diff options
author | SerenaFeng <feng.xiaowei@zte.com.cn> | 2016-09-13 13:53:46 +0800 |
---|---|---|
committer | SerenaFeng <feng.xiaowei@zte.com.cn> | 2016-09-14 00:34:42 +0800 |
commit | b176405bc9c60989493483a9d1f11aea2f2cb5d1 (patch) | |
tree | 4151d905e0f5359ed5f038036c004fbe03afdac9 | |
parent | f8c3682b609b5fa238385d69e34e397ec6ec1bfd (diff) |
merge N days and whole database process and not publish existed data any way
JIRA: FUNCTEST-473
Change-Id: I2631fa04b0922afda2746cfd6be6f35587d534ba
Signed-off-by: SerenaFeng <feng.xiaowei@zte.com.cn>
-rw-r--r-- | scripts/create_kibana_dashboards.py | 2 | ||||
-rw-r--r-- | scripts/kibana_cleanup.py | 2 | ||||
-rw-r--r-- | scripts/mongo_to_elasticsearch.py | 146 | ||||
-rw-r--r-- | scripts/shared_utils.py | 37 |
4 files changed, 92 insertions, 95 deletions
diff --git a/scripts/create_kibana_dashboards.py b/scripts/create_kibana_dashboards.py index 59666c1..efa6e17 100644 --- a/scripts/create_kibana_dashboards.py +++ b/scripts/create_kibana_dashboards.py @@ -273,7 +273,7 @@ def _get_pods_and_scenarios(project_name, case_name, installer): } }) - elastic_data = shared_utils.get_elastic_data(urlparse.urljoin(base_elastic_url, '/test_results/mongo2elastic'), + elastic_data = shared_utils.get_elastic_docs(urlparse.urljoin(base_elastic_url, '/test_results/mongo2elastic'), es_creds, query_json) pods_and_scenarios = {} diff --git a/scripts/kibana_cleanup.py b/scripts/kibana_cleanup.py index e8d452a..e699db4 100644 --- a/scripts/kibana_cleanup.py +++ b/scripts/kibana_cleanup.py @@ -13,7 +13,7 @@ logger.addHandler(file_handler) def delete_all(url, es_creds): - ids = shared_utils.get_elastic_data(url, es_creds, body=None, field='_id') + ids = shared_utils.get_elastic_docs(url, es_creds, body=None, field='_id') for id in ids: del_url = '/'.join([url, id]) shared_utils.delete_request(del_url, es_creds) diff --git a/scripts/mongo_to_elasticsearch.py b/scripts/mongo_to_elasticsearch.py index ea515bc..b722793 100644 --- a/scripts/mongo_to_elasticsearch.py +++ b/scripts/mongo_to_elasticsearch.py @@ -16,6 +16,27 @@ import shared_utils logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get +parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch') +parser.add_argument('-od', '--output-destination', + default='elasticsearch', + choices=('elasticsearch', 'stdout'), + help='defaults to elasticsearch') + +parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N', + help='get entries old at most N days from mongodb and' + ' parse those that are not already in elasticsearch.' + ' If not present, will get everything from mongodb, which is the default') + +parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200', + help='the url of elasticsearch, defaults to http://localhost:9200') + +parser.add_argument('-u', '--elasticsearch-username', default=None, + help='The username with password for elasticsearch in format username:password') + +args = parser.parse_args() + +tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4()) + def _fix_date(date_string): if isinstance(date_string, dict): @@ -24,7 +45,7 @@ def _fix_date(date_string): return date_string[:-3].replace(' ', 'T') + 'Z' -def verify_mongo_entry(testcase): +def verify_document(testcase): """ Mandatory fields: installer @@ -109,12 +130,12 @@ def verify_mongo_entry(testcase): return True -def modify_mongo_entry(testcase): +def format_document(testcase): # 1. verify and identify the testcase # 2. if modification is implemented, then use that # 3. if not, try to use default # 4. if 2 or 3 is successful, return True, otherwise return False - if verify_mongo_entry(testcase): + if verify_document(testcase): project = testcase['project_name'] case_name = testcase['case_name'] fmt = conf_utils.get_format(project, case_name) @@ -123,103 +144,56 @@ def modify_mongo_entry(testcase): logger.info("Processing %s/%s using format %s" % (project, case_name, fmt)) return vars(mongo2elastic_format)[fmt](testcase) except Exception: - logger.error("Fail in modify testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc())) + logger.error("Fail in format testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc())) + return False else: return False -def publish_mongo_data(output_destination): - tmp_filename = 'mongo-{}.log'.format(uuid.uuid4()) - try: - subprocess.check_call(['mongoexport', - '--db', 'test_results_collection', - '-c', 'results', - '--out', tmp_filename]) - with open(tmp_filename) as fobj: - for mongo_json_line in fobj: - test_result = json.loads(mongo_json_line) - if modify_mongo_entry(test_result): - status, data = shared_utils.publish_json(test_result, es_creds, output_destination) - if status > 300: - project = test_result['project_name'] - case_name = test_result['case_name'] - logger.info('project {} case {} publish failed, due to [{}]' - .format(project, case_name, json.loads(data)['error']['reason'])) - finally: - if os.path.exists(tmp_filename): - os.remove(tmp_filename) - +def export_documents(days): + cmd = ['mongoexport', '--db', 'test_results_collection', '-c', 'results'] + if days > 0: + past_time = datetime.datetime.today() - datetime.timedelta(days=days) + cmd += ['--query', '{{"start_date":{{$gt:"{}"}}}}'.format(past_time)] + cmd += [ '--out', '{}'.format(tmp_docs_file)] -def get_mongo_data(days): - past_time = datetime.datetime.today() - datetime.timedelta(days=days) - mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'results', - '--query', '{{"start_date":{{$gt:"{}"}}}}' - .format(past_time)]).splitlines() - - mongo_data = [] - for mongo_json_line in mongo_json_lines: - test_result = json.loads(mongo_json_line) - if modify_mongo_entry(test_result): - # if the modification could be applied, append the modified result - mongo_data.append(test_result) - return mongo_data + try: + subprocess.check_call(cmd) + except Exception, err: + logger.error("export mongodb failed: %s" % err) + exit(-1) -def publish_difference(mongo_data, elastic_data, output_destination, es_creds): - for elastic_entry in elastic_data: - if elastic_entry in mongo_data: - mongo_data.remove(elastic_entry) +def publish_document(document, es_creds, to): + status, data = shared_utils.publish_json(document, es_creds, to) + if status > 300: + logger.error('Publish record[{}] failed, due to [{}]' + .format(document, json.loads(data)['error']['reason'])) - logger.info('number of parsed test results: {}'.format(len(mongo_data))) - for parsed_test_result in mongo_data: - shared_utils.publish_json(parsed_test_result, es_creds, output_destination) +def publish_nonexist_documents(elastic_docs, es_creds, to): + try: + with open(tmp_docs_file) as fdocs: + for doc_line in fdocs: + doc = json.loads(doc_line) + if format_document(doc) and doc not in elastic_docs: + publish_document(doc, es_creds, to) + finally: + fdocs.close() + if os.path.exists(tmp_docs_file): + os.remove(tmp_docs_file) if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch') - parser.add_argument('-od', '--output-destination', - default='elasticsearch', - choices=('elasticsearch', 'stdout'), - help='defaults to elasticsearch') - - parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N', - help='get entries old at most N days from mongodb and' - ' parse those that are not already in elasticsearch.' - ' If not present, will get everything from mongodb, which is the default') - - parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200', - help='the url of elasticsearch, defaults to http://localhost:9200') - - parser.add_argument('-u', '--elasticsearch-username', default=None, - help='The username with password for elasticsearch in format username:password') - - args = parser.parse_args() base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic') - output_destination = args.output_destination + to = args.output_destination days = args.merge_latest es_creds = args.elasticsearch_username - if output_destination == 'elasticsearch': - output_destination = base_elastic_url - - # parsed_test_results will be printed/sent to elasticsearch - if days == 0: - publish_mongo_data(output_destination) - elif days > 0: - body = '''{{ - "query" : {{ - "range" : {{ - "start_date" : {{ - "gte" : "now-{}d" - }} - }} - }} -}}'''.format(days) - elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_creds, body) - logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data))) - mongo_data = get_mongo_data(days) - publish_difference(mongo_data, elastic_data, output_destination, es_creds) - else: - raise Exception('Update must be non-negative') + if to == 'elasticsearch': + to = base_elastic_url + export_documents(days) + elastic_docs = shared_utils.get_elastic_docs_by_days(base_elastic_url, es_creds, days) + logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_docs))) + publish_nonexist_documents(elastic_docs, es_creds, to) diff --git a/scripts/shared_utils.py b/scripts/shared_utils.py index 15c1af8..aa8a65d 100644 --- a/scripts/shared_utils.py +++ b/scripts/shared_utils.py @@ -10,14 +10,14 @@ def delete_request(url, creds, body=None): http.request('DELETE', url, headers=headers, body=body) -def publish_json(json_ojb, creds, output_destination): +def publish_json(json_ojb, creds, to): json_dump = json.dumps(json_ojb) - if output_destination == 'stdout': + if to == 'stdout': print json_dump return 200, None else: headers = urllib3.make_headers(basic_auth=creds) - result = http.request('POST', output_destination, headers=headers, body=json_dump) + result = http.request('POST', to, headers=headers, body=json_dump) return result.status, result.data @@ -25,16 +25,39 @@ def _get_nr_of_hits(elastic_json): return elastic_json['hits']['total'] -def get_elastic_data(elastic_url, creds, body, field='_source'): +def get_elastic_docs(elastic_url, creds, body=None, field = '_source'): + # 1. get the number of results headers = urllib3.make_headers(basic_auth=creds) elastic_json = json.loads(http.request('GET', elastic_url + '/_search?size=0', headers=headers, body=body).data) + print elastic_json nr_of_hits = _get_nr_of_hits(elastic_json) # 2. get all results elastic_json = json.loads(http.request('GET', elastic_url + '/_search?size={}'.format(nr_of_hits), headers=headers, body=body).data) - elastic_data = [] + elastic_docs = [] for hit in elastic_json['hits']['hits']: - elastic_data.append(hit[field]) - return elastic_data + elastic_docs.append(hit[field]) + return elastic_docs + +def get_elastic_docs_by_days(elastic_url, creds, days): + if days == 0: + body = '''{ + "query": { + "match_all": {} + } + }''' + elif days > 0: + body = '''{{ + "query" : {{ + "range" : {{ + "start_date" : {{ + "gte" : "now-{}d" + }} + }} + }} + }}'''.format(days) + else: + raise Exception('Update days must be non-negative') + return get_elastic_docs(elastic_url, creds, body) |