diff options
Diffstat (limited to 'storperf/db/job_db.py')
-rw-r--r-- | storperf/db/job_db.py | 259 |
1 files changed, 0 insertions, 259 deletions
diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py deleted file mode 100644 index 3308fa8..0000000 --- a/storperf/db/job_db.py +++ /dev/null @@ -1,259 +0,0 @@ -############################################################################## -# Copyright (c) 2015 EMC and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - -import calendar -import json -import logging -from sqlite3 import OperationalError -import sqlite3 -from threading import Lock -import time -import uuid - - -db_mutex = Lock() - - -class JobDB(object): - - db_name = "StorPerfJob.db" - - def __init__(self): - """ - Creates the StorPerfJob.db and jobs tables on demand - """ - - self.logger = logging.getLogger(__name__) - self.logger.debug("Connecting to " + JobDB.db_name) - self.job_id = None - - with db_mutex: - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - try: - cursor.execute('''CREATE TABLE jobs - (job_id text, - workload text, - start text, - end text)''') - self.logger.debug("Created job table") - except OperationalError: - self.logger.debug("Job table exists") - - try: - cursor.execute('''CREATE TABLE job_params - (job_id text, - param text, - value text)''') - self.logger.debug("Created job_params table") - except OperationalError: - self.logger.debug("Job params table exists") - - try: - cursor.execute('''CREATE TABLE job_summary - (job_id text, - summary text)''') - self.logger.debug("Created job summary table") - except OperationalError: - self.logger.debug("Job summary table exists") - - cursor.execute('SELECT * FROM jobs') - cursor.execute('SELECT * FROM job_params') - db.commit() - db.close() - - def create_job_id(self): - """ - Returns a job id that is guaranteed to be unique in this - StorPerf instance. - """ - with db_mutex: - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - - self.job_id = str(uuid.uuid4()) - row = cursor.execute( - "select * from jobs where job_id = ?", (self.job_id,)) - - while (row.fetchone() is not None): - self.logger.info("Duplicate job id found, regenerating") - self.job_id = str(uuid.uuid4()) - row = cursor.execute( - "select * from jobs where job_id = ?", (self.job_id,)) - - cursor.execute( - "insert into jobs(job_id) values (?)", (self.job_id,)) - self.logger.debug("Reserved job id " + self.job_id) - db.commit() - db.close() - - def start_workload(self, workload): - """ - Records the start time for the given workload - """ - - workload_name = workload.fullname - - if (self.job_id is None): - self.create_job_id() - - with db_mutex: - - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - - now = str(calendar.timegm(time.gmtime())) - - row = cursor.execute( - """select * from jobs - where job_id = ? - and workload = ?""", - (self.job_id, workload_name,)) - - if (row.fetchone() is None): - cursor.execute( - """insert into jobs - (job_id, - workload, - start) - values (?, ?, ?)""", - (self.job_id, - workload_name, - now,)) - else: - self.logger.warn("Duplicate start time for workload %s" - % workload_name) - cursor.execute( - """update jobs set - job_id = ?, - start = ? - where workload = ?""", - (self.job_id, - now, - workload_name,)) - - db.commit() - db.close() - - def end_workload(self, workload): - """ - Records the end time for the given workload - """ - if (self.job_id is None): - self.create_job_id() - - workload_name = workload.fullname - - with db_mutex: - - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - now = str(calendar.timegm(time.gmtime())) - - row = cursor.execute( - """select * from jobs - where job_id = ? - and workload = ?""", - (self.job_id, workload_name,)) - - if (row.fetchone() is None): - self.logger.warn("No start time recorded for workload %s" - % workload_name) - cursor.execute( - """insert into jobs - (job_id, - workload, - start, - end) - values (?, ?, ?, ?)""", - (self.job_id, - workload_name, - now, - now)) - else: - cursor.execute( - """update jobs set - job_id = ?, - end = ? - where workload = ?""", - (self.job_id, - now, - workload_name,)) - - db.commit() - db.close() - - def fetch_workloads(self, workload): - workload_prefix = workload + "%" - workload_executions = [] - - with db_mutex: - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - cursor.execute("""select workload, start, end - from jobs where workload like ?""", - (workload_prefix,)) - - while (True): - row = cursor.fetchone() - if (row is None): - break - workload_execution = [row[0], row[1], row[2]] - workload_executions.append(workload_execution) - db.close() - - return workload_executions - - def record_workload_params(self, params): - """ - """ - if (self.job_id is None): - self.create_job_id() - - with db_mutex: - - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - for param, value in params.iteritems(): - cursor.execute( - """insert into job_params - (job_id, - param, - value) - values (?, ?, ?)""", - (self.job_id, - param, - value,)) - db.commit() - db.close() - - def fetch_workload_params(self, job_id): - """ - """ - params = {} - with db_mutex: - - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - - cursor.execute( - "select param, value from job_params where job_id = ?", - (job_id,)) - - while (True): - row = cursor.fetchone() - if (row is None): - break - try: - data = json.loads(row[1]) - except: - data = row[1] - params[row[0]] = data - db.close() - return params |