From eed2ece373aede00f43e2655dca451266d4e9c37 Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Wed, 24 Feb 2016 23:04:14 +0100 Subject: DB Access Mutex Addition of a mutex to prevent mutiple, simultaneous DB accesses as SQLite is single threaded Change-Id: Iad2cd94015f7fb604dc6b60636db939099f52757 JIRA: STORPERF-32 Signed-off-by: Mark Beierl --- storperf/db/configuration_db.py | 115 +++++++++-------- storperf/db/job_db.py | 264 +++++++++++++++++++++------------------- 2 files changed, 206 insertions(+), 173 deletions(-) diff --git a/storperf/db/configuration_db.py b/storperf/db/configuration_db.py index 649c186..700588d 100644 --- a/storperf/db/configuration_db.py +++ b/storperf/db/configuration_db.py @@ -8,74 +8,88 @@ ############################################################################## from _sqlite3 import OperationalError +from threading import Lock import logging import sqlite3 +db_mutex = Lock() + class ConfigurationDB(object): - db_name = "StorPerf.db" + db_name = "StorPerfConfig.db" def __init__(self): """ - Creates the StorPerf.db and configuration tables on demand + Creates the StorPerfConfig.db and configuration tables on demand """ self.logger = logging.getLogger(__name__) self.logger.debug("Connecting to " + ConfigurationDB.db_name) - db = sqlite3.connect(ConfigurationDB.db_name) - - cursor = db.cursor() - try: - cursor.execute('''CREATE TABLE configuration - (configuration_name text, - key text, - value text)''') - self.logger.debug("Created configuration table") - except OperationalError: - self.logger.debug("Configuration table exists") - - cursor.execute('SELECT * FROM configuration') + with db_mutex: + db = sqlite3.connect(ConfigurationDB.db_name) + + cursor = db.cursor() + try: + cursor.execute('''CREATE TABLE configuration + (configuration_name text, + key text, + value text)''') + self.logger.debug("Created configuration table") + except OperationalError: + self.logger.debug("Configuration table exists") + + cursor.execute('SELECT * FROM configuration') + db.commit() + db.close() def delete_configuration_value(self, configuration_name, key): """Deletes the value associated with the given key """ - db = sqlite3.connect(ConfigurationDB.db_name) - cursor = db.cursor() + with db_mutex: + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() - cursor.execute( - "delete from configuration where configuration_name=? and key=?", - (configuration_name, key)) + cursor.execute( + "delete from configuration where configuration_name=? and key=?", + (configuration_name, key)) - self.logger.debug("Deleted " + configuration_name + ":" + key) + self.logger.debug("Deleted " + configuration_name + ":" + key) - db.commit() + db.commit() + db.close() def get_configuration_value(self, configuration_name, key): """Returns a string representation of the value stored with this key under the given configuration name. """ - db = sqlite3.connect(ConfigurationDB.db_name) - cursor = db.cursor() + with db_mutex: + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() - cursor.execute( - """select value from configuration - where configuration_name = ? - and key = ?""", - (configuration_name, key,)) + cursor.execute( + """select value from configuration + where configuration_name = ? + and key = ?""", + (configuration_name, key,)) - row = cursor.fetchone() + row = cursor.fetchone() - if (row is None): - self.logger.debug( - configuration_name + ":" + key + " does not exist") - return None - else: - self.logger.debug( - configuration_name + ":" + key + " is " + str(row[0])) - return str(row[0]) + return_value = None + + if (row is None): + self.logger.debug( + configuration_name + ":" + key + " does not exist") + else: + self.logger.debug( + configuration_name + ":" + key + " is " + str(row[0])) + return_value = str(row[0]) + + db.close() + + return return_value def set_configuration_value(self, configuration_name, key, value): """Updates or creates the key under the given configuration @@ -85,19 +99,22 @@ class ConfigurationDB(object): if (value is None): return self.delete_configuration_value(configuration_name, key) - value = str(value) + with db_mutex: + value = str(value) - db = sqlite3.connect(ConfigurationDB.db_name) - cursor = db.cursor() + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() - cursor.execute( - "delete from configuration where configuration_name=? and key=?", - (configuration_name, key)) + cursor.execute( + "delete from configuration where configuration_name=? and key=?", + (configuration_name, key)) - cursor.execute( - """insert into configuration(configuration_name, key, value) - values (?,?,?)""", (configuration_name, key, value)) + cursor.execute( + """insert into configuration(configuration_name, key, value) + values (?,?,?)""", (configuration_name, key, value)) - self.logger.debug(configuration_name + ":" + key + " set to " + value) + self.logger.debug( + configuration_name + ":" + key + " set to " + value) - db.commit() + db.commit() + db.close() diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py index bec8d3f..f24ccf4 100644 --- a/storperf/db/job_db.py +++ b/storperf/db/job_db.py @@ -8,6 +8,7 @@ ############################################################################## from _sqlite3 import OperationalError +from threading import Lock import calendar import logging import sqlite3 @@ -17,140 +18,152 @@ import uuid import requests +db_mutex = Lock() + + class JobDB(object): - db_name = "StorPerf.db" + db_name = "StorPerfJob.db" def __init__(self): """ - Creates the StorPerf.db and jobs tables on demand + Creates the StorPerfJob.db and jobs tables on demand """ self.logger = logging.getLogger(__name__) self.logger.debug("Connecting to " + JobDB.db_name) self.job_id = None - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - try: - cursor.execute('''CREATE TABLE jobs - (job_id text, - workload text, - start text, - end text)''') - self.logger.debug("Created job table") - except OperationalError: - self.logger.debug("Job table exists") - - cursor.execute('SELECT * FROM jobs') + with db_mutex: + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + try: + cursor.execute('''CREATE TABLE jobs + (job_id text, + workload text, + start text, + end text)''') + self.logger.debug("Created job table") + except OperationalError: + self.logger.debug("Job table exists") + + cursor.execute('SELECT * FROM jobs') + db.commit() + db.close() def create_job_id(self): """ Returns a job id that is guaranteed to be unique in this StorPerf instance. """ - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() + with db_mutex: + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() - self.job_id = str(uuid.uuid4()) - row = cursor.execute( - "select * from jobs where job_id = ?", (self.job_id,)) - - while (row.fetchone() is not None): - self.logger.info("Duplicate job id found, regenerating") self.job_id = str(uuid.uuid4()) row = cursor.execute( "select * from jobs where job_id = ?", (self.job_id,)) - cursor.execute( - "insert into jobs(job_id) values (?)", (self.job_id,)) - self.logger.debug("Reserved job id " + self.job_id) - db.commit() + while (row.fetchone() is not None): + self.logger.info("Duplicate job id found, regenerating") + self.job_id = str(uuid.uuid4()) + row = cursor.execute( + "select * from jobs where job_id = ?", (self.job_id,)) + + cursor.execute( + "insert into jobs(job_id) values (?)", (self.job_id,)) + self.logger.debug("Reserved job id " + self.job_id) + db.commit() + db.close() def start_workload(self, workload_name): """ Records the start time for the given workload """ - if (self.job_id is None): - self.create_job_id() - - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - - now = str(calendar.timegm(time.gmtime())) + with db_mutex: + if (self.job_id is None): + self.create_job_id() - row = cursor.execute( - """select * from jobs - where job_id = ? - and workload = ?""", - (self.job_id, workload_name,)) + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() - if (row.fetchone() is None): - cursor.execute( - """insert into jobs - (job_id, - workload, - start) - values (?, ?, ?)""", - (self.job_id, - workload_name, - now,)) - else: - self.logger.warn("Duplicate start time for workload " - + workload_name) - cursor.execute( - """update jobs set - job_id = ?, - start = ? - where workload = ?""", - (self.job_id, - now, - workload_name,)) + now = str(calendar.timegm(time.gmtime())) - db.commit() + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (self.job_id, workload_name,)) + + if (row.fetchone() is None): + cursor.execute( + """insert into jobs + (job_id, + workload, + start) + values (?, ?, ?)""", + (self.job_id, + workload_name, + now,)) + else: + self.logger.warn("Duplicate start time for workload " + + workload_name) + cursor.execute( + """update jobs set + job_id = ?, + start = ? + where workload = ?""", + (self.job_id, + now, + workload_name,)) + + db.commit() + db.close() def end_workload(self, workload_name): """ Records the end time for the given workload """ - if (self.job_id is None): - self.create_job_id() - - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - now = str(calendar.timegm(time.gmtime())) - - row = cursor.execute( - """select * from jobs - where job_id = ? - and workload = ?""", - (self.job_id, workload_name,)) - - if (row.fetchone() is None): - self.logger.warn("No start time recorded for workload " - + workload_name) - cursor.execute( - """insert into jobs - (job_id, - workload, - start, - end) - values (?, ?, ?, ?)""", - (self.job_id, - workload_name, - now, - now)) - else: - cursor.execute( - """update jobs set - job_id = ?, - end = ? - where workload = ?""", - (self.job_id, - now, - workload_name,)) + with db_mutex: + if (self.job_id is None): + self.create_job_id() - db.commit() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + now = str(calendar.timegm(time.gmtime())) + + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (self.job_id, workload_name,)) + + if (row.fetchone() is None): + self.logger.warn("No start time recorded for workload " + + workload_name) + cursor.execute( + """insert into jobs + (job_id, + workload, + start, + end) + values (?, ?, ?, ?)""", + (self.job_id, + workload_name, + now, + now)) + else: + cursor.execute( + """update jobs set + job_id = ?, + end = ? + where workload = ?""", + (self.job_id, + now, + workload_name,)) + + db.commit() + db.close() def fetch_results(self, workload_prefix=""): if (workload_prefix is None): @@ -165,36 +178,39 @@ class JobDB(object): self.logger.debug("Workload like: " + workload_prefix) - db = sqlite3.connect(JobDB.db_name) - cursor = db.cursor() - cursor.execute("""select start, end, workload - from jobs where workload like ?""", - (workload_prefix,)) + with db_mutex: - while (True): - row = cursor.fetchone() - if (row is None): - break + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + cursor.execute("""select start, end, workload + from jobs where workload like ?""", + (workload_prefix,)) - start_time = str(row[0]) - end_time = str(row[1]) - workload = str(row[2]) + while (True): + row = cursor.fetchone() + if (row is None): + break - # for most of these stats, we just want the final one - # as that is cumulative average or whatever for the whole - # run + start_time = str(row[0]) + end_time = str(row[1]) + workload = str(row[2]) - self.logger.info("workload=" + workload + - "start=" + start_time + " end=" + end_time) + # for most of these stats, we just want the final one + # as that is cumulative average or whatever for the whole + # run - request = 'http://127.0.0.1:8000/render/?target=*.' + self.job_id + \ - '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \ - start_time + "&until=" + end_time + self.logger.info("workload=" + workload + + "start=" + start_time + " end=" + end_time) - response = requests.get(request) + request = 'http://127.0.0.1:8000/render/?target=*.' + self.job_id + \ + '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \ + start_time + "&until=" + end_time - if (response.status_code == 200): - data = response.json() - print data - else: - pass + response = requests.get(request) + + if (response.status_code == 200): + data = response.json() + print data + else: + pass + db.close() -- cgit 1.2.3-korg