summaryrefslogtreecommitdiffstats
path: root/utils/test/scripts/mongo_to_elasticsearch.py
diff options
context:
space:
mode:
Diffstat (limited to 'utils/test/scripts/mongo_to_elasticsearch.py')
-rw-r--r--utils/test/scripts/mongo_to_elasticsearch.py333
1 files changed, 190 insertions, 143 deletions
diff --git a/utils/test/scripts/mongo_to_elasticsearch.py b/utils/test/scripts/mongo_to_elasticsearch.py
index b722793b3..3af7c0fa8 100644
--- a/utils/test/scripts/mongo_to_elasticsearch.py
+++ b/utils/test/scripts/mongo_to_elasticsearch.py
@@ -1,4 +1,5 @@
#! /usr/bin/env python
+
import datetime
import json
import os
@@ -38,148 +39,187 @@ args = parser.parse_args()
tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
-def _fix_date(date_string):
- if isinstance(date_string, dict):
- return date_string['$date']
- else:
- return date_string[:-3].replace(' ', 'T') + 'Z'
-
-
-def verify_document(testcase):
- """
- Mandatory fields:
- installer
- pod_name
- version
- case_name
- date
- project
- details
-
- these fields must be present and must NOT be None
-
- Optional fields:
- description
-
- these fields will be preserved if the are NOT None
- """
- mandatory_fields = ['installer',
- 'pod_name',
- 'version',
- 'case_name',
- 'project_name',
- 'details']
- mandatory_fields_to_modify = {'start_date': _fix_date}
- fields_to_swap_or_add = {'scenario': 'version'}
- if '_id' in testcase:
- mongo_id = testcase['_id']
- else:
- mongo_id = None
- optional_fields = ['description']
- for key, value in testcase.items():
- if key in mandatory_fields:
- if value is None:
- # empty mandatory field, invalid input
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
- " for mandatory field '{}'".format(mongo_id, key))
- return False
- else:
- mandatory_fields.remove(key)
- elif key in mandatory_fields_to_modify:
- if value is None:
- # empty mandatory field, invalid input
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
- " for mandatory field '{}'".format(mongo_id, key))
- return False
+class DocumentPublisher:
+
+ def __init__(self, doc, fmt, exist_docs, creds, to):
+ self.doc = doc
+ self.fmt = fmt
+ self.creds = creds
+ self.exist_docs = exist_docs
+ self.to = to
+ self.is_formatted = True
+
+ def format(self):
+ try:
+ if self._verify_document() and self.fmt:
+ self.is_formatted = vars(mongo2elastic_format)[self.fmt](self.doc)
else:
- testcase[key] = mandatory_fields_to_modify[key](value)
- del mandatory_fields_to_modify[key]
- elif key in fields_to_swap_or_add:
- if value is None:
- swapped_key = fields_to_swap_or_add[key]
- swapped_value = testcase[swapped_key]
- logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value))
- testcase[key] = swapped_value
- del fields_to_swap_or_add[key]
+ self.is_formatted = False
+ except Exception:
+ logger.error("Fail in format testcase[%s]\nerror message: %s" %
+ (self.doc, traceback.format_exc()))
+ self.is_formatted = False
+ finally:
+ return self
+
+ def publish(self):
+ if self.is_formatted and self.doc not in self.exist_docs:
+ self._publish()
+
+ def _publish(self):
+ status, data = shared_utils.publish_json(self.doc, self.creds, self.to)
+ if status > 300:
+ logger.error('Publish record[{}] failed, due to [{}]'
+ .format(self.doc, json.loads(data)['error']['reason']))
+
+ def _fix_date(self, date_string):
+ if isinstance(date_string, dict):
+ return date_string['$date']
+ else:
+ return date_string[:-3].replace(' ', 'T') + 'Z'
+
+ def _verify_document(self):
+ """
+ Mandatory fields:
+ installer
+ pod_name
+ version
+ case_name
+ date
+ project
+ details
+
+ these fields must be present and must NOT be None
+
+ Optional fields:
+ description
+
+ these fields will be preserved if the are NOT None
+ """
+ mandatory_fields = ['installer',
+ 'pod_name',
+ 'version',
+ 'case_name',
+ 'project_name',
+ 'details']
+ mandatory_fields_to_modify = {'start_date': self._fix_date}
+ fields_to_swap_or_add = {'scenario': 'version'}
+ if '_id' in self.doc:
+ mongo_id = self.doc['_id']
+ else:
+ mongo_id = None
+ optional_fields = ['description']
+ for key, value in self.doc.items():
+ if key in mandatory_fields:
+ if value is None:
+ # empty mandatory field, invalid input
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
+ " for mandatory field '{}'".format(mongo_id, key))
+ return False
+ else:
+ mandatory_fields.remove(key)
+ elif key in mandatory_fields_to_modify:
+ if value is None:
+ # empty mandatory field, invalid input
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
+ " for mandatory field '{}'".format(mongo_id, key))
+ return False
+ else:
+ self.doc[key] = mandatory_fields_to_modify[key](value)
+ del mandatory_fields_to_modify[key]
+ elif key in fields_to_swap_or_add:
+ if value is None:
+ swapped_key = fields_to_swap_or_add[key]
+ swapped_value = self.doc[swapped_key]
+ logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key,
+ swapped_value))
+ self.doc[key] = swapped_value
+ del fields_to_swap_or_add[key]
+ else:
+ del fields_to_swap_or_add[key]
+ elif key in optional_fields:
+ if value is None:
+ # empty optional field, remove
+ del self.doc[key]
+ optional_fields.remove(key)
else:
- del fields_to_swap_or_add[key]
- elif key in optional_fields:
- if value is None:
- # empty optional field, remove
- del testcase[key]
- optional_fields.remove(key)
+ # unknown field
+ del self.doc[key]
+
+ if len(mandatory_fields) > 0:
+ # some mandatory fields are missing
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
+ " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
+ return False
+ elif len(mandatory_fields_to_modify) > 0:
+ # some mandatory fields are missing
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
+ " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
+ return False
+ else:
+ if len(fields_to_swap_or_add) > 0:
+ for key, swap_key in fields_to_swap_or_add.iteritems():
+ self.doc[key] = self.doc[swap_key]
+
+ return True
+
+
+class DocumentsPublisher:
+
+ def __init__(self, project, case, fmt, days, elastic_url, creds, to):
+ self.project = project
+ self.case = case
+ self.fmt = fmt
+ self.days = days
+ self.elastic_url = elastic_url
+ self.creds = creds
+ self.to = to
+ self.existed_docs = []
+
+ def export(self):
+ if days > 0:
+ past_time = datetime.datetime.today() - datetime.timedelta(days=days)
+ query = '''{{
+ "project_name": "{}",
+ "case_name": "{}",
+ "start_date": {{"$gt" : "{}"}}
+ }}'''.format(self.project, self.case, past_time)
else:
- # unknown field
- del testcase[key]
-
- if len(mandatory_fields) > 0:
- # some mandatory fields are missing
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
- " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
- return False
- elif len(mandatory_fields_to_modify) > 0:
- # some mandatory fields are missing
- logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
- " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
- return False
- else:
- if len(fields_to_swap_or_add) > 0:
- for key, swap_key in fields_to_swap_or_add.iteritems():
- testcase[key] = testcase[swap_key]
-
- return True
-
-
-def format_document(testcase):
- # 1. verify and identify the testcase
- # 2. if modification is implemented, then use that
- # 3. if not, try to use default
- # 4. if 2 or 3 is successful, return True, otherwise return False
- if verify_document(testcase):
- project = testcase['project_name']
- case_name = testcase['case_name']
- fmt = conf_utils.get_format(project, case_name)
- if fmt:
- try:
- logger.info("Processing %s/%s using format %s" % (project, case_name, fmt))
- return vars(mongo2elastic_format)[fmt](testcase)
- except Exception:
- logger.error("Fail in format testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
- return False
- else:
- return False
-
-
-def export_documents(days):
- cmd = ['mongoexport', '--db', 'test_results_collection', '-c', 'results']
- if days > 0:
- past_time = datetime.datetime.today() - datetime.timedelta(days=days)
- cmd += ['--query', '{{"start_date":{{$gt:"{}"}}}}'.format(past_time)]
- cmd += [ '--out', '{}'.format(tmp_docs_file)]
-
- try:
- subprocess.check_call(cmd)
- except Exception, err:
- logger.error("export mongodb failed: %s" % err)
- exit(-1)
-
-
-def publish_document(document, es_creds, to):
- status, data = shared_utils.publish_json(document, es_creds, to)
- if status > 300:
- logger.error('Publish record[{}] failed, due to [{}]'
- .format(document, json.loads(data)['error']['reason']))
-
-
-def publish_nonexist_documents(elastic_docs, es_creds, to):
- try:
- with open(tmp_docs_file) as fdocs:
- for doc_line in fdocs:
- doc = json.loads(doc_line)
- if format_document(doc) and doc not in elastic_docs:
- publish_document(doc, es_creds, to)
- finally:
- fdocs.close()
+ query = '''{{
+ "project_name": "{}",
+ "case_name": "{}"
+ }}'''.format(self.project, self.case)
+ cmd = ['mongoexport',
+ '--db', 'test_results_collection',
+ '--collection', 'results',
+ '--query', '{}'.format(query),
+ '--out', '{}'.format(tmp_docs_file)]
+ try:
+ subprocess.check_call(cmd)
+ return self
+ except Exception, err:
+ logger.error("export mongodb failed: %s" % err)
+ self._remove()
+ exit(-1)
+
+ def get_existed_docs(self):
+ self.existed_docs = shared_utils.get_elastic_docs_by_days(self.elastic_url, self.creds, days)
+ return self
+
+ def publish(self):
+ try:
+ with open(tmp_docs_file) as fdocs:
+ for doc_line in fdocs:
+ DocumentPublisher(json.loads(doc_line),
+ self.fmt,
+ self.existed_docs,
+ self.creds,
+ self.to).format().publish()
+ finally:
+ fdocs.close()
+ self._remove()
+
+ def _remove(self):
if os.path.exists(tmp_docs_file):
os.remove(tmp_docs_file)
@@ -193,7 +233,14 @@ if __name__ == '__main__':
if to == 'elasticsearch':
to = base_elastic_url
- export_documents(days)
- elastic_docs = shared_utils.get_elastic_docs_by_days(base_elastic_url, es_creds, days)
- logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_docs)))
- publish_nonexist_documents(elastic_docs, es_creds, to)
+ for project, case_dicts in conf_utils.testcases_yaml.items():
+ for case_dict in case_dicts:
+ case = case_dict.get('name')
+ fmt = conf_utils.compose_format(case_dict.get('format'))
+ DocumentsPublisher(project,
+ case,
+ fmt,
+ days,
+ base_elastic_url,
+ es_creds,
+ to).export().get_existed_docs().publish()