diff options
Diffstat (limited to 'storperf')
27 files changed, 904 insertions, 195 deletions
diff --git a/storperf/__init__.py b/storperf/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/storperf/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/carbon/__init__.py b/storperf/carbon/__init__.py index e69de29..73334c7 100644 --- a/storperf/carbon/__init__.py +++ b/storperf/carbon/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/db/__init__.py b/storperf/db/__init__.py index e69de29..73334c7 100644 --- a/storperf/db/__init__.py +++ b/storperf/db/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/db/configuration_db.py b/storperf/db/configuration_db.py new file mode 100644 index 0000000..649c186 --- /dev/null +++ b/storperf/db/configuration_db.py @@ -0,0 +1,103 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from _sqlite3 import OperationalError +import logging +import sqlite3 + + +class ConfigurationDB(object): + + db_name = "StorPerf.db" + + def __init__(self): + """ + Creates the StorPerf.db and configuration tables on demand + """ + + self.logger = logging.getLogger(__name__) + self.logger.debug("Connecting to " + ConfigurationDB.db_name) + db = sqlite3.connect(ConfigurationDB.db_name) + + cursor = db.cursor() + try: + cursor.execute('''CREATE TABLE configuration + (configuration_name text, + key text, + value text)''') + self.logger.debug("Created configuration table") + except OperationalError: + self.logger.debug("Configuration table exists") + + cursor.execute('SELECT * FROM configuration') + + def delete_configuration_value(self, configuration_name, key): + """Deletes the value associated with the given key + """ + + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() + + cursor.execute( + "delete from configuration where configuration_name=? and key=?", + (configuration_name, key)) + + self.logger.debug("Deleted " + configuration_name + ":" + key) + + db.commit() + + def get_configuration_value(self, configuration_name, key): + """Returns a string representation of the value stored + with this key under the given configuration name. + """ + + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() + + cursor.execute( + """select value from configuration + where configuration_name = ? + and key = ?""", + (configuration_name, key,)) + + row = cursor.fetchone() + + if (row is None): + self.logger.debug( + configuration_name + ":" + key + " does not exist") + return None + else: + self.logger.debug( + configuration_name + ":" + key + " is " + str(row[0])) + return str(row[0]) + + def set_configuration_value(self, configuration_name, key, value): + """Updates or creates the key under the given configuration + name so that it holds the value specified. + """ + + if (value is None): + return self.delete_configuration_value(configuration_name, key) + + value = str(value) + + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() + + cursor.execute( + "delete from configuration where configuration_name=? and key=?", + (configuration_name, key)) + + cursor.execute( + """insert into configuration(configuration_name, key, value) + values (?,?,?)""", (configuration_name, key, value)) + + self.logger.debug(configuration_name + ":" + key + " set to " + value) + + db.commit() diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py index a65fa78..bec8d3f 100644 --- a/storperf/db/job_db.py +++ b/storperf/db/job_db.py @@ -28,10 +28,10 @@ class JobDB(object): self.logger = logging.getLogger(__name__) self.logger.debug("Connecting to " + JobDB.db_name) - self.db = sqlite3.connect(JobDB.db_name) self.job_id = None - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() try: cursor.execute('''CREATE TABLE jobs (job_id text, @@ -49,7 +49,8 @@ class JobDB(object): Returns a job id that is guaranteed to be unique in this StorPerf instance. """ - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() self.job_id = str(uuid.uuid4()) row = cursor.execute( @@ -64,7 +65,7 @@ class JobDB(object): cursor.execute( "insert into jobs(job_id) values (?)", (self.job_id,)) self.logger.debug("Reserved job id " + self.job_id) - self.db.commit() + db.commit() def start_workload(self, workload_name): """ @@ -73,7 +74,9 @@ class JobDB(object): if (self.job_id is None): self.create_job_id() - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + now = str(calendar.timegm(time.gmtime())) row = cursor.execute( @@ -104,7 +107,7 @@ class JobDB(object): now, workload_name,)) - self.db.commit() + db.commit() def end_workload(self, workload_name): """ @@ -113,7 +116,8 @@ class JobDB(object): if (self.job_id is None): self.create_job_id() - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() now = str(calendar.timegm(time.gmtime())) row = cursor.execute( @@ -146,7 +150,7 @@ class JobDB(object): now, workload_name,)) - self.db.commit() + db.commit() def fetch_results(self, workload_prefix=""): if (workload_prefix is None): @@ -161,7 +165,8 @@ class JobDB(object): self.logger.debug("Workload like: " + workload_prefix) - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() cursor.execute("""select start, end, workload from jobs where workload like ?""", (workload_prefix,)) @@ -186,8 +191,6 @@ class JobDB(object): '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \ start_time + "&until=" + end_time - print '\n\t' + request + '\n' - response = requests.get(request) if (response.status_code == 200): diff --git a/storperf/fio/__init__.py b/storperf/fio/__init__.py index e69de29..73334c7 100644 --- a/storperf/fio/__init__.py +++ b/storperf/fio/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index 0b13349..e343dce 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -7,10 +7,11 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from threading import Thread +import cmd import json import logging import subprocess -from threading import Thread class FIOInvoker(object): @@ -18,6 +19,18 @@ class FIOInvoker(object): def __init__(self): self.logger = logging.getLogger(__name__) self.event_listeners = set() + self.event_callback_ids = set() + self._remote_host = None + self.callback_id = None + + @property + def remote_host(self): + return self._remote_host + + @remote_host.setter + def remote_host(self, value): + self._remote_host = value + self.logger = logging.getLogger(__name__ + ":" + value) def register(self, event_listener): self.event_listeners.add(event_listener) @@ -41,7 +54,7 @@ class FIOInvoker(object): self.json_body = "" for event_listener in self.event_listeners: - event_listener(json_metric) + event_listener(self.callback_id, json_metric) except Exception, e: self.logger.error("Error parsing JSON: %s", e) @@ -58,10 +71,18 @@ class FIOInvoker(object): self.fio_process.stderr.close() def execute(self, args=[]): - for arg in args: - self.logger.debug("FIO arg: " + arg) - - self.fio_process = subprocess.Popen(['fio'] + args, + self.logger.debug("FIO args " + str(args)) + + if (self.remote_host is None): + cmd = "fio" + else: + cmd = "ssh" + additional_args = ['-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + 'ubuntu@' + self.remote_host, "./fio"] + args = additional_args + args + + self.fio_process = subprocess.Popen([cmd] + args, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/storperf/logging.json b/storperf/logging.json index b2fb73b..6168717 100644 --- a/storperf/logging.json +++ b/storperf/logging.json @@ -15,11 +15,11 @@ "stream": "ext://sys.stdout" }, - "info_file_handler": { + "file_handler": { "class": "logging.handlers.RotatingFileHandler", - "level": "INFO", + "level": "DEBUG", "formatter": "simple", - "filename": "info.log", + "filename": "storperf.log", "maxBytes": 10485760, "backupCount": 20, "encoding": "utf8" @@ -45,7 +45,12 @@ }, "root": { - "level": "INFO", - "handlers": ["console", "info_file_handler", "error_file_handler"] + "level": "WARN", + "handlers": ["console", "file_handler", "error_file_handler"] + }, + + "storperf": { + "level": "DEBUG", + "handlers": ["console", "file_handler", "error_file_handler"] } }
\ No newline at end of file diff --git a/storperf/main.py b/storperf/main.py deleted file mode 100644 index 11357f4..0000000 --- a/storperf/main.py +++ /dev/null @@ -1,120 +0,0 @@ -############################################################################## -# Copyright (c) 2015 EMC and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import getopt -import json -import logging.config -import os -import sys - -from test_executor import TestExecutor, UnknownWorkload - -""" -""" - - -class Usage(Exception): - - def __init__(self, msg): - self.msg = msg - - -def setup_logging( - default_path='storperf/logging.json', - default_level=logging.INFO, - env_key='LOG_CFG' -): - """Setup logging configuration - - """ - path = default_path - value = os.getenv(env_key, None) - if value: - path = value - if os.path.exists(path): - with open(path, 'rt') as f: - config = json.load(f) - logging.config.dictConfig(config) - else: - logging.basicConfig(level=default_level) - - -def event(event_string): - logging.getLogger(__name__).info(event_string) - - -def main(argv=None): - setup_logging() - test_executor = TestExecutor() - verbose = False - debug = False - workloads = None - report = None - - if argv is None: - argv = sys.argv - try: - try: - opts, args = getopt.getopt(argv[1:], "t:w:r:scvdh", - ["target=", - "workload=", - "report=", - "nossd", - "nowarm", - "verbose", - "debug", - "help", - ]) - except getopt.error, msg: - raise Usage(msg) - - for o, a in opts: - if o in ("-h", "--help"): - print __doc__ - return 0 - elif o in ("-t", "--target"): - test_executor.filename = a - elif o in ("-t", "--target"): - report = a - elif o in ("-v", "--verbose"): - verbose = True - elif o in ("-d", "--debug"): - debug = True - elif o in ("-s", "--nossd"): - test_executor.precondition = False - elif o in ("-c", "--nowarm"): - test_executor.warm = False - elif o in ("-w", "--workload"): - workloads = a.split(",") - elif o in ("-r", "--report"): - report = a - - if (debug): - logging.getLogger().setLevel(logging.DEBUG) - - test_executor.register_workloads(workloads) - - except Usage, err: - print >> sys.stderr, err.msg - print >> sys.stderr, "for help use --help" - return 2 - except UnknownWorkload, err: - print >> sys.stderr, err.msg - print >> sys.stderr, "for help use --help" - return 2 - - if (verbose): - test_executor.register(event) - - if (report is not None): - print test_executor.fetch_results(report, workloads) - else: - test_executor.execute() - -if __name__ == "__main__": - sys.exit(main()) diff --git a/storperf/resources/hot/agent-group.yaml b/storperf/resources/hot/agent-group.yaml new file mode 100644 index 0000000..315ecf3 --- /dev/null +++ b/storperf/resources/hot/agent-group.yaml @@ -0,0 +1,57 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +heat_template_version: 2013-05-23 + +parameters: + agent_network: + type: string + constraints: + - custom_constraint: neutron.network + flavor: + type: string + default: "StorPerf Agent" + key_name: + type: string + default: StorPerf + volume_size: + type: number + description: Size of the volume to be created. + default: 1 + constraints: + - range: { min: 1, max: 1024 } + description: must be between 1 and 1024 Gb. + agent_count: + type: number + default: 1 + constraints: + - range: { min: 1, max: 512 } + description: must be between 1 and 512 agents. + + +resources: + slaves: + type: OS::Heat::ResourceGroup + properties: + count: {get_param: agent_count} + resource_def: { + type: "storperf-agent.yaml", + properties: { + agent_network: {get_param: agent_network}, + flavor: {get_param: flavor}, + key_name: {get_param: key_name}, + volume_size: {get_param: volume_size} + } + } + +outputs: + slave_ips: { + description: "Slave addresses", + value: { get_attr: [ slaves, storperf_agent_ip] } + } diff --git a/storperf/resources/hot/storperf-agent.yaml b/storperf/resources/hot/storperf-agent.yaml new file mode 100644 index 0000000..94238e5 --- /dev/null +++ b/storperf/resources/hot/storperf-agent.yaml @@ -0,0 +1,101 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +heat_template_version: 2013-05-23 + +parameters: + flavor: + type: string + default: m1.small + image: + type: string + default: 'StorPerf Agent' + key_name: + type: string + default: StorPerf + username: + type: string + default: storperf + volume_size: + type: number + description: Size of the volume to be created. + default: 1 + constraints: + - range: { min: 1, max: 1024 } + description: must be between 1 and 1024 Gb. + agent_network: + type: string + constraints: + - custom_constraint: neutron.network + +resources: + + storperf_agent: + type: "OS::Nova::Server" + properties: + name: storperf-agent + image: { get_param: image } + flavor: { get_param: flavor } + key_name: { get_param: key_name } + networks: + - port: { get_resource: storperf_agent_port } + user_data: { get_resource: storperf_agent_config } + user_data_format: RAW + + storperf_agent_config: + type: "OS::Heat::CloudConfig" + properties: + cloud_config: + users: + - name: { get_param: username } + groups: users + shell: /bin/bash + sudo: "ALL=(ALL) NOPASSWD:ALL" + ssh_authorized_keys: + - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07 + - default + package_update: false + package_upgrade: false + manage_etc_hosts: localhost + + storperf_agent_port: + type: "OS::Neutron::Port" + properties: + network_id: { get_param: agent_network } + security_groups: + - { get_resource: storperf_security_group } + + storperf_security_group: + type: OS::Neutron::SecurityGroup + properties: + description: Neutron security group rules + name: StorPerf-Security-Group + rules: + - remote_ip_prefix: 0.0.0.0/0 + protocol: tcp + direction: ingress + - remote_ip_prefix: 0.0.0.0/0 + protocol: icmp + direction: ingress + + agent_volume: + type: OS::Cinder::Volume + properties: + size: { get_param: volume_size } + + agent_volume_att: + type: OS::Cinder::VolumeAttachment + properties: + instance_uuid: { get_resource: storperf_agent } + volume_id: { get_resource: agent_volume} + +outputs: + storperf_agent_ip: + description: The IP address of the agent on the StorPerf network + value: { get_attr: [ storperf_agent, first_address ] }
\ No newline at end of file diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py new file mode 100644 index 0000000..9e88a3c --- /dev/null +++ b/storperf/storperf_master.py @@ -0,0 +1,342 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from threading import Thread +import logging +import os +import subprocess + +from db.configuration_db import ConfigurationDB +from test_executor import TestExecutor +import cinderclient.v2 as cinderclient +import heatclient.client as heatclient +import keystoneclient.v2_0 as ksclient + + +class ParameterError(Exception): + """ """ + + +class StorPerfMaster(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + + self.configuration_db = ConfigurationDB() + + template_file = open("storperf/resources/hot/agent-group.yaml") + self._agent_group_hot = template_file.read() + template_file = open("storperf/resources/hot/storperf-agent.yaml") + self._agent_resource_hot = template_file.read() + self._hot_files = { + 'storperf-agent.yaml': self._agent_resource_hot + } + self.logger.debug( + "Loaded agent-group template as: " + self._agent_group_hot) + self.logger.debug( + "Loaded agent-resource template as: " + self._agent_resource_hot) + + self._username = os.environ.get('OS_USERNAME') + self._password = os.environ.get('OS_PASSWORD') + self._tenant_name = os.environ.get('OS_TENANT_NAME') + self._project_name = os.environ.get('OS_PROJECT_NAME') + self._auth_url = os.environ.get('OS_AUTH_URL') + + self._cinder_client = None + self._heat_client = None + self._test_executor = TestExecutor() + + @property + def volume_size(self): + value = self.configuration_db.get_configuration_value( + 'stack', + 'volume_size') + if (value is None): + self.volume_size = 1 + value = 1 + return int(value) + + @volume_size.setter + def volume_size(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change volume size after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'volume_size', + value) + + @property + def agent_count(self): + value = self.configuration_db.get_configuration_value( + 'stack', + 'agent_count') + + if (value is None): + self.agent_count = 1 + value = 1 + return int(value) + + @agent_count.setter + def agent_count(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change agent count after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'agent_count', + value) + + @property + def agent_network(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'agent_network') + + @agent_network.setter + def agent_network(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change agent network after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'agent_network', + value) + + @property + def stack_id(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'stack_id') + + @stack_id.setter + def stack_id(self, value): + self.configuration_db.set_configuration_value( + 'stack', + 'stack_id', + value) + + @property + def volume_quota(self): + self._attach_to_openstack() + quotas = self._cinder_client.quotas.get(self._tenant_name) + return int(quotas.volumes) + + @property + def filename(self): + return self._test_executor.filename + + @filename.setter + def filename(self, value): + self._test_executor.filename = value + + @property + def precondition(self): + return self._test_executor.precondition + + @precondition.setter + def precondition(self, value): + self._test_executor.precondition = value + + @property + def warm_up(self): + return self._test_executor.warm + + @warm_up.setter + def warm_up(self, value): + self._test_executor.warm = value + + @property + def is_stack_created(self): + if (self.stack_id is not None): + self._attach_to_openstack() + + stack = self._heat_client.stacks.get(self.stack_id) + status = getattr(stack, 'stack_status') + + self.logger.info("Status=" + status) + if (status == u'CREATE_COMPLETE'): + return True + + return False + + @property + def workloads(self): + return self.configuration_db.get_configuration_value( + 'workload', + 'workloads') + + @workloads.setter + def workloads(self, value): + self._test_executor.register_workloads(value) + + self.configuration_db.set_configuration_value( + 'workload', + 'workloads', + str(self._test_executor.workload_modules)) + + def create_stack(self): + if (self.stack_id is not None): + raise ParameterError("ERROR: Stack has already been created") + + self._attach_to_openstack() + if (self.agent_count > self.volume_quota): + message = "ERROR: Volume quota too low: " + \ + str(self.agent_count) + " > " + str(self.volume_quota) + raise ParameterError(message) + + stack = self._heat_client.stacks.create( + stack_name="StorPerfAgentGroup", + template=self._agent_group_hot, + files=self._hot_files, + parameters=self._make_parameters()) + + self.stack_id = stack['stack']['id'] + pass + + def validate_stack(self): + self._attach_to_openstack() + if (self.agent_count > self.volume_quota): + message = "ERROR: Volume quota too low: " + \ + str(self.agent_count) + " > " + str(self.volume_quota) + self.logger.error(message) + raise ParameterError(message) + + self._heat_client.stacks.preview( + stack_name="StorPerfAgentGroup", + template=self._agent_group_hot, + files=self._hot_files, + parameters=self._make_parameters()) + return True + + def wait_for_stack_creation(self): + + pass + + def delete_stack(self): + if (self.stack_id is None): + raise ParameterError("ERROR: Stack does not exist") + + self._attach_to_openstack() + + self._heat_client.stacks.delete(stack_id=self.stack_id) + self.stack_id = None + + pass + + def execute_workloads(self): + + if (self.stack_id is None): + raise ParameterError("ERROR: Stack does not exist") + + self._attach_to_openstack() + + stack = self._heat_client.stacks.get(self.stack_id) + outputs = getattr(stack, 'outputs') + slaves = outputs[0]['output_value'] + + setup_threads = [] + + for slave in slaves: + t = Thread(target=self._setup_slave, args=(slave,)) + setup_threads.append(t) + t.start() + + for thread in setup_threads: + thread.join() + + self._test_executor.slaves = slaves + return self._test_executor.execute() + + def _setup_slave(self, slave): + logger = logging.getLogger(__name__ + ":" + slave) + + logger.info("Initializing slave at " + slave) + + args = ['scp', '-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + '/lib/x86_64-linux-gnu/libaio.so.1', + 'ubuntu@' + slave + ":"] + + logger.debug(args) + proc = subprocess.Popen(args, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + (stdout, stderr) = proc.communicate() + if (len(stdout) > 0): + logger.debug(stdout.decode('utf-8').strip()) + if (len(stderr) > 0): + logger.error(stderr.decode('utf-8').strip()) + + args = ['scp', '-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + '/usr/local/bin/fio', + 'ubuntu@' + slave + ":"] + + logger.debug(args) + proc = subprocess.Popen(args, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + (stdout, stderr) = proc.communicate() + if (len(stdout) > 0): + logger.debug(stdout.decode('utf-8').strip()) + if (len(stderr) > 0): + logger.error(stderr.decode('utf-8').strip()) + + args = ['ssh', '-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + 'ubuntu@' + slave, + 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1' + ] + + logger.debug(args) + proc = subprocess.Popen(args, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + (stdout, stderr) = proc.communicate() + if (len(stdout) > 0): + logger.debug(stdout.decode('utf-8').strip()) + if (len(stderr) > 0): + logger.error(stderr.decode('utf-8').strip()) + + def _make_parameters(self): + heat_parameters = {} + heat_parameters['agent_network'] = self.agent_network + heat_parameters['agent_count'] = self.agent_count + heat_parameters['volume_size'] = self.volume_size + return heat_parameters + + def _attach_to_openstack(self): + + if (self._cinder_client is None): + self._cinder_client = cinderclient.Client( + self._username, self._password, self._project_name, + self._auth_url, service_type='volumev2') + self._cinder_client.authenticate() + + if (self._heat_client is None): + self._keystone_client = ksclient.Client( + auth_url=self._auth_url, + username=self._username, + password=self._password, + tenant_name=self._tenant_name) + heat_endpoint = self._keystone_client.service_catalog.url_for( + service_type='orchestration') + self._heat_client = heatclient.Client( + '1', endpoint=heat_endpoint, + token=self._keystone_client.auth_token) diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 462f06b..a1a817e 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -7,23 +7,20 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from os import listdir +from os.path import isfile, join +from storperf.carbon.converter import JSONToCarbon +from storperf.carbon.emitter import CarbonMetricTransmitter +from storperf.db.job_db import JobDB +from storperf.fio.fio_invoker import FIOInvoker +from threading import Thread import imp import logging -from os import listdir import os -from os.path import isfile, join -import socket - -from carbon.converter import JSONToCarbon -from carbon.emitter import CarbonMetricTransmitter -from db.job_db import JobDB -from fio.fio_invoker import FIOInvoker class UnknownWorkload(Exception): - - def __init__(self, msg): - self.msg = msg + pass class TestExecutor(object): @@ -31,7 +28,7 @@ class TestExecutor(object): def __init__(self): self.logger = logging.getLogger(__name__) self.workload_modules = [] - self.filename = "storperf.dat" + self.filename = None self.precondition = True self.warm = True self.event_listeners = set() @@ -39,6 +36,16 @@ class TestExecutor(object): self.metrics_emitter = CarbonMetricTransmitter() self.prefix = None self.job_db = JobDB() + self._slaves = [] + + @property + def slaves(self): + return self._slaves + + @slaves.setter + def slaves(self, slaves): + self.logger.debug("Set slaves to: " + str(slaves)) + self._slaves = slaves def register(self, event_listener): self.event_listeners.add(event_listener) @@ -46,15 +53,15 @@ class TestExecutor(object): def unregister(self, event_listener): self.event_listeners.discard(event_listener) - def event(self, metric): + def event(self, callback_id, metric): carbon_metrics = self.metrics_converter.convert_to_dictionary( metric, - self.prefix) + callback_id) - read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"] - write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"] - read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"] - write_iops = carbon_metrics[self.prefix + ".jobs.1.write.iops"] + read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"] + write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"] + read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"] + write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"] message = "Average Latency us Read/Write: " + read_latency \ + "/" + write_latency + " IOPS r/w: " + \ @@ -78,9 +85,11 @@ class TestExecutor(object): workloads = [] for filename in workload_files: - mname, ext = os.path.splitext(filename) + mname, _ = os.path.splitext(filename) if (not mname.startswith('_')): workloads.append(mname) + else: + workloads = workloads.split(',') if (self.warm is True): workloads.insert(0, "_warm_up") @@ -94,15 +103,16 @@ class TestExecutor(object): workload + ".py") self.logger.debug("Found: " + str(workload_module)) if(workload_module is None): - raise UnknownWorkload("Unknown workload: " + workload) + raise UnknownWorkload( + "ERROR: Unknown workload: " + workload) self.workload_modules.append(workload_module) except ImportError, err: - raise UnknownWorkload(err) + raise UnknownWorkload("ERROR: " + str(err)) def load_from_file(self, uri): uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri)) path, fname = os.path.split(uri) - mname, ext = os.path.splitext(fname) + mname, _ = os.path.splitext(fname) no_ext = os.path.join(path, mname) self.logger.debug("Looking for: " + no_ext) if os.path.exists(no_ext + '.pyc'): @@ -115,21 +125,34 @@ class TestExecutor(object): def execute(self): - shortname = socket.getfqdn().split('.')[0] + self.job_db.create_job_id() + for slave in self.slaves: + t = Thread(target=self.execute_on_node, args=(slave,)) + t.daemon = False + t.start() + + return self.job_db.job_id + + def execute_on_node(self, remote_host): + + logger = logging.getLogger(__name__ + ":" + remote_host) invoker = FIOInvoker() + invoker.remote_host = remote_host invoker.register(self.event) - self.job_db.create_job_id() - self.logger.info("Starting job " + self.job_db.job_id) + + logger.info( + "Starting job " + self.job_db.job_id + " on " + remote_host) for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") constructorMethod = getattr(workload_module, workload_name) - self.logger.debug( + logger.debug( "Found workload: " + str(constructorMethod)) workload = constructorMethod() - workload.filename = self.filename + if (self.filename is not None): + workload.filename = self.filename workload.invoker = invoker if (workload_name.startswith("_")): @@ -143,6 +166,7 @@ class TestExecutor(object): for iodepth in iodepths: full_workload_name = workload_name + \ + ".host." + remote_host + \ ".queue-depth." + str(iodepth) + \ ".block-size." + str(blocksize) @@ -151,14 +175,15 @@ class TestExecutor(object): self.logger.info( "Executing workload: " + full_workload_name) - self.prefix = shortname + "." + self.job_db.job_id + \ + invoker.callback_id = self.job_db.job_id + \ "." + full_workload_name self.job_db.start_workload(full_workload_name) workload.execute() self.job_db.end_workload(full_workload_name) - self.logger.info("Finished job " + self.job_db.job_id) + logger.info( + "Finished job " + self.job_db.job_id + " on " + remote_host) def fetch_results(self, job, workload_name=""): self.job_db.job_id = job diff --git a/storperf/tests/carbon_tests/__init__.py b/storperf/tests/carbon_tests/__init__.py index e69de29..73334c7 100644 --- a/storperf/tests/carbon_tests/__init__.py +++ b/storperf/tests/carbon_tests/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/tests/carbon_tests/emitter_test.py b/storperf/tests/carbon_tests/emitter_test.py index c26e837..f3ff57e 100644 --- a/storperf/tests/carbon_tests/emitter_test.py +++ b/storperf/tests/carbon_tests/emitter_test.py @@ -7,15 +7,13 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -import unittest - -import json +from storperf.carbon import converter +from storperf.carbon.emitter import CarbonMetricTransmitter +from time import sleep import SocketServer +import json import threading -from time import sleep - -from carbon import converter -from carbon.emitter import CarbonMetricTransmitter +import unittest class MetricsHandler(SocketServer.BaseRequestHandler): diff --git a/storperf/tests/carbon_tests/json_to_carbon_test.py b/storperf/tests/carbon_tests/json_to_carbon_test.py index 6d62418..d309b48 100644 --- a/storperf/tests/carbon_tests/json_to_carbon_test.py +++ b/storperf/tests/carbon_tests/json_to_carbon_test.py @@ -7,10 +7,9 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -import unittest +from storperf.carbon.converter import JSONToCarbon import json - -from carbon.converter import JSONToCarbon +import unittest class JSONToCarbonTest(unittest.TestCase): diff --git a/storperf/tests/db_tests/configuration_db_test.py b/storperf/tests/db_tests/configuration_db_test.py new file mode 100644 index 0000000..e8b7188 --- /dev/null +++ b/storperf/tests/db_tests/configuration_db_test.py @@ -0,0 +1,66 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from storperf.db.configuration_db import ConfigurationDB +import os +import unittest + + +class ConfigurationDBTest(unittest.TestCase): + + def setUp(self): + ConfigurationDB.db_name = __name__ + ".db" + try: + os.remove(ConfigurationDB.db_name) + except OSError: + pass + + self.config_db = ConfigurationDB() + + def test_create_key(self): + expected = "ABCDE-12345" + + self.config_db.set_configuration_value( + "test", "key", expected) + + actual = self.config_db.get_configuration_value( + "test", "key") + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_update_key(self): + expected = "ABCDE-12345" + + self.config_db.set_configuration_value( + "test", "key", "initial_value") + + self.config_db.set_configuration_value( + "test", "key", expected) + + actual = self.config_db.get_configuration_value( + "test", "key") + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_deleted_key(self): + expected = None + + self.config_db.set_configuration_value( + "test", "key", "initial_value") + + self.config_db.delete_configuration_value( + "test", "key") + + actual = self.config_db.get_configuration_value( + "test", "key") + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py index d9b10a2..4620412 100644 --- a/storperf/tests/db_tests/job_db_test.py +++ b/storperf/tests/db_tests/job_db_test.py @@ -7,17 +7,23 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from storperf.db.job_db import JobDB +import os +import sqlite3 import unittest import mock -from db.job_db import JobDB - class JobDBTest(unittest.TestCase): def setUp(self): - JobDB.db_name = ":memory:" + + JobDB.db_name = __name__ + '.db' + try: + os.remove(JobDB.db_name) + except OSError: + pass self.job = JobDB() @mock.patch("uuid.uuid4") @@ -56,7 +62,8 @@ class JobDBTest(unittest.TestCase): mock_uuid.side_effect = (job_id,) workload_name = "Workload" - cursor = self.job.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() row = cursor.execute( """select * from jobs @@ -97,7 +104,8 @@ class JobDBTest(unittest.TestCase): self.job.start_workload(workload_name) self.job.end_workload(workload_name) - cursor = self.job.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() cursor.execute( """select job_id, workload, start, end from jobs where job_id = ? @@ -124,7 +132,8 @@ class JobDBTest(unittest.TestCase): mock_uuid.side_effect = (job_id,) workload_name = "Workload" - cursor = self.job.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() self.job.start_workload(workload_name) self.job.start_workload(workload_name) @@ -155,7 +164,8 @@ class JobDBTest(unittest.TestCase): self.job.end_workload(workload_name) - cursor = self.job.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() cursor.execute( """select job_id, workload, start, end from jobs where job_id = ? diff --git a/storperf/tests/storperf_master_test.py b/storperf/tests/storperf_master_test.py new file mode 100644 index 0000000..ff85fb0 --- /dev/null +++ b/storperf/tests/storperf_master_test.py @@ -0,0 +1,51 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from storperf.db.configuration_db import ConfigurationDB +from storperf.storperf_master import StorPerfMaster +import os +import unittest + + +class StorPerfMasterTest(unittest.TestCase): + + def setUp(self): + ConfigurationDB.db_name = __name__ + ".db" + try: + os.remove(ConfigurationDB.db_name) + except OSError: + pass + self.storperf = StorPerfMaster() + + def test_agent_count(self): + expected = 10 + + self.storperf.agent_count = expected + actual = self.storperf.agent_count + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_volume_size(self): + expected = 20 + + self.storperf.volume_size = expected + actual = self.storperf.volume_size + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_agent_network(self): + expected = "ABCDEF" + + self.storperf.agent_network = expected + actual = self.storperf.agent_network + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) diff --git a/storperf/workloads/__init__.py b/storperf/workloads/__init__.py index e69de29..73334c7 100644 --- a/storperf/workloads/__init__.py +++ b/storperf/workloads/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/workloads/_ssd_preconditioning.py b/storperf/workloads/_ssd_preconditioning.py index e1e8bef..cce3c31 100644 --- a/storperf/workloads/_ssd_preconditioning.py +++ b/storperf/workloads/_ssd_preconditioning.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class _ssd_preconditioning(_base_workload._base_workload): diff --git a/storperf/workloads/_warm_up.py b/storperf/workloads/_warm_up.py index 27667ca..9cd268e 100644 --- a/storperf/workloads/_warm_up.py +++ b/storperf/workloads/_warm_up.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class _warm_up(_base_workload._base_workload): diff --git a/storperf/workloads/rr.py b/storperf/workloads/rr.py index 824974d..3823a4c 100644 --- a/storperf/workloads/rr.py +++ b/storperf/workloads/rr.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class rr(_base_workload._base_workload): diff --git a/storperf/workloads/rs.py b/storperf/workloads/rs.py index 92e3ce6..511888e 100644 --- a/storperf/workloads/rs.py +++ b/storperf/workloads/rs.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class rs(_base_workload._base_workload): diff --git a/storperf/workloads/rw.py b/storperf/workloads/rw.py index 2132a81..f4b6979 100644 --- a/storperf/workloads/rw.py +++ b/storperf/workloads/rw.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class rw(_base_workload._base_workload): diff --git a/storperf/workloads/wr.py b/storperf/workloads/wr.py index 19b2c61..457a29a 100644 --- a/storperf/workloads/wr.py +++ b/storperf/workloads/wr.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class wr(_base_workload._base_workload): diff --git a/storperf/workloads/ws.py b/storperf/workloads/ws.py index 8ec2ebe..f37079e 100644 --- a/storperf/workloads/ws.py +++ b/storperf/workloads/ws.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class ws(_base_workload._base_workload): |