diff options
Diffstat (limited to 'utils/test/scripts/mongo_to_elasticsearch.py')
-rw-r--r-- | utils/test/scripts/mongo_to_elasticsearch.py | 333 |
1 files changed, 190 insertions, 143 deletions
diff --git a/utils/test/scripts/mongo_to_elasticsearch.py b/utils/test/scripts/mongo_to_elasticsearch.py index b722793b3..3af7c0fa8 100644 --- a/utils/test/scripts/mongo_to_elasticsearch.py +++ b/utils/test/scripts/mongo_to_elasticsearch.py @@ -1,4 +1,5 @@ #! /usr/bin/env python + import datetime import json import os @@ -38,148 +39,187 @@ args = parser.parse_args() tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4()) -def _fix_date(date_string): - if isinstance(date_string, dict): - return date_string['$date'] - else: - return date_string[:-3].replace(' ', 'T') + 'Z' - - -def verify_document(testcase): - """ - Mandatory fields: - installer - pod_name - version - case_name - date - project - details - - these fields must be present and must NOT be None - - Optional fields: - description - - these fields will be preserved if the are NOT None - """ - mandatory_fields = ['installer', - 'pod_name', - 'version', - 'case_name', - 'project_name', - 'details'] - mandatory_fields_to_modify = {'start_date': _fix_date} - fields_to_swap_or_add = {'scenario': 'version'} - if '_id' in testcase: - mongo_id = testcase['_id'] - else: - mongo_id = None - optional_fields = ['description'] - for key, value in testcase.items(): - if key in mandatory_fields: - if value is None: - # empty mandatory field, invalid input - logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value" - " for mandatory field '{}'".format(mongo_id, key)) - return False - else: - mandatory_fields.remove(key) - elif key in mandatory_fields_to_modify: - if value is None: - # empty mandatory field, invalid input - logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value" - " for mandatory field '{}'".format(mongo_id, key)) - return False +class DocumentPublisher: + + def __init__(self, doc, fmt, exist_docs, creds, to): + self.doc = doc + self.fmt = fmt + self.creds = creds + self.exist_docs = exist_docs + self.to = to + self.is_formatted = True + + def format(self): + try: + if self._verify_document() and self.fmt: + self.is_formatted = vars(mongo2elastic_format)[self.fmt](self.doc) else: - testcase[key] = mandatory_fields_to_modify[key](value) - del mandatory_fields_to_modify[key] - elif key in fields_to_swap_or_add: - if value is None: - swapped_key = fields_to_swap_or_add[key] - swapped_value = testcase[swapped_key] - logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, swapped_value)) - testcase[key] = swapped_value - del fields_to_swap_or_add[key] + self.is_formatted = False + except Exception: + logger.error("Fail in format testcase[%s]\nerror message: %s" % + (self.doc, traceback.format_exc())) + self.is_formatted = False + finally: + return self + + def publish(self): + if self.is_formatted and self.doc not in self.exist_docs: + self._publish() + + def _publish(self): + status, data = shared_utils.publish_json(self.doc, self.creds, self.to) + if status > 300: + logger.error('Publish record[{}] failed, due to [{}]' + .format(self.doc, json.loads(data)['error']['reason'])) + + def _fix_date(self, date_string): + if isinstance(date_string, dict): + return date_string['$date'] + else: + return date_string[:-3].replace(' ', 'T') + 'Z' + + def _verify_document(self): + """ + Mandatory fields: + installer + pod_name + version + case_name + date + project + details + + these fields must be present and must NOT be None + + Optional fields: + description + + these fields will be preserved if the are NOT None + """ + mandatory_fields = ['installer', + 'pod_name', + 'version', + 'case_name', + 'project_name', + 'details'] + mandatory_fields_to_modify = {'start_date': self._fix_date} + fields_to_swap_or_add = {'scenario': 'version'} + if '_id' in self.doc: + mongo_id = self.doc['_id'] + else: + mongo_id = None + optional_fields = ['description'] + for key, value in self.doc.items(): + if key in mandatory_fields: + if value is None: + # empty mandatory field, invalid input + logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value" + " for mandatory field '{}'".format(mongo_id, key)) + return False + else: + mandatory_fields.remove(key) + elif key in mandatory_fields_to_modify: + if value is None: + # empty mandatory field, invalid input + logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value" + " for mandatory field '{}'".format(mongo_id, key)) + return False + else: + self.doc[key] = mandatory_fields_to_modify[key](value) + del mandatory_fields_to_modify[key] + elif key in fields_to_swap_or_add: + if value is None: + swapped_key = fields_to_swap_or_add[key] + swapped_value = self.doc[swapped_key] + logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key, + swapped_value)) + self.doc[key] = swapped_value + del fields_to_swap_or_add[key] + else: + del fields_to_swap_or_add[key] + elif key in optional_fields: + if value is None: + # empty optional field, remove + del self.doc[key] + optional_fields.remove(key) else: - del fields_to_swap_or_add[key] - elif key in optional_fields: - if value is None: - # empty optional field, remove - del testcase[key] - optional_fields.remove(key) + # unknown field + del self.doc[key] + + if len(mandatory_fields) > 0: + # some mandatory fields are missing + logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing" + " mandatory field(s) '{}'".format(mongo_id, mandatory_fields)) + return False + elif len(mandatory_fields_to_modify) > 0: + # some mandatory fields are missing + logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing" + " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys())) + return False + else: + if len(fields_to_swap_or_add) > 0: + for key, swap_key in fields_to_swap_or_add.iteritems(): + self.doc[key] = self.doc[swap_key] + + return True + + +class DocumentsPublisher: + + def __init__(self, project, case, fmt, days, elastic_url, creds, to): + self.project = project + self.case = case + self.fmt = fmt + self.days = days + self.elastic_url = elastic_url + self.creds = creds + self.to = to + self.existed_docs = [] + + def export(self): + if days > 0: + past_time = datetime.datetime.today() - datetime.timedelta(days=days) + query = '''{{ + "project_name": "{}", + "case_name": "{}", + "start_date": {{"$gt" : "{}"}} + }}'''.format(self.project, self.case, past_time) else: - # unknown field - del testcase[key] - - if len(mandatory_fields) > 0: - # some mandatory fields are missing - logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing" - " mandatory field(s) '{}'".format(mongo_id, mandatory_fields)) - return False - elif len(mandatory_fields_to_modify) > 0: - # some mandatory fields are missing - logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing" - " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys())) - return False - else: - if len(fields_to_swap_or_add) > 0: - for key, swap_key in fields_to_swap_or_add.iteritems(): - testcase[key] = testcase[swap_key] - - return True - - -def format_document(testcase): - # 1. verify and identify the testcase - # 2. if modification is implemented, then use that - # 3. if not, try to use default - # 4. if 2 or 3 is successful, return True, otherwise return False - if verify_document(testcase): - project = testcase['project_name'] - case_name = testcase['case_name'] - fmt = conf_utils.get_format(project, case_name) - if fmt: - try: - logger.info("Processing %s/%s using format %s" % (project, case_name, fmt)) - return vars(mongo2elastic_format)[fmt](testcase) - except Exception: - logger.error("Fail in format testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc())) - return False - else: - return False - - -def export_documents(days): - cmd = ['mongoexport', '--db', 'test_results_collection', '-c', 'results'] - if days > 0: - past_time = datetime.datetime.today() - datetime.timedelta(days=days) - cmd += ['--query', '{{"start_date":{{$gt:"{}"}}}}'.format(past_time)] - cmd += [ '--out', '{}'.format(tmp_docs_file)] - - try: - subprocess.check_call(cmd) - except Exception, err: - logger.error("export mongodb failed: %s" % err) - exit(-1) - - -def publish_document(document, es_creds, to): - status, data = shared_utils.publish_json(document, es_creds, to) - if status > 300: - logger.error('Publish record[{}] failed, due to [{}]' - .format(document, json.loads(data)['error']['reason'])) - - -def publish_nonexist_documents(elastic_docs, es_creds, to): - try: - with open(tmp_docs_file) as fdocs: - for doc_line in fdocs: - doc = json.loads(doc_line) - if format_document(doc) and doc not in elastic_docs: - publish_document(doc, es_creds, to) - finally: - fdocs.close() + query = '''{{ + "project_name": "{}", + "case_name": "{}" + }}'''.format(self.project, self.case) + cmd = ['mongoexport', + '--db', 'test_results_collection', + '--collection', 'results', + '--query', '{}'.format(query), + '--out', '{}'.format(tmp_docs_file)] + try: + subprocess.check_call(cmd) + return self + except Exception, err: + logger.error("export mongodb failed: %s" % err) + self._remove() + exit(-1) + + def get_existed_docs(self): + self.existed_docs = shared_utils.get_elastic_docs_by_days(self.elastic_url, self.creds, days) + return self + + def publish(self): + try: + with open(tmp_docs_file) as fdocs: + for doc_line in fdocs: + DocumentPublisher(json.loads(doc_line), + self.fmt, + self.existed_docs, + self.creds, + self.to).format().publish() + finally: + fdocs.close() + self._remove() + + def _remove(self): if os.path.exists(tmp_docs_file): os.remove(tmp_docs_file) @@ -193,7 +233,14 @@ if __name__ == '__main__': if to == 'elasticsearch': to = base_elastic_url - export_documents(days) - elastic_docs = shared_utils.get_elastic_docs_by_days(base_elastic_url, es_creds, days) - logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_docs))) - publish_nonexist_documents(elastic_docs, es_creds, to) + for project, case_dicts in conf_utils.testcases_yaml.items(): + for case_dict in case_dicts: + case = case_dict.get('name') + fmt = conf_utils.compose_format(case_dict.get('format')) + DocumentsPublisher(project, + case, + fmt, + days, + base_elastic_url, + es_creds, + to).export().get_existed_docs().publish() |