summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf
diff options
context:
space:
mode:
authormbeierl <mark.beierl@dell.com>2017-07-11 15:12:35 -0400
committermbeierl <mark.beierl@dell.com>2017-07-11 15:47:46 -0400
commit7602a54309adbe5c5346ee6befecc2e596976504 (patch)
tree60f15026780db30b0b8842ba1a1e2cc021e22625 /docker/storperf-master/storperf
parentfc09b37e95c19f820ec60db19d98c0dc3d670829 (diff)
Change all paths
Changes the paths of all source code so that it exists under the dockerfile location for each container. This way we can use COPY instead of git clone, as well as use the existing JJB. Change-Id: I883b2957d89659c164fff0a1ebc4d677c534796d JIRA: STORPERF-188 Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker/storperf-master/storperf')
-rw-r--r--docker/storperf-master/storperf/__init__.py8
-rw-r--r--docker/storperf-master/storperf/carbon/__init__.py8
-rw-r--r--docker/storperf-master/storperf/carbon/converter.py57
-rw-r--r--docker/storperf-master/storperf/carbon/emitter.py38
-rw-r--r--docker/storperf-master/storperf/db/__init__.py8
-rw-r--r--docker/storperf-master/storperf/db/configuration_db.py120
-rw-r--r--docker/storperf-master/storperf/db/graphite_db.py63
-rw-r--r--docker/storperf-master/storperf/db/job_db.py259
-rw-r--r--docker/storperf-master/storperf/db/test_results_db.py61
-rw-r--r--docker/storperf-master/storperf/fio/__init__.py8
-rw-r--r--docker/storperf-master/storperf/fio/fio_invoker.py153
-rw-r--r--docker/storperf-master/storperf/logging.json51
-rw-r--r--docker/storperf-master/storperf/resources/hot/agent-group.yaml106
-rw-r--r--docker/storperf-master/storperf/resources/hot/storperf-agent.yaml99
-rw-r--r--docker/storperf-master/storperf/resources/ssh/storperf_rsa27
-rw-r--r--docker/storperf-master/storperf/resources/ssh/storperf_rsa.pub1
-rw-r--r--docker/storperf-master/storperf/storperf_master.py448
-rw-r--r--docker/storperf-master/storperf/test_executor.py326
-rw-r--r--docker/storperf-master/storperf/utilities/__init__.py8
-rw-r--r--docker/storperf-master/storperf/utilities/data_handler.py205
-rw-r--r--docker/storperf-master/storperf/utilities/data_treatment.py39
-rw-r--r--docker/storperf-master/storperf/utilities/dictionary.py15
-rw-r--r--docker/storperf-master/storperf/utilities/math.py116
-rw-r--r--docker/storperf-master/storperf/utilities/steady_state.py54
-rw-r--r--docker/storperf-master/storperf/utilities/thread_gate.py67
-rw-r--r--docker/storperf-master/storperf/workloads/__init__.py8
-rw-r--r--docker/storperf-master/storperf/workloads/_base_workload.py82
-rw-r--r--docker/storperf-master/storperf/workloads/_ssd_preconditioning.py17
-rw-r--r--docker/storperf-master/storperf/workloads/_warm_up.py17
-rw-r--r--docker/storperf-master/storperf/workloads/rr.py16
-rw-r--r--docker/storperf-master/storperf/workloads/rs.py16
-rw-r--r--docker/storperf-master/storperf/workloads/rw.py18
-rw-r--r--docker/storperf-master/storperf/workloads/wr.py16
-rw-r--r--docker/storperf-master/storperf/workloads/ws.py16
34 files changed, 2551 insertions, 0 deletions
diff --git a/docker/storperf-master/storperf/__init__.py b/docker/storperf-master/storperf/__init__.py
new file mode 100644
index 0000000..73334c7
--- /dev/null
+++ b/docker/storperf-master/storperf/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/docker/storperf-master/storperf/carbon/__init__.py b/docker/storperf-master/storperf/carbon/__init__.py
new file mode 100644
index 0000000..73334c7
--- /dev/null
+++ b/docker/storperf-master/storperf/carbon/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/docker/storperf-master/storperf/carbon/converter.py b/docker/storperf-master/storperf/carbon/converter.py
new file mode 100644
index 0000000..623c144
--- /dev/null
+++ b/docker/storperf-master/storperf/carbon/converter.py
@@ -0,0 +1,57 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import calendar
+import logging
+import time
+
+
+class Converter(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ def convert_json_to_flat(self, json_object, prefix=None):
+ # Use the timestamp reported by fio, or current time if
+ # not present.
+ if 'timestamp' in json_object:
+ timestamp = str(json_object.pop('timestamp'))
+ else:
+ timestamp = str(calendar.timegm(time.gmtime()))
+
+ self.flat_dictionary = {}
+ self.flat_dictionary['timestamp'] = timestamp
+
+ self.resurse_to_flat_dictionary(json_object, prefix)
+ return self.flat_dictionary
+
+ def resurse_to_flat_dictionary(self, json, prefix=None):
+ if type(json) == dict:
+ for k, v in json.items():
+ if prefix is None:
+ key = k.decode("utf-8").replace(" ", "_")
+ else:
+ key = prefix + "." + k.decode("utf-8").replace(" ", "_")
+ if hasattr(v, '__iter__'):
+ self.resurse_to_flat_dictionary(v, key)
+ else:
+ self.flat_dictionary[key] = str(v).replace(" ", "_")
+ elif type(json) == list:
+ index = 0
+ for v in json:
+ index += 1
+ if hasattr(v, '__iter__'):
+ self.resurse_to_flat_dictionary(
+ v, prefix + "." + str(index))
+ else:
+ if prefix is None:
+ self.flat_dictionary[index] = str(v).replace(" ", "_")
+ + " " + self.timestamp
+ else:
+ key = prefix + "." + index
+ self.flat_dictionary[key] = str(v).replace(" ", "_")
diff --git a/docker/storperf-master/storperf/carbon/emitter.py b/docker/storperf-master/storperf/carbon/emitter.py
new file mode 100644
index 0000000..e23dc79
--- /dev/null
+++ b/docker/storperf-master/storperf/carbon/emitter.py
@@ -0,0 +1,38 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import calendar
+import logging
+import socket
+import time
+
+
+class CarbonMetricTransmitter():
+
+ carbon_host = '127.0.0.1'
+ carbon_port = 2003
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ def transmit_metrics(self, metrics):
+ if 'timestamp' in metrics:
+ metrics.pop('timestamp')
+ timestamp = str(calendar.timegm(time.gmtime()))
+
+ carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ carbon_socket.connect((self.carbon_host, self.carbon_port))
+
+ for key, metric in metrics.items():
+ message = key + " " + metric + " " + timestamp
+ self.logger.debug("Metric: " + message)
+ carbon_socket.send(message + '\n')
+
+ carbon_socket.close()
+ self.logger.info("Sent metrics to carbon with timestamp %s"
+ % timestamp)
diff --git a/docker/storperf-master/storperf/db/__init__.py b/docker/storperf-master/storperf/db/__init__.py
new file mode 100644
index 0000000..73334c7
--- /dev/null
+++ b/docker/storperf-master/storperf/db/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/docker/storperf-master/storperf/db/configuration_db.py b/docker/storperf-master/storperf/db/configuration_db.py
new file mode 100644
index 0000000..5b996c7
--- /dev/null
+++ b/docker/storperf-master/storperf/db/configuration_db.py
@@ -0,0 +1,120 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from sqlite3 import OperationalError
+from threading import Lock
+import logging
+import sqlite3
+
+db_mutex = Lock()
+
+
+class ConfigurationDB(object):
+
+ db_name = "StorPerfConfig.db"
+
+ def __init__(self):
+ """
+ Creates the StorPerfConfig.db and configuration tables on demand
+ """
+
+ self.logger = logging.getLogger(__name__)
+ self.logger.debug("Connecting to " + ConfigurationDB.db_name)
+ with db_mutex:
+ db = sqlite3.connect(ConfigurationDB.db_name)
+
+ cursor = db.cursor()
+ try:
+ cursor.execute('''CREATE TABLE configuration
+ (configuration_name text,
+ key text,
+ value text)''')
+ self.logger.debug("Created configuration table")
+ except OperationalError:
+ self.logger.debug("Configuration table exists")
+
+ cursor.execute('SELECT * FROM configuration')
+ db.commit()
+ db.close()
+
+ def delete_configuration_value(self, configuration_name, key):
+ """Deletes the value associated with the given key
+ """
+
+ with db_mutex:
+ db = sqlite3.connect(ConfigurationDB.db_name)
+ cursor = db.cursor()
+
+ cursor.execute("delete from configuration where "
+ "configuration_name=? and key=?",
+ (configuration_name, key))
+
+ self.logger.debug("Deleted " + configuration_name + ":" + key)
+
+ db.commit()
+ db.close()
+
+ def get_configuration_value(self, configuration_name, key):
+ """Returns a string representation of the value stored
+ with this key under the given configuration name.
+ """
+
+ with db_mutex:
+ db = sqlite3.connect(ConfigurationDB.db_name)
+ cursor = db.cursor()
+
+ cursor.execute(
+ """select value from configuration
+ where configuration_name = ?
+ and key = ?""",
+ (configuration_name, key,))
+
+ row = cursor.fetchone()
+
+ return_value = None
+
+ if (row is None):
+ self.logger.debug(
+ configuration_name + ":" + key + " does not exist")
+ else:
+ self.logger.debug(
+ configuration_name + ":" + key + " is " + str(row[0]))
+ return_value = str(row[0])
+
+ db.close()
+
+ return return_value
+
+ def set_configuration_value(self, configuration_name, key, value):
+ """Updates or creates the key under the given configuration
+ name so that it holds the value specified.
+ """
+
+ if (value is None):
+ return self.delete_configuration_value(configuration_name, key)
+
+ with db_mutex:
+ value = str(value)
+
+ db = sqlite3.connect(ConfigurationDB.db_name)
+ cursor = db.cursor()
+
+ cursor.execute("delete from configuration where "
+ "configuration_name=? and key=?",
+ (configuration_name, key))
+
+ cursor.execute(
+ """insert into configuration(configuration_name, key, value)
+ values (?,?,?)""", (configuration_name, key, value))
+
+ self.logger.debug(
+ configuration_name + ":" + key + " set to " + value)
+
+ db.commit()
+ db.close()
diff --git a/docker/storperf-master/storperf/db/graphite_db.py b/docker/storperf-master/storperf/db/graphite_db.py
new file mode 100644
index 0000000..c8a2d35
--- /dev/null
+++ b/docker/storperf-master/storperf/db/graphite_db.py
@@ -0,0 +1,63 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import json
+import logging
+
+import requests
+
+from storperf.db.job_db import JobDB
+
+
+class GraphiteDB(object):
+
+ def __init__(self):
+ """
+ """
+ self._job_db = JobDB()
+ self.logger = logging.getLogger(__name__)
+
+ def fetch_series(self, workload, metric, io_type, time, duration):
+
+ series = []
+ end = time
+ start = end - duration
+
+ request = ("http://127.0.0.1:8000/render/?target="
+ "averageSeries(%s.*.jobs.1.%s.%s)"
+ "&format=json"
+ "&from=%s"
+ "&until=%s" %
+ (workload, io_type, metric,
+ start, end))
+ self.logger.debug("Calling %s" % (request))
+
+ response = requests.get(request)
+ if (response.status_code == 200):
+ series = self._series_results(json.loads(response.content))
+
+ return series
+
+ def _series_results(self, results):
+
+ series = []
+
+ for item in results:
+ datapoints = item['datapoints']
+ for datapoint in datapoints:
+ if datapoint[0] is not None:
+ series.append([datapoint[1], datapoint[0]])
+
+ return series
+
+ def make_fullname_pattern(self, workload):
+ parts = workload.split('.')
+ wildcards_needed = 7 - len(parts)
+ fullname = workload + (".*" * wildcards_needed)
+ return fullname
diff --git a/docker/storperf-master/storperf/db/job_db.py b/docker/storperf-master/storperf/db/job_db.py
new file mode 100644
index 0000000..3308fa8
--- /dev/null
+++ b/docker/storperf-master/storperf/db/job_db.py
@@ -0,0 +1,259 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import calendar
+import json
+import logging
+from sqlite3 import OperationalError
+import sqlite3
+from threading import Lock
+import time
+import uuid
+
+
+db_mutex = Lock()
+
+
+class JobDB(object):
+
+ db_name = "StorPerfJob.db"
+
+ def __init__(self):
+ """
+ Creates the StorPerfJob.db and jobs tables on demand
+ """
+
+ self.logger = logging.getLogger(__name__)
+ self.logger.debug("Connecting to " + JobDB.db_name)
+ self.job_id = None
+
+ with db_mutex:
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+ try:
+ cursor.execute('''CREATE TABLE jobs
+ (job_id text,
+ workload text,
+ start text,
+ end text)''')
+ self.logger.debug("Created job table")
+ except OperationalError:
+ self.logger.debug("Job table exists")
+
+ try:
+ cursor.execute('''CREATE TABLE job_params
+ (job_id text,
+ param text,
+ value text)''')
+ self.logger.debug("Created job_params table")
+ except OperationalError:
+ self.logger.debug("Job params table exists")
+
+ try:
+ cursor.execute('''CREATE TABLE job_summary
+ (job_id text,
+ summary text)''')
+ self.logger.debug("Created job summary table")
+ except OperationalError:
+ self.logger.debug("Job summary table exists")
+
+ cursor.execute('SELECT * FROM jobs')
+ cursor.execute('SELECT * FROM job_params')
+ db.commit()
+ db.close()
+
+ def create_job_id(self):
+ """
+ Returns a job id that is guaranteed to be unique in this
+ StorPerf instance.
+ """
+ with db_mutex:
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+
+ self.job_id = str(uuid.uuid4())
+ row = cursor.execute(
+ "select * from jobs where job_id = ?", (self.job_id,))
+
+ while (row.fetchone() is not None):
+ self.logger.info("Duplicate job id found, regenerating")
+ self.job_id = str(uuid.uuid4())
+ row = cursor.execute(
+ "select * from jobs where job_id = ?", (self.job_id,))
+
+ cursor.execute(
+ "insert into jobs(job_id) values (?)", (self.job_id,))
+ self.logger.debug("Reserved job id " + self.job_id)
+ db.commit()
+ db.close()
+
+ def start_workload(self, workload):
+ """
+ Records the start time for the given workload
+ """
+
+ workload_name = workload.fullname
+
+ if (self.job_id is None):
+ self.create_job_id()
+
+ with db_mutex:
+
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+
+ now = str(calendar.timegm(time.gmtime()))
+
+ row = cursor.execute(
+ """select * from jobs
+ where job_id = ?
+ and workload = ?""",
+ (self.job_id, workload_name,))
+
+ if (row.fetchone() is None):
+ cursor.execute(
+ """insert into jobs
+ (job_id,
+ workload,
+ start)
+ values (?, ?, ?)""",
+ (self.job_id,
+ workload_name,
+ now,))
+ else:
+ self.logger.warn("Duplicate start time for workload %s"
+ % workload_name)
+ cursor.execute(
+ """update jobs set
+ job_id = ?,
+ start = ?
+ where workload = ?""",
+ (self.job_id,
+ now,
+ workload_name,))
+
+ db.commit()
+ db.close()
+
+ def end_workload(self, workload):
+ """
+ Records the end time for the given workload
+ """
+ if (self.job_id is None):
+ self.create_job_id()
+
+ workload_name = workload.fullname
+
+ with db_mutex:
+
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+ now = str(calendar.timegm(time.gmtime()))
+
+ row = cursor.execute(
+ """select * from jobs
+ where job_id = ?
+ and workload = ?""",
+ (self.job_id, workload_name,))
+
+ if (row.fetchone() is None):
+ self.logger.warn("No start time recorded for workload %s"
+ % workload_name)
+ cursor.execute(
+ """insert into jobs
+ (job_id,
+ workload,
+ start,
+ end)
+ values (?, ?, ?, ?)""",
+ (self.job_id,
+ workload_name,
+ now,
+ now))
+ else:
+ cursor.execute(
+ """update jobs set
+ job_id = ?,
+ end = ?
+ where workload = ?""",
+ (self.job_id,
+ now,
+ workload_name,))
+
+ db.commit()
+ db.close()
+
+ def fetch_workloads(self, workload):
+ workload_prefix = workload + "%"
+ workload_executions = []
+
+ with db_mutex:
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+ cursor.execute("""select workload, start, end
+ from jobs where workload like ?""",
+ (workload_prefix,))
+
+ while (True):
+ row = cursor.fetchone()
+ if (row is None):
+ break
+ workload_execution = [row[0], row[1], row[2]]
+ workload_executions.append(workload_execution)
+ db.close()
+
+ return workload_executions
+
+ def record_workload_params(self, params):
+ """
+ """
+ if (self.job_id is None):
+ self.create_job_id()
+
+ with db_mutex:
+
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+ for param, value in params.iteritems():
+ cursor.execute(
+ """insert into job_params
+ (job_id,
+ param,
+ value)
+ values (?, ?, ?)""",
+ (self.job_id,
+ param,
+ value,))
+ db.commit()
+ db.close()
+
+ def fetch_workload_params(self, job_id):
+ """
+ """
+ params = {}
+ with db_mutex:
+
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+
+ cursor.execute(
+ "select param, value from job_params where job_id = ?",
+ (job_id,))
+
+ while (True):
+ row = cursor.fetchone()
+ if (row is None):
+ break
+ try:
+ data = json.loads(row[1])
+ except:
+ data = row[1]
+ params[row[0]] = data
+ db.close()
+ return params
diff --git a/docker/storperf-master/storperf/db/test_results_db.py b/docker/storperf-master/storperf/db/test_results_db.py
new file mode 100644
index 0000000..a2f7038
--- /dev/null
+++ b/docker/storperf-master/storperf/db/test_results_db.py
@@ -0,0 +1,61 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import json
+import os
+import requests
+
+
+def get_installer_type(logger=None):
+ """
+ Get installer type (fuel, apex, joid, compass)
+ """
+ try:
+ installer = os.environ['INSTALLER_TYPE']
+ except KeyError:
+ if logger:
+ logger.error("Impossible to retrieve the installer type")
+ installer = "Unknown_installer"
+
+ return installer
+
+
+def push_results_to_db(db_url, project, case_name,
+ test_start, test_stop, logger, pod_name,
+ version, scenario, criteria, build_tag, details):
+ """
+ POST results to the Result target DB
+ """
+ url = db_url + "/results"
+ installer = get_installer_type(logger)
+
+ params = {"project_name": project, "case_name": case_name,
+ "pod_name": pod_name, "installer": installer,
+ "version": version, "scenario": scenario, "criteria": criteria,
+ "build_tag": build_tag, "start_date": test_start,
+ "stop_date": test_stop, "details": details}
+
+ headers = {'Content-Type': 'application/json'}
+ try:
+ if logger:
+ jsonified_params = json.dumps(params)
+ logger.info("Pushing results to %s" % (url))
+ logger.debug("Parameters: %s" % jsonified_params[:1024])
+ r = requests.post(url, data=jsonified_params, headers=headers)
+ if logger:
+ logger.debug(r)
+ logger.debug(r.status_code)
+ logger.debug(r.content)
+ return json.loads(r.content)
+ except Exception, e:
+ logger.error("Error [push_results_to_db('%s', '%s', '%s', " +
+ "'%s', '%s', '%s', '%s', '%s', '%s')]:" %
+ (db_url, project, case_name, pod_name, version,
+ scenario, criteria, build_tag, details), e)
+ return None
diff --git a/docker/storperf-master/storperf/fio/__init__.py b/docker/storperf-master/storperf/fio/__init__.py
new file mode 100644
index 0000000..73334c7
--- /dev/null
+++ b/docker/storperf-master/storperf/fio/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py
new file mode 100644
index 0000000..106696d
--- /dev/null
+++ b/docker/storperf-master/storperf/fio/fio_invoker.py
@@ -0,0 +1,153 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import json
+import logging
+from threading import Thread
+import paramiko
+
+
+class FIOInvoker(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+ self.event_listeners = set()
+ self.event_callback_ids = set()
+ self._remote_host = None
+ self.callback_id = None
+ self.terminated = False
+
+ @property
+ def remote_host(self):
+ return self._remote_host
+
+ @remote_host.setter
+ def remote_host(self, value):
+ self._remote_host = value
+ self.logger = logging.getLogger(__name__ + ":" + value)
+
+ def register(self, event_listener):
+ self.event_listeners.add(event_listener)
+
+ def unregister(self, event_listener):
+ self.event_listeners.discard(event_listener)
+
+ def stdout_handler(self, stdout):
+ self.logger.debug("Started")
+ self.json_body = ""
+ try:
+ for line in iter(stdout.readline, b''):
+ if line.startswith("fio"):
+ line = ""
+ continue
+ self.json_body += line
+ try:
+ if line == "}\n":
+ json_metric = json.loads(self.json_body)
+ self.json_body = ""
+
+ if not self.terminated:
+ for event_listener in self.event_listeners:
+ try:
+ self.logger.debug(
+ "Event listener callback")
+ event_listener(
+ self.callback_id, json_metric)
+ except Exception, e:
+ self.logger.exception(
+ "Notifying listener %s: %s",
+ self.callback_id, e)
+ self.logger.debug(
+ "Event listener callback complete")
+ except Exception, e:
+ self.logger.error("Error parsing JSON: %s", e)
+ except IOError:
+ pass # We might have read from the closed socket, ignore it
+
+ stdout.close()
+ self.logger.debug("Finished")
+
+ def stderr_handler(self, stderr):
+ self.logger.debug("Started")
+ for line in iter(stderr.readline, b''):
+ self.logger.error("FIO Error: %s", line.rstrip())
+
+ # Sometime, FIO gets stuck and will give us this message:
+ # fio: job 'sequential_read' hasn't exited in 60 seconds,
+ # it appears to be stuck. Doing forceful exit of this job.
+ # A second killall of fio will release it stuck process.
+
+ if 'it appears to be stuck' in line:
+ self.terminate()
+
+ stderr.close()
+ self.logger.debug("Finished")
+
+ def execute(self, args=[]):
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.connect(self.remote_host, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+
+ command = "sudo ./fio " + ' '.join(args)
+ self.logger.debug("Remote command: %s" % command)
+
+ chan = ssh.get_transport().open_session(timeout=None)
+ chan.settimeout(None)
+ chan.exec_command(command)
+ stdout = chan.makefile('r', -1)
+ stderr = chan.makefile_stderr('r', -1)
+
+ tout = Thread(target=self.stdout_handler, args=(stdout,),
+ name="%s stdout" % self._remote_host)
+ tout.daemon = True
+ tout.start()
+
+ terr = Thread(target=self.stderr_handler, args=(stderr,),
+ name="%s stderr" % self._remote_host)
+ terr.daemon = True
+ terr.start()
+
+ self.logger.info("Started fio on " + self.remote_host)
+ exit_status = chan.recv_exit_status()
+ self.logger.info("Finished fio on %s with exit code %s" %
+ (self.remote_host, exit_status))
+
+ stdout.close()
+ stderr.close()
+
+ self.logger.debug("Joining stderr handler")
+ terr.join()
+ self.logger.debug("Joining stdout handler")
+ tout.join()
+ self.logger.debug("Ended")
+
+ def terminate(self):
+ self.logger.debug("Terminating fio on " + self.remote_host)
+ self.terminated = True
+
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.connect(self.remote_host, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+
+ command = "sudo killall fio"
+
+ self.logger.debug("Executing on %s: %s" % (self.remote_host, command))
+ (_, stdout, stderr) = ssh.exec_command(command)
+
+ for line in stdout.readlines():
+ self.logger.debug(line.strip())
+ for line in stderr.readlines():
+ self.logger.error(line.strip())
+
+ stdout.close()
+ stderr.close()
diff --git a/docker/storperf-master/storperf/logging.json b/docker/storperf-master/storperf/logging.json
new file mode 100644
index 0000000..2a0bbce
--- /dev/null
+++ b/docker/storperf-master/storperf/logging.json
@@ -0,0 +1,51 @@
+{
+ "version": 1,
+ "disable_existing_loggers": false,
+ "formatters": {
+ "simple": {
+ "format": "%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s"
+ }
+ },
+
+ "handlers": {
+ "console": {
+ "class": "logging.StreamHandler",
+ "level": "DEBUG",
+ "formatter": "simple",
+ "stream": "ext://sys.stdout"
+ },
+
+ "file_handler": {
+ "class": "logging.handlers.RotatingFileHandler",
+ "level": "DEBUG",
+ "formatter": "simple",
+ "filename": "storperf.log",
+ "maxBytes": 10485760,
+ "backupCount": 20,
+ "encoding": "utf8"
+ },
+
+ "error_file_handler": {
+ "class": "logging.handlers.RotatingFileHandler",
+ "level": "ERROR",
+ "formatter": "simple",
+ "filename": "errors.log",
+ "maxBytes": 10485760,
+ "backupCount": 20,
+ "encoding": "utf8"
+ }
+ },
+
+ "loggers": {
+ "": {
+ "level": "INFO",
+ "handlers": ["console", "file_handler", "error_file_handler"]
+ },
+ "storperf": {
+ "level": "DEBUG"
+ },
+ "storperf.carbon.emitter": {
+ "level": "INFO"
+ }
+ }
+}
diff --git a/docker/storperf-master/storperf/resources/hot/agent-group.yaml b/docker/storperf-master/storperf/resources/hot/agent-group.yaml
new file mode 100644
index 0000000..c758ecd
--- /dev/null
+++ b/docker/storperf-master/storperf/resources/hot/agent-group.yaml
@@ -0,0 +1,106 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+heat_template_version: 2013-05-23
+
+parameters:
+ public_network:
+ type: string
+ constraints:
+ - custom_constraint: neutron.network
+ agent_flavor:
+ type: string
+ default: "storperf"
+ agent_image:
+ type: string
+ default: 'StorPerf Ubuntu 14.04'
+ volume_size:
+ type: number
+ description: Size of the volume to be created.
+ default: 1
+ constraints:
+ - range: { min: 1, max: 1024 }
+ description: must be between 1 and 1024 Gb.
+ agent_count:
+ type: number
+ default: 1
+ constraints:
+ - range: { min: 1, max: 512 }
+ description: must be between 1 and 512 agents.
+
+
+resources:
+ slaves:
+ type: OS::Heat::ResourceGroup
+ depends_on: [storperf_subnet, storperf_network_router_interface,
+ storperf_open_security_group, storperf_key_pair]
+ properties:
+ count: {get_param: agent_count}
+ resource_def: {
+ type: "storperf-agent.yaml",
+ properties: {
+ public_network: {get_param: public_network},
+ agent_network: {get_resource: storperf_network},
+ flavor: {get_param: agent_flavor},
+ image: {get_param: agent_image},
+ storperf_open_security_group: {get_resource: storperf_open_security_group},
+ key_name: {get_resource: storperf_key_pair},
+ volume_size: {get_param: volume_size}
+ }
+ }
+
+ storperf_network:
+ type: OS::Neutron::Net
+ properties:
+ name: storperf-network
+
+ storperf_subnet:
+ type: OS::Neutron::Subnet
+ properties:
+ network_id: { get_resource: storperf_network }
+ cidr: 172.16.0.0/16
+ gateway_ip: 172.16.0.1
+
+ storperf_network_router:
+ type: OS::Neutron::Router
+ properties:
+ external_gateway_info:
+ network: { get_param: public_network }
+
+ storperf_network_router_interface:
+ type: OS::Neutron::RouterInterface
+ properties:
+ router_id: { get_resource: storperf_network_router }
+ subnet_id: { get_resource: storperf_subnet }
+
+ storperf_key_pair:
+ type: OS::Nova::KeyPair
+ properties:
+ save_private_key: true
+ name: storperf_agent_keypair
+
+ storperf_open_security_group:
+ type: OS::Neutron::SecurityGroup
+ properties:
+ description: An open security group to allow all access to the StorPerf slaves
+ rules:
+ - remote_ip_prefix: 0.0.0.0/0
+ protocol: tcp
+ port_range_min: 22
+ port_range_max: 22
+ - remote_ip_prefix: 0.0.0.0/0
+ protocol: icmp
+
+
+
+outputs:
+ slave_ips: {
+ description: "Slave addresses",
+ value: { get_attr: [ slaves, storperf_agent_ip] }
+ }
diff --git a/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml b/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml
new file mode 100644
index 0000000..7bf8b4d
--- /dev/null
+++ b/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml
@@ -0,0 +1,99 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+heat_template_version: 2013-05-23
+
+parameters:
+ flavor:
+ type: string
+ default: storperf
+ image:
+ type: string
+ default: 'Ubuntu 16.04'
+ key_name:
+ type: string
+ default: StorPerf
+ username:
+ type: string
+ default: storperf
+ storperf_open_security_group:
+ type: string
+ volume_size:
+ type: number
+ description: Size of the volume to be created.
+ default: 1
+ constraints:
+ - range: { min: 1, max: 1024 }
+ description: must be between 1 and 1024 Gb.
+ agent_network:
+ type: string
+ constraints:
+ - custom_constraint: neutron.network
+ public_network:
+ type: string
+ constraints:
+ - custom_constraint: neutron.network
+resources:
+
+ storperf_agent:
+ type: "OS::Nova::Server"
+ properties:
+ name: storperf-agent
+ image: { get_param: image }
+ flavor: { get_param: flavor }
+ key_name: { get_param: key_name }
+ networks:
+ - port: { get_resource: storperf_agent_port }
+ user_data: { get_resource: storperf_agent_config }
+ user_data_format: RAW
+
+ storperf_agent_config:
+ type: "OS::Heat::CloudConfig"
+ properties:
+ cloud_config:
+ users:
+ - name: { get_param: username }
+ groups: users
+ shell: /bin/bash
+ sudo: "ALL=(ALL) NOPASSWD:ALL"
+ ssh_authorized_keys:
+ - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07
+ - default
+ package_update: false
+ package_upgrade: false
+ manage_etc_hosts: localhost
+
+ storperf_agent_port:
+ type: "OS::Neutron::Port"
+ properties:
+ network_id: { get_param: agent_network }
+ security_groups:
+ - { get_param: storperf_open_security_group }
+
+ storperf_floating_ip:
+ type: OS::Neutron::FloatingIP
+ properties:
+ floating_network_id: { get_param: public_network }
+ port_id: { get_resource: storperf_agent_port }
+
+ agent_volume:
+ type: OS::Cinder::Volume
+ properties:
+ size: { get_param: volume_size }
+
+ agent_volume_att:
+ type: OS::Cinder::VolumeAttachment
+ properties:
+ instance_uuid: { get_resource: storperf_agent }
+ volume_id: { get_resource: agent_volume}
+
+outputs:
+ storperf_agent_ip:
+ description: The floating IP address of the agent on the public network
+ value: { get_attr: [ storperf_floating_ip, floating_ip_address ] }
diff --git a/docker/storperf-master/storperf/resources/ssh/storperf_rsa b/docker/storperf-master/storperf/resources/ssh/storperf_rsa
new file mode 100644
index 0000000..fb8c714
--- /dev/null
+++ b/docker/storperf-master/storperf/resources/ssh/storperf_rsa
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpQIBAAKCAQEAxG5w4qmfEY0CScyT39O+CeNV5WBA/s2Ac2lGeVJJFKrl88ps
+lJqFmhQVyT/lz7uDbOOFP0vOtePicRpPerY1HSZSIy9AIPBz+ii7UzQmm7nCnBAE
+qYCSFVqibxNxPbqWSyBIkxrBfCPEuu09qqtSdbr7ccnmZso5uzTGpo8hVxOd3YUd
+hB6krrciitQiu+DZEWPcK96llCLd+ZC3SklUn66+IDjonAwC2yEbu+7JByL647fs
+CtVVqYQXTIOsrbquZiBOqY1gFNxsJeGk8TlNM+NwEcwi6vB1X/LP3So2DI/KZTam
+iH3HjG2hFKqTR7/8W77RjRuzdGgf2uKZFEC9OwIDAQABAoIBAQC7J6byrz5Z0Io/
+mmXCOtK0RSAQHfePTml2jPWSnm32/SV/dHyj0d49gamISBNEK5r64oSQXEAlWWzk
+6naTqotMrLhRwbFOMQuva6OfkO7ALOtZGoH2pgOJyQM+5b1dXSvZhHbhgfqbttC4
+cVXyCK3WckMklYOqqD79OTmUdIcFmILoYuMbIuMBtPukLgOel0wwTboegqDGg3aq
+Kmnq+Y79Z2FG2Xn1NGllSCLPMAKJpODNW1E1H8JqvV7gmC/SUItRsvcLSg6pr2vB
+eRhjLlczpnOK9pzHoul/qe6GR9RG/FoWl9c5uVeq3OFxqUQplwmxNVTQ+vJplJM5
+WFbsDZRhAoGBAPw5cpa2d4AuPxFJPnsrlhAAxtK/+Jl1Hi/1ioA5lggSe55xHdvP
+dxeh4XJ8u4jnjCDdrmqHF0K578DoaCCRz9UT7P2JV2QCECPQgCvpkgEG344ovHwA
+VI2j6+m1zKA6yJ2BEr/t3pvDrjoOtBHZaPMdJBHi53YleZvecxDlODT5AoGBAMdf
+MDP6np1Jr/3NTJSYcFQGeqNA3S++aNVJAM+rsQk8rZLttvyiinIK+NUgMbGVMtsH
++t4HbJrCpX4cLzmDKDuRXGK8z4n3/jyjID7WDRPm5te+yqD2Zrv6oAal4DoxlGYj
+EHftbWKwGwgktWKQbdV+7R0ZHwNgPK3cn3twnrTTAoGAIrDsF9qk+RZjSkdetqY6
+D51ru1T4JnM7YbFOCXDiN94C7rn0N2WDpdZ4Ib0SNjRSy7px4OkPw/e7CDdvCvvD
+MDV7ZSvcvz6hZaup1WBc2pNNcEoeEpghCRJAwnZk3Kz5JuC36XoYIih58DZRghr7
+GmUpruQcnd1tqoigHvTIFFECgYEAk67dJAPHrrdA2H5U9dWdj4BlI70Omykuup8j
+LLH/p9n8sVPJMoasiyG5hRYd+W+NhlX47LMPLex1bl8uVCnliYp5puI50feTqhMj
+9afVdCKcaL/5lRYwr5pNI9+Ho1PKm4Xp0wxa9LmCrJuUiPh3g6hLuDw9juCg0iEV
+OfkIduUCgYEA+924bfPhC7mNURBsQHaGSqlRbCX1KmU8EnmMJuVsPW8N9mN3/KU9
+0QFulljh1QNiByK5Iq4J0Q62OFbb1k8POB0ensZwfdIX/QoyhWGXz+DoUOLSDY6N
+tlIG1mTYJCltc1rajPAtKdLv/PZeJS6NWp1y6T23IVQ08+HzCfxYwRA=
+-----END RSA PRIVATE KEY-----
diff --git a/docker/storperf-master/storperf/resources/ssh/storperf_rsa.pub b/docker/storperf-master/storperf/resources/ssh/storperf_rsa.pub
new file mode 100644
index 0000000..cdc8cb4
--- /dev/null
+++ b/docker/storperf-master/storperf/resources/ssh/storperf_rsa.pub
@@ -0,0 +1 @@
+ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07 jenkins@storperf-openstack
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
new file mode 100644
index 0000000..fb3e269
--- /dev/null
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -0,0 +1,448 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from datetime import datetime
+import logging
+import os
+import socket
+from storperf.db.configuration_db import ConfigurationDB
+from storperf.db.job_db import JobDB
+from storperf.test_executor import TestExecutor
+from threading import Thread
+from time import sleep
+
+from cinderclient import client as cinderclient
+import heatclient.client as heatclient
+from keystoneauth1 import loading
+from keystoneauth1 import session
+import paramiko
+from scp import SCPClient
+
+
+class ParameterError(Exception):
+ """ """
+
+
+class StorPerfMaster(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ self.configuration_db = ConfigurationDB()
+ self.job_db = JobDB()
+
+ template_file = open("storperf/resources/hot/agent-group.yaml")
+ self._agent_group_hot = template_file.read()
+ template_file = open("storperf/resources/hot/storperf-agent.yaml")
+ self._agent_resource_hot = template_file.read()
+ self._hot_files = {
+ 'storperf-agent.yaml': self._agent_resource_hot
+ }
+ self.logger.debug(
+ "Loaded agent-group template as: " + self._agent_group_hot)
+ self.logger.debug(
+ "Loaded agent-resource template as: " + self._agent_resource_hot)
+
+ self._cinder_client = None
+ self._heat_client = None
+ self._test_executor = TestExecutor()
+ self._last_openstack_auth = datetime.now()
+
+ @property
+ def volume_size(self):
+ value = self.configuration_db.get_configuration_value(
+ 'stack',
+ 'volume_size')
+ if (value is None):
+ self.volume_size = 1
+ value = 1
+ return int(value)
+
+ @volume_size.setter
+ def volume_size(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change volume size after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'volume_size',
+ value)
+
+ @property
+ def agent_count(self):
+ value = self.configuration_db.get_configuration_value(
+ 'stack',
+ 'agent_count')
+
+ if (value is None):
+ self.agent_count = 1
+ value = 1
+ return int(value)
+
+ @agent_count.setter
+ def agent_count(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change agent count after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'agent_count',
+ value)
+
+ @property
+ def agent_image(self):
+ value = self.configuration_db.get_configuration_value(
+ 'stack',
+ 'agent_image')
+
+ if (value is None):
+ value = 'Ubuntu 14.04'
+ self.agent_image = value
+
+ return value
+
+ @agent_image.setter
+ def agent_image(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change agent image after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'agent_image',
+ value)
+
+ @property
+ def public_network(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'public_network')
+
+ @public_network.setter
+ def public_network(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change public network after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'public_network',
+ value)
+
+ @property
+ def agent_flavor(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'agent_flavor')
+
+ @agent_flavor.setter
+ def agent_flavor(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change flavor after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'agent_flavor',
+ value)
+
+ @property
+ def stack_id(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'stack_id')
+
+ @stack_id.setter
+ def stack_id(self, value):
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'stack_id',
+ value)
+
+ @property
+ def volume_quota(self):
+ self._attach_to_openstack()
+ quotas = self._cinder_client.quotas.get(
+ os.environ.get('OS_TENANT_ID'))
+ return int(quotas.volumes)
+
+ @property
+ def filename(self):
+ return self._test_executor.filename
+
+ @filename.setter
+ def filename(self, value):
+ self._test_executor.filename = value
+
+ @property
+ def deadline(self):
+ return self._test_executor.deadline
+
+ @deadline.setter
+ def deadline(self, value):
+ self._test_executor.deadline = value
+
+ @property
+ def steady_state_samples(self):
+ return self._test_executor.steady_state_samples
+
+ @steady_state_samples.setter
+ def steady_state_samples(self, value):
+ self._test_executor.steady_state_samples = value
+
+ @property
+ def queue_depths(self):
+ return self._test_executor.queue_depths
+
+ @queue_depths.setter
+ def queue_depths(self, value):
+ self._test_executor.queue_depths = value
+
+ @property
+ def block_sizes(self):
+ return self._test_executor.block_sizes
+
+ @block_sizes.setter
+ def block_sizes(self, value):
+ self._test_executor.block_sizes = value
+
+ @property
+ def is_stack_created(self):
+ if (self.stack_id is not None):
+ self._attach_to_openstack()
+
+ stack = self._heat_client.stacks.get(self.stack_id)
+ status = getattr(stack, 'stack_status')
+
+ self.logger.info("Status=" + status)
+ if (status == u'CREATE_COMPLETE'):
+ return True
+
+ return False
+
+ @property
+ def workloads(self):
+ return self.configuration_db.get_configuration_value(
+ 'workload',
+ 'workloads')
+
+ @workloads.setter
+ def workloads(self, value):
+ self._test_executor.register_workloads(value)
+
+ self.configuration_db.set_configuration_value(
+ 'workload',
+ 'workloads',
+ str(self._test_executor.workload_modules))
+
+ def create_stack(self):
+ if (self.stack_id is not None):
+ raise ParameterError("ERROR: Stack has already been created")
+
+ self._attach_to_openstack()
+ volume_quota = self.volume_quota
+ if (volume_quota > 0 and self.agent_count > volume_quota):
+ message = "ERROR: Volume quota too low: " + \
+ str(self.agent_count) + " > " + str(self.volume_quota)
+ raise ParameterError(message)
+
+ self.logger.debug("Creating stack")
+ stack = self._heat_client.stacks.create(
+ stack_name="StorPerfAgentGroup",
+ template=self._agent_group_hot,
+ files=self._hot_files,
+ parameters=self._make_parameters())
+
+ self.stack_id = stack['stack']['id']
+
+ while True:
+ stack = self._heat_client.stacks.get(self.stack_id)
+ status = getattr(stack, 'stack_status')
+ self.logger.debug("Stack status=%s" % (status,))
+ if (status == u'CREATE_COMPLETE'):
+ return True
+ if (status == u'DELETE_COMPLETE'):
+ self.stack_id = None
+ return True
+ if (status == u'CREATE_FAILED'):
+ sleep(5)
+ self._heat_client.stacks.delete(stack_id=self.stack_id)
+ sleep(2)
+
+ def delete_stack(self):
+ if (self.stack_id is None):
+ raise ParameterError("ERROR: Stack does not exist")
+
+ self._attach_to_openstack()
+ while True:
+ stack = self._heat_client.stacks.get(self.stack_id)
+ status = getattr(stack, 'stack_status')
+ self.logger.debug("Stack status=%s" % (status,))
+ if (status == u'CREATE_COMPLETE'):
+ self._heat_client.stacks.delete(stack_id=self.stack_id)
+ if (status == u'DELETE_COMPLETE'):
+ self.stack_id = None
+ return True
+ if (status == u'DELETE_FAILED'):
+ sleep(5)
+ self._heat_client.stacks.delete(stack_id=self.stack_id)
+ sleep(2)
+
+ def execute_workloads(self, metadata={}):
+ if (self.stack_id is None):
+ raise ParameterError("ERROR: Stack does not exist")
+
+ self._attach_to_openstack()
+
+ stack = self._heat_client.stacks.get(self.stack_id)
+ outputs = getattr(stack, 'outputs')
+ slaves = outputs[0]['output_value']
+
+ setup_threads = []
+
+ for slave in slaves:
+ t = Thread(target=self._setup_slave, args=(slave,))
+ setup_threads.append(t)
+ t.start()
+
+ for thread in setup_threads:
+ thread.join()
+
+ self._test_executor.slaves = slaves
+
+ params = metadata
+ params['agent_count'] = self.agent_count
+ params['public_network'] = self.public_network
+ params['volume_size'] = self.volume_size
+ job_id = self._test_executor.execute(params)
+
+ return job_id
+
+ def terminate_workloads(self):
+ return self._test_executor.terminate()
+
+ def fetch_results(self, job_id):
+ if self._test_executor.job_db.job_id == job_id:
+ return self._test_executor.metadata['metrics']
+
+ workload_params = self.job_db.fetch_workload_params(job_id)
+ if 'report' in workload_params:
+ report = workload_params['report']
+ return report['metrics']
+ return {}
+
+ def fetch_metadata(self, job_id):
+ return self.job_db.fetch_workload_params(job_id)
+
+ def fetch_job_status(self, job_id):
+ return self._test_executor.execution_status(job_id)
+
+ def _setup_slave(self, slave):
+ logger = logging.getLogger(__name__ + ":" + slave)
+
+ logger.info("Initializing slave at " + slave)
+
+ logger.debug("Checking if slave " + slave + " is alive")
+
+ alive = False
+ timer = 10
+ while not alive:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ result = s.connect_ex((slave, 22))
+ s.close()
+
+ if result:
+ alive = False
+ sleep(1)
+ timer -= 1
+ if timer == 0:
+ logger.debug("Still waiting for slave " + slave)
+ timer = 10
+ else:
+ alive = True
+ logger.debug("Slave " + slave + " is alive and ready")
+
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.connect(slave, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+
+ scp = SCPClient(ssh.get_transport())
+ logger.debug("Transferring libaio.so.1 to %s" % slave)
+ scp.put('/lib/x86_64-linux-gnu/libaio.so.1', '~/')
+ logger.debug("Transferring fio to %s" % slave)
+ scp.put('/usr/local/bin/fio', '~/')
+
+ cmd = 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1'
+ logger.debug("Executing on %s: %s" % (slave, cmd))
+ (_, stdout, stderr) = ssh.exec_command(cmd)
+
+ for line in stdout.readlines():
+ logger.debug(line.strip())
+ for line in stderr.readlines():
+ logger.error(line.strip())
+
+ def _make_parameters(self):
+ heat_parameters = {}
+ heat_parameters['public_network'] = self.public_network
+ heat_parameters['agent_count'] = self.agent_count
+ heat_parameters['volume_size'] = self.volume_size
+ heat_parameters['agent_image'] = self.agent_image
+ heat_parameters['agent_flavor'] = self.agent_flavor
+ return heat_parameters
+
+ def _attach_to_openstack(self):
+
+ time_since_last_auth = datetime.now() - self._last_openstack_auth
+
+ if (self._heat_client is None or
+ time_since_last_auth.total_seconds() > 600):
+ self._last_openstack_auth = datetime.now()
+
+ creds = {
+ "username": os.environ.get('OS_USERNAME'),
+ "password": os.environ.get('OS_PASSWORD'),
+ "auth_url": os.environ.get('OS_AUTH_URL'),
+ "project_domain_id":
+ os.environ.get('OS_PROJECT_DOMAIN_ID'),
+ "project_domain_name":
+ os.environ.get('OS_PROJECT_DOMAIN_NAME'),
+ "project_id": os.environ.get('OS_PROJECT_ID'),
+ "project_name": os.environ.get('OS_PROJECT_NAME'),
+ "tenant_name": os.environ.get('OS_TENANT_NAME'),
+ "tenant_id": os.environ.get("OS_TENANT_ID"),
+ "user_domain_id": os.environ.get('OS_USER_DOMAIN_ID'),
+ "user_domain_name": os.environ.get('OS_USER_DOMAIN_NAME')
+ }
+
+ self.logger.debug("Creds: %s" % creds)
+
+ loader = loading.get_plugin_loader('password')
+ auth = loader.load_from_options(**creds)
+ sess = session.Session(auth=auth)
+
+ self.logger.debug("Looking up orchestration endpoint")
+ heat_endpoint = sess.get_endpoint(auth=auth,
+ service_type="orchestration",
+ endpoint_type='publicURL')
+
+ self.logger.debug("Orchestration endpoint is %s" % heat_endpoint)
+ token = sess.get_token(auth=auth)
+
+ self._heat_client = heatclient.Client(
+ "1",
+ endpoint=heat_endpoint,
+ token=token)
+
+ self.logger.debug("Creating cinder client")
+ self._cinder_client = cinderclient.Client("2", session=sess)
+ self.logger.debug("OpenStack authentication complete")
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
new file mode 100644
index 0000000..b2d5914
--- /dev/null
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -0,0 +1,326 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import copy
+import imp
+import json
+import logging
+from os import listdir
+import os
+from os.path import isfile, join
+import sched
+from storperf.carbon.converter import Converter
+from storperf.carbon.emitter import CarbonMetricTransmitter
+from storperf.db.job_db import JobDB
+from storperf.fio.fio_invoker import FIOInvoker
+from storperf.utilities.data_handler import DataHandler
+from storperf.utilities.thread_gate import ThreadGate
+from threading import Thread
+import time
+
+
+class UnknownWorkload(Exception):
+ pass
+
+
+class TestExecutor(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+ self.workload_modules = []
+ self.filename = None
+ self.deadline = None
+ self.steady_state_samples = 10
+ self.metadata = {}
+ self.start_time = None
+ self.end_time = None
+ self.current_workload = None
+ self.workload_status = {}
+ self.result_url = None
+ self._queue_depths = [1, 4, 8]
+ self._block_sizes = [512, 4096, 16384]
+ self.event_listeners = set()
+ self.metrics_converter = Converter()
+ self.metrics_emitter = CarbonMetricTransmitter()
+ self.prefix = None
+ self.job_db = JobDB()
+ self._slaves = []
+ self._terminated = False
+ self._workload_executors = []
+ self._workload_thread = None
+ self._thread_gate = None
+
+ @property
+ def slaves(self):
+ return self._slaves
+
+ @slaves.setter
+ def slaves(self, slaves):
+ self.logger.debug("Set slaves to: " + str(slaves))
+ self._slaves = slaves
+
+ @property
+ def queue_depths(self):
+ return ','.join(self._queue_depths)
+
+ @queue_depths.setter
+ def queue_depths(self, queue_depths):
+ self.logger.debug("Set queue_depths to: " + str(queue_depths))
+ self._queue_depths = queue_depths.split(',')
+
+ @property
+ def block_sizes(self):
+ return ','.join(self._block_sizes)
+
+ @property
+ def terminated(self):
+ return self._terminated
+
+ @block_sizes.setter
+ def block_sizes(self, block_sizes):
+ self.logger.debug("Set block_sizes to: " + str(block_sizes))
+ self._block_sizes = block_sizes.split(',')
+
+ def register(self, event_listener):
+ self.event_listeners.add(event_listener)
+
+ def unregister(self, event_listener):
+ self.event_listeners.discard(event_listener)
+
+ def event(self, callback_id, metric):
+ carbon_metrics = self.metrics_converter.convert_json_to_flat(
+ metric,
+ callback_id)
+
+ self.metrics_emitter.transmit_metrics(carbon_metrics)
+
+ if self._thread_gate.report(callback_id):
+ self.broadcast_event()
+
+ def broadcast_event(self):
+ for event_listener in self.event_listeners:
+ try:
+ self.logger.debug("Notifying event listener %s",
+ event_listener)
+ event_listener(self)
+ except Exception, e:
+ self.logger.exception("While notifying listener %s", e)
+
+ def register_workloads(self, workloads):
+ self.workload_modules = []
+
+ if (workloads is None or len(workloads) == 0):
+ workload_dir = os.path.normpath(
+ os.path.join(os.path.dirname(__file__), "workloads"))
+
+ workload_files = [
+ f for f in listdir(workload_dir)
+ if isfile(join(workload_dir, f))]
+
+ workloads = []
+
+ for filename in workload_files:
+ mname, _ = os.path.splitext(filename)
+ if (not mname.startswith('_')):
+ workloads.append(mname)
+ else:
+ workloads = workloads.split(',')
+
+ for workload in workloads:
+ try:
+ workload_module = self.load_from_file("workloads/" +
+ workload + ".py")
+ self.logger.debug("Found: " + str(workload_module))
+ if(workload_module is None):
+ raise UnknownWorkload(
+ "ERROR: Unknown workload: " + workload)
+ if workload_module not in self.workload_modules:
+ self.workload_modules.append(workload_module)
+ except ImportError, err:
+ raise UnknownWorkload("ERROR: " + str(err))
+
+ def load_from_file(self, uri):
+ uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri))
+ path, fname = os.path.split(uri)
+ mname, _ = os.path.splitext(fname)
+ no_ext = os.path.join(path, mname)
+ self.logger.debug("Looking for: " + no_ext)
+ if os.path.exists(no_ext + '.pyc'):
+ self.logger.debug("Loading compiled: " + mname + " from " + no_ext)
+ return imp.load_compiled(mname, no_ext + '.pyc')
+ if os.path.exists(no_ext + '.py'):
+ self.logger.debug("Compiling: " + mname + " from " + no_ext)
+ return imp.load_source(mname, no_ext + '.py')
+ return None
+
+ def execute(self, metadata):
+ self.job_db.create_job_id()
+ self.job_db.record_workload_params(metadata)
+ self.metadata = metadata
+ self.metadata['metrics'] = {}
+ self._workload_thread = Thread(target=self.execute_workloads,
+ args=(),
+ name="Workload thread")
+ self._workload_thread.start()
+ return self.job_db.job_id
+
+ def terminate(self):
+ self._terminated = True
+ self.end_time = time.time()
+ return self.terminate_current_run()
+
+ def terminate_current_run(self):
+ self.logger.info("Terminating current run")
+ terminated_hosts = []
+ for workload in self._workload_executors:
+ workload.terminate()
+ terminated_hosts.append(workload.remote_host)
+ return terminated_hosts
+
+ def execution_status(self, job_id):
+
+ result = {}
+ status = "Completed"
+
+ if self.job_db.job_id == job_id and self._terminated is False:
+ status = "Running"
+
+ result['Status'] = status
+ result['Workloads'] = self.workload_status
+ result['TestResultURL'] = self.result_url
+
+ return result
+
+ def execute_workloads(self):
+ self._terminated = False
+ self.logger.info("Starting job %s" % (self.job_db.job_id))
+ self.event_listeners.clear()
+ data_handler = DataHandler()
+ self.register(data_handler.data_event)
+
+ self.start_time = time.time()
+
+ self.workload_status = {}
+ # Prepare stats list
+ for workload_module in self.workload_modules:
+ workload_name = getattr(workload_module, "__name__")
+ blocksizes = self._block_sizes
+ iodepths = self._queue_depths
+ for blocksize in blocksizes:
+ for iodepth in iodepths:
+ name = '%s.%s.queue-depth.%s.block-size.%s' % \
+ (self.job_db.job_id, workload_name, iodepth, blocksize)
+ self.workload_status[name] = "Pending"
+
+ for workload_module in self.workload_modules:
+ workload_name = getattr(workload_module, "__name__")
+ self.logger.info("Starting workload %s" % (workload_name))
+
+ constructorMethod = getattr(workload_module, workload_name)
+ workload = constructorMethod()
+ if (self.filename is not None):
+ workload.filename = self.filename
+
+ if (workload_name.startswith("_")):
+ iodepths = [8, ]
+ blocksizes = [16384, ]
+ else:
+ iodepths = self._queue_depths
+ blocksizes = self._block_sizes
+
+ workload.id = self.job_db.job_id
+ self._thread_gate = ThreadGate(len(self.slaves),
+ workload.options['status-interval'])
+
+ for blocksize in blocksizes:
+ for iodepth in iodepths:
+
+ if self._terminated:
+ return
+ self.current_workload = (
+ "%s.%s.queue-depth.%s.block-size.%s" %
+ (self.job_db.job_id,
+ workload_name,
+ iodepth,
+ blocksize))
+
+ self.logger.info("Starting run %s" % self.current_workload)
+ self.workload_status[self.current_workload] = "Running"
+
+ scheduler = sched.scheduler(time.time, time.sleep)
+ if self.deadline is not None \
+ and not workload_name.startswith("_"):
+ event = scheduler.enter(self.deadline * 60, 1,
+ self.terminate_current_run,
+ ())
+ t = Thread(target=scheduler.run, args=())
+ t.start()
+
+ workload.options['iodepth'] = str(iodepth)
+ workload.options['bs'] = str(blocksize)
+
+ slave_threads = []
+ for slave in self.slaves:
+ slave_workload = copy.copy(workload)
+ slave_workload.remote_host = slave
+
+ self._workload_executors.append(slave_workload)
+
+ t = Thread(target=self.execute_on_node,
+ args=(slave_workload,),
+ name="%s worker" % slave)
+ t.daemon = False
+ t.start()
+ slave_threads.append(t)
+
+ for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
+ slave_thread.join()
+ self.logger.debug("Done waiting for %s" % slave_thread)
+
+ if not scheduler.empty():
+ try:
+ scheduler.cancel(event)
+ except:
+ pass
+
+ self.logger.info("Completed run %s" %
+ self.current_workload)
+ self.workload_status[self.current_workload] = "Completed"
+ self._workload_executors = []
+ self.current_workload = None
+
+ self.logger.info("Completed workload %s" % (workload_name))
+ self.logger.info("Completed job %s" % (self.job_db.job_id))
+
+ if self.result_url is not None:
+ self.logger.info("Results can be found at %s" % self.result_url)
+
+ self.end_time = time.time()
+ self._terminated = True
+ report = {'report': json.dumps(self.metadata)}
+ self.job_db.record_workload_params(report)
+ self.broadcast_event()
+ self.unregister(data_handler.data_event)
+ self.job_db.job_id = None
+
+ def execute_on_node(self, workload):
+
+ invoker = FIOInvoker()
+ invoker.register(self.event)
+ workload.invoker = invoker
+
+ self.logger.info("Starting " + workload.fullname)
+
+ self.job_db.start_workload(workload)
+ workload.execute()
+ self.job_db.end_workload(workload)
+ invoker.unregister(self.event)
+
+ self.logger.info("Ended " + workload.fullname)
diff --git a/docker/storperf-master/storperf/utilities/__init__.py b/docker/storperf-master/storperf/utilities/__init__.py
new file mode 100644
index 0000000..73444b6
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py
new file mode 100644
index 0000000..1da869c
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/data_handler.py
@@ -0,0 +1,205 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+import os
+from storperf.db import test_results_db
+from storperf.db.graphite_db import GraphiteDB
+from storperf.db.job_db import JobDB
+from storperf.utilities import data_treatment as DataTreatment
+from storperf.utilities import dictionary
+from storperf.utilities import math as math
+from storperf.utilities import steady_state as SteadyState
+from time import sleep
+import time
+
+
+class DataHandler(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+ self.job_db = JobDB()
+
+ """
+ """
+
+ def data_event(self, executor):
+ self.logger.debug("Event received")
+
+ if executor.terminated:
+ self._push_to_db(executor)
+ else:
+ workload = '.'.join(executor.current_workload.split('.')[1:6])
+ if 'metrics' not in executor.metadata:
+ executor.metadata['metrics'] = {}
+
+ steady_state = True
+ metrics = {}
+ for metric in ('lat.mean', 'iops', 'bw'):
+ metrics[metric] = {}
+ for io_type in ('read', 'write'):
+ metrics[metric][io_type] = {}
+
+ series = self._lookup_prior_data(executor, metric, io_type)
+ series = self._convert_timestamps_to_samples(
+ executor, series)
+ steady = self._evaluate_prior_data(
+ series, executor.steady_state_samples)
+
+ self.logger.debug("Steady state for %s %s: %s"
+ % (io_type, metric, steady))
+
+ metrics[metric][io_type]['series'] = series
+ metrics[metric][io_type]['steady_state'] = steady
+ treated_data = DataTreatment.data_treatment(series)
+
+ metrics[metric][io_type]['slope'] = \
+ math.slope(treated_data['slope_data'])
+ metrics[metric][io_type]['range'] = \
+ math.range_value(treated_data['range_data'])
+ average = math.average(treated_data['average_data'])
+ metrics[metric][io_type]['average'] = average
+
+ metrics_key = '%s.%s.%s' % (workload, io_type, metric)
+ executor.metadata['metrics'][metrics_key] = average
+
+ if not steady:
+ steady_state = False
+
+ if 'report_data' not in executor.metadata:
+ executor.metadata['report_data'] = {}
+
+ if 'steady_state' not in executor.metadata:
+ executor.metadata['steady_state'] = {}
+
+ executor.metadata['report_data'][workload] = metrics
+ executor.metadata['steady_state'][workload] = steady_state
+
+ workload_name = executor.current_workload.split('.')[1]
+
+ if steady_state and not workload_name.startswith('_'):
+ executor.terminate_current_run()
+
+ def _lookup_prior_data(self, executor, metric, io_type):
+ workload = executor.current_workload
+ graphite_db = GraphiteDB()
+
+ # A bit of a hack here as Carbon might not be finished storing the
+ # data we just sent to it
+ now = int(time.time())
+ backtime = 60 * (executor.steady_state_samples + 2)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ most_recent_time = now
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+
+ delta = now - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ while (delta < 5 or (delta > 60 and delta < 120)):
+ sleep(5)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+ delta = time.time() - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ return data_series
+
+ def _convert_timestamps_to_samples(self, executor, series):
+ workload_record = self.job_db.fetch_workloads(
+ executor.current_workload)
+ start_time = int(workload_record[0][1])
+
+ normalized_series = []
+
+ for item in series:
+ elapsed = (item[0] - start_time)
+ sample_number = int(round(float(elapsed) / 60))
+ normalized_series.append([sample_number, item[1]])
+
+ return normalized_series
+
+ def _evaluate_prior_data(self, data_series, samples):
+ self.logger.debug("Data series: %s" % data_series)
+ number_of_samples = len(data_series)
+
+ if number_of_samples == 0:
+ return False
+ if (number_of_samples < samples):
+ self.logger.debug("Only %s samples, ignoring" % number_of_samples)
+ return False
+
+ return SteadyState.steady_state(data_series)
+
+ def _push_to_db(self, executor):
+ pod_name = dictionary.get_key_from_dict(executor.metadata,
+ 'pod_name',
+ 'Unknown')
+ version = dictionary.get_key_from_dict(executor.metadata,
+ 'version',
+ 'Unknown')
+ scenario = dictionary.get_key_from_dict(executor.metadata,
+ 'scenario_name',
+ 'Unknown')
+ build_tag = dictionary.get_key_from_dict(executor.metadata,
+ 'build_tag',
+ 'Unknown')
+ test_case = dictionary.get_key_from_dict(executor.metadata,
+ 'test_case',
+ 'Unknown')
+ duration = executor.end_time - executor.start_time
+
+ payload = executor.metadata
+
+ steady_state = True
+ for _, value in executor.metadata['steady_state'].items():
+ steady_state = steady_state and value
+
+ payload['timestart'] = executor.start_time
+ payload['duration'] = duration
+
+ if steady_state:
+ criteria = 'PASS'
+ else:
+ criteria = 'FAIL'
+
+ start_time = time.strftime('%Y-%m-%d %H:%M:%S',
+ time.gmtime(executor.start_time))
+
+ end_time = time.strftime('%Y-%m-%d %H:%M:%S',
+ time.gmtime(executor.end_time))
+
+ test_db = os.environ.get('TEST_DB_URL')
+ if test_db is not None:
+ self.logger.info("Pushing results to %s" % (test_db))
+ try:
+ response = test_results_db.push_results_to_db(test_db,
+ "storperf",
+ test_case,
+ start_time,
+ end_time,
+ self.logger,
+ pod_name,
+ version,
+ scenario,
+ criteria,
+ build_tag,
+ payload)
+ executor.result_url = response['href']
+ except:
+ self.logger.exception("Error pushing results into Database")
diff --git a/docker/storperf-master/storperf/utilities/data_treatment.py b/docker/storperf-master/storperf/utilities/data_treatment.py
new file mode 100644
index 0000000..2368fd9
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/data_treatment.py
@@ -0,0 +1,39 @@
+##############################################################################
+# Copyright (c) 2016 CENGN and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+
+def data_treatment(data_series):
+ """
+ This function aims at performing any necessary pre treatment on the
+ data_series passed to the steady_state function before being passed
+ under to the different math utilities (slope, range and average)
+ so the data can match the requirements of each algorithm.
+ The function returns a dictionary composed of three values that can be
+ accessed with the following keys : 'slope_data', 'range_data' and
+ 'average_data'.
+ The data_series is currently assumed to follow the pattern :
+ [[x1,y1], [x2,y2], ..., [xm,ym]]. If this pattern were to change, or
+ the input data pattern of one of the math module, this data_treatment
+ function should be the only part of the Steady State detection module
+ that would need to be modified too.
+ """
+
+ x_values = []
+ y_values = []
+ for l in data_series:
+ x_values.append(l[0])
+ y_values.append(l[1])
+
+ treated_data = {
+ 'slope_data': data_series, # No treatment necessary so far
+ 'range_data': y_values, # The y_values only
+ 'average_data': y_values
+ }
+
+ return treated_data
diff --git a/docker/storperf-master/storperf/utilities/dictionary.py b/docker/storperf-master/storperf/utilities/dictionary.py
new file mode 100644
index 0000000..95f625c
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/dictionary.py
@@ -0,0 +1,15 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+
+def get_key_from_dict(dictionary, key, default_value=None):
+ if key in dictionary:
+ return dictionary[key]
+ else:
+ return default_value
diff --git a/docker/storperf-master/storperf/utilities/math.py b/docker/storperf-master/storperf/utilities/math.py
new file mode 100644
index 0000000..8e04134
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/math.py
@@ -0,0 +1,116 @@
+##############################################################################
+# Copyright (c) 2016 CENGN and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+
+
+def slope(data_series):
+ """
+ This function implements the linear least squares algorithm described in
+ the following wikipedia article :
+ https://en.wikipedia.org/wiki/Linear_least_squares_(mathematics)
+ in the case of m equations (provided by m data points) and 2 unknown
+ variables (x and y, which represent the time and the Volume performance
+ variable being tested e.g. IOPS, latency...).
+ The data_series is currently assumed to follow the pattern :
+ [[x1,y1], [x2,y2], ..., [xm,ym]].
+ If this data pattern were to change, the data_treatement function
+ should be adjusted to ensure compatibility with the rest of the
+ Steady State Detection module.
+ """
+
+ # In the particular case of an empty data series
+ if len(data_series) == 0:
+ beta2 = None
+
+ else: # The general case
+ data_series = copy.deepcopy(data_series)
+ m = len(data_series)
+ # To make sure at least one element is a float number so the result
+ # of the algorithm be a float number
+ data_series[0][0] = float(data_series[0][0])
+
+ """
+ It consists in solving the normal equations system (2 equations,
+ 2 unknowns) by calculating the value of beta2 (slope).
+ The formula of beta1 (the y-intercept) is given as a comment in
+ case it is needed later.
+ """
+ sum_xi = 0
+ sum_xi_sq = 0
+ sum_yi_xi = 0
+ sum_yi = 0
+ for i in range(0, m):
+ xi = data_series[i][0]
+ yi = data_series[i][1]
+
+ sum_xi += xi
+ sum_xi_sq += xi**2
+ sum_yi_xi += xi * yi
+ sum_yi += yi
+
+ over = (sum_xi**2 - m * sum_xi_sq)
+ if over == 0:
+ beta2 = None # Infinite slope
+ else:
+ beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope
+ # beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if
+ # needed
+
+ return beta2
+
+
+def range_value(data_series):
+ """
+ This function implements a range algorithm that returns a float number
+ representing the range of the data_series that is passed to it.
+ The data_series being passed is assumed to follow the following data
+ pattern : [y1, y2, y3, ..., ym] where yi represents the ith
+ measuring point of the y variable. The y variable represents the
+ Volume performance being tested (e.g. IOPS, latency...).
+ If this data pattern were to change, the data_treatment function
+ should be adjusted to ensure compatibility with the rest of the
+ Steady State Dectection module.
+ The conversion of the data series from the original pattern to the
+ [y1, y2, y3, ..., ym] pattern is done outside this function
+ so the original pattern can be changed without breaking this function.
+ """
+
+ # In the particular case of an empty data series
+ if len(data_series) == 0:
+ range_value = None
+
+ else: # The general case
+ max_value = max(data_series)
+ min_value = min(data_series)
+ range_value = max_value - min_value
+
+ return range_value
+
+
+def average(data_series):
+ """
+ This function seeks to calculate the average value of the data series
+ given a series following the pattern : [y1, y2, y3, ..., ym].
+ If this data pattern were to change, the data_treatment function
+ should be adjusted to ensure compatibility with the rest of the
+ Steady State Dectection module.
+ The function returns a float number corresponding to the average of the yi.
+ """
+ m = len(data_series)
+
+ if m == 0: # In the particular case of an empty data series
+ average = None
+
+ else:
+ data_sum = 0
+ for value in data_series:
+ data_sum += value
+ average = data_sum / float(m)
+
+ return average
diff --git a/docker/storperf-master/storperf/utilities/steady_state.py b/docker/storperf-master/storperf/utilities/steady_state.py
new file mode 100644
index 0000000..13f609d
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/steady_state.py
@@ -0,0 +1,54 @@
+##############################################################################
+# Copyright (c) 2016 CENGN and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import logging
+
+from storperf.utilities import data_treatment as DataTreatment
+from storperf.utilities import math as math
+
+
+def steady_state(data_series):
+ """
+ This function seeks to detect steady state given on a measurement
+ window given the data series of that measurement window following
+ the pattern : [[x1,y1], [x2,y2], ..., [xm,ym]]. m represents the number
+ of points recorded in the measurement window, x which represents the
+ time, and y which represents the Volume performance variable being
+ tested e.g. IOPS, latency...
+ The function returns a boolean describing wether or not steady state
+ has been reached with the data that is passed to it.
+ """
+
+ logger = logging.getLogger('storperf.utilities.steady_state')
+
+ # Pre conditioning the data to match the algorithms
+ treated_data = DataTreatment.data_treatment(data_series)
+
+ # Calculating useful values invoking dedicated functions
+ slope_value = math.slope(treated_data['slope_data'])
+ range_value = math.range_value(treated_data['range_data'])
+ average_value = math.average(treated_data['average_data'])
+
+ if (slope_value is not None and range_value is not None and
+ average_value is not None):
+ # Verification of the Steady State conditions following the SNIA
+ # definition
+ range_condition = abs(range_value) <= 0.20 * abs(average_value)
+ slope_condition = abs(slope_value) <= 0.10 * abs(average_value)
+
+ steady_state = range_condition and slope_condition
+
+ logger.debug("Range %s <= %s?" % (abs(range_value),
+ (0.20 * abs(average_value))))
+ logger.debug("Slope %s <= %s?" % (abs(slope_value),
+ (0.10 * abs(average_value))))
+ logger.debug("Steady State? %s" % steady_state)
+ else:
+ steady_state = False
+
+ return steady_state
diff --git a/docker/storperf-master/storperf/utilities/thread_gate.py b/docker/storperf-master/storperf/utilities/thread_gate.py
new file mode 100644
index 0000000..38acbb1
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/thread_gate.py
@@ -0,0 +1,67 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+"""
+Creates a gate object that allows synchronization between an arbitrary
+number of callers.
+"""
+import logging
+import time
+from threading import Lock
+
+
+class FailureToReportException(Exception):
+ pass
+
+
+class ThreadGate(object):
+
+ def __init__(self, size, timeout=60):
+ self.logger = logging.getLogger(__name__)
+ self._gate_size = size
+ self._timeout = timeout
+ self._registrants = {}
+ self._creation_time = time.time()
+ self._lock = Lock()
+
+ """
+ Calling this method returns a true or false, indicating that enough
+ of the other registrants have reported in
+ """
+
+ def report(self, gate_id):
+ with self._lock:
+ now = time.time()
+ self._registrants[gate_id] = now
+ ready = True
+ self.logger.debug("Gate report for %s", gate_id)
+
+ total_missing = self._gate_size - len(self._registrants)
+ if total_missing > 0:
+ self.logger.debug("Not all registrants have reported in")
+ time_since_creation = now - self._creation_time
+ if (time_since_creation > (self._timeout * 2)):
+ self.logger.error(
+ "%s registrant(s) have never reported in",
+ total_missing)
+ raise FailureToReportException
+ return False
+
+ for k, v in self._registrants.items():
+ time_since_last_report = now - v
+ if time_since_last_report > self._timeout:
+ self.logger.debug("Registrant %s last reported %s ago",
+ k, time_since_last_report)
+ ready = False
+
+ self.logger.debug("Gate pass? %s", ready)
+
+ if ready:
+ self._registrants.clear()
+
+ return ready
diff --git a/docker/storperf-master/storperf/workloads/__init__.py b/docker/storperf-master/storperf/workloads/__init__.py
new file mode 100644
index 0000000..73334c7
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py
new file mode 100644
index 0000000..936c839
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/_base_workload.py
@@ -0,0 +1,82 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+
+
+class _base_workload(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.default_filesize = "1G"
+ self.filename = '/dev/vdb'
+ self.options = {
+ 'ioengine': 'libaio',
+ 'direct': '1',
+ 'rw': 'read',
+ 'bs': '64k',
+ 'iodepth': '1',
+ 'numjobs': '1',
+ 'loops': '20',
+ 'output-format': 'json',
+ 'status-interval': '60'
+ }
+ self.invoker = None
+ self.remote_host = None
+ self.id = None
+
+ def execute(self):
+ if self.invoker is None:
+ raise ValueError("No invoker has been set")
+
+ args = []
+ self.invoker.remote_host = self.remote_host
+ self.invoker.callback_id = self.fullname
+
+ if self.filename.startswith("/dev"):
+ self.options['size'] = "100%"
+ self.logger.debug(
+ "Profiling a device, using 100% of " + self.filename)
+ else:
+ self.options['size'] = self.default_filesize
+ self.logger.debug("Profiling a filesystem, using " +
+ self.default_filesize + " file")
+
+ self.options['filename'] = self.filename
+
+ self.setup()
+
+ for key, value in self.options.iteritems():
+ args.append('--' + key + "=" + value)
+
+ self.invoker.execute(args)
+
+ def terminate(self):
+ if self.invoker is not None:
+ self.invoker.terminate()
+
+ def setup(self):
+ pass
+
+ @property
+ def remote_host(self):
+ return str(self._remote_host)
+
+ @remote_host.setter
+ def remote_host(self, value):
+ self._remote_host = value
+
+ @property
+ def fullname(self):
+ return ("%s.%s.queue-depth.%s.block-size.%s.%s" %
+ (str(self.id),
+ self.__class__.__name__,
+ str(self.options['iodepth']),
+ str(self.options['bs']),
+ str(self.remote_host).replace(".", "-")))
diff --git a/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py b/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py
new file mode 100644
index 0000000..cce3c31
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py
@@ -0,0 +1,17 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from storperf.workloads import _base_workload
+
+
+class _ssd_preconditioning(_base_workload._base_workload):
+
+ def setup(self):
+ self.options['name'] = 'ssd_preconditioning'
+ self.options['rw'] = 'randwrite'
+ self.options['loops'] = '1'
diff --git a/docker/storperf-master/storperf/workloads/_warm_up.py b/docker/storperf-master/storperf/workloads/_warm_up.py
new file mode 100644
index 0000000..9cd268e
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/_warm_up.py
@@ -0,0 +1,17 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from storperf.workloads import _base_workload
+
+
+class _warm_up(_base_workload._base_workload):
+
+ def setup(self):
+ self.options['name'] = 'warm_up'
+ self.options['rw'] = 'write'
+ self.options['loops'] = '1'
diff --git a/docker/storperf-master/storperf/workloads/rr.py b/docker/storperf-master/storperf/workloads/rr.py
new file mode 100644
index 0000000..3823a4c
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/rr.py
@@ -0,0 +1,16 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from storperf.workloads import _base_workload
+
+
+class rr(_base_workload._base_workload):
+
+ def setup(self):
+ self.options['name'] = 'random_read'
+ self.options['rw'] = 'randread'
diff --git a/docker/storperf-master/storperf/workloads/rs.py b/docker/storperf-master/storperf/workloads/rs.py
new file mode 100644
index 0000000..511888e
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/rs.py
@@ -0,0 +1,16 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from storperf.workloads import _base_workload
+
+
+class rs(_base_workload._base_workload):
+
+ def setup(self):
+ self.options['name'] = 'sequential_read'
+ self.options['rw'] = 'read'
diff --git a/docker/storperf-master/storperf/workloads/rw.py b/docker/storperf-master/storperf/workloads/rw.py
new file mode 100644
index 0000000..f4b6979
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/rw.py
@@ -0,0 +1,18 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from storperf.workloads import _base_workload
+
+
+class rw(_base_workload._base_workload):
+
+ def setup(self):
+ self.options['name'] = 'random_readwrite'
+ self.options['rwmixread'] = '70'
+ self.options['rw'] = 'rw'
+ self.logger.debug(self.options)
diff --git a/docker/storperf-master/storperf/workloads/wr.py b/docker/storperf-master/storperf/workloads/wr.py
new file mode 100644
index 0000000..457a29a
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/wr.py
@@ -0,0 +1,16 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from storperf.workloads import _base_workload
+
+
+class wr(_base_workload._base_workload):
+
+ def setup(self):
+ self.options['name'] = 'random_write'
+ self.options['rw'] = 'randwrite'
diff --git a/docker/storperf-master/storperf/workloads/ws.py b/docker/storperf-master/storperf/workloads/ws.py
new file mode 100644
index 0000000..f37079e
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/ws.py
@@ -0,0 +1,16 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from storperf.workloads import _base_workload
+
+
+class ws(_base_workload._base_workload):
+
+ def setup(self):
+ self.options['name'] = 'sequential_write'
+ self.options['rw'] = 'write'