diff options
Diffstat (limited to 'docker/storperf-master/storperf/db/job_db.py')
-rw-r--r-- | docker/storperf-master/storperf/db/job_db.py | 259 |
1 files changed, 259 insertions, 0 deletions
diff --git a/docker/storperf-master/storperf/db/job_db.py b/docker/storperf-master/storperf/db/job_db.py new file mode 100644 index 0000000..3308fa8 --- /dev/null +++ b/docker/storperf-master/storperf/db/job_db.py @@ -0,0 +1,259 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import calendar +import json +import logging +from sqlite3 import OperationalError +import sqlite3 +from threading import Lock +import time +import uuid + + +db_mutex = Lock() + + +class JobDB(object): + + db_name = "StorPerfJob.db" + + def __init__(self): + """ + Creates the StorPerfJob.db and jobs tables on demand + """ + + self.logger = logging.getLogger(__name__) + self.logger.debug("Connecting to " + JobDB.db_name) + self.job_id = None + + with db_mutex: + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + try: + cursor.execute('''CREATE TABLE jobs + (job_id text, + workload text, + start text, + end text)''') + self.logger.debug("Created job table") + except OperationalError: + self.logger.debug("Job table exists") + + try: + cursor.execute('''CREATE TABLE job_params + (job_id text, + param text, + value text)''') + self.logger.debug("Created job_params table") + except OperationalError: + self.logger.debug("Job params table exists") + + try: + cursor.execute('''CREATE TABLE job_summary + (job_id text, + summary text)''') + self.logger.debug("Created job summary table") + except OperationalError: + self.logger.debug("Job summary table exists") + + cursor.execute('SELECT * FROM jobs') + cursor.execute('SELECT * FROM job_params') + db.commit() + db.close() + + def create_job_id(self): + """ + Returns a job id that is guaranteed to be unique in this + StorPerf instance. + """ + with db_mutex: + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + + self.job_id = str(uuid.uuid4()) + row = cursor.execute( + "select * from jobs where job_id = ?", (self.job_id,)) + + while (row.fetchone() is not None): + self.logger.info("Duplicate job id found, regenerating") + self.job_id = str(uuid.uuid4()) + row = cursor.execute( + "select * from jobs where job_id = ?", (self.job_id,)) + + cursor.execute( + "insert into jobs(job_id) values (?)", (self.job_id,)) + self.logger.debug("Reserved job id " + self.job_id) + db.commit() + db.close() + + def start_workload(self, workload): + """ + Records the start time for the given workload + """ + + workload_name = workload.fullname + + if (self.job_id is None): + self.create_job_id() + + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + + now = str(calendar.timegm(time.gmtime())) + + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (self.job_id, workload_name,)) + + if (row.fetchone() is None): + cursor.execute( + """insert into jobs + (job_id, + workload, + start) + values (?, ?, ?)""", + (self.job_id, + workload_name, + now,)) + else: + self.logger.warn("Duplicate start time for workload %s" + % workload_name) + cursor.execute( + """update jobs set + job_id = ?, + start = ? + where workload = ?""", + (self.job_id, + now, + workload_name,)) + + db.commit() + db.close() + + def end_workload(self, workload): + """ + Records the end time for the given workload + """ + if (self.job_id is None): + self.create_job_id() + + workload_name = workload.fullname + + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + now = str(calendar.timegm(time.gmtime())) + + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (self.job_id, workload_name,)) + + if (row.fetchone() is None): + self.logger.warn("No start time recorded for workload %s" + % workload_name) + cursor.execute( + """insert into jobs + (job_id, + workload, + start, + end) + values (?, ?, ?, ?)""", + (self.job_id, + workload_name, + now, + now)) + else: + cursor.execute( + """update jobs set + job_id = ?, + end = ? + where workload = ?""", + (self.job_id, + now, + workload_name,)) + + db.commit() + db.close() + + def fetch_workloads(self, workload): + workload_prefix = workload + "%" + workload_executions = [] + + with db_mutex: + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + cursor.execute("""select workload, start, end + from jobs where workload like ?""", + (workload_prefix,)) + + while (True): + row = cursor.fetchone() + if (row is None): + break + workload_execution = [row[0], row[1], row[2]] + workload_executions.append(workload_execution) + db.close() + + return workload_executions + + def record_workload_params(self, params): + """ + """ + if (self.job_id is None): + self.create_job_id() + + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + for param, value in params.iteritems(): + cursor.execute( + """insert into job_params + (job_id, + param, + value) + values (?, ?, ?)""", + (self.job_id, + param, + value,)) + db.commit() + db.close() + + def fetch_workload_params(self, job_id): + """ + """ + params = {} + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + + cursor.execute( + "select param, value from job_params where job_id = ?", + (job_id,)) + + while (True): + row = cursor.fetchone() + if (row is None): + break + try: + data = json.loads(row[1]) + except: + data = row[1] + params[row[0]] = data + db.close() + return params |