summaryrefslogtreecommitdiffstats
path: root/dashboard/dashboard/mongo2elastic/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'dashboard/dashboard/mongo2elastic/main.py')
-rw-r--r--dashboard/dashboard/mongo2elastic/main.py243
1 files changed, 243 insertions, 0 deletions
diff --git a/dashboard/dashboard/mongo2elastic/main.py b/dashboard/dashboard/mongo2elastic/main.py
new file mode 100644
index 0000000..25b5320
--- /dev/null
+++ b/dashboard/dashboard/mongo2elastic/main.py
@@ -0,0 +1,243 @@
+#! /usr/bin/env python
+
+import datetime
+import json
+import os
+import subprocess
+import traceback
+import urlparse
+import uuid
+
+import argparse
+
+from common import logger_utils, elastic_access
+from conf import testcases
+from conf.config import APIConfig
+from mongo2elastic import format
+
+logger = logger_utils.DashboardLogger('mongo2elastic').get
+
+parser = argparse.ArgumentParser()
+parser.add_argument("-c", "--config-file",
+ dest='config_file',
+ help="Config file location")
+parser.add_argument('-ld', '--latest-days',
+ default=0,
+ type=int,
+ metavar='N',
+ help='get entries old at most N days from mongodb and'
+ ' parse those that are not already in elasticsearch.'
+ ' If not present, will get everything from mongodb, which is the default')
+
+args = parser.parse_args()
+CONF = APIConfig().parse(args.config_file)
+
+
+tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
+
+
+class DocumentPublisher:
+
+ def __init__(self, doc, fmt, exist_docs, creds, to):
+ self.doc = doc
+ self.fmt = fmt
+ self.creds = creds
+ self.exist_docs = exist_docs
+ self.to = to
+ self.is_formatted = True
+
+ def format(self):
+ try:
+ if self._verify_document() and self.fmt:
+ self.is_formatted = vars(format)[self.fmt](self.doc)
+ else:
+ self.is_formatted = False
+ except Exception:
+ logger.error("Fail in format testcase[%s]\nerror message: %s" %
+ (self.doc, traceback.format_exc()))
+ self.is_formatted = False
+ finally:
+ return self
+
+ def publish(self):
+ if self.is_formatted and self.doc not in self.exist_docs:
+ self._publish()
+
+ def _publish(self):
+ status, data = elastic_access.publish_json(self.doc, self.creds, self.to)
+ if status > 300:
+ logger.error('Publish record[{}] failed, due to [{}]'
+ .format(self.doc, json.loads(data)['error']['reason']))
+
+ def _fix_date(self, date_string):
+ if isinstance(date_string, dict):
+ return date_string['$date']
+ else:
+ return date_string[:-3].replace(' ', 'T') + 'Z'
+
+ def _verify_document(self):
+ """
+ Mandatory fields:
+ installer
+ pod_name
+ version
+ case_name
+ date
+ project
+ details
+
+ these fields must be present and must NOT be None
+
+ Optional fields:
+ description
+
+ these fields will be preserved if the are NOT None
+ """
+ mandatory_fields = ['installer',
+ 'pod_name',
+ 'version',
+ 'case_name',
+ 'project_name',
+ 'details']
+ mandatory_fields_to_modify = {'start_date': self._fix_date}
+ fields_to_swap_or_add = {'scenario': 'version'}
+ if '_id' in self.doc:
+ mongo_id = self.doc['_id']
+ else:
+ mongo_id = None
+ optional_fields = ['description']
+ for key, value in self.doc.items():
+ if key in mandatory_fields:
+ if value is None:
+ # empty mandatory field, invalid input
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
+ " for mandatory field '{}'".format(mongo_id, key))
+ return False
+ else:
+ mandatory_fields.remove(key)
+ elif key in mandatory_fields_to_modify:
+ if value is None:
+ # empty mandatory field, invalid input
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing value"
+ " for mandatory field '{}'".format(mongo_id, key))
+ return False
+ else:
+ self.doc[key] = mandatory_fields_to_modify[key](value)
+ del mandatory_fields_to_modify[key]
+ elif key in fields_to_swap_or_add:
+ if value is None:
+ swapped_key = fields_to_swap_or_add[key]
+ swapped_value = self.doc[swapped_key]
+ logger.info("Swapping field '{}' with value None for '{}' with value '{}'.".format(key, swapped_key,
+ swapped_value))
+ self.doc[key] = swapped_value
+ del fields_to_swap_or_add[key]
+ else:
+ del fields_to_swap_or_add[key]
+ elif key in optional_fields:
+ if value is None:
+ # empty optional field, remove
+ del self.doc[key]
+ optional_fields.remove(key)
+ else:
+ # unknown field
+ del self.doc[key]
+
+ if len(mandatory_fields) > 0:
+ # some mandatory fields are missing
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
+ " mandatory field(s) '{}'".format(mongo_id, mandatory_fields))
+ return False
+ elif len(mandatory_fields_to_modify) > 0:
+ # some mandatory fields are missing
+ logger.info("Skipping testcase with mongo _id '{}' because the testcase was missing"
+ " mandatory field(s) '{}'".format(mongo_id, mandatory_fields_to_modify.keys()))
+ return False
+ else:
+ if len(fields_to_swap_or_add) > 0:
+ for key, swap_key in fields_to_swap_or_add.iteritems():
+ self.doc[key] = self.doc[swap_key]
+
+ return True
+
+
+class DocumentsPublisher:
+
+ def __init__(self, project, case, fmt, days, elastic_url, creds, to):
+ self.project = project
+ self.case = case
+ self.fmt = fmt
+ self.days = days
+ self.elastic_url = elastic_url
+ self.creds = creds
+ self.to = to
+ self.existed_docs = []
+
+ def export(self):
+ if self.days > 0:
+ past_time = datetime.datetime.today() - datetime.timedelta(days=self.days)
+ query = '''{{
+ "project_name": "{}",
+ "case_name": "{}",
+ "start_date": {{"$gt" : "{}"}}
+ }}'''.format(self.project, self.case, past_time)
+ else:
+ query = '''{{
+ "project_name": "{}",
+ "case_name": "{}"
+ }}'''.format(self.project, self.case)
+ cmd = ['mongoexport',
+ '--db', 'test_results_collection',
+ '--collection', 'results',
+ '--query', '{}'.format(query),
+ '--out', '{}'.format(tmp_docs_file)]
+ try:
+ subprocess.check_call(cmd)
+ return self
+ except Exception, err:
+ logger.error("export mongodb failed: %s" % err)
+ self._remove()
+ exit(-1)
+
+ def get_existed_docs(self):
+ self.existed_docs = elastic_access.get_elastic_docs_by_days(self.elastic_url, self.creds, self.days)
+ return self
+
+ def publish(self):
+ try:
+ with open(tmp_docs_file) as fdocs:
+ for doc_line in fdocs:
+ DocumentPublisher(json.loads(doc_line),
+ self.fmt,
+ self.existed_docs,
+ self.creds,
+ self.to).format().publish()
+ finally:
+ fdocs.close()
+ self._remove()
+
+ def _remove(self):
+ if os.path.exists(tmp_docs_file):
+ os.remove(tmp_docs_file)
+
+
+def main():
+ base_elastic_url = urlparse.urljoin(CONF.elastic_url, '/test_results/mongo2elastic')
+ to = CONF.destination
+ days = args.latest_days
+ es_creds = CONF.elastic_creds
+
+ if to == 'elasticsearch':
+ to = base_elastic_url
+
+ for project, case_dicts in testcases.testcases_yaml.items():
+ for case_dict in case_dicts:
+ case = case_dict.get('name')
+ fmt = testcases.compose_format(case_dict.get('format'))
+ DocumentsPublisher(project,
+ case,
+ fmt,
+ days,
+ base_elastic_url,
+ es_creds,
+ to).export().get_existed_docs().publish()