summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSerenaFeng <feng.xiaowei@zte.com.cn>2016-09-13 13:53:46 +0800
committerSerenaFeng <feng.xiaowei@zte.com.cn>2016-09-14 00:34:42 +0800
commitb176405bc9c60989493483a9d1f11aea2f2cb5d1 (patch)
tree4151d905e0f5359ed5f038036c004fbe03afdac9
parentf8c3682b609b5fa238385d69e34e397ec6ec1bfd (diff)
merge N days and whole database process and not publish existed data any way
JIRA: FUNCTEST-473 Change-Id: I2631fa04b0922afda2746cfd6be6f35587d534ba Signed-off-by: SerenaFeng <feng.xiaowei@zte.com.cn>
-rw-r--r--scripts/create_kibana_dashboards.py2
-rw-r--r--scripts/kibana_cleanup.py2
-rw-r--r--scripts/mongo_to_elasticsearch.py146
-rw-r--r--scripts/shared_utils.py37
4 files changed, 92 insertions, 95 deletions
diff --git a/scripts/create_kibana_dashboards.py b/scripts/create_kibana_dashboards.py
index 59666c1..efa6e17 100644
--- a/scripts/create_kibana_dashboards.py
+++ b/scripts/create_kibana_dashboards.py
@@ -273,7 +273,7 @@ def _get_pods_and_scenarios(project_name, case_name, installer):
}
})
- elastic_data = shared_utils.get_elastic_data(urlparse.urljoin(base_elastic_url, '/test_results/mongo2elastic'),
+ elastic_data = shared_utils.get_elastic_docs(urlparse.urljoin(base_elastic_url, '/test_results/mongo2elastic'),
es_creds, query_json)
pods_and_scenarios = {}
diff --git a/scripts/kibana_cleanup.py b/scripts/kibana_cleanup.py
index e8d452a..e699db4 100644
--- a/scripts/kibana_cleanup.py
+++ b/scripts/kibana_cleanup.py
@@ -13,7 +13,7 @@ logger.addHandler(file_handler)
def delete_all(url, es_creds):
- ids = shared_utils.get_elastic_data(url, es_creds, body=None, field='_id')
+ ids = shared_utils.get_elastic_docs(url, es_creds, body=None, field='_id')
for id in ids:
del_url = '/'.join([url, id])
shared_utils.delete_request(del_url, es_creds)
diff --git a/scripts/mongo_to_elasticsearch.py b/scripts/mongo_to_elasticsearch.py
index ea515bc..b722793 100644
--- a/scripts/mongo_to_elasticsearch.py
+++ b/scripts/mongo_to_elasticsearch.py
@@ -16,6 +16,27 @@ import shared_utils
logger = logger_utils.KibanaDashboardLogger('mongo2elastic').get
+parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
+parser.add_argument('-od', '--output-destination',
+ default='elasticsearch',
+ choices=('elasticsearch', 'stdout'),
+ help='defaults to elasticsearch')
+
+parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
+ help='get entries old at most N days from mongodb and'
+ ' parse those that are not already in elasticsearch.'
+ ' If not present, will get everything from mongodb, which is the default')
+
+parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
+ help='the url of elasticsearch, defaults to http://localhost:9200')
+
+parser.add_argument('-u', '--elasticsearch-username', default=None,
+ help='The username with password for elasticsearch in format username:password')
+
+args = parser.parse_args()
+
+tmp_docs_file = './mongo-{}.json'.format(uuid.uuid4())
+
def _fix_date(date_string):
if isinstance(date_string, dict):
@@ -24,7 +45,7 @@ def _fix_date(date_string):
return date_string[:-3].replace(' ', 'T') + 'Z'
-def verify_mongo_entry(testcase):
+def verify_document(testcase):
"""
Mandatory fields:
installer
@@ -109,12 +130,12 @@ def verify_mongo_entry(testcase):
return True
-def modify_mongo_entry(testcase):
+def format_document(testcase):
# 1. verify and identify the testcase
# 2. if modification is implemented, then use that
# 3. if not, try to use default
# 4. if 2 or 3 is successful, return True, otherwise return False
- if verify_mongo_entry(testcase):
+ if verify_document(testcase):
project = testcase['project_name']
case_name = testcase['case_name']
fmt = conf_utils.get_format(project, case_name)
@@ -123,103 +144,56 @@ def modify_mongo_entry(testcase):
logger.info("Processing %s/%s using format %s" % (project, case_name, fmt))
return vars(mongo2elastic_format)[fmt](testcase)
except Exception:
- logger.error("Fail in modify testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
+ logger.error("Fail in format testcase[%s]\nerror message: %s" % (testcase, traceback.format_exc()))
+ return False
else:
return False
-def publish_mongo_data(output_destination):
- tmp_filename = 'mongo-{}.log'.format(uuid.uuid4())
- try:
- subprocess.check_call(['mongoexport',
- '--db', 'test_results_collection',
- '-c', 'results',
- '--out', tmp_filename])
- with open(tmp_filename) as fobj:
- for mongo_json_line in fobj:
- test_result = json.loads(mongo_json_line)
- if modify_mongo_entry(test_result):
- status, data = shared_utils.publish_json(test_result, es_creds, output_destination)
- if status > 300:
- project = test_result['project_name']
- case_name = test_result['case_name']
- logger.info('project {} case {} publish failed, due to [{}]'
- .format(project, case_name, json.loads(data)['error']['reason']))
- finally:
- if os.path.exists(tmp_filename):
- os.remove(tmp_filename)
-
+def export_documents(days):
+ cmd = ['mongoexport', '--db', 'test_results_collection', '-c', 'results']
+ if days > 0:
+ past_time = datetime.datetime.today() - datetime.timedelta(days=days)
+ cmd += ['--query', '{{"start_date":{{$gt:"{}"}}}}'.format(past_time)]
+ cmd += [ '--out', '{}'.format(tmp_docs_file)]
-def get_mongo_data(days):
- past_time = datetime.datetime.today() - datetime.timedelta(days=days)
- mongo_json_lines = subprocess.check_output(['mongoexport', '--db', 'test_results_collection', '-c', 'results',
- '--query', '{{"start_date":{{$gt:"{}"}}}}'
- .format(past_time)]).splitlines()
-
- mongo_data = []
- for mongo_json_line in mongo_json_lines:
- test_result = json.loads(mongo_json_line)
- if modify_mongo_entry(test_result):
- # if the modification could be applied, append the modified result
- mongo_data.append(test_result)
- return mongo_data
+ try:
+ subprocess.check_call(cmd)
+ except Exception, err:
+ logger.error("export mongodb failed: %s" % err)
+ exit(-1)
-def publish_difference(mongo_data, elastic_data, output_destination, es_creds):
- for elastic_entry in elastic_data:
- if elastic_entry in mongo_data:
- mongo_data.remove(elastic_entry)
+def publish_document(document, es_creds, to):
+ status, data = shared_utils.publish_json(document, es_creds, to)
+ if status > 300:
+ logger.error('Publish record[{}] failed, due to [{}]'
+ .format(document, json.loads(data)['error']['reason']))
- logger.info('number of parsed test results: {}'.format(len(mongo_data)))
- for parsed_test_result in mongo_data:
- shared_utils.publish_json(parsed_test_result, es_creds, output_destination)
+def publish_nonexist_documents(elastic_docs, es_creds, to):
+ try:
+ with open(tmp_docs_file) as fdocs:
+ for doc_line in fdocs:
+ doc = json.loads(doc_line)
+ if format_document(doc) and doc not in elastic_docs:
+ publish_document(doc, es_creds, to)
+ finally:
+ fdocs.close()
+ if os.path.exists(tmp_docs_file):
+ os.remove(tmp_docs_file)
if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='Modify and filter mongo json data for elasticsearch')
- parser.add_argument('-od', '--output-destination',
- default='elasticsearch',
- choices=('elasticsearch', 'stdout'),
- help='defaults to elasticsearch')
-
- parser.add_argument('-ml', '--merge-latest', default=0, type=int, metavar='N',
- help='get entries old at most N days from mongodb and'
- ' parse those that are not already in elasticsearch.'
- ' If not present, will get everything from mongodb, which is the default')
-
- parser.add_argument('-e', '--elasticsearch-url', default='http://localhost:9200',
- help='the url of elasticsearch, defaults to http://localhost:9200')
-
- parser.add_argument('-u', '--elasticsearch-username', default=None,
- help='The username with password for elasticsearch in format username:password')
-
- args = parser.parse_args()
base_elastic_url = urlparse.urljoin(args.elasticsearch_url, '/test_results/mongo2elastic')
- output_destination = args.output_destination
+ to = args.output_destination
days = args.merge_latest
es_creds = args.elasticsearch_username
- if output_destination == 'elasticsearch':
- output_destination = base_elastic_url
-
- # parsed_test_results will be printed/sent to elasticsearch
- if days == 0:
- publish_mongo_data(output_destination)
- elif days > 0:
- body = '''{{
- "query" : {{
- "range" : {{
- "start_date" : {{
- "gte" : "now-{}d"
- }}
- }}
- }}
-}}'''.format(days)
- elastic_data = shared_utils.get_elastic_data(base_elastic_url, es_creds, body)
- logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_data)))
- mongo_data = get_mongo_data(days)
- publish_difference(mongo_data, elastic_data, output_destination, es_creds)
- else:
- raise Exception('Update must be non-negative')
+ if to == 'elasticsearch':
+ to = base_elastic_url
+ export_documents(days)
+ elastic_docs = shared_utils.get_elastic_docs_by_days(base_elastic_url, es_creds, days)
+ logger.info('number of hits in elasticsearch for now-{}d: {}'.format(days, len(elastic_docs)))
+ publish_nonexist_documents(elastic_docs, es_creds, to)
diff --git a/scripts/shared_utils.py b/scripts/shared_utils.py
index 15c1af8..aa8a65d 100644
--- a/scripts/shared_utils.py
+++ b/scripts/shared_utils.py
@@ -10,14 +10,14 @@ def delete_request(url, creds, body=None):
http.request('DELETE', url, headers=headers, body=body)
-def publish_json(json_ojb, creds, output_destination):
+def publish_json(json_ojb, creds, to):
json_dump = json.dumps(json_ojb)
- if output_destination == 'stdout':
+ if to == 'stdout':
print json_dump
return 200, None
else:
headers = urllib3.make_headers(basic_auth=creds)
- result = http.request('POST', output_destination, headers=headers, body=json_dump)
+ result = http.request('POST', to, headers=headers, body=json_dump)
return result.status, result.data
@@ -25,16 +25,39 @@ def _get_nr_of_hits(elastic_json):
return elastic_json['hits']['total']
-def get_elastic_data(elastic_url, creds, body, field='_source'):
+def get_elastic_docs(elastic_url, creds, body=None, field = '_source'):
+
# 1. get the number of results
headers = urllib3.make_headers(basic_auth=creds)
elastic_json = json.loads(http.request('GET', elastic_url + '/_search?size=0', headers=headers, body=body).data)
+ print elastic_json
nr_of_hits = _get_nr_of_hits(elastic_json)
# 2. get all results
elastic_json = json.loads(http.request('GET', elastic_url + '/_search?size={}'.format(nr_of_hits), headers=headers, body=body).data)
- elastic_data = []
+ elastic_docs = []
for hit in elastic_json['hits']['hits']:
- elastic_data.append(hit[field])
- return elastic_data
+ elastic_docs.append(hit[field])
+ return elastic_docs
+
+def get_elastic_docs_by_days(elastic_url, creds, days):
+ if days == 0:
+ body = '''{
+ "query": {
+ "match_all": {}
+ }
+ }'''
+ elif days > 0:
+ body = '''{{
+ "query" : {{
+ "range" : {{
+ "start_date" : {{
+ "gte" : "now-{}d"
+ }}
+ }}
+ }}
+ }}'''.format(days)
+ else:
+ raise Exception('Update days must be non-negative')
+ return get_elastic_docs(elastic_url, creds, body)