summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@emc.com>2016-02-24 23:04:14 +0100
committerMark Beierl <mark.beierl@emc.com>2016-02-24 23:04:14 +0100
commiteed2ece373aede00f43e2655dca451266d4e9c37 (patch)
treeccac31e435c1268706c1e46a702f08b1fa83b59b
parentfd4b99b086583bc8dc91950393ffed646f7eee59 (diff)
DB Access Mutex
Addition of a mutex to prevent mutiple, simultaneous DB accesses as SQLite is single threaded Change-Id: Iad2cd94015f7fb604dc6b60636db939099f52757 JIRA: STORPERF-32 Signed-off-by: Mark Beierl <mark.beierl@emc.com>
-rw-r--r--storperf/db/configuration_db.py115
-rw-r--r--storperf/db/job_db.py264
2 files changed, 206 insertions, 173 deletions
diff --git a/storperf/db/configuration_db.py b/storperf/db/configuration_db.py
index 649c186..700588d 100644
--- a/storperf/db/configuration_db.py
+++ b/storperf/db/configuration_db.py
@@ -8,74 +8,88 @@
##############################################################################
from _sqlite3 import OperationalError
+from threading import Lock
import logging
import sqlite3
+db_mutex = Lock()
+
class ConfigurationDB(object):
- db_name = "StorPerf.db"
+ db_name = "StorPerfConfig.db"
def __init__(self):
"""
- Creates the StorPerf.db and configuration tables on demand
+ Creates the StorPerfConfig.db and configuration tables on demand
"""
self.logger = logging.getLogger(__name__)
self.logger.debug("Connecting to " + ConfigurationDB.db_name)
- db = sqlite3.connect(ConfigurationDB.db_name)
-
- cursor = db.cursor()
- try:
- cursor.execute('''CREATE TABLE configuration
- (configuration_name text,
- key text,
- value text)''')
- self.logger.debug("Created configuration table")
- except OperationalError:
- self.logger.debug("Configuration table exists")
-
- cursor.execute('SELECT * FROM configuration')
+ with db_mutex:
+ db = sqlite3.connect(ConfigurationDB.db_name)
+
+ cursor = db.cursor()
+ try:
+ cursor.execute('''CREATE TABLE configuration
+ (configuration_name text,
+ key text,
+ value text)''')
+ self.logger.debug("Created configuration table")
+ except OperationalError:
+ self.logger.debug("Configuration table exists")
+
+ cursor.execute('SELECT * FROM configuration')
+ db.commit()
+ db.close()
def delete_configuration_value(self, configuration_name, key):
"""Deletes the value associated with the given key
"""
- db = sqlite3.connect(ConfigurationDB.db_name)
- cursor = db.cursor()
+ with db_mutex:
+ db = sqlite3.connect(ConfigurationDB.db_name)
+ cursor = db.cursor()
- cursor.execute(
- "delete from configuration where configuration_name=? and key=?",
- (configuration_name, key))
+ cursor.execute(
+ "delete from configuration where configuration_name=? and key=?",
+ (configuration_name, key))
- self.logger.debug("Deleted " + configuration_name + ":" + key)
+ self.logger.debug("Deleted " + configuration_name + ":" + key)
- db.commit()
+ db.commit()
+ db.close()
def get_configuration_value(self, configuration_name, key):
"""Returns a string representation of the value stored
with this key under the given configuration name.
"""
- db = sqlite3.connect(ConfigurationDB.db_name)
- cursor = db.cursor()
+ with db_mutex:
+ db = sqlite3.connect(ConfigurationDB.db_name)
+ cursor = db.cursor()
- cursor.execute(
- """select value from configuration
- where configuration_name = ?
- and key = ?""",
- (configuration_name, key,))
+ cursor.execute(
+ """select value from configuration
+ where configuration_name = ?
+ and key = ?""",
+ (configuration_name, key,))
- row = cursor.fetchone()
+ row = cursor.fetchone()
- if (row is None):
- self.logger.debug(
- configuration_name + ":" + key + " does not exist")
- return None
- else:
- self.logger.debug(
- configuration_name + ":" + key + " is " + str(row[0]))
- return str(row[0])
+ return_value = None
+
+ if (row is None):
+ self.logger.debug(
+ configuration_name + ":" + key + " does not exist")
+ else:
+ self.logger.debug(
+ configuration_name + ":" + key + " is " + str(row[0]))
+ return_value = str(row[0])
+
+ db.close()
+
+ return return_value
def set_configuration_value(self, configuration_name, key, value):
"""Updates or creates the key under the given configuration
@@ -85,19 +99,22 @@ class ConfigurationDB(object):
if (value is None):
return self.delete_configuration_value(configuration_name, key)
- value = str(value)
+ with db_mutex:
+ value = str(value)
- db = sqlite3.connect(ConfigurationDB.db_name)
- cursor = db.cursor()
+ db = sqlite3.connect(ConfigurationDB.db_name)
+ cursor = db.cursor()
- cursor.execute(
- "delete from configuration where configuration_name=? and key=?",
- (configuration_name, key))
+ cursor.execute(
+ "delete from configuration where configuration_name=? and key=?",
+ (configuration_name, key))
- cursor.execute(
- """insert into configuration(configuration_name, key, value)
- values (?,?,?)""", (configuration_name, key, value))
+ cursor.execute(
+ """insert into configuration(configuration_name, key, value)
+ values (?,?,?)""", (configuration_name, key, value))
- self.logger.debug(configuration_name + ":" + key + " set to " + value)
+ self.logger.debug(
+ configuration_name + ":" + key + " set to " + value)
- db.commit()
+ db.commit()
+ db.close()
diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py
index bec8d3f..f24ccf4 100644
--- a/storperf/db/job_db.py
+++ b/storperf/db/job_db.py
@@ -8,6 +8,7 @@
##############################################################################
from _sqlite3 import OperationalError
+from threading import Lock
import calendar
import logging
import sqlite3
@@ -17,140 +18,152 @@ import uuid
import requests
+db_mutex = Lock()
+
+
class JobDB(object):
- db_name = "StorPerf.db"
+ db_name = "StorPerfJob.db"
def __init__(self):
"""
- Creates the StorPerf.db and jobs tables on demand
+ Creates the StorPerfJob.db and jobs tables on demand
"""
self.logger = logging.getLogger(__name__)
self.logger.debug("Connecting to " + JobDB.db_name)
self.job_id = None
- db = sqlite3.connect(JobDB.db_name)
- cursor = db.cursor()
- try:
- cursor.execute('''CREATE TABLE jobs
- (job_id text,
- workload text,
- start text,
- end text)''')
- self.logger.debug("Created job table")
- except OperationalError:
- self.logger.debug("Job table exists")
-
- cursor.execute('SELECT * FROM jobs')
+ with db_mutex:
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+ try:
+ cursor.execute('''CREATE TABLE jobs
+ (job_id text,
+ workload text,
+ start text,
+ end text)''')
+ self.logger.debug("Created job table")
+ except OperationalError:
+ self.logger.debug("Job table exists")
+
+ cursor.execute('SELECT * FROM jobs')
+ db.commit()
+ db.close()
def create_job_id(self):
"""
Returns a job id that is guaranteed to be unique in this
StorPerf instance.
"""
- db = sqlite3.connect(JobDB.db_name)
- cursor = db.cursor()
+ with db_mutex:
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
- self.job_id = str(uuid.uuid4())
- row = cursor.execute(
- "select * from jobs where job_id = ?", (self.job_id,))
-
- while (row.fetchone() is not None):
- self.logger.info("Duplicate job id found, regenerating")
self.job_id = str(uuid.uuid4())
row = cursor.execute(
"select * from jobs where job_id = ?", (self.job_id,))
- cursor.execute(
- "insert into jobs(job_id) values (?)", (self.job_id,))
- self.logger.debug("Reserved job id " + self.job_id)
- db.commit()
+ while (row.fetchone() is not None):
+ self.logger.info("Duplicate job id found, regenerating")
+ self.job_id = str(uuid.uuid4())
+ row = cursor.execute(
+ "select * from jobs where job_id = ?", (self.job_id,))
+
+ cursor.execute(
+ "insert into jobs(job_id) values (?)", (self.job_id,))
+ self.logger.debug("Reserved job id " + self.job_id)
+ db.commit()
+ db.close()
def start_workload(self, workload_name):
"""
Records the start time for the given workload
"""
- if (self.job_id is None):
- self.create_job_id()
-
- db = sqlite3.connect(JobDB.db_name)
- cursor = db.cursor()
-
- now = str(calendar.timegm(time.gmtime()))
+ with db_mutex:
+ if (self.job_id is None):
+ self.create_job_id()
- row = cursor.execute(
- """select * from jobs
- where job_id = ?
- and workload = ?""",
- (self.job_id, workload_name,))
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
- if (row.fetchone() is None):
- cursor.execute(
- """insert into jobs
- (job_id,
- workload,
- start)
- values (?, ?, ?)""",
- (self.job_id,
- workload_name,
- now,))
- else:
- self.logger.warn("Duplicate start time for workload "
- + workload_name)
- cursor.execute(
- """update jobs set
- job_id = ?,
- start = ?
- where workload = ?""",
- (self.job_id,
- now,
- workload_name,))
+ now = str(calendar.timegm(time.gmtime()))
- db.commit()
+ row = cursor.execute(
+ """select * from jobs
+ where job_id = ?
+ and workload = ?""",
+ (self.job_id, workload_name,))
+
+ if (row.fetchone() is None):
+ cursor.execute(
+ """insert into jobs
+ (job_id,
+ workload,
+ start)
+ values (?, ?, ?)""",
+ (self.job_id,
+ workload_name,
+ now,))
+ else:
+ self.logger.warn("Duplicate start time for workload "
+ + workload_name)
+ cursor.execute(
+ """update jobs set
+ job_id = ?,
+ start = ?
+ where workload = ?""",
+ (self.job_id,
+ now,
+ workload_name,))
+
+ db.commit()
+ db.close()
def end_workload(self, workload_name):
"""
Records the end time for the given workload
"""
- if (self.job_id is None):
- self.create_job_id()
-
- db = sqlite3.connect(JobDB.db_name)
- cursor = db.cursor()
- now = str(calendar.timegm(time.gmtime()))
-
- row = cursor.execute(
- """select * from jobs
- where job_id = ?
- and workload = ?""",
- (self.job_id, workload_name,))
-
- if (row.fetchone() is None):
- self.logger.warn("No start time recorded for workload "
- + workload_name)
- cursor.execute(
- """insert into jobs
- (job_id,
- workload,
- start,
- end)
- values (?, ?, ?, ?)""",
- (self.job_id,
- workload_name,
- now,
- now))
- else:
- cursor.execute(
- """update jobs set
- job_id = ?,
- end = ?
- where workload = ?""",
- (self.job_id,
- now,
- workload_name,))
+ with db_mutex:
+ if (self.job_id is None):
+ self.create_job_id()
- db.commit()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+ now = str(calendar.timegm(time.gmtime()))
+
+ row = cursor.execute(
+ """select * from jobs
+ where job_id = ?
+ and workload = ?""",
+ (self.job_id, workload_name,))
+
+ if (row.fetchone() is None):
+ self.logger.warn("No start time recorded for workload "
+ + workload_name)
+ cursor.execute(
+ """insert into jobs
+ (job_id,
+ workload,
+ start,
+ end)
+ values (?, ?, ?, ?)""",
+ (self.job_id,
+ workload_name,
+ now,
+ now))
+ else:
+ cursor.execute(
+ """update jobs set
+ job_id = ?,
+ end = ?
+ where workload = ?""",
+ (self.job_id,
+ now,
+ workload_name,))
+
+ db.commit()
+ db.close()
def fetch_results(self, workload_prefix=""):
if (workload_prefix is None):
@@ -165,36 +178,39 @@ class JobDB(object):
self.logger.debug("Workload like: " + workload_prefix)
- db = sqlite3.connect(JobDB.db_name)
- cursor = db.cursor()
- cursor.execute("""select start, end, workload
- from jobs where workload like ?""",
- (workload_prefix,))
+ with db_mutex:
- while (True):
- row = cursor.fetchone()
- if (row is None):
- break
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+ cursor.execute("""select start, end, workload
+ from jobs where workload like ?""",
+ (workload_prefix,))
- start_time = str(row[0])
- end_time = str(row[1])
- workload = str(row[2])
+ while (True):
+ row = cursor.fetchone()
+ if (row is None):
+ break
- # for most of these stats, we just want the final one
- # as that is cumulative average or whatever for the whole
- # run
+ start_time = str(row[0])
+ end_time = str(row[1])
+ workload = str(row[2])
- self.logger.info("workload=" + workload +
- "start=" + start_time + " end=" + end_time)
+ # for most of these stats, we just want the final one
+ # as that is cumulative average or whatever for the whole
+ # run
- request = 'http://127.0.0.1:8000/render/?target=*.' + self.job_id + \
- '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \
- start_time + "&until=" + end_time
+ self.logger.info("workload=" + workload +
+ "start=" + start_time + " end=" + end_time)
- response = requests.get(request)
+ request = 'http://127.0.0.1:8000/render/?target=*.' + self.job_id + \
+ '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \
+ start_time + "&until=" + end_time
- if (response.status_code == 200):
- data = response.json()
- print data
- else:
- pass
+ response = requests.get(request)
+
+ if (response.status_code == 200):
+ data = response.json()
+ print data
+ else:
+ pass
+ db.close()