diff options
Diffstat (limited to 'docker/storperf-master')
59 files changed, 4582 insertions, 0 deletions
diff --git a/docker/storperf-master/requirements.pip b/docker/storperf-master/requirements.pip new file mode 100644 index 0000000..5f9b68e --- /dev/null +++ b/docker/storperf-master/requirements.pip @@ -0,0 +1,17 @@ +pyyaml==3.10 +python-neutronclient==2.6.0 +python-heatclient==0.8.0 +python-novaclient==2.28.1 +python-glanceclient==1.1.0 +python-cinderclient==1.6.0 +python-keystoneclient==1.6.0 +keystoneauth1==2.12.1 +flask==0.10 +flask_cors==3.0.2 +flask-restful==0.3.5 +flask-restful-swagger==0.19 +flask-swagger==0.2.12 +html2text==2016.1.8 +paramiko==2.0.2 +requests==2.13.0 +scp==0.10.2 diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py new file mode 100644 index 0000000..cc8d834 --- /dev/null +++ b/docker/storperf-master/rest_server.py @@ -0,0 +1,342 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import json +import logging.config +import os +import sys + +from flask import abort, Flask, request, jsonify +from flask_restful import Resource, Api, fields +from flask_restful_swagger import swagger + +from flask_cors import CORS +from storperf.storperf_master import StorPerfMaster + +app = Flask(__name__, static_url_path="") +CORS(app) +api = swagger.docs(Api(app), apiVersion='1.0') + +storperf = StorPerfMaster() + + +@swagger.model +class ConfigurationRequestModel: + resource_fields = { + 'agent_count': fields.Integer, + 'agent_flavor': fields.String, + 'agent_image': fields.String, + 'public_network': fields.String, + 'volume_size': fields.Integer + } + + +@swagger.model +class ConfigurationResponseModel: + resource_fields = { + 'agent_count': fields.Integer, + 'agent_flavor': fields.String, + 'agent_image': fields.String, + 'public_network': fields.String, + 'stack_created': fields.Boolean, + 'stack_id': fields.String, + 'volume_size': fields.Integer + } + + +class Configure(Resource): + + """Configuration API""" + + def __init__(self): + self.logger = logging.getLogger(__name__) + + @swagger.operation( + notes='Fetch the current agent configuration', + type=ConfigurationResponseModel.__name__ + ) + def get(self): + return jsonify({'agent_count': storperf.agent_count, + 'agent_flavor': storperf.agent_flavor, + 'agent_image': storperf.agent_image, + 'public_network': storperf.public_network, + 'volume_size': storperf.volume_size, + 'stack_created': storperf.is_stack_created, + 'stack_id': storperf.stack_id}) + + @swagger.operation( + notes='''Set the current agent configuration and create a stack in + the controller. Returns once the stack create is completed.''', + parameters=[ + { + "name": "configuration", + "description": '''Configuration to be set. All parameters are + optional, and will retain their previous value if not + specified. Volume size is in GB. + ''', + "required": True, + "type": "ConfigurationRequestModel", + "paramType": "body" + } + ], + type=ConfigurationResponseModel.__name__ + ) + def post(self): + if not request.json: + abort(400, "ERROR: No data specified") + + try: + if ('agent_count' in request.json): + storperf.agent_count = request.json['agent_count'] + if ('agent_flavor' in request.json): + storperf.agent_flavor = request.json['agent_flavor'] + if ('agent_image' in request.json): + storperf.agent_image = request.json['agent_image'] + if ('public_network' in request.json): + storperf.public_network = request.json['public_network'] + if ('volume_size' in request.json): + storperf.volume_size = request.json['volume_size'] + + storperf.create_stack() + if storperf.stack_id is None: + abort(400, "Stack creation failed") + + return jsonify({'agent_count': storperf.agent_count, + 'agent_flavor': storperf.agent_flavor, + 'agent_image': storperf.agent_image, + 'public_network': storperf.public_network, + 'volume_size': storperf.volume_size, + 'stack_id': storperf.stack_id}) + + except Exception as e: + abort(400, str(e)) + + @swagger.operation( + notes='Deletes the agent configuration and the stack' + ) + def delete(self): + try: + storperf.delete_stack() + except Exception as e: + abort(400, str(e)) + + +@swagger.model +class WorkloadModel: + resource_fields = { + 'target': fields.String, + 'deadline': fields.Integer, + "steady_state_samples": fields.Integer, + 'workload': fields.String, + 'queue_depths': fields.String, + 'block_sizes': fields.String + } + + +@swagger.model +class WorkloadResponseModel: + resource_fields = { + 'job_id': fields.String + } + + +class Job(Resource): + + """Job API""" + + def __init__(self): + self.logger = logging.getLogger(__name__) + + @swagger.operation( + notes='Fetch the metrics of the specified workload', + parameters=[ + { + "name": "id", + "description": "The UUID of the workload in the format " + "NNNNNNNN-NNNN-NNNN-NNNN-NNNNNNNNNNNN", + "required": True, + "type": "string", + "allowMultiple": False, + "paramType": "query" + }, + { + "name": "metrics_type", + "description": "The metrics_type of metrics to report. May be " + "metrics (default), or metadata", + "required": False, + "metrics_type": "string", + "allowMultiple": False, + "paramType": "query" + } + ], + responseMessages=[ + { + "code": 200, + "message": "Workload ID found, response in JSON format" + }, + { + "code": 404, + "message": "Workload ID not found" + } + ] + ) + def get(self): + + metrics_type = "metrics" + if request.args.get('type'): + metrics_type = request.args.get('type') + + workload_id = request.args.get('id') + + if metrics_type == "metrics": + return jsonify(storperf.fetch_results(workload_id)) + + if metrics_type == "metadata": + return jsonify(storperf.fetch_metadata(workload_id)) + + if metrics_type == "status": + return jsonify(storperf.fetch_job_status(workload_id)) + + @swagger.operation( + parameters=[ + { + "name": "body", + "description": """Start execution of a workload with the +following parameters: + +"target": The target device to profile", + +"deadline": if specified, the maximum duration in minutes +for any single test iteration. + +"workload":if specified, the workload to run. Defaults to all. + """, + "required": True, + "type": "WorkloadModel", + "paramType": "body" + } + ], + type=WorkloadResponseModel.__name__, + responseMessages=[ + { + "code": 200, + "message": "Job submitted" + }, + { + "code": 400, + "message": "Missing configuration data" + } + ] + ) + def post(self): + if not request.json: + abort(400, "ERROR: Missing configuration data") + + self.logger.info(request.json) + + try: + if ('target' in request.json): + storperf.filename = request.json['target'] + if ('deadline' in request.json): + storperf.deadline = request.json['deadline'] + if ('steady_state_samples' in request.json): + storperf.steady_state_samples = request.json[ + 'steady_state_samples'] + if ('queue_depths' in request.json): + storperf.queue_depths = request.json['queue_depths'] + if ('block_sizes' in request.json): + storperf.block_sizes = request.json['block_sizes'] + if ('workload' in request.json): + storperf.workloads = request.json['workload'] + else: + storperf.workloads = None + if ('metadata' in request.json): + metadata = request.json['metadata'] + else: + metadata = {} + + job_id = storperf.execute_workloads(metadata) + + return jsonify({'job_id': job_id}) + + except Exception as e: + abort(400, str(e)) + + @swagger.operation( + notes='Cancels the currently running workload', + responseMessages=[ + { + "code": 200, + "message": "Wordload ID found, response in JSON format" + }, + ] + ) + def delete(self): + self.logger.info("Threads: %s" % sys._current_frames()) + print sys._current_frames() + try: + return jsonify({'Slaves': storperf.terminate_workloads()}) + except Exception as e: + abort(400, str(e)) + + +@swagger.model +class QuotaModel: + + resource_fields = { + 'quota': fields.Integer + } + + +class Quota(Resource): + """Quota API""" + + @swagger.operation( + notes='''Fetch the current Cinder volume quota. This value limits + the number of volumes that can be created, and by extension, defines + the maximum number of agents that can be created for any given test + scenario''', + type=QuotaModel.__name__ + ) + def get(self): + quota = storperf.volume_quota + return jsonify({'quota': quota}) + + +def setup_logging(default_path='logging.json', + default_level=logging.INFO, env_key='LOG_CFG'): + """Setup logging configuration + """ + + path = default_path + value = os.getenv(env_key, None) + if value: + path = value + if os.path.exists(path): + with open(path, 'rt') as f: + config = json.load(f) + logging.config.dictConfig(config) + else: + logging.basicConfig(level=default_level) + + socketHandler = logging.handlers.DatagramHandler( + 'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT) + rootLogger = logging.getLogger('') + rootLogger.addHandler(socketHandler) + + +api.add_resource(Configure, "/api/v1.0/configurations") +api.add_resource(Quota, "/api/v1.0/quotas") +api.add_resource(Job, "/api/v1.0/jobs") + +if __name__ == "__main__": + setup_logging() + logging.getLogger("storperf").setLevel(logging.DEBUG) + + app.run(host='0.0.0.0', debug=True, threaded=True) diff --git a/docker/storperf-master/storperf/__init__.py b/docker/storperf-master/storperf/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/docker/storperf-master/storperf/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/storperf/carbon/__init__.py b/docker/storperf-master/storperf/carbon/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/docker/storperf-master/storperf/carbon/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/storperf/carbon/converter.py b/docker/storperf-master/storperf/carbon/converter.py new file mode 100644 index 0000000..623c144 --- /dev/null +++ b/docker/storperf-master/storperf/carbon/converter.py @@ -0,0 +1,57 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import calendar +import logging +import time + + +class Converter(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + + def convert_json_to_flat(self, json_object, prefix=None): + # Use the timestamp reported by fio, or current time if + # not present. + if 'timestamp' in json_object: + timestamp = str(json_object.pop('timestamp')) + else: + timestamp = str(calendar.timegm(time.gmtime())) + + self.flat_dictionary = {} + self.flat_dictionary['timestamp'] = timestamp + + self.resurse_to_flat_dictionary(json_object, prefix) + return self.flat_dictionary + + def resurse_to_flat_dictionary(self, json, prefix=None): + if type(json) == dict: + for k, v in json.items(): + if prefix is None: + key = k.decode("utf-8").replace(" ", "_") + else: + key = prefix + "." + k.decode("utf-8").replace(" ", "_") + if hasattr(v, '__iter__'): + self.resurse_to_flat_dictionary(v, key) + else: + self.flat_dictionary[key] = str(v).replace(" ", "_") + elif type(json) == list: + index = 0 + for v in json: + index += 1 + if hasattr(v, '__iter__'): + self.resurse_to_flat_dictionary( + v, prefix + "." + str(index)) + else: + if prefix is None: + self.flat_dictionary[index] = str(v).replace(" ", "_") + + " " + self.timestamp + else: + key = prefix + "." + index + self.flat_dictionary[key] = str(v).replace(" ", "_") diff --git a/docker/storperf-master/storperf/carbon/emitter.py b/docker/storperf-master/storperf/carbon/emitter.py new file mode 100644 index 0000000..e23dc79 --- /dev/null +++ b/docker/storperf-master/storperf/carbon/emitter.py @@ -0,0 +1,38 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import calendar +import logging +import socket +import time + + +class CarbonMetricTransmitter(): + + carbon_host = '127.0.0.1' + carbon_port = 2003 + + def __init__(self): + self.logger = logging.getLogger(__name__) + + def transmit_metrics(self, metrics): + if 'timestamp' in metrics: + metrics.pop('timestamp') + timestamp = str(calendar.timegm(time.gmtime())) + + carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + carbon_socket.connect((self.carbon_host, self.carbon_port)) + + for key, metric in metrics.items(): + message = key + " " + metric + " " + timestamp + self.logger.debug("Metric: " + message) + carbon_socket.send(message + '\n') + + carbon_socket.close() + self.logger.info("Sent metrics to carbon with timestamp %s" + % timestamp) diff --git a/docker/storperf-master/storperf/db/__init__.py b/docker/storperf-master/storperf/db/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/docker/storperf-master/storperf/db/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/storperf/db/configuration_db.py b/docker/storperf-master/storperf/db/configuration_db.py new file mode 100644 index 0000000..5b996c7 --- /dev/null +++ b/docker/storperf-master/storperf/db/configuration_db.py @@ -0,0 +1,120 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from sqlite3 import OperationalError +from threading import Lock +import logging +import sqlite3 + +db_mutex = Lock() + + +class ConfigurationDB(object): + + db_name = "StorPerfConfig.db" + + def __init__(self): + """ + Creates the StorPerfConfig.db and configuration tables on demand + """ + + self.logger = logging.getLogger(__name__) + self.logger.debug("Connecting to " + ConfigurationDB.db_name) + with db_mutex: + db = sqlite3.connect(ConfigurationDB.db_name) + + cursor = db.cursor() + try: + cursor.execute('''CREATE TABLE configuration + (configuration_name text, + key text, + value text)''') + self.logger.debug("Created configuration table") + except OperationalError: + self.logger.debug("Configuration table exists") + + cursor.execute('SELECT * FROM configuration') + db.commit() + db.close() + + def delete_configuration_value(self, configuration_name, key): + """Deletes the value associated with the given key + """ + + with db_mutex: + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() + + cursor.execute("delete from configuration where " + "configuration_name=? and key=?", + (configuration_name, key)) + + self.logger.debug("Deleted " + configuration_name + ":" + key) + + db.commit() + db.close() + + def get_configuration_value(self, configuration_name, key): + """Returns a string representation of the value stored + with this key under the given configuration name. + """ + + with db_mutex: + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() + + cursor.execute( + """select value from configuration + where configuration_name = ? + and key = ?""", + (configuration_name, key,)) + + row = cursor.fetchone() + + return_value = None + + if (row is None): + self.logger.debug( + configuration_name + ":" + key + " does not exist") + else: + self.logger.debug( + configuration_name + ":" + key + " is " + str(row[0])) + return_value = str(row[0]) + + db.close() + + return return_value + + def set_configuration_value(self, configuration_name, key, value): + """Updates or creates the key under the given configuration + name so that it holds the value specified. + """ + + if (value is None): + return self.delete_configuration_value(configuration_name, key) + + with db_mutex: + value = str(value) + + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() + + cursor.execute("delete from configuration where " + "configuration_name=? and key=?", + (configuration_name, key)) + + cursor.execute( + """insert into configuration(configuration_name, key, value) + values (?,?,?)""", (configuration_name, key, value)) + + self.logger.debug( + configuration_name + ":" + key + " set to " + value) + + db.commit() + db.close() diff --git a/docker/storperf-master/storperf/db/graphite_db.py b/docker/storperf-master/storperf/db/graphite_db.py new file mode 100644 index 0000000..c8a2d35 --- /dev/null +++ b/docker/storperf-master/storperf/db/graphite_db.py @@ -0,0 +1,63 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import json +import logging + +import requests + +from storperf.db.job_db import JobDB + + +class GraphiteDB(object): + + def __init__(self): + """ + """ + self._job_db = JobDB() + self.logger = logging.getLogger(__name__) + + def fetch_series(self, workload, metric, io_type, time, duration): + + series = [] + end = time + start = end - duration + + request = ("http://127.0.0.1:8000/render/?target=" + "averageSeries(%s.*.jobs.1.%s.%s)" + "&format=json" + "&from=%s" + "&until=%s" % + (workload, io_type, metric, + start, end)) + self.logger.debug("Calling %s" % (request)) + + response = requests.get(request) + if (response.status_code == 200): + series = self._series_results(json.loads(response.content)) + + return series + + def _series_results(self, results): + + series = [] + + for item in results: + datapoints = item['datapoints'] + for datapoint in datapoints: + if datapoint[0] is not None: + series.append([datapoint[1], datapoint[0]]) + + return series + + def make_fullname_pattern(self, workload): + parts = workload.split('.') + wildcards_needed = 7 - len(parts) + fullname = workload + (".*" * wildcards_needed) + return fullname diff --git a/docker/storperf-master/storperf/db/job_db.py b/docker/storperf-master/storperf/db/job_db.py new file mode 100644 index 0000000..3308fa8 --- /dev/null +++ b/docker/storperf-master/storperf/db/job_db.py @@ -0,0 +1,259 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import calendar +import json +import logging +from sqlite3 import OperationalError +import sqlite3 +from threading import Lock +import time +import uuid + + +db_mutex = Lock() + + +class JobDB(object): + + db_name = "StorPerfJob.db" + + def __init__(self): + """ + Creates the StorPerfJob.db and jobs tables on demand + """ + + self.logger = logging.getLogger(__name__) + self.logger.debug("Connecting to " + JobDB.db_name) + self.job_id = None + + with db_mutex: + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + try: + cursor.execute('''CREATE TABLE jobs + (job_id text, + workload text, + start text, + end text)''') + self.logger.debug("Created job table") + except OperationalError: + self.logger.debug("Job table exists") + + try: + cursor.execute('''CREATE TABLE job_params + (job_id text, + param text, + value text)''') + self.logger.debug("Created job_params table") + except OperationalError: + self.logger.debug("Job params table exists") + + try: + cursor.execute('''CREATE TABLE job_summary + (job_id text, + summary text)''') + self.logger.debug("Created job summary table") + except OperationalError: + self.logger.debug("Job summary table exists") + + cursor.execute('SELECT * FROM jobs') + cursor.execute('SELECT * FROM job_params') + db.commit() + db.close() + + def create_job_id(self): + """ + Returns a job id that is guaranteed to be unique in this + StorPerf instance. + """ + with db_mutex: + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + + self.job_id = str(uuid.uuid4()) + row = cursor.execute( + "select * from jobs where job_id = ?", (self.job_id,)) + + while (row.fetchone() is not None): + self.logger.info("Duplicate job id found, regenerating") + self.job_id = str(uuid.uuid4()) + row = cursor.execute( + "select * from jobs where job_id = ?", (self.job_id,)) + + cursor.execute( + "insert into jobs(job_id) values (?)", (self.job_id,)) + self.logger.debug("Reserved job id " + self.job_id) + db.commit() + db.close() + + def start_workload(self, workload): + """ + Records the start time for the given workload + """ + + workload_name = workload.fullname + + if (self.job_id is None): + self.create_job_id() + + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + + now = str(calendar.timegm(time.gmtime())) + + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (self.job_id, workload_name,)) + + if (row.fetchone() is None): + cursor.execute( + """insert into jobs + (job_id, + workload, + start) + values (?, ?, ?)""", + (self.job_id, + workload_name, + now,)) + else: + self.logger.warn("Duplicate start time for workload %s" + % workload_name) + cursor.execute( + """update jobs set + job_id = ?, + start = ? + where workload = ?""", + (self.job_id, + now, + workload_name,)) + + db.commit() + db.close() + + def end_workload(self, workload): + """ + Records the end time for the given workload + """ + if (self.job_id is None): + self.create_job_id() + + workload_name = workload.fullname + + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + now = str(calendar.timegm(time.gmtime())) + + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (self.job_id, workload_name,)) + + if (row.fetchone() is None): + self.logger.warn("No start time recorded for workload %s" + % workload_name) + cursor.execute( + """insert into jobs + (job_id, + workload, + start, + end) + values (?, ?, ?, ?)""", + (self.job_id, + workload_name, + now, + now)) + else: + cursor.execute( + """update jobs set + job_id = ?, + end = ? + where workload = ?""", + (self.job_id, + now, + workload_name,)) + + db.commit() + db.close() + + def fetch_workloads(self, workload): + workload_prefix = workload + "%" + workload_executions = [] + + with db_mutex: + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + cursor.execute("""select workload, start, end + from jobs where workload like ?""", + (workload_prefix,)) + + while (True): + row = cursor.fetchone() + if (row is None): + break + workload_execution = [row[0], row[1], row[2]] + workload_executions.append(workload_execution) + db.close() + + return workload_executions + + def record_workload_params(self, params): + """ + """ + if (self.job_id is None): + self.create_job_id() + + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + for param, value in params.iteritems(): + cursor.execute( + """insert into job_params + (job_id, + param, + value) + values (?, ?, ?)""", + (self.job_id, + param, + value,)) + db.commit() + db.close() + + def fetch_workload_params(self, job_id): + """ + """ + params = {} + with db_mutex: + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + + cursor.execute( + "select param, value from job_params where job_id = ?", + (job_id,)) + + while (True): + row = cursor.fetchone() + if (row is None): + break + try: + data = json.loads(row[1]) + except: + data = row[1] + params[row[0]] = data + db.close() + return params diff --git a/docker/storperf-master/storperf/db/test_results_db.py b/docker/storperf-master/storperf/db/test_results_db.py new file mode 100644 index 0000000..a2f7038 --- /dev/null +++ b/docker/storperf-master/storperf/db/test_results_db.py @@ -0,0 +1,61 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import json +import os +import requests + + +def get_installer_type(logger=None): + """ + Get installer type (fuel, apex, joid, compass) + """ + try: + installer = os.environ['INSTALLER_TYPE'] + except KeyError: + if logger: + logger.error("Impossible to retrieve the installer type") + installer = "Unknown_installer" + + return installer + + +def push_results_to_db(db_url, project, case_name, + test_start, test_stop, logger, pod_name, + version, scenario, criteria, build_tag, details): + """ + POST results to the Result target DB + """ + url = db_url + "/results" + installer = get_installer_type(logger) + + params = {"project_name": project, "case_name": case_name, + "pod_name": pod_name, "installer": installer, + "version": version, "scenario": scenario, "criteria": criteria, + "build_tag": build_tag, "start_date": test_start, + "stop_date": test_stop, "details": details} + + headers = {'Content-Type': 'application/json'} + try: + if logger: + jsonified_params = json.dumps(params) + logger.info("Pushing results to %s" % (url)) + logger.debug("Parameters: %s" % jsonified_params[:1024]) + r = requests.post(url, data=jsonified_params, headers=headers) + if logger: + logger.debug(r) + logger.debug(r.status_code) + logger.debug(r.content) + return json.loads(r.content) + except Exception, e: + logger.error("Error [push_results_to_db('%s', '%s', '%s', " + + "'%s', '%s', '%s', '%s', '%s', '%s')]:" % + (db_url, project, case_name, pod_name, version, + scenario, criteria, build_tag, details), e) + return None diff --git a/docker/storperf-master/storperf/fio/__init__.py b/docker/storperf-master/storperf/fio/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/docker/storperf-master/storperf/fio/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py new file mode 100644 index 0000000..106696d --- /dev/null +++ b/docker/storperf-master/storperf/fio/fio_invoker.py @@ -0,0 +1,153 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import json +import logging +from threading import Thread +import paramiko + + +class FIOInvoker(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + self.event_listeners = set() + self.event_callback_ids = set() + self._remote_host = None + self.callback_id = None + self.terminated = False + + @property + def remote_host(self): + return self._remote_host + + @remote_host.setter + def remote_host(self, value): + self._remote_host = value + self.logger = logging.getLogger(__name__ + ":" + value) + + def register(self, event_listener): + self.event_listeners.add(event_listener) + + def unregister(self, event_listener): + self.event_listeners.discard(event_listener) + + def stdout_handler(self, stdout): + self.logger.debug("Started") + self.json_body = "" + try: + for line in iter(stdout.readline, b''): + if line.startswith("fio"): + line = "" + continue + self.json_body += line + try: + if line == "}\n": + json_metric = json.loads(self.json_body) + self.json_body = "" + + if not self.terminated: + for event_listener in self.event_listeners: + try: + self.logger.debug( + "Event listener callback") + event_listener( + self.callback_id, json_metric) + except Exception, e: + self.logger.exception( + "Notifying listener %s: %s", + self.callback_id, e) + self.logger.debug( + "Event listener callback complete") + except Exception, e: + self.logger.error("Error parsing JSON: %s", e) + except IOError: + pass # We might have read from the closed socket, ignore it + + stdout.close() + self.logger.debug("Finished") + + def stderr_handler(self, stderr): + self.logger.debug("Started") + for line in iter(stderr.readline, b''): + self.logger.error("FIO Error: %s", line.rstrip()) + + # Sometime, FIO gets stuck and will give us this message: + # fio: job 'sequential_read' hasn't exited in 60 seconds, + # it appears to be stuck. Doing forceful exit of this job. + # A second killall of fio will release it stuck process. + + if 'it appears to be stuck' in line: + self.terminate() + + stderr.close() + self.logger.debug("Finished") + + def execute(self, args=[]): + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(self.remote_host, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + command = "sudo ./fio " + ' '.join(args) + self.logger.debug("Remote command: %s" % command) + + chan = ssh.get_transport().open_session(timeout=None) + chan.settimeout(None) + chan.exec_command(command) + stdout = chan.makefile('r', -1) + stderr = chan.makefile_stderr('r', -1) + + tout = Thread(target=self.stdout_handler, args=(stdout,), + name="%s stdout" % self._remote_host) + tout.daemon = True + tout.start() + + terr = Thread(target=self.stderr_handler, args=(stderr,), + name="%s stderr" % self._remote_host) + terr.daemon = True + terr.start() + + self.logger.info("Started fio on " + self.remote_host) + exit_status = chan.recv_exit_status() + self.logger.info("Finished fio on %s with exit code %s" % + (self.remote_host, exit_status)) + + stdout.close() + stderr.close() + + self.logger.debug("Joining stderr handler") + terr.join() + self.logger.debug("Joining stdout handler") + tout.join() + self.logger.debug("Ended") + + def terminate(self): + self.logger.debug("Terminating fio on " + self.remote_host) + self.terminated = True + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(self.remote_host, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + command = "sudo killall fio" + + self.logger.debug("Executing on %s: %s" % (self.remote_host, command)) + (_, stdout, stderr) = ssh.exec_command(command) + + for line in stdout.readlines(): + self.logger.debug(line.strip()) + for line in stderr.readlines(): + self.logger.error(line.strip()) + + stdout.close() + stderr.close() diff --git a/docker/storperf-master/storperf/logging.json b/docker/storperf-master/storperf/logging.json new file mode 100644 index 0000000..2a0bbce --- /dev/null +++ b/docker/storperf-master/storperf/logging.json @@ -0,0 +1,51 @@ +{ + "version": 1, + "disable_existing_loggers": false, + "formatters": { + "simple": { + "format": "%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s" + } + }, + + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": "DEBUG", + "formatter": "simple", + "stream": "ext://sys.stdout" + }, + + "file_handler": { + "class": "logging.handlers.RotatingFileHandler", + "level": "DEBUG", + "formatter": "simple", + "filename": "storperf.log", + "maxBytes": 10485760, + "backupCount": 20, + "encoding": "utf8" + }, + + "error_file_handler": { + "class": "logging.handlers.RotatingFileHandler", + "level": "ERROR", + "formatter": "simple", + "filename": "errors.log", + "maxBytes": 10485760, + "backupCount": 20, + "encoding": "utf8" + } + }, + + "loggers": { + "": { + "level": "INFO", + "handlers": ["console", "file_handler", "error_file_handler"] + }, + "storperf": { + "level": "DEBUG" + }, + "storperf.carbon.emitter": { + "level": "INFO" + } + } +} diff --git a/docker/storperf-master/storperf/resources/hot/agent-group.yaml b/docker/storperf-master/storperf/resources/hot/agent-group.yaml new file mode 100644 index 0000000..c758ecd --- /dev/null +++ b/docker/storperf-master/storperf/resources/hot/agent-group.yaml @@ -0,0 +1,106 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +heat_template_version: 2013-05-23 + +parameters: + public_network: + type: string + constraints: + - custom_constraint: neutron.network + agent_flavor: + type: string + default: "storperf" + agent_image: + type: string + default: 'StorPerf Ubuntu 14.04' + volume_size: + type: number + description: Size of the volume to be created. + default: 1 + constraints: + - range: { min: 1, max: 1024 } + description: must be between 1 and 1024 Gb. + agent_count: + type: number + default: 1 + constraints: + - range: { min: 1, max: 512 } + description: must be between 1 and 512 agents. + + +resources: + slaves: + type: OS::Heat::ResourceGroup + depends_on: [storperf_subnet, storperf_network_router_interface, + storperf_open_security_group, storperf_key_pair] + properties: + count: {get_param: agent_count} + resource_def: { + type: "storperf-agent.yaml", + properties: { + public_network: {get_param: public_network}, + agent_network: {get_resource: storperf_network}, + flavor: {get_param: agent_flavor}, + image: {get_param: agent_image}, + storperf_open_security_group: {get_resource: storperf_open_security_group}, + key_name: {get_resource: storperf_key_pair}, + volume_size: {get_param: volume_size} + } + } + + storperf_network: + type: OS::Neutron::Net + properties: + name: storperf-network + + storperf_subnet: + type: OS::Neutron::Subnet + properties: + network_id: { get_resource: storperf_network } + cidr: 172.16.0.0/16 + gateway_ip: 172.16.0.1 + + storperf_network_router: + type: OS::Neutron::Router + properties: + external_gateway_info: + network: { get_param: public_network } + + storperf_network_router_interface: + type: OS::Neutron::RouterInterface + properties: + router_id: { get_resource: storperf_network_router } + subnet_id: { get_resource: storperf_subnet } + + storperf_key_pair: + type: OS::Nova::KeyPair + properties: + save_private_key: true + name: storperf_agent_keypair + + storperf_open_security_group: + type: OS::Neutron::SecurityGroup + properties: + description: An open security group to allow all access to the StorPerf slaves + rules: + - remote_ip_prefix: 0.0.0.0/0 + protocol: tcp + port_range_min: 22 + port_range_max: 22 + - remote_ip_prefix: 0.0.0.0/0 + protocol: icmp + + + +outputs: + slave_ips: { + description: "Slave addresses", + value: { get_attr: [ slaves, storperf_agent_ip] } + } diff --git a/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml b/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml new file mode 100644 index 0000000..7bf8b4d --- /dev/null +++ b/docker/storperf-master/storperf/resources/hot/storperf-agent.yaml @@ -0,0 +1,99 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +heat_template_version: 2013-05-23 + +parameters: + flavor: + type: string + default: storperf + image: + type: string + default: 'Ubuntu 16.04' + key_name: + type: string + default: StorPerf + username: + type: string + default: storperf + storperf_open_security_group: + type: string + volume_size: + type: number + description: Size of the volume to be created. + default: 1 + constraints: + - range: { min: 1, max: 1024 } + description: must be between 1 and 1024 Gb. + agent_network: + type: string + constraints: + - custom_constraint: neutron.network + public_network: + type: string + constraints: + - custom_constraint: neutron.network +resources: + + storperf_agent: + type: "OS::Nova::Server" + properties: + name: storperf-agent + image: { get_param: image } + flavor: { get_param: flavor } + key_name: { get_param: key_name } + networks: + - port: { get_resource: storperf_agent_port } + user_data: { get_resource: storperf_agent_config } + user_data_format: RAW + + storperf_agent_config: + type: "OS::Heat::CloudConfig" + properties: + cloud_config: + users: + - name: { get_param: username } + groups: users + shell: /bin/bash + sudo: "ALL=(ALL) NOPASSWD:ALL" + ssh_authorized_keys: + - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07 + - default + package_update: false + package_upgrade: false + manage_etc_hosts: localhost + + storperf_agent_port: + type: "OS::Neutron::Port" + properties: + network_id: { get_param: agent_network } + security_groups: + - { get_param: storperf_open_security_group } + + storperf_floating_ip: + type: OS::Neutron::FloatingIP + properties: + floating_network_id: { get_param: public_network } + port_id: { get_resource: storperf_agent_port } + + agent_volume: + type: OS::Cinder::Volume + properties: + size: { get_param: volume_size } + + agent_volume_att: + type: OS::Cinder::VolumeAttachment + properties: + instance_uuid: { get_resource: storperf_agent } + volume_id: { get_resource: agent_volume} + +outputs: + storperf_agent_ip: + description: The floating IP address of the agent on the public network + value: { get_attr: [ storperf_floating_ip, floating_ip_address ] } diff --git a/docker/storperf-master/storperf/resources/ssh/storperf_rsa b/docker/storperf-master/storperf/resources/ssh/storperf_rsa new file mode 100644 index 0000000..fb8c714 --- /dev/null +++ b/docker/storperf-master/storperf/resources/ssh/storperf_rsa @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpQIBAAKCAQEAxG5w4qmfEY0CScyT39O+CeNV5WBA/s2Ac2lGeVJJFKrl88ps +lJqFmhQVyT/lz7uDbOOFP0vOtePicRpPerY1HSZSIy9AIPBz+ii7UzQmm7nCnBAE +qYCSFVqibxNxPbqWSyBIkxrBfCPEuu09qqtSdbr7ccnmZso5uzTGpo8hVxOd3YUd +hB6krrciitQiu+DZEWPcK96llCLd+ZC3SklUn66+IDjonAwC2yEbu+7JByL647fs +CtVVqYQXTIOsrbquZiBOqY1gFNxsJeGk8TlNM+NwEcwi6vB1X/LP3So2DI/KZTam +iH3HjG2hFKqTR7/8W77RjRuzdGgf2uKZFEC9OwIDAQABAoIBAQC7J6byrz5Z0Io/ +mmXCOtK0RSAQHfePTml2jPWSnm32/SV/dHyj0d49gamISBNEK5r64oSQXEAlWWzk +6naTqotMrLhRwbFOMQuva6OfkO7ALOtZGoH2pgOJyQM+5b1dXSvZhHbhgfqbttC4 +cVXyCK3WckMklYOqqD79OTmUdIcFmILoYuMbIuMBtPukLgOel0wwTboegqDGg3aq +Kmnq+Y79Z2FG2Xn1NGllSCLPMAKJpODNW1E1H8JqvV7gmC/SUItRsvcLSg6pr2vB +eRhjLlczpnOK9pzHoul/qe6GR9RG/FoWl9c5uVeq3OFxqUQplwmxNVTQ+vJplJM5 +WFbsDZRhAoGBAPw5cpa2d4AuPxFJPnsrlhAAxtK/+Jl1Hi/1ioA5lggSe55xHdvP +dxeh4XJ8u4jnjCDdrmqHF0K578DoaCCRz9UT7P2JV2QCECPQgCvpkgEG344ovHwA +VI2j6+m1zKA6yJ2BEr/t3pvDrjoOtBHZaPMdJBHi53YleZvecxDlODT5AoGBAMdf +MDP6np1Jr/3NTJSYcFQGeqNA3S++aNVJAM+rsQk8rZLttvyiinIK+NUgMbGVMtsH ++t4HbJrCpX4cLzmDKDuRXGK8z4n3/jyjID7WDRPm5te+yqD2Zrv6oAal4DoxlGYj +EHftbWKwGwgktWKQbdV+7R0ZHwNgPK3cn3twnrTTAoGAIrDsF9qk+RZjSkdetqY6 +D51ru1T4JnM7YbFOCXDiN94C7rn0N2WDpdZ4Ib0SNjRSy7px4OkPw/e7CDdvCvvD +MDV7ZSvcvz6hZaup1WBc2pNNcEoeEpghCRJAwnZk3Kz5JuC36XoYIih58DZRghr7 +GmUpruQcnd1tqoigHvTIFFECgYEAk67dJAPHrrdA2H5U9dWdj4BlI70Omykuup8j +LLH/p9n8sVPJMoasiyG5hRYd+W+NhlX47LMPLex1bl8uVCnliYp5puI50feTqhMj +9afVdCKcaL/5lRYwr5pNI9+Ho1PKm4Xp0wxa9LmCrJuUiPh3g6hLuDw9juCg0iEV +OfkIduUCgYEA+924bfPhC7mNURBsQHaGSqlRbCX1KmU8EnmMJuVsPW8N9mN3/KU9 +0QFulljh1QNiByK5Iq4J0Q62OFbb1k8POB0ensZwfdIX/QoyhWGXz+DoUOLSDY6N +tlIG1mTYJCltc1rajPAtKdLv/PZeJS6NWp1y6T23IVQ08+HzCfxYwRA= +-----END RSA PRIVATE KEY----- diff --git a/docker/storperf-master/storperf/resources/ssh/storperf_rsa.pub b/docker/storperf-master/storperf/resources/ssh/storperf_rsa.pub new file mode 100644 index 0000000..cdc8cb4 --- /dev/null +++ b/docker/storperf-master/storperf/resources/ssh/storperf_rsa.pub @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07 jenkins@storperf-openstack diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py new file mode 100644 index 0000000..fb3e269 --- /dev/null +++ b/docker/storperf-master/storperf/storperf_master.py @@ -0,0 +1,448 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from datetime import datetime +import logging +import os +import socket +from storperf.db.configuration_db import ConfigurationDB +from storperf.db.job_db import JobDB +from storperf.test_executor import TestExecutor +from threading import Thread +from time import sleep + +from cinderclient import client as cinderclient +import heatclient.client as heatclient +from keystoneauth1 import loading +from keystoneauth1 import session +import paramiko +from scp import SCPClient + + +class ParameterError(Exception): + """ """ + + +class StorPerfMaster(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + + self.configuration_db = ConfigurationDB() + self.job_db = JobDB() + + template_file = open("storperf/resources/hot/agent-group.yaml") + self._agent_group_hot = template_file.read() + template_file = open("storperf/resources/hot/storperf-agent.yaml") + self._agent_resource_hot = template_file.read() + self._hot_files = { + 'storperf-agent.yaml': self._agent_resource_hot + } + self.logger.debug( + "Loaded agent-group template as: " + self._agent_group_hot) + self.logger.debug( + "Loaded agent-resource template as: " + self._agent_resource_hot) + + self._cinder_client = None + self._heat_client = None + self._test_executor = TestExecutor() + self._last_openstack_auth = datetime.now() + + @property + def volume_size(self): + value = self.configuration_db.get_configuration_value( + 'stack', + 'volume_size') + if (value is None): + self.volume_size = 1 + value = 1 + return int(value) + + @volume_size.setter + def volume_size(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change volume size after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'volume_size', + value) + + @property + def agent_count(self): + value = self.configuration_db.get_configuration_value( + 'stack', + 'agent_count') + + if (value is None): + self.agent_count = 1 + value = 1 + return int(value) + + @agent_count.setter + def agent_count(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change agent count after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'agent_count', + value) + + @property + def agent_image(self): + value = self.configuration_db.get_configuration_value( + 'stack', + 'agent_image') + + if (value is None): + value = 'Ubuntu 14.04' + self.agent_image = value + + return value + + @agent_image.setter + def agent_image(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change agent image after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'agent_image', + value) + + @property + def public_network(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'public_network') + + @public_network.setter + def public_network(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change public network after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'public_network', + value) + + @property + def agent_flavor(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'agent_flavor') + + @agent_flavor.setter + def agent_flavor(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change flavor after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'agent_flavor', + value) + + @property + def stack_id(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'stack_id') + + @stack_id.setter + def stack_id(self, value): + self.configuration_db.set_configuration_value( + 'stack', + 'stack_id', + value) + + @property + def volume_quota(self): + self._attach_to_openstack() + quotas = self._cinder_client.quotas.get( + os.environ.get('OS_TENANT_ID')) + return int(quotas.volumes) + + @property + def filename(self): + return self._test_executor.filename + + @filename.setter + def filename(self, value): + self._test_executor.filename = value + + @property + def deadline(self): + return self._test_executor.deadline + + @deadline.setter + def deadline(self, value): + self._test_executor.deadline = value + + @property + def steady_state_samples(self): + return self._test_executor.steady_state_samples + + @steady_state_samples.setter + def steady_state_samples(self, value): + self._test_executor.steady_state_samples = value + + @property + def queue_depths(self): + return self._test_executor.queue_depths + + @queue_depths.setter + def queue_depths(self, value): + self._test_executor.queue_depths = value + + @property + def block_sizes(self): + return self._test_executor.block_sizes + + @block_sizes.setter + def block_sizes(self, value): + self._test_executor.block_sizes = value + + @property + def is_stack_created(self): + if (self.stack_id is not None): + self._attach_to_openstack() + + stack = self._heat_client.stacks.get(self.stack_id) + status = getattr(stack, 'stack_status') + + self.logger.info("Status=" + status) + if (status == u'CREATE_COMPLETE'): + return True + + return False + + @property + def workloads(self): + return self.configuration_db.get_configuration_value( + 'workload', + 'workloads') + + @workloads.setter + def workloads(self, value): + self._test_executor.register_workloads(value) + + self.configuration_db.set_configuration_value( + 'workload', + 'workloads', + str(self._test_executor.workload_modules)) + + def create_stack(self): + if (self.stack_id is not None): + raise ParameterError("ERROR: Stack has already been created") + + self._attach_to_openstack() + volume_quota = self.volume_quota + if (volume_quota > 0 and self.agent_count > volume_quota): + message = "ERROR: Volume quota too low: " + \ + str(self.agent_count) + " > " + str(self.volume_quota) + raise ParameterError(message) + + self.logger.debug("Creating stack") + stack = self._heat_client.stacks.create( + stack_name="StorPerfAgentGroup", + template=self._agent_group_hot, + files=self._hot_files, + parameters=self._make_parameters()) + + self.stack_id = stack['stack']['id'] + + while True: + stack = self._heat_client.stacks.get(self.stack_id) + status = getattr(stack, 'stack_status') + self.logger.debug("Stack status=%s" % (status,)) + if (status == u'CREATE_COMPLETE'): + return True + if (status == u'DELETE_COMPLETE'): + self.stack_id = None + return True + if (status == u'CREATE_FAILED'): + sleep(5) + self._heat_client.stacks.delete(stack_id=self.stack_id) + sleep(2) + + def delete_stack(self): + if (self.stack_id is None): + raise ParameterError("ERROR: Stack does not exist") + + self._attach_to_openstack() + while True: + stack = self._heat_client.stacks.get(self.stack_id) + status = getattr(stack, 'stack_status') + self.logger.debug("Stack status=%s" % (status,)) + if (status == u'CREATE_COMPLETE'): + self._heat_client.stacks.delete(stack_id=self.stack_id) + if (status == u'DELETE_COMPLETE'): + self.stack_id = None + return True + if (status == u'DELETE_FAILED'): + sleep(5) + self._heat_client.stacks.delete(stack_id=self.stack_id) + sleep(2) + + def execute_workloads(self, metadata={}): + if (self.stack_id is None): + raise ParameterError("ERROR: Stack does not exist") + + self._attach_to_openstack() + + stack = self._heat_client.stacks.get(self.stack_id) + outputs = getattr(stack, 'outputs') + slaves = outputs[0]['output_value'] + + setup_threads = [] + + for slave in slaves: + t = Thread(target=self._setup_slave, args=(slave,)) + setup_threads.append(t) + t.start() + + for thread in setup_threads: + thread.join() + + self._test_executor.slaves = slaves + + params = metadata + params['agent_count'] = self.agent_count + params['public_network'] = self.public_network + params['volume_size'] = self.volume_size + job_id = self._test_executor.execute(params) + + return job_id + + def terminate_workloads(self): + return self._test_executor.terminate() + + def fetch_results(self, job_id): + if self._test_executor.job_db.job_id == job_id: + return self._test_executor.metadata['metrics'] + + workload_params = self.job_db.fetch_workload_params(job_id) + if 'report' in workload_params: + report = workload_params['report'] + return report['metrics'] + return {} + + def fetch_metadata(self, job_id): + return self.job_db.fetch_workload_params(job_id) + + def fetch_job_status(self, job_id): + return self._test_executor.execution_status(job_id) + + def _setup_slave(self, slave): + logger = logging.getLogger(__name__ + ":" + slave) + + logger.info("Initializing slave at " + slave) + + logger.debug("Checking if slave " + slave + " is alive") + + alive = False + timer = 10 + while not alive: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = s.connect_ex((slave, 22)) + s.close() + + if result: + alive = False + sleep(1) + timer -= 1 + if timer == 0: + logger.debug("Still waiting for slave " + slave) + timer = 10 + else: + alive = True + logger.debug("Slave " + slave + " is alive and ready") + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(slave, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + scp = SCPClient(ssh.get_transport()) + logger.debug("Transferring libaio.so.1 to %s" % slave) + scp.put('/lib/x86_64-linux-gnu/libaio.so.1', '~/') + logger.debug("Transferring fio to %s" % slave) + scp.put('/usr/local/bin/fio', '~/') + + cmd = 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1' + logger.debug("Executing on %s: %s" % (slave, cmd)) + (_, stdout, stderr) = ssh.exec_command(cmd) + + for line in stdout.readlines(): + logger.debug(line.strip()) + for line in stderr.readlines(): + logger.error(line.strip()) + + def _make_parameters(self): + heat_parameters = {} + heat_parameters['public_network'] = self.public_network + heat_parameters['agent_count'] = self.agent_count + heat_parameters['volume_size'] = self.volume_size + heat_parameters['agent_image'] = self.agent_image + heat_parameters['agent_flavor'] = self.agent_flavor + return heat_parameters + + def _attach_to_openstack(self): + + time_since_last_auth = datetime.now() - self._last_openstack_auth + + if (self._heat_client is None or + time_since_last_auth.total_seconds() > 600): + self._last_openstack_auth = datetime.now() + + creds = { + "username": os.environ.get('OS_USERNAME'), + "password": os.environ.get('OS_PASSWORD'), + "auth_url": os.environ.get('OS_AUTH_URL'), + "project_domain_id": + os.environ.get('OS_PROJECT_DOMAIN_ID'), + "project_domain_name": + os.environ.get('OS_PROJECT_DOMAIN_NAME'), + "project_id": os.environ.get('OS_PROJECT_ID'), + "project_name": os.environ.get('OS_PROJECT_NAME'), + "tenant_name": os.environ.get('OS_TENANT_NAME'), + "tenant_id": os.environ.get("OS_TENANT_ID"), + "user_domain_id": os.environ.get('OS_USER_DOMAIN_ID'), + "user_domain_name": os.environ.get('OS_USER_DOMAIN_NAME') + } + + self.logger.debug("Creds: %s" % creds) + + loader = loading.get_plugin_loader('password') + auth = loader.load_from_options(**creds) + sess = session.Session(auth=auth) + + self.logger.debug("Looking up orchestration endpoint") + heat_endpoint = sess.get_endpoint(auth=auth, + service_type="orchestration", + endpoint_type='publicURL') + + self.logger.debug("Orchestration endpoint is %s" % heat_endpoint) + token = sess.get_token(auth=auth) + + self._heat_client = heatclient.Client( + "1", + endpoint=heat_endpoint, + token=token) + + self.logger.debug("Creating cinder client") + self._cinder_client = cinderclient.Client("2", session=sess) + self.logger.debug("OpenStack authentication complete") diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py new file mode 100644 index 0000000..b2d5914 --- /dev/null +++ b/docker/storperf-master/storperf/test_executor.py @@ -0,0 +1,326 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import copy +import imp +import json +import logging +from os import listdir +import os +from os.path import isfile, join +import sched +from storperf.carbon.converter import Converter +from storperf.carbon.emitter import CarbonMetricTransmitter +from storperf.db.job_db import JobDB +from storperf.fio.fio_invoker import FIOInvoker +from storperf.utilities.data_handler import DataHandler +from storperf.utilities.thread_gate import ThreadGate +from threading import Thread +import time + + +class UnknownWorkload(Exception): + pass + + +class TestExecutor(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + self.workload_modules = [] + self.filename = None + self.deadline = None + self.steady_state_samples = 10 + self.metadata = {} + self.start_time = None + self.end_time = None + self.current_workload = None + self.workload_status = {} + self.result_url = None + self._queue_depths = [1, 4, 8] + self._block_sizes = [512, 4096, 16384] + self.event_listeners = set() + self.metrics_converter = Converter() + self.metrics_emitter = CarbonMetricTransmitter() + self.prefix = None + self.job_db = JobDB() + self._slaves = [] + self._terminated = False + self._workload_executors = [] + self._workload_thread = None + self._thread_gate = None + + @property + def slaves(self): + return self._slaves + + @slaves.setter + def slaves(self, slaves): + self.logger.debug("Set slaves to: " + str(slaves)) + self._slaves = slaves + + @property + def queue_depths(self): + return ','.join(self._queue_depths) + + @queue_depths.setter + def queue_depths(self, queue_depths): + self.logger.debug("Set queue_depths to: " + str(queue_depths)) + self._queue_depths = queue_depths.split(',') + + @property + def block_sizes(self): + return ','.join(self._block_sizes) + + @property + def terminated(self): + return self._terminated + + @block_sizes.setter + def block_sizes(self, block_sizes): + self.logger.debug("Set block_sizes to: " + str(block_sizes)) + self._block_sizes = block_sizes.split(',') + + def register(self, event_listener): + self.event_listeners.add(event_listener) + + def unregister(self, event_listener): + self.event_listeners.discard(event_listener) + + def event(self, callback_id, metric): + carbon_metrics = self.metrics_converter.convert_json_to_flat( + metric, + callback_id) + + self.metrics_emitter.transmit_metrics(carbon_metrics) + + if self._thread_gate.report(callback_id): + self.broadcast_event() + + def broadcast_event(self): + for event_listener in self.event_listeners: + try: + self.logger.debug("Notifying event listener %s", + event_listener) + event_listener(self) + except Exception, e: + self.logger.exception("While notifying listener %s", e) + + def register_workloads(self, workloads): + self.workload_modules = [] + + if (workloads is None or len(workloads) == 0): + workload_dir = os.path.normpath( + os.path.join(os.path.dirname(__file__), "workloads")) + + workload_files = [ + f for f in listdir(workload_dir) + if isfile(join(workload_dir, f))] + + workloads = [] + + for filename in workload_files: + mname, _ = os.path.splitext(filename) + if (not mname.startswith('_')): + workloads.append(mname) + else: + workloads = workloads.split(',') + + for workload in workloads: + try: + workload_module = self.load_from_file("workloads/" + + workload + ".py") + self.logger.debug("Found: " + str(workload_module)) + if(workload_module is None): + raise UnknownWorkload( + "ERROR: Unknown workload: " + workload) + if workload_module not in self.workload_modules: + self.workload_modules.append(workload_module) + except ImportError, err: + raise UnknownWorkload("ERROR: " + str(err)) + + def load_from_file(self, uri): + uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri)) + path, fname = os.path.split(uri) + mname, _ = os.path.splitext(fname) + no_ext = os.path.join(path, mname) + self.logger.debug("Looking for: " + no_ext) + if os.path.exists(no_ext + '.pyc'): + self.logger.debug("Loading compiled: " + mname + " from " + no_ext) + return imp.load_compiled(mname, no_ext + '.pyc') + if os.path.exists(no_ext + '.py'): + self.logger.debug("Compiling: " + mname + " from " + no_ext) + return imp.load_source(mname, no_ext + '.py') + return None + + def execute(self, metadata): + self.job_db.create_job_id() + self.job_db.record_workload_params(metadata) + self.metadata = metadata + self.metadata['metrics'] = {} + self._workload_thread = Thread(target=self.execute_workloads, + args=(), + name="Workload thread") + self._workload_thread.start() + return self.job_db.job_id + + def terminate(self): + self._terminated = True + self.end_time = time.time() + return self.terminate_current_run() + + def terminate_current_run(self): + self.logger.info("Terminating current run") + terminated_hosts = [] + for workload in self._workload_executors: + workload.terminate() + terminated_hosts.append(workload.remote_host) + return terminated_hosts + + def execution_status(self, job_id): + + result = {} + status = "Completed" + + if self.job_db.job_id == job_id and self._terminated is False: + status = "Running" + + result['Status'] = status + result['Workloads'] = self.workload_status + result['TestResultURL'] = self.result_url + + return result + + def execute_workloads(self): + self._terminated = False + self.logger.info("Starting job %s" % (self.job_db.job_id)) + self.event_listeners.clear() + data_handler = DataHandler() + self.register(data_handler.data_event) + + self.start_time = time.time() + + self.workload_status = {} + # Prepare stats list + for workload_module in self.workload_modules: + workload_name = getattr(workload_module, "__name__") + blocksizes = self._block_sizes + iodepths = self._queue_depths + for blocksize in blocksizes: + for iodepth in iodepths: + name = '%s.%s.queue-depth.%s.block-size.%s' % \ + (self.job_db.job_id, workload_name, iodepth, blocksize) + self.workload_status[name] = "Pending" + + for workload_module in self.workload_modules: + workload_name = getattr(workload_module, "__name__") + self.logger.info("Starting workload %s" % (workload_name)) + + constructorMethod = getattr(workload_module, workload_name) + workload = constructorMethod() + if (self.filename is not None): + workload.filename = self.filename + + if (workload_name.startswith("_")): + iodepths = [8, ] + blocksizes = [16384, ] + else: + iodepths = self._queue_depths + blocksizes = self._block_sizes + + workload.id = self.job_db.job_id + self._thread_gate = ThreadGate(len(self.slaves), + workload.options['status-interval']) + + for blocksize in blocksizes: + for iodepth in iodepths: + + if self._terminated: + return + self.current_workload = ( + "%s.%s.queue-depth.%s.block-size.%s" % + (self.job_db.job_id, + workload_name, + iodepth, + blocksize)) + + self.logger.info("Starting run %s" % self.current_workload) + self.workload_status[self.current_workload] = "Running" + + scheduler = sched.scheduler(time.time, time.sleep) + if self.deadline is not None \ + and not workload_name.startswith("_"): + event = scheduler.enter(self.deadline * 60, 1, + self.terminate_current_run, + ()) + t = Thread(target=scheduler.run, args=()) + t.start() + + workload.options['iodepth'] = str(iodepth) + workload.options['bs'] = str(blocksize) + + slave_threads = [] + for slave in self.slaves: + slave_workload = copy.copy(workload) + slave_workload.remote_host = slave + + self._workload_executors.append(slave_workload) + + t = Thread(target=self.execute_on_node, + args=(slave_workload,), + name="%s worker" % slave) + t.daemon = False + t.start() + slave_threads.append(t) + + for slave_thread in slave_threads: + self.logger.debug("Waiting on %s" % slave_thread) + slave_thread.join() + self.logger.debug("Done waiting for %s" % slave_thread) + + if not scheduler.empty(): + try: + scheduler.cancel(event) + except: + pass + + self.logger.info("Completed run %s" % + self.current_workload) + self.workload_status[self.current_workload] = "Completed" + self._workload_executors = [] + self.current_workload = None + + self.logger.info("Completed workload %s" % (workload_name)) + self.logger.info("Completed job %s" % (self.job_db.job_id)) + + if self.result_url is not None: + self.logger.info("Results can be found at %s" % self.result_url) + + self.end_time = time.time() + self._terminated = True + report = {'report': json.dumps(self.metadata)} + self.job_db.record_workload_params(report) + self.broadcast_event() + self.unregister(data_handler.data_event) + self.job_db.job_id = None + + def execute_on_node(self, workload): + + invoker = FIOInvoker() + invoker.register(self.event) + workload.invoker = invoker + + self.logger.info("Starting " + workload.fullname) + + self.job_db.start_workload(workload) + workload.execute() + self.job_db.end_workload(workload) + invoker.unregister(self.event) + + self.logger.info("Ended " + workload.fullname) diff --git a/docker/storperf-master/storperf/utilities/__init__.py b/docker/storperf-master/storperf/utilities/__init__.py new file mode 100644 index 0000000..73444b6 --- /dev/null +++ b/docker/storperf-master/storperf/utilities/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py new file mode 100644 index 0000000..1da869c --- /dev/null +++ b/docker/storperf-master/storperf/utilities/data_handler.py @@ -0,0 +1,205 @@ +############################################################################## +# Copyright (c) 2016 Dell EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import logging +import os +from storperf.db import test_results_db +from storperf.db.graphite_db import GraphiteDB +from storperf.db.job_db import JobDB +from storperf.utilities import data_treatment as DataTreatment +from storperf.utilities import dictionary +from storperf.utilities import math as math +from storperf.utilities import steady_state as SteadyState +from time import sleep +import time + + +class DataHandler(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + self.job_db = JobDB() + + """ + """ + + def data_event(self, executor): + self.logger.debug("Event received") + + if executor.terminated: + self._push_to_db(executor) + else: + workload = '.'.join(executor.current_workload.split('.')[1:6]) + if 'metrics' not in executor.metadata: + executor.metadata['metrics'] = {} + + steady_state = True + metrics = {} + for metric in ('lat.mean', 'iops', 'bw'): + metrics[metric] = {} + for io_type in ('read', 'write'): + metrics[metric][io_type] = {} + + series = self._lookup_prior_data(executor, metric, io_type) + series = self._convert_timestamps_to_samples( + executor, series) + steady = self._evaluate_prior_data( + series, executor.steady_state_samples) + + self.logger.debug("Steady state for %s %s: %s" + % (io_type, metric, steady)) + + metrics[metric][io_type]['series'] = series + metrics[metric][io_type]['steady_state'] = steady + treated_data = DataTreatment.data_treatment(series) + + metrics[metric][io_type]['slope'] = \ + math.slope(treated_data['slope_data']) + metrics[metric][io_type]['range'] = \ + math.range_value(treated_data['range_data']) + average = math.average(treated_data['average_data']) + metrics[metric][io_type]['average'] = average + + metrics_key = '%s.%s.%s' % (workload, io_type, metric) + executor.metadata['metrics'][metrics_key] = average + + if not steady: + steady_state = False + + if 'report_data' not in executor.metadata: + executor.metadata['report_data'] = {} + + if 'steady_state' not in executor.metadata: + executor.metadata['steady_state'] = {} + + executor.metadata['report_data'][workload] = metrics + executor.metadata['steady_state'][workload] = steady_state + + workload_name = executor.current_workload.split('.')[1] + + if steady_state and not workload_name.startswith('_'): + executor.terminate_current_run() + + def _lookup_prior_data(self, executor, metric, io_type): + workload = executor.current_workload + graphite_db = GraphiteDB() + + # A bit of a hack here as Carbon might not be finished storing the + # data we just sent to it + now = int(time.time()) + backtime = 60 * (executor.steady_state_samples + 2) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + most_recent_time = now + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + + delta = now - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + while (delta < 5 or (delta > 60 and delta < 120)): + sleep(5) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + delta = time.time() - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + return data_series + + def _convert_timestamps_to_samples(self, executor, series): + workload_record = self.job_db.fetch_workloads( + executor.current_workload) + start_time = int(workload_record[0][1]) + + normalized_series = [] + + for item in series: + elapsed = (item[0] - start_time) + sample_number = int(round(float(elapsed) / 60)) + normalized_series.append([sample_number, item[1]]) + + return normalized_series + + def _evaluate_prior_data(self, data_series, samples): + self.logger.debug("Data series: %s" % data_series) + number_of_samples = len(data_series) + + if number_of_samples == 0: + return False + if (number_of_samples < samples): + self.logger.debug("Only %s samples, ignoring" % number_of_samples) + return False + + return SteadyState.steady_state(data_series) + + def _push_to_db(self, executor): + pod_name = dictionary.get_key_from_dict(executor.metadata, + 'pod_name', + 'Unknown') + version = dictionary.get_key_from_dict(executor.metadata, + 'version', + 'Unknown') + scenario = dictionary.get_key_from_dict(executor.metadata, + 'scenario_name', + 'Unknown') + build_tag = dictionary.get_key_from_dict(executor.metadata, + 'build_tag', + 'Unknown') + test_case = dictionary.get_key_from_dict(executor.metadata, + 'test_case', + 'Unknown') + duration = executor.end_time - executor.start_time + + payload = executor.metadata + + steady_state = True + for _, value in executor.metadata['steady_state'].items(): + steady_state = steady_state and value + + payload['timestart'] = executor.start_time + payload['duration'] = duration + + if steady_state: + criteria = 'PASS' + else: + criteria = 'FAIL' + + start_time = time.strftime('%Y-%m-%d %H:%M:%S', + time.gmtime(executor.start_time)) + + end_time = time.strftime('%Y-%m-%d %H:%M:%S', + time.gmtime(executor.end_time)) + + test_db = os.environ.get('TEST_DB_URL') + if test_db is not None: + self.logger.info("Pushing results to %s" % (test_db)) + try: + response = test_results_db.push_results_to_db(test_db, + "storperf", + test_case, + start_time, + end_time, + self.logger, + pod_name, + version, + scenario, + criteria, + build_tag, + payload) + executor.result_url = response['href'] + except: + self.logger.exception("Error pushing results into Database") diff --git a/docker/storperf-master/storperf/utilities/data_treatment.py b/docker/storperf-master/storperf/utilities/data_treatment.py new file mode 100644 index 0000000..2368fd9 --- /dev/null +++ b/docker/storperf-master/storperf/utilities/data_treatment.py @@ -0,0 +1,39 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + + +def data_treatment(data_series): + """ + This function aims at performing any necessary pre treatment on the + data_series passed to the steady_state function before being passed + under to the different math utilities (slope, range and average) + so the data can match the requirements of each algorithm. + The function returns a dictionary composed of three values that can be + accessed with the following keys : 'slope_data', 'range_data' and + 'average_data'. + The data_series is currently assumed to follow the pattern : + [[x1,y1], [x2,y2], ..., [xm,ym]]. If this pattern were to change, or + the input data pattern of one of the math module, this data_treatment + function should be the only part of the Steady State detection module + that would need to be modified too. + """ + + x_values = [] + y_values = [] + for l in data_series: + x_values.append(l[0]) + y_values.append(l[1]) + + treated_data = { + 'slope_data': data_series, # No treatment necessary so far + 'range_data': y_values, # The y_values only + 'average_data': y_values + } + + return treated_data diff --git a/docker/storperf-master/storperf/utilities/dictionary.py b/docker/storperf-master/storperf/utilities/dictionary.py new file mode 100644 index 0000000..95f625c --- /dev/null +++ b/docker/storperf-master/storperf/utilities/dictionary.py @@ -0,0 +1,15 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + + +def get_key_from_dict(dictionary, key, default_value=None): + if key in dictionary: + return dictionary[key] + else: + return default_value diff --git a/docker/storperf-master/storperf/utilities/math.py b/docker/storperf-master/storperf/utilities/math.py new file mode 100644 index 0000000..8e04134 --- /dev/null +++ b/docker/storperf-master/storperf/utilities/math.py @@ -0,0 +1,116 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import copy + + +def slope(data_series): + """ + This function implements the linear least squares algorithm described in + the following wikipedia article : + https://en.wikipedia.org/wiki/Linear_least_squares_(mathematics) + in the case of m equations (provided by m data points) and 2 unknown + variables (x and y, which represent the time and the Volume performance + variable being tested e.g. IOPS, latency...). + The data_series is currently assumed to follow the pattern : + [[x1,y1], [x2,y2], ..., [xm,ym]]. + If this data pattern were to change, the data_treatement function + should be adjusted to ensure compatibility with the rest of the + Steady State Detection module. + """ + + # In the particular case of an empty data series + if len(data_series) == 0: + beta2 = None + + else: # The general case + data_series = copy.deepcopy(data_series) + m = len(data_series) + # To make sure at least one element is a float number so the result + # of the algorithm be a float number + data_series[0][0] = float(data_series[0][0]) + + """ + It consists in solving the normal equations system (2 equations, + 2 unknowns) by calculating the value of beta2 (slope). + The formula of beta1 (the y-intercept) is given as a comment in + case it is needed later. + """ + sum_xi = 0 + sum_xi_sq = 0 + sum_yi_xi = 0 + sum_yi = 0 + for i in range(0, m): + xi = data_series[i][0] + yi = data_series[i][1] + + sum_xi += xi + sum_xi_sq += xi**2 + sum_yi_xi += xi * yi + sum_yi += yi + + over = (sum_xi**2 - m * sum_xi_sq) + if over == 0: + beta2 = None # Infinite slope + else: + beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope + # beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if + # needed + + return beta2 + + +def range_value(data_series): + """ + This function implements a range algorithm that returns a float number + representing the range of the data_series that is passed to it. + The data_series being passed is assumed to follow the following data + pattern : [y1, y2, y3, ..., ym] where yi represents the ith + measuring point of the y variable. The y variable represents the + Volume performance being tested (e.g. IOPS, latency...). + If this data pattern were to change, the data_treatment function + should be adjusted to ensure compatibility with the rest of the + Steady State Dectection module. + The conversion of the data series from the original pattern to the + [y1, y2, y3, ..., ym] pattern is done outside this function + so the original pattern can be changed without breaking this function. + """ + + # In the particular case of an empty data series + if len(data_series) == 0: + range_value = None + + else: # The general case + max_value = max(data_series) + min_value = min(data_series) + range_value = max_value - min_value + + return range_value + + +def average(data_series): + """ + This function seeks to calculate the average value of the data series + given a series following the pattern : [y1, y2, y3, ..., ym]. + If this data pattern were to change, the data_treatment function + should be adjusted to ensure compatibility with the rest of the + Steady State Dectection module. + The function returns a float number corresponding to the average of the yi. + """ + m = len(data_series) + + if m == 0: # In the particular case of an empty data series + average = None + + else: + data_sum = 0 + for value in data_series: + data_sum += value + average = data_sum / float(m) + + return average diff --git a/docker/storperf-master/storperf/utilities/steady_state.py b/docker/storperf-master/storperf/utilities/steady_state.py new file mode 100644 index 0000000..13f609d --- /dev/null +++ b/docker/storperf-master/storperf/utilities/steady_state.py @@ -0,0 +1,54 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import logging + +from storperf.utilities import data_treatment as DataTreatment +from storperf.utilities import math as math + + +def steady_state(data_series): + """ + This function seeks to detect steady state given on a measurement + window given the data series of that measurement window following + the pattern : [[x1,y1], [x2,y2], ..., [xm,ym]]. m represents the number + of points recorded in the measurement window, x which represents the + time, and y which represents the Volume performance variable being + tested e.g. IOPS, latency... + The function returns a boolean describing wether or not steady state + has been reached with the data that is passed to it. + """ + + logger = logging.getLogger('storperf.utilities.steady_state') + + # Pre conditioning the data to match the algorithms + treated_data = DataTreatment.data_treatment(data_series) + + # Calculating useful values invoking dedicated functions + slope_value = math.slope(treated_data['slope_data']) + range_value = math.range_value(treated_data['range_data']) + average_value = math.average(treated_data['average_data']) + + if (slope_value is not None and range_value is not None and + average_value is not None): + # Verification of the Steady State conditions following the SNIA + # definition + range_condition = abs(range_value) <= 0.20 * abs(average_value) + slope_condition = abs(slope_value) <= 0.10 * abs(average_value) + + steady_state = range_condition and slope_condition + + logger.debug("Range %s <= %s?" % (abs(range_value), + (0.20 * abs(average_value)))) + logger.debug("Slope %s <= %s?" % (abs(slope_value), + (0.10 * abs(average_value)))) + logger.debug("Steady State? %s" % steady_state) + else: + steady_state = False + + return steady_state diff --git a/docker/storperf-master/storperf/utilities/thread_gate.py b/docker/storperf-master/storperf/utilities/thread_gate.py new file mode 100644 index 0000000..38acbb1 --- /dev/null +++ b/docker/storperf-master/storperf/utilities/thread_gate.py @@ -0,0 +1,67 @@ +############################################################################## +# Copyright (c) 2016 Dell EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +""" +Creates a gate object that allows synchronization between an arbitrary +number of callers. +""" +import logging +import time +from threading import Lock + + +class FailureToReportException(Exception): + pass + + +class ThreadGate(object): + + def __init__(self, size, timeout=60): + self.logger = logging.getLogger(__name__) + self._gate_size = size + self._timeout = timeout + self._registrants = {} + self._creation_time = time.time() + self._lock = Lock() + + """ + Calling this method returns a true or false, indicating that enough + of the other registrants have reported in + """ + + def report(self, gate_id): + with self._lock: + now = time.time() + self._registrants[gate_id] = now + ready = True + self.logger.debug("Gate report for %s", gate_id) + + total_missing = self._gate_size - len(self._registrants) + if total_missing > 0: + self.logger.debug("Not all registrants have reported in") + time_since_creation = now - self._creation_time + if (time_since_creation > (self._timeout * 2)): + self.logger.error( + "%s registrant(s) have never reported in", + total_missing) + raise FailureToReportException + return False + + for k, v in self._registrants.items(): + time_since_last_report = now - v + if time_since_last_report > self._timeout: + self.logger.debug("Registrant %s last reported %s ago", + k, time_since_last_report) + ready = False + + self.logger.debug("Gate pass? %s", ready) + + if ready: + self._registrants.clear() + + return ready diff --git a/docker/storperf-master/storperf/workloads/__init__.py b/docker/storperf-master/storperf/workloads/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/docker/storperf-master/storperf/workloads/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py new file mode 100644 index 0000000..936c839 --- /dev/null +++ b/docker/storperf-master/storperf/workloads/_base_workload.py @@ -0,0 +1,82 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import logging + + +class _base_workload(object): + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + self.default_filesize = "1G" + self.filename = '/dev/vdb' + self.options = { + 'ioengine': 'libaio', + 'direct': '1', + 'rw': 'read', + 'bs': '64k', + 'iodepth': '1', + 'numjobs': '1', + 'loops': '20', + 'output-format': 'json', + 'status-interval': '60' + } + self.invoker = None + self.remote_host = None + self.id = None + + def execute(self): + if self.invoker is None: + raise ValueError("No invoker has been set") + + args = [] + self.invoker.remote_host = self.remote_host + self.invoker.callback_id = self.fullname + + if self.filename.startswith("/dev"): + self.options['size'] = "100%" + self.logger.debug( + "Profiling a device, using 100% of " + self.filename) + else: + self.options['size'] = self.default_filesize + self.logger.debug("Profiling a filesystem, using " + + self.default_filesize + " file") + + self.options['filename'] = self.filename + + self.setup() + + for key, value in self.options.iteritems(): + args.append('--' + key + "=" + value) + + self.invoker.execute(args) + + def terminate(self): + if self.invoker is not None: + self.invoker.terminate() + + def setup(self): + pass + + @property + def remote_host(self): + return str(self._remote_host) + + @remote_host.setter + def remote_host(self, value): + self._remote_host = value + + @property + def fullname(self): + return ("%s.%s.queue-depth.%s.block-size.%s.%s" % + (str(self.id), + self.__class__.__name__, + str(self.options['iodepth']), + str(self.options['bs']), + str(self.remote_host).replace(".", "-"))) diff --git a/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py b/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py new file mode 100644 index 0000000..cce3c31 --- /dev/null +++ b/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py @@ -0,0 +1,17 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from storperf.workloads import _base_workload + + +class _ssd_preconditioning(_base_workload._base_workload): + + def setup(self): + self.options['name'] = 'ssd_preconditioning' + self.options['rw'] = 'randwrite' + self.options['loops'] = '1' diff --git a/docker/storperf-master/storperf/workloads/_warm_up.py b/docker/storperf-master/storperf/workloads/_warm_up.py new file mode 100644 index 0000000..9cd268e --- /dev/null +++ b/docker/storperf-master/storperf/workloads/_warm_up.py @@ -0,0 +1,17 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from storperf.workloads import _base_workload + + +class _warm_up(_base_workload._base_workload): + + def setup(self): + self.options['name'] = 'warm_up' + self.options['rw'] = 'write' + self.options['loops'] = '1' diff --git a/docker/storperf-master/storperf/workloads/rr.py b/docker/storperf-master/storperf/workloads/rr.py new file mode 100644 index 0000000..3823a4c --- /dev/null +++ b/docker/storperf-master/storperf/workloads/rr.py @@ -0,0 +1,16 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from storperf.workloads import _base_workload + + +class rr(_base_workload._base_workload): + + def setup(self): + self.options['name'] = 'random_read' + self.options['rw'] = 'randread' diff --git a/docker/storperf-master/storperf/workloads/rs.py b/docker/storperf-master/storperf/workloads/rs.py new file mode 100644 index 0000000..511888e --- /dev/null +++ b/docker/storperf-master/storperf/workloads/rs.py @@ -0,0 +1,16 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from storperf.workloads import _base_workload + + +class rs(_base_workload._base_workload): + + def setup(self): + self.options['name'] = 'sequential_read' + self.options['rw'] = 'read' diff --git a/docker/storperf-master/storperf/workloads/rw.py b/docker/storperf-master/storperf/workloads/rw.py new file mode 100644 index 0000000..f4b6979 --- /dev/null +++ b/docker/storperf-master/storperf/workloads/rw.py @@ -0,0 +1,18 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from storperf.workloads import _base_workload + + +class rw(_base_workload._base_workload): + + def setup(self): + self.options['name'] = 'random_readwrite' + self.options['rwmixread'] = '70' + self.options['rw'] = 'rw' + self.logger.debug(self.options) diff --git a/docker/storperf-master/storperf/workloads/wr.py b/docker/storperf-master/storperf/workloads/wr.py new file mode 100644 index 0000000..457a29a --- /dev/null +++ b/docker/storperf-master/storperf/workloads/wr.py @@ -0,0 +1,16 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from storperf.workloads import _base_workload + + +class wr(_base_workload._base_workload): + + def setup(self): + self.options['name'] = 'random_write' + self.options['rw'] = 'randwrite' diff --git a/docker/storperf-master/storperf/workloads/ws.py b/docker/storperf-master/storperf/workloads/ws.py new file mode 100644 index 0000000..f37079e --- /dev/null +++ b/docker/storperf-master/storperf/workloads/ws.py @@ -0,0 +1,16 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from storperf.workloads import _base_workload + + +class ws(_base_workload._base_workload): + + def setup(self): + self.options['name'] = 'sequential_write' + self.options['rw'] = 'write' diff --git a/docker/storperf-master/supervisord.conf b/docker/storperf-master/supervisord.conf new file mode 100644 index 0000000..9832c49 --- /dev/null +++ b/docker/storperf-master/supervisord.conf @@ -0,0 +1,33 @@ +[supervisord] +nodaemon = true +environment = GRAPHITE_STORAGE_DIR='/var/lib/graphite/storage',GRAPHITE_CONF_DIR='/var/lib/graphite/conf' + +[program:nginx] +command = /usr/sbin/nginx +stdout_logfile = /var/log/supervisor/%(program_name)s.log +stderr_logfile = /var/log/supervisor/%(program_name)s.log +autorestart = true + +[program:carbon-cache] +user = www-data +command = /var/lib/graphite/bin/carbon-cache.py --debug start +stdout_logfile = /var/log/supervisor/%(program_name)s.log +stderr_logfile = /var/log/supervisor/%(program_name)s.log +autorestart = true + +[program:graphite-webapp] +user = www-data +directory = /var/lib/graphite/webapp +environment = PYTHONPATH='/var/lib/graphite/webapp' +command = /usr/bin/gunicorn_django -b127.0.0.1:8080 -w2 graphite/settings.py +stdout_logfile = /var/log/supervisor/%(program_name)s.log +stderr_logfile = /var/log/supervisor/%(program_name)s.log +autorestart = true + +[program:storperf-webapp] +user = root +directory = /home/opnfv/repos/storperf/ +command = /usr/bin/python rest_server.py +stdout_logfile = /var/log/supervisor/%(program_name)s.log +stderr_logfile = /var/log/supervisor/%(program_name)s.log +autorestart = true diff --git a/docker/storperf-master/tests/__init__.py b/docker/storperf-master/tests/__init__.py new file mode 100644 index 0000000..230494c --- /dev/null +++ b/docker/storperf-master/tests/__init__.py @@ -0,0 +1,11 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import logging + +logging.basicConfig(level=logging.DEBUG) diff --git a/docker/storperf-master/tests/carbon_tests/__init__.py b/docker/storperf-master/tests/carbon_tests/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/docker/storperf-master/tests/carbon_tests/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/tests/carbon_tests/emitter_test.py b/docker/storperf-master/tests/carbon_tests/emitter_test.py new file mode 100644 index 0000000..7f61049 --- /dev/null +++ b/docker/storperf-master/tests/carbon_tests/emitter_test.py @@ -0,0 +1,72 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import SocketServer +import json +from storperf.carbon import converter +from storperf.carbon.emitter import CarbonMetricTransmitter +import threading +from time import sleep, strptime +import unittest + +import mock + + +class MetricsHandler(SocketServer.BaseRequestHandler): + + def handle(self): + # Echo the back to the client + CarbonMetricTransmitterTest.response = self.request.recv(1024) + return + + +class MetricsServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): + pass + + +class CarbonMetricTransmitterTest(unittest.TestCase): + listen_port = 0 + response = None + + def setUp(self): + + address = ('localhost', 0) + server = MetricsServer(address, MetricsHandler) + ip, self.listen_port = server.server_address + + t = threading.Thread(target=server.serve_forever) + t.setDaemon(True) + t.start() + + @mock.patch("time.gmtime") + def test_transmit_metrics(self, mock_time): + + mock_time.return_value = strptime("30 Nov 00", "%d %b %y") + + testconv = converter.Converter() + json_object = json.loads( + """{"timestamp" : "975542400", "key":"value" }""") + result = testconv.convert_json_to_flat(json_object, "host.run-name") + + emitter = CarbonMetricTransmitter() + emitter.carbon_port = self.listen_port + emitter.transmit_metrics(result) + + count = 0 + + while (CarbonMetricTransmitterTest.response is None and count < 10): + count += 1 + sleep(0.1) + + self.assertEqual("host.run-name.key value 975542400\n", + CarbonMetricTransmitterTest.response, + CarbonMetricTransmitterTest.response) + +if __name__ == '__main__': + unittest.main() diff --git a/docker/storperf-master/tests/carbon_tests/json_to_carbon_test.py b/docker/storperf-master/tests/carbon_tests/json_to_carbon_test.py new file mode 100644 index 0000000..523ff77 --- /dev/null +++ b/docker/storperf-master/tests/carbon_tests/json_to_carbon_test.py @@ -0,0 +1,116 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from storperf.carbon.converter import Converter +import json +import unittest + + +class JSONToCarbonTest(unittest.TestCase): + + single_json_text_element = """{ "key" : "value" }""" + single_json_numeric_element = """{ "key" : 123 }""" + single_json_key_with_spaces = """{ "key with spaces" : "value" }""" + single_json_value_with_spaces = """{ "key" : "value with spaces" }""" + json_map_name_with_spaces = \ + """{ "map with spaces" : { "key" : "value" } }""" + json_list_name_with_spaces = \ + """{ "list with spaces" : [{ "key" : "value" }] }""" + + simple_fio_json = """ +{ + "fio version" : "fio-2.2.10", + "timestamp" : 1444144664, + "time" : "Tue Oct 6 11:17:44 2015", + "jobs" : [ + { + "jobname" : "random-read", + "groupid" : 0, + "error" : 0, + "eta" : 0, + "elapsed" : 26, + "read" : { + "io_bytes" : 7116, + "bw" : 275, + "iops" : 68.99, + "runtime" : 25788, + "total_ios" : 1779, + "short_ios" : 0, + "drop_ios" : 0, + "slat" : { + "min" : 0, + "max" : 0, + "mean" : 0.00, + "stddev" : 0.00 + } + } + }] +} +""" + + def setUp(self): + pass + + def test_to_string(self): + testconv = Converter() + json_object = json.loads(self.simple_fio_json) + result = testconv.convert_json_to_flat(json_object, "host.run-name") + self.assertEqual("7116", result[ + "host.run-name.jobs.1.read.io_bytes"], + result["host.run-name.jobs.1.read.io_bytes"]) + + def test_single_text_element_no_prefix(self): + testconv = Converter() + result = testconv.convert_json_to_flat( + json.loads(self.single_json_text_element)) + + self.assertEqual("value", result["key"], result["key"]) + + def test_single_numeric_element_no_prefix(self): + testconv = Converter() + result = testconv.convert_json_to_flat( + json.loads(self.single_json_numeric_element)) + + self.assertEqual("123", result["key"], result["key"]) + + def test_single_text_key_space_element_no_prefix(self): + testconv = Converter() + result = testconv.convert_json_to_flat( + json.loads(self.single_json_key_with_spaces)) + + self.assertEqual( + "value", result["key_with_spaces"], result["key_with_spaces"]) + + def test_single_text_value_space_element_no_prefix(self): + testconv = Converter() + result = testconv.convert_json_to_flat( + json.loads(self.single_json_value_with_spaces)) + + self.assertEqual("value_with_spaces", result["key"], result["key"]) + + def test_map_name_with_space_no_prefix(self): + testconv = Converter() + result = testconv.convert_json_to_flat( + json.loads(self.json_map_name_with_spaces)) + + self.assertEqual( + "value", result["map_with_spaces.key"], + result["map_with_spaces.key"]) + + def test_list_name_with_space_no_prefix(self): + testconv = Converter() + result = testconv.convert_json_to_flat( + json.loads(self.json_list_name_with_spaces)) + + self.assertEqual( + "value", result["list_with_spaces.1.key"], + result["list_with_spaces.1.key"]) + +if __name__ == '__main__': + unittest.main() diff --git a/docker/storperf-master/tests/db_tests/__init__.py b/docker/storperf-master/tests/db_tests/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/docker/storperf-master/tests/db_tests/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/tests/db_tests/configuration_db_test.py b/docker/storperf-master/tests/db_tests/configuration_db_test.py new file mode 100644 index 0000000..d8b021a --- /dev/null +++ b/docker/storperf-master/tests/db_tests/configuration_db_test.py @@ -0,0 +1,71 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from storperf.db.configuration_db import ConfigurationDB +import os +import unittest + + +class ConfigurationDBTest(unittest.TestCase): + + def setUp(self): + ConfigurationDB.db_name = __name__ + '.db' + try: + os.remove(ConfigurationDB.db_name) + except OSError: + pass + self.config_db = ConfigurationDB() + + def tearDown(self): + try: + os.remove(ConfigurationDB.db_name) + except OSError: + pass + + def test_create_key(self): + expected = "ABCDE-12345" + + self.config_db.set_configuration_value( + "test", "key", expected) + + actual = self.config_db.get_configuration_value( + "test", "key") + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_update_key(self): + expected = "ABCDE-12345" + + self.config_db.set_configuration_value( + "test", "key", "initial_value") + + self.config_db.set_configuration_value( + "test", "key", expected) + + actual = self.config_db.get_configuration_value( + "test", "key") + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_deleted_key(self): + expected = None + + self.config_db.set_configuration_value( + "test", "key", "initial_value") + + self.config_db.delete_configuration_value( + "test", "key") + + actual = self.config_db.get_configuration_value( + "test", "key") + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) diff --git a/docker/storperf-master/tests/db_tests/graphite_db_test.py b/docker/storperf-master/tests/db_tests/graphite_db_test.py new file mode 100644 index 0000000..d4c6fb6 --- /dev/null +++ b/docker/storperf-master/tests/db_tests/graphite_db_test.py @@ -0,0 +1,112 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import unittest + +import mock + +from storperf.db.graphite_db import GraphiteDB + + +class MockResponse(): + + def __init__(self): + self.content = "" + self.status_code = 200 + + +class GraphiteDBTest(unittest.TestCase): + + def setUp(self): + self.graphdb = GraphiteDB() + self.graphdb._job_db = self + + def test_wildcard_pattern(self): + workload = "job_id" + expected = "job_id.*.*.*.*.*.*" + actual = self.graphdb.make_fullname_pattern(workload) + self.assertEqual(expected, actual, actual) + + def test_no_wildcard_pattern(self): + workload = "job_id.workload.host.queue-depth.1.block-size.16" + actual = self.graphdb.make_fullname_pattern(workload) + self.assertEqual(workload, actual, actual) + + def test_fetch_averages(self): + # self.graphdb.fetch_averages(u'32d31724-fac1-44f3-9033-ca8e00066a36') + pass + + @mock.patch("requests.get") + def test_fetch_series(self, mock_requests): + + response = MockResponse() + response.content = """ +[ + { + "datapoints": [ + [null,1480455880], + [null,1480455890], + [null,1480455900], + [205.345,1480455910], + [201.59,1480455920], + [205.76,1480455930], + [null,1480455940], + [null,1480455950], + [null,1480455960], + [215.655,1480455970], + [214.16,1480455980], + [213.57,1480455990], + [null,1480456000], + [null,1480456010], + [null,1480456020], + [219.37,1480456030], + [219.28,1480456040], + [217.75,1480456050], + [null,1480456060] + ], + "target":"averageSeries(.8192.*.jobs.1.write.iops)" + } +]""" + expected = [[1480455910, 205.345], + [1480455920, 201.59], + [1480455930, 205.76], + [1480455970, 215.655], + [1480455980, 214.16], + [1480455990, 213.57], + [1480456030, 219.37], + [1480456040, 219.28], + [1480456050, 217.75]] + + mock_requests.side_effect = (response, ) + + actual = self.graphdb.fetch_series("workload", "iops", + "write", 0, 600) + self.assertEqual(expected, actual) + + def fetch_workloads(self, workload): + workloads = [[u'32d31724-fac1-44f3-9033-ca8e00066a36.' + u'_warm_up.queue-depth.32.block-size.8192.10-9-15-151', + u'1462379653', u'1462379893'], + [u'32d31724-fac1-44f3-9033-ca8e00066a36.' + u'_warm_up.queue-depth.32.block-size.8192.10-9-15-150', + u'1462379653', u'1462379898'], + [u'32d31724-fac1-44f3-9033-ca8e00066a36' + u'.rw.queue-depth.128.block-size.8192.10-9-15-151', + u'1462379898', u'1462380028'], + [u'32d31724-fac1-44f3-9033-ca8e00066a36' + u'.rw.queue-depth.128.block-size.8192.10-9-15-150', + u'1462379898', u'1462380032'], + [u'32d31724-fac1-44f3-9033-ca8e00066a36' + u'.rw.queue-depth.16.block-size.8192.10-9-15-151', + u'1462380032', u'1462380312'], + [u'32d31724-fac1-44f3-9033-ca8e00066a36' + u'.rw.queue-depth.16.block-size.8192.10-9-15-150', + u'1462380032', u'1462380329'], + ] + return workloads diff --git a/docker/storperf-master/tests/db_tests/job_db_test.py b/docker/storperf-master/tests/db_tests/job_db_test.py new file mode 100644 index 0000000..25fda1f --- /dev/null +++ b/docker/storperf-master/tests/db_tests/job_db_test.py @@ -0,0 +1,198 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import os +import sqlite3 +import unittest + +import mock + +from storperf.db.job_db import JobDB +from storperf.workloads.rr import rr + + +class JobDBTest(unittest.TestCase): + + def setUp(self): + JobDB.db_name = __name__ + '.db' + try: + os.remove(JobDB.db_name) + except OSError: + pass + self.job = JobDB() + + def tearDown(self): + try: + os.remove(JobDB.db_name) + except OSError: + pass + + @mock.patch("uuid.uuid4") + def test_create_job(self, mock_uuid): + expected = "ABCDE-12345" + mock_uuid.side_effect = (expected,) + + self.job.create_job_id() + + actual = self.job.job_id + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + @mock.patch("uuid.uuid4") + def test_duplicate_job_generated(self, mock_uuid): + duplicate = "EDCBA-12345" + expected = "EDCBA-54321" + + mock_uuid.side_effect = (duplicate, duplicate, expected,) + + self.job.create_job_id() + self.job.create_job_id() + + actual = self.job.job_id + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + @mock.patch("uuid.uuid4") + @mock.patch("calendar.timegm") + def test_start_job(self, mock_calendar, mock_uuid): + job_id = "ABCDE-12345" + start_time = "12345" + mock_calendar.side_effect = (start_time,) + mock_uuid.side_effect = (job_id,) + workload = rr() + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + + row = cursor.execute( + """select * from jobs + where job_id = ? + and workload = ?""", + (job_id, workload.fullname,)) + + self.assertEqual(None, + row.fetchone(), + "Should not have been a row in the db") + + self.job.start_workload(workload) + + cursor.execute( + """select job_id, workload, start from jobs + where job_id = ? + and workload = ?""", + (job_id, workload.fullname,)) + + row = cursor.fetchone() + + self.assertNotEqual(None, row, "Should be a row in the db") + self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) + self.assertEqual( + workload.fullname, row[1], "Did not expect " + str(row[1])) + self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) + + @mock.patch("uuid.uuid4") + @mock.patch("calendar.timegm") + def test_end_job(self, mock_calendar, mock_uuid): + job_id = "ABCDE-12345" + start_time = "12345" + end_time = "54321" + mock_calendar.side_effect = (start_time, end_time,) + mock_uuid.side_effect = (job_id,) + workload = rr() + + self.job.start_workload(workload) + self.job.end_workload(workload) + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + cursor.execute( + """select job_id, workload, start, end from jobs + where job_id = ? + and workload = ?""", + (job_id, workload.fullname,)) + + row = cursor.fetchone() + + self.assertNotEqual(None, row, "Should be a row in the db") + self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) + self.assertEqual( + workload.fullname, row[1], "Did not expect " + str(row[1])) + self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) + self.assertEqual(end_time, row[3], "Did not expect " + str(row[3])) + + @mock.patch("uuid.uuid4") + @mock.patch("calendar.timegm") + def test_duplicate_start_job(self, mock_calendar, mock_uuid): + job_id = "ABCDE-12345" + start_time_1 = "12345" + start_time_2 = "12346" + + mock_calendar.side_effect = (start_time_1, start_time_2) + mock_uuid.side_effect = (job_id,) + workload = rr() + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + + self.job.start_workload(workload) + self.job.start_workload(workload) + + cursor.execute( + """select job_id, workload, start from jobs + where job_id = ? + and workload = ?""", + (job_id, workload.fullname,)) + + row = cursor.fetchone() + + self.assertNotEqual(None, row, "Should be a row in the db") + self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) + self.assertEqual( + workload.fullname, row[1], "Did not expect " + str(row[1])) + self.assertEqual(start_time_2, row[2], "Did not expect " + str(row[2])) + + @mock.patch("uuid.uuid4") + @mock.patch("calendar.timegm") + def test_end_job_without_start(self, mock_calendar, mock_uuid): + job_id = "ABCDE-12345" + start_time = "12345" + end_time = "54321" + mock_calendar.side_effect = (start_time, end_time,) + mock_uuid.side_effect = (job_id,) + workload = rr() + + self.job.end_workload(workload) + + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + cursor.execute( + """select job_id, workload, start, end from jobs + where job_id = ? + and workload = ?""", + (job_id, workload.fullname,)) + + row = cursor.fetchone() + + self.assertNotEqual(None, row, "Should be a row in the db") + self.assertEqual(job_id, row[0], "Did not expect " + str(row[0])) + self.assertEqual( + workload.fullname, row[1], "Did not expect " + str(row[1])) + # The start time is set to the same time as end if it was never set + # before + self.assertEqual(start_time, row[2], "Did not expect " + str(row[2])) + self.assertEqual(start_time, row[3], "Did not expect " + str(row[3])) + + def test_job_params(self): + expected = {u"a": 1, u"b": 2} + self.job.job_id = "ABCD" + self.job.record_workload_params(expected) + actual = self.job.fetch_workload_params(self.job.job_id) + self.assertEqual(expected, actual) diff --git a/docker/storperf-master/tests/fio_tests/__init__.py b/docker/storperf-master/tests/fio_tests/__init__.py new file mode 100644 index 0000000..df29e18 --- /dev/null +++ b/docker/storperf-master/tests/fio_tests/__init__.py @@ -0,0 +1,11 @@ +############################################################################## +# Copyright (c) 2017 Dell EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import logging + +logging.basicConfig(level=logging.DEBUG) diff --git a/docker/storperf-master/tests/fio_tests/fio_invoker_test.py b/docker/storperf-master/tests/fio_tests/fio_invoker_test.py new file mode 100644 index 0000000..4672651 --- /dev/null +++ b/docker/storperf-master/tests/fio_tests/fio_invoker_test.py @@ -0,0 +1,88 @@ +############################################################################## +# Copyright (c) 2017 Dell EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from StringIO import StringIO +import json +import unittest + +from storperf.fio.fio_invoker import FIOInvoker + + +class Test(unittest.TestCase): + + simple_dictionary = {'Key': 'Value'} + + def exceptional_event(self, callback_id, metric): + self.exception_called = True + raise Exception + + def event(self, callback_id, metric): + self.metric = metric + + def setUp(self): + self.exception_called = False + self.metric = None + self.fio_invoker = FIOInvoker() + + def testStdoutValidJSON(self): + self.fio_invoker.register(self.event) + string = json.dumps(self.simple_dictionary, indent=4, sort_keys=True) + + output = StringIO(string + "\n") + self.fio_invoker.stdout_handler(output) + + self.assertEqual(self.simple_dictionary, self.metric) + + def testStdoutValidJSONWithFIOOutput(self): + self.fio_invoker.register(self.event) + string = json.dumps(self.simple_dictionary, indent=4, sort_keys=True) + terminating = "fio: terminating on signal 2\n" + output = StringIO(terminating + string + "\n") + self.fio_invoker.stdout_handler(output) + + self.assertEqual(self.simple_dictionary, self.metric) + + def testStdoutNoJSON(self): + self.fio_invoker.register(self.event) + string = "{'key': 'value'}" + + output = StringIO(string + "\n") + self.fio_invoker.stdout_handler(output) + + self.assertEqual(None, self.metric) + + def testStdoutInvalidJSON(self): + self.fio_invoker.register(self.event) + string = "{'key':\n}" + + output = StringIO(string + "\n") + self.fio_invoker.stdout_handler(output) + + self.assertEqual(None, self.metric) + + def testStdoutAfterTerminated(self): + self.fio_invoker.register(self.event) + string = json.dumps(self.simple_dictionary, indent=4, sort_keys=True) + + self.fio_invoker.terminated = True + output = StringIO(string + "\n") + self.fio_invoker.stdout_handler(output) + + self.assertEqual(None, self.metric) + + def testStdoutCallbackException(self): + self.fio_invoker.register(self.exceptional_event) + self.fio_invoker.register(self.event) + string = json.dumps(self.simple_dictionary, indent=4, sort_keys=True) + + output = StringIO(string + "\n") + self.fio_invoker.stdout_handler(output) + + self.assertEqual(self.simple_dictionary, self.metric) + self.assertEqual(self.exception_called, True) diff --git a/docker/storperf-master/tests/storperf_master_test.py b/docker/storperf-master/tests/storperf_master_test.py new file mode 100644 index 0000000..e824a5f --- /dev/null +++ b/docker/storperf-master/tests/storperf_master_test.py @@ -0,0 +1,85 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import os +import unittest + +from storperf.db.configuration_db import ConfigurationDB +from storperf.storperf_master import StorPerfMaster + + +class StorPerfMasterTest(unittest.TestCase): + + def setUp(self): + ConfigurationDB.db_name = __name__ + '.db' + try: + os.remove(ConfigurationDB.db_name) + except OSError: + pass + self.storperf = StorPerfMaster() + + def tearDown(self): + try: + os.remove(ConfigurationDB.db_name) + except OSError: + pass + + def test_agent_count(self): + expected = 10 + + self.storperf.agent_count = expected + actual = self.storperf.agent_count + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_queue_depths(self): + expected = "1,2,3" + + self.storperf.queue_depths = expected + actual = self.storperf.queue_depths + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_block_sizes(self): + expected = "8,2,1,0" + + self.storperf.block_sizes = expected + actual = self.storperf.block_sizes + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_volume_size(self): + expected = 20 + + self.storperf.volume_size = expected + actual = self.storperf.volume_size + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_agent_network(self): + expected = "ABCDEF" + + self.storperf.public_network = expected + actual = self.storperf.public_network + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_agent_flavor(self): + expected = "m1.small" + + self.storperf.agent_flavor = expected + actual = self.storperf.agent_flavor + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) diff --git a/docker/storperf-master/tests/utilities_tests/__init__.py b/docker/storperf-master/tests/utilities_tests/__init__.py new file mode 100644 index 0000000..6218fe3 --- /dev/null +++ b/docker/storperf-master/tests/utilities_tests/__init__.py @@ -0,0 +1,11 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import logging + +logging.basicConfig(level=logging.DEBUG) diff --git a/docker/storperf-master/tests/utilities_tests/data_handler_test.py b/docker/storperf-master/tests/utilities_tests/data_handler_test.py new file mode 100644 index 0000000..6d57b0d --- /dev/null +++ b/docker/storperf-master/tests/utilities_tests/data_handler_test.py @@ -0,0 +1,297 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import os +from storperf.utilities.data_handler import DataHandler +import unittest + +import mock + + +class MockGraphiteDB(object): + + def __init__(self): + self.series = [] + + def fetch_series(self, job_id, timeframe): + return self.series + + +class DataHandlerTest(unittest.TestCase): + + def setUp(self): + self.event_listeners = set() + self.data_handler = DataHandler() + self._terminated = False + self.args = None + self.start_time = 0 + self.steady_state_samples = 10 + self.end_time = 1 + self.metadata = {} + self.block_sizes = "1" + self.queue_depths = "1" + mock.job_id = "1" + self.job_db = mock + self.pushed = False + self.current_workload = None + self.db_results = None + pass + + @property + def terminated(self): + return self._terminated + + def push_results_to_db(self, *args): + self.pushed = True + self.db_results = args + results = {"href": "http://localhost/api/result/uuid-that-is-long"} + return results + + def terminate(self): + self._terminated = True + + def terminate_current_run(self): + self._terminated = True + + @mock.patch("time.time") + @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series") + def test_lookup_prior_data(self, mock_graphite_db, mock_time): + self._terminated = False + expected = [[1480455910, 205.345], + [1480455920, 201.59], + [1480455930, 205.76], + [1480455970, 215.655], + [1480455980, 214.16], + [1480455990, 213.57], + [1480456030, 219.37], + [1480456040, 219.28], + [1480456050, 217.75]] + mock_graphite_db.return_value = expected + mock_time.return_value = expected[-1][0] + 10 + + self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" % + ("job_id", + "rw", + 8, + 8192)) + + actual = self.data_handler._lookup_prior_data(self, 'read', 'iops') + self.assertEqual(expected, actual) + + def test_short_sample(self): + series = [[1480455910, 205.345], + [1480455920, 201.59], + [1480455930, 205.76], + [1480455970, 215.655], + [1480455980, 214.16], + [1480455990, 213.57], + [1480456030, 219.37], + [1480456040, 219.28], + [1480456050, 217.75]] + + actual = self.data_handler._evaluate_prior_data( + series, self.steady_state_samples) + self.assertEqual(False, actual) + + def test_long_not_steady_sample(self): + series = [[4804559100, 205345], + [4804559200, 20159], + [4804559300, 20576], + [4804560300, 21937], + [4804560400, 21928], + [4804560500, 21775]] + actual = self.data_handler._evaluate_prior_data( + series, self.steady_state_samples) + self.assertEqual(False, actual) + + def test_long_steady_sample(self): + series = [[4804559100, 205.345], + [4804559200, 201.59], + [4804559300, 205.76], + [4804559400, 205.76], + [4804559500, 205.76], + [4804559600, 205.76], + [4804559700, 205.76], + [4804560300, 219.37], + [4804560400, 219.28], + [4804560500, 217.75]] + actual = self.data_handler._evaluate_prior_data( + series, self.steady_state_samples) + self.assertEqual(True, actual) + + @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("storperf.db.test_results_db.push_results_to_db") + @mock.patch("storperf.utilities.data_handler.GraphiteDB") + def test_terminated_report(self, mock_graphite_db, mock_results_db): + self._terminated = True + mock_results_db.side_effect = self.push_results_to_db + mock_graphite_db.side_effect = MockGraphiteDB + self.metadata = { + "steady_state": { + "rr.queue-depth.8.block-size.16384": True, + "rr.queue-depth.8.block-size.2048": False, + "rr.queue-depth.8.block-size.8192": True, + }, + } + + self.data_handler.data_event(self) + self.assertEqual(True, self.pushed) + + @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("time.time") + @mock.patch("storperf.db.test_results_db.push_results_to_db") + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series") + @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads") + def test_non_terminated_report(self, mock_job_db, mock_graphite_db, + mock_results_db, mock_time): + self._terminated = False + mock_results_db.side_effect = self.push_results_to_db + series = \ + [[1480455910, 205.345], + [1480455920, 201.59], + [1480455930, 205.76], + [1480455970, 215.655], + [1480455980, 214.16], + [1480455990, 213.57], + [1480456030, 219.37], + [1480456040, 219.28], + [1480456050, 217.75]] + mock_graphite_db.return_value = series + mock_time.return_value = series[-1][0] + 10 + expected_slope = 12.292030334472656 + expected_range = 17.78 + expected_average = 212.49777777777774 + + self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" % + ("job_id", + "rw", + 8, + 8192)) + + mock_job_db.return_value = [[self.current_workload, 4804559000, None]] + + self.data_handler.data_event(self) + self.assertEqual(False, self.pushed) + self.assertEqual(False, self._terminated) + + self.assertEqual(expected_slope, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] + ['lat.mean'] + ['read'] + ['slope']) + self.assertEqual(expected_range, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] + ['lat.mean'] + ['read'] + ['range']) + self.assertEqual(expected_average, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] + ['lat.mean'] + ['read'] + ['average']) + + @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("time.time") + @mock.patch("storperf.db.test_results_db.push_results_to_db") + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series") + @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads") + def test_report_that_causes_termination(self, + mock_job_db, + mock_graphite_db, + mock_results_db, + mock_time): + self._terminated = False + mock_results_db.side_effect = self.push_results_to_db + series = [[4804559100, 205.345], + [4804559200, 201.59], + [4804559300, 205.76], + [4804559400, 205.76], + [4804559500, 205.76], + [4804559600, 205.76], + [4804559700, 205.76], + [4804560300, 219.37], + [4804560400, 219.28], + [4804560500, 217.75]] + report_data = [[2, 205.345], + [3, 201.59], + [5, 205.76], + [7, 205.76], + [8, 205.76], + [10, 205.76], + [12, 205.76], + [22, 219.37], + [23, 219.28], + [25, 217.75]] + mock_graphite_db.return_value = series + mock_time.return_value = 4804560500 + 10 + + expected_slope = 0.7419522662249607 + expected_range = 17.78 + expected_average = 209.2135 + + self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" % + ("job_id", + "rw", + 8, + 8192)) + + mock_job_db.return_value = [[self.current_workload, 4804559000, None]] + + self.data_handler.data_event(self) + + self.assertEqual(expected_slope, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] + ['lat.mean'] + ['read'] + ['slope']) + self.assertEqual(expected_range, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] + ['lat.mean'] + ['read'] + ['range']) + self.assertEqual(expected_average, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] + ['lat.mean'] + ['read'] + ['average']) + self.assertEqual(report_data, self.metadata['report_data'] + ['rw.queue-depth.8.block-size.8192'] + ['lat.mean'] + ['read'] + ['series']) + self.assertEqual(True, self._terminated) + + self.assertEqual(False, self.pushed) + + @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("storperf.db.test_results_db.push_results_to_db") + def test_playload_report(self, + mock_results_db): + mock_results_db.side_effect = self.push_results_to_db + self.start_time = 1504559100 + self.end_time = 1504560000 + self.metadata = { + "scenario_name": "ceph_ws,wr,rs,rr,rw", + "status": "OK", + "steady_state": { + "rr.queue-depth.8.block-size.16384": True, + "rr.queue-depth.8.block-size.2048": False, + "rr.queue-depth.8.block-size.8192": True, + }, + "storage_node_count": 5, + "volume_size": 10 + } + self.data_handler._push_to_db(self) + self.assertEqual('FAIL', self.db_results[9], + 'Expected FAIL in criteria') + self.assertEqual('2017-09-04 21:05:00', self.db_results[3], + 'Start time') + self.assertEqual('2017-09-04 21:20:00', self.db_results[4], + 'End time') diff --git a/docker/storperf-master/tests/utilities_tests/data_treatment_test.py b/docker/storperf-master/tests/utilities_tests/data_treatment_test.py new file mode 100644 index 0000000..4450f92 --- /dev/null +++ b/docker/storperf-master/tests/utilities_tests/data_treatment_test.py @@ -0,0 +1,81 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest +from storperf.utilities import data_treatment as DataTreatment + + +class DataTreatmentTest(unittest.TestCase): + + def setUp(self): + unittest.TestCase.setUp(self) + + def test_empty_series(self): + expected = { + 'slope_data': [], + 'range_data': [], + 'average_data': [] + } + data_series = [] + actual = DataTreatment.data_treatment(data_series) + self.assertEqual(expected, actual) + + def test_integer_series(self): + expected = { + 'slope_data': [[1, 5], [66, 2], [12, 98], [74, 669], [33, 66]], + 'range_data': [5, 2, 98, 669, 66], + 'average_data': [5, 2, 98, 669, 66] + } + data_series = [[1, 5], [66, 2], [12, 98], [74, 669], [33, 66]] + actual = DataTreatment.data_treatment(data_series) + self.assertEqual(expected, actual) + + def test_float_series(self): + expected = { + 'slope_data': [[5.6, 12.7], [96.66, 78.212], + [639.568, 5.3], [4.65, 6.667]], + 'range_data': [12.7, 78.212, 5.3, 6.667], + 'average_data': [12.7, 78.212, 5.3, 6.667] + } + data_series = [ + [5.6, 12.7], [96.66, 78.212], [639.568, 5.3], [4.65, 6.667]] + actual = DataTreatment.data_treatment(data_series) + self.assertEqual(expected, actual) + + def test_float_int_mix(self): + expected = { + 'slope_data': [[5, 12.7], [96.66, 7], [639.568, 5.3], [4, 6]], + 'range_data': [12.7, 7, 5.3, 6], + 'average_data': [12.7, 7, 5.3, 6] + } + data_series = [[5, 12.7], [96.66, 7], [639.568, 5.3], [4, 6]] + actual = DataTreatment.data_treatment(data_series) + self.assertEqual(expected, actual) + + def test_negative_values(self): + expected = { + 'slope_data': [[-15, 5.56], [41.3, -278], [41.3, -98], + [78.336, -0.12], [33.667, 66]], + 'range_data': [5.56, -278, -98, -0.12, 66], + 'average_data': [5.56, -278, -98, -0.12, 66] + } + data_series = [ + [-15, 5.56], [41.3, -278], [41.3, -98], + [78.336, -0.12], [33.667, 66]] + actual = DataTreatment.data_treatment(data_series) + self.assertEqual(expected, actual) + + def test_single_value(self): + expected = { + 'slope_data': [[86.8, 65.36]], + 'range_data': [65.36], + 'average_data': [65.36] + } + data_series = [[86.8, 65.36]] + actual = DataTreatment.data_treatment(data_series) + self.assertEqual(expected, actual) diff --git a/docker/storperf-master/tests/utilities_tests/dictionary_test.py b/docker/storperf-master/tests/utilities_tests/dictionary_test.py new file mode 100644 index 0000000..0819cef --- /dev/null +++ b/docker/storperf-master/tests/utilities_tests/dictionary_test.py @@ -0,0 +1,42 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import unittest +from storperf.utilities import dictionary + + +class DictionaryTest(unittest.TestCase): + + def setUp(self): + self.dictionary = {} + self.dictionary['key'] = 'value' + pass + + def test_get_no_default(self): + expected = None + actual = dictionary.get_key_from_dict(self.dictionary, 'no-key') + self.assertEqual(expected, actual) + + def test_get_with_default(self): + expected = 'value 2' + actual = dictionary.get_key_from_dict( + self.dictionary, 'no-key', expected) + self.assertEqual(expected, actual) + + def test_get_with_value(self): + expected = 'value' + actual = dictionary.get_key_from_dict( + self.dictionary, 'key') + self.assertEqual(expected, actual) + + def test_get_with_value_and_default(self): + expected = 'value' + actual = dictionary.get_key_from_dict( + self.dictionary, 'key', 'value 2') + self.assertEqual(expected, actual) diff --git a/docker/storperf-master/tests/utilities_tests/math_average_test.py b/docker/storperf-master/tests/utilities_tests/math_average_test.py new file mode 100644 index 0000000..3095f56 --- /dev/null +++ b/docker/storperf-master/tests/utilities_tests/math_average_test.py @@ -0,0 +1,52 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest +from storperf.utilities import math as math + + +class MathAverageTest(unittest.TestCase): + + def setUp(self): + unittest.TestCase.setUp(self) + + def test_empty_series(self): + expected = None + data_series = [] + actual = math.average(data_series) + self.assertEqual(expected, actual) + + def test_integer_series(self): + expected = 19.75 + data_series = [5, 12, 7, 55] + actual = math.average(data_series) + self.assertEqual(expected, actual) + + def test_float_series(self): + expected = 63.475899999999996 + data_series = [78.6, 45.187, 33.334, 96.7826] + actual = math.average(data_series) + self.assertEqual(expected, actual) + + def test_float_int_mix(self): + expected = 472.104 + data_series = [10, 557.33, 862, 56.99, 874.2] + actual = math.average(data_series) + self.assertEqual(expected, actual) + + def test_negative_values(self): + expected = -17.314 + data_series = [-15.654, 59.5, 16.25, -150, 3.334] + actual = math.average(data_series) + self.assertEqual(expected, actual) + + def test_single_value(self): + expected = -66.6667 + data_series = [-66.6667] + actual = math.average(data_series) + self.assertEqual(expected, actual) diff --git a/docker/storperf-master/tests/utilities_tests/math_range_test.py b/docker/storperf-master/tests/utilities_tests/math_range_test.py new file mode 100644 index 0000000..90519e7 --- /dev/null +++ b/docker/storperf-master/tests/utilities_tests/math_range_test.py @@ -0,0 +1,120 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from random import uniform, randrange +import unittest + +from storperf.utilities import math as Range + + +class MathRangeTest(unittest.TestCase): + + def setUp(self): + unittest.TestCase.setUp(self) + + def test_empty_series(self): + expected = None + data_series = [] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_integer_series(self): + expected = 11946 + data_series = [5, 351, 847, 2, 1985, 18, + 96, 389, 687, 1, 11947, 758, 155] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_float_series_1_decimal(self): + expected = 778595.5 + data_series = [736.4, 9856.4, 684.2, 0.3, 0.9, 778595.8] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_float_series_2_decimals(self): + expected = 5693.47 + data_series = [51.36, 78.40, 1158.24, 5.50, 0.98, 5694.45] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_float_series_3_decimals(self): + expected = 992.181 + data_series = [4.562, 12.582, 689.452, + 135.162, 996.743, 65.549, 36.785] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_float_series_4_decimals(self): + expected = 122985.3241 + data_series = [39.4785, 896.7845, 11956.3654, + 44.2398, 6589.7134, 0.3671, 122985.6912] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_float_series_5_decimals(self): + expected = 8956208.84494 + data_series = [12.78496, 55.91275, 668.94378, + 550396.5671, 512374.9999, 8956221.6299] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_float_series_10_decimals(self): + expected = 5984.507397972699 + data_series = [1.1253914785, 5985.6327894512, + 256.1875693287, 995.8497623415] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_float_mix(self): + expected = 60781.6245372199 + data_series = [60785.9962, 899.4, 78.66, 69.58, 4.93795, + 587.195486, 96.7694536, 5.13755964, + 33.333333334, 60786.5624872199] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_float_integer_mix(self): + expected = 460781.05825 + data_series = [460785.9962, 845.634, 24.1, 69.58, 89, 4.93795] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_negative_values(self): + expected = 596.78163 + data_series = [-4.655, -33.3334, -596.78422, -0.00259, -66.785] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_negative_positive_mix(self): + expected = 58.859500000000004 + data_series = [6.85698, -2.8945, 0, -0.15, 55.965] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_single_element(self): + expected = 0 + data_series = [2.265] + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_10000_values_processing(self): + expected = 28001.068 + data_series = [uniform(-10000, 10000) for _ in range(10000)] + data_series.insert(randrange(len(data_series)), 15000.569) + data_series.insert(randrange(len(data_series)), -13000.499) + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) + + def test_processing_100_values_100_times(self): + expected = 35911.3134 + for _ in range(1, 100): + data_series = [uniform(-10000, 10000) for _ in range(100)] + data_series.insert(randrange(len(data_series)), 16956.3334) + data_series.insert(randrange(len(data_series)), -18954.98) + actual = Range.range_value(data_series) + self.assertEqual(expected, actual) diff --git a/docker/storperf-master/tests/utilities_tests/math_slope_test.py b/docker/storperf-master/tests/utilities_tests/math_slope_test.py new file mode 100644 index 0000000..24d5cd7 --- /dev/null +++ b/docker/storperf-master/tests/utilities_tests/math_slope_test.py @@ -0,0 +1,72 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest +from storperf.utilities import math as Slope + + +class MathSlopeTest(unittest.TestCase): + + def setUp(self): + unittest.TestCase.setUp(self) + pass + + def test_slope_empty_series(self): + expected = None + actual = Slope.slope([]) + self.assertEqual(expected, actual) + + def test_slope_integer_series(self): + expected = 1.4 + actual = Slope.slope([[1, 6], [2, 5], [3, 7], [4, 10]]) + self.assertEqual(expected, actual) + + def test_slope_decimal_series(self): + expected = 1.4 + actual = Slope.slope([[1.0, 6.0], [2.0, 5.0], [3.0, 7.0], [4.0, 10.0]]) + self.assertEqual(expected, actual) + + def test_slope_decimal_integer_mix(self): + expected = 1.4 + actual = Slope.slope([[1.0, 6], [2, 5.0], [3, 7], [4.0, 10]]) + self.assertEqual(expected, actual) + + def test_slope_negative_y_series(self): + expected = 2 + actual = Slope.slope([[1.0, -2], [2, 2], [3, 2]]) + self.assertEqual(expected, actual) + + def test_slope_negative_x_series(self): + expected = 1.4 + actual = Slope.slope([[-24, 6.0], [-23, 5], [-22, 7.0], [-21, 10]]) + self.assertEqual(expected, actual) + + def test_slope_out_of_order_series(self): + expected = 1.4 + actual = Slope.slope([[2, 5.0], [4, 10], [3.0, 7], [1, 6]]) + self.assertEqual(expected, actual) + + def test_slope_0_in_y(self): + expected = -0.5 + actual = Slope.slope([[15.5, 1], [16.5, 0], [17.5, 0]]) + self.assertEqual(expected, actual) + + def test_slope_0_in_x(self): + expected = 1.4 + actual = Slope.slope([[0, 6.0], [1, 5], [2, 7], [3, 10]]) + self.assertEqual(expected, actual) + + def test_slope_0_in_x_and_y(self): + expected = 1.5 + actual = Slope.slope([[0.0, 0], [1, 1], [2, 3]]) + self.assertEqual(expected, actual) + + def test_infinte_slope(self): + expected = None + actual = Slope.slope([[1480623510, 1295.87], [1480623520, 1380.79]]) + self.assertEqual(expected, actual) diff --git a/docker/storperf-master/tests/utilities_tests/steady_state_test.py b/docker/storperf-master/tests/utilities_tests/steady_state_test.py new file mode 100644 index 0000000..564c090 --- /dev/null +++ b/docker/storperf-master/tests/utilities_tests/steady_state_test.py @@ -0,0 +1,65 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest +from storperf.utilities import steady_state as SteadyState + + +class SteadyStateTest(unittest.TestCase): + + def setUp(self): + unittest.TestCase.setUp(self) + + def test_integer_values(self): + expected = True + data_series = [[305, 20], [306, 21], [307, 21], [308, 19]] + actual = SteadyState.steady_state(data_series) + self.assertEqual(expected, actual) + + def test_float_values(self): + expected = True + data_series = [ + [55.5, 40.5], [150.2, 42.3], [150.8, 41.8], [151.2, 41.5]] + actual = SteadyState.steady_state(data_series) + self.assertEqual(expected, actual) + + def test_float_integer_mix_false(self): + expected = False + data_series = [[1, 2], [2, 2.2], [3, 1.8], [4, 1.8]] + actual = SteadyState.steady_state(data_series) + self.assertEqual(expected, actual) + + def test_float_integer_mix_true(self): + expected = True + data_series = [[12, 18], [12.5, 18.2], [13, 16.8], [15, 16.8]] + actual = SteadyState.steady_state(data_series) + self.assertEqual(expected, actual) + + def test_empty_series(self): + expected = False + data_series = [] + actual = SteadyState.steady_state(data_series) + self.assertEqual(expected, actual) + + def test_negative_values(self): + expected = True + data_series = [[-1, -24.2], [0.5, -23.8], [1.1, -24.0], [3.2, -24.0]] + actual = SteadyState.steady_state(data_series) + self.assertEqual(expected, actual) + + def test_out_of_order_series(self): + expected = True + data_series = [[-15, 0.43], [-16, 0.41], [-3, 0.45], [4, 0.42]] + actual = SteadyState.steady_state(data_series) + self.assertEqual(expected, actual) + + def test_negative_slope(self): + expected = False + data_series = [[1.3, 1], [1.2, 1], [1.1, 1.1], [1.0, 1.1]] + actual = SteadyState.steady_state(data_series) + self.assertEqual(expected, actual) diff --git a/docker/storperf-master/tests/utilities_tests/thread_gate_test.py b/docker/storperf-master/tests/utilities_tests/thread_gate_test.py new file mode 100644 index 0000000..de8b15a --- /dev/null +++ b/docker/storperf-master/tests/utilities_tests/thread_gate_test.py @@ -0,0 +1,57 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import time +import unittest + +from storperf.utilities.thread_gate import FailureToReportException +from storperf.utilities.thread_gate import ThreadGate + + +class ThreadGateTest(unittest.TestCase): + + def setUp(self): + pass + + def test_one_one_report(self): + gate = ThreadGate(1) + self.assertEqual(True, gate.report(1)) + + def test_two_one_report(self): + gate = ThreadGate(2) + self.assertEqual(False, gate.report(1)) + + def test_two_two_reports(self): + gate = ThreadGate(2) + self.assertEqual(False, gate.report(1)) + self.assertEqual(True, gate.report(2)) + + def test_two_one_duplicate_reports(self): + gate = ThreadGate(2) + self.assertEqual(False, gate.report(1)) + self.assertEqual(False, gate.report(1)) + self.assertEqual(True, gate.report(2)) + + def test_two_old_old_report(self): + timeout = 5 + gate = ThreadGate(2, timeout) + report_time = time.time() - (timeout * 2) + gate._registrants[2] = report_time + self.assertEqual(False, gate.report(1)) + + def test_two_never_report(self): + timeout = 5 + gate = ThreadGate(2, timeout) + report_time = time.time() - (timeout * 3) + gate._creation_time = report_time + try: + gate.report(1) + self.fail() + except FailureToReportException: + pass diff --git a/docker/storperf-master/tests/workload_tests/__init__.py b/docker/storperf-master/tests/workload_tests/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/docker/storperf-master/tests/workload_tests/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/tests/workload_tests/workload_subclass_test.py b/docker/storperf-master/tests/workload_tests/workload_subclass_test.py new file mode 100644 index 0000000..e9e47f3 --- /dev/null +++ b/docker/storperf-master/tests/workload_tests/workload_subclass_test.py @@ -0,0 +1,54 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from storperf.workloads.rr import rr +from storperf.workloads.rs import rs +from storperf.workloads.rw import rw +from storperf.workloads.wr import wr +from storperf.workloads.ws import ws +import unittest + + +class WorkloadSubclassTest(unittest.TestCase): + + def setUp(self): + pass + + def test_local_name(self): + workload = rr() + self.assertEqual(workload.fullname, + "None.rr.queue-depth.1.block-size.64k.None", + workload.fullname) + + def test_remote_name(self): + workload = rw() + workload.remote_host = "192.168.0.1" + self.assertEqual(workload.fullname, + "None.rw.queue-depth.1.block-size.64k.192-168-0-1", + workload.fullname) + + def test_blocksize(self): + workload = rs() + workload.options["bs"] = "4k" + self.assertEqual(workload.fullname, + "None.rs.queue-depth.1.block-size.4k.None", + workload.fullname) + + def test_queue_depth(self): + workload = wr() + workload.options["iodepth"] = "8" + self.assertEqual(workload.fullname, + "None.wr.queue-depth.8.block-size.64k.None", + workload.fullname) + + def test_id(self): + workload = ws() + workload.id = "workloadid" + self.assertEqual(workload.fullname, + "workloadid.ws.queue-depth.1.block-size.64k.None", + workload.fullname) |