From 488a47d945d3ef3dfa9ee37ca0aac3b480ffc800 Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Tue, 19 Jan 2016 20:58:35 -0500 Subject: Remote slave agent workload Add storperf master object to manage stack lifecycle. Add configuration db. Creation of CLI vs. main so that ReST API and CLI API can be kept clear. Fixed License in files. Changed DB objects to be thread safe. Added ssh server to container if desired for CLI. Change-Id: Idfe84bfb7920e6ce19d27462593c21ea86e7b406 JIRA: STORPERF-29 Signed-off-by: Mark Beierl --- .gitignore | 1 + build-dev-docker.sh | 10 +- ci/build.sh | 1 - ci/setup.py | 10 + ci/storperf-master.yaml | 117 ------- ci/test.sh | 1 - cli.py | 172 +++++++++++ docker/Dockerfile | 40 ++- docker/requirements.pip | 10 + docker/storperf.pp | 9 + docker/supervisord.conf | 16 + rest_server.py | 134 ++++++++ storperf/__init__.py | 8 + storperf/carbon/__init__.py | 8 + storperf/db/__init__.py | 8 + storperf/db/configuration_db.py | 103 +++++++ storperf/db/job_db.py | 25 +- storperf/fio/__init__.py | 8 + storperf/fio/fio_invoker.py | 33 +- storperf/logging.json | 15 +- storperf/main.py | 120 -------- storperf/resources/hot/agent-group.yaml | 57 ++++ storperf/resources/hot/storperf-agent.yaml | 101 ++++++ storperf/storperf_master.py | 342 +++++++++++++++++++++ storperf/test_executor.py | 83 +++-- storperf/tests/carbon_tests/__init__.py | 8 + storperf/tests/carbon_tests/emitter_test.py | 12 +- storperf/tests/carbon_tests/json_to_carbon_test.py | 5 +- storperf/tests/db_tests/configuration_db_test.py | 66 ++++ storperf/tests/db_tests/job_db_test.py | 24 +- storperf/tests/storperf_master_test.py | 51 +++ storperf/workloads/__init__.py | 8 + storperf/workloads/_ssd_preconditioning.py | 2 +- storperf/workloads/_warm_up.py | 2 +- storperf/workloads/rr.py | 2 +- storperf/workloads/rs.py | 2 +- storperf/workloads/rw.py | 2 +- storperf/workloads/wr.py | 2 +- storperf/workloads/ws.py | 2 +- 39 files changed, 1300 insertions(+), 320 deletions(-) delete mode 100755 ci/build.sh delete mode 100644 ci/storperf-master.yaml delete mode 100755 ci/test.sh create mode 100644 cli.py create mode 100644 docker/requirements.pip create mode 100644 rest_server.py create mode 100644 storperf/__init__.py create mode 100644 storperf/db/configuration_db.py delete mode 100644 storperf/main.py create mode 100644 storperf/resources/hot/agent-group.yaml create mode 100644 storperf/resources/hot/storperf-agent.yaml create mode 100644 storperf/storperf_master.py create mode 100644 storperf/tests/db_tests/configuration_db_test.py create mode 100644 storperf/tests/storperf_master_test.py diff --git a/.gitignore b/.gitignore index 10e8437..c06ce0a 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ coverage.xml .coverage .settings storperf.egg-info +*.db diff --git a/build-dev-docker.sh b/build-dev-docker.sh index 1e6a861..131d8ef 100755 --- a/build-dev-docker.sh +++ b/build-dev-docker.sh @@ -1,4 +1,12 @@ #!/bin/bash +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## echo "Creating a docker image from the current working directory..." @@ -8,4 +16,4 @@ sed -i "s|COPY supervisord.conf|COPY docker/supervisord.conf|" Dockerfile docker build -t opnfv/storperf:dev . -rm Dockerfile +rm -f Dockerfile diff --git a/ci/build.sh b/ci/build.sh deleted file mode 100755 index 8b13789..0000000 --- a/ci/build.sh +++ /dev/null @@ -1 +0,0 @@ - diff --git a/ci/setup.py b/ci/setup.py index c8d05e8..daacc61 100755 --- a/ci/setup.py +++ b/ci/setup.py @@ -1,3 +1,12 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + from setuptools import setup, find_packages @@ -16,6 +25,7 @@ setup( "flake8>=2.5.1", "flask>=0.10.1", "flask-restful>=0.3.5", + "html2text>=2016.1.8", "mock>=1.3", "pyyaml>=3.11", "python-cinderclient>=1.5.0", diff --git a/ci/storperf-master.yaml b/ci/storperf-master.yaml deleted file mode 100644 index 1bc84f5..0000000 --- a/ci/storperf-master.yaml +++ /dev/null @@ -1,117 +0,0 @@ -heat_template_version: 2013-05-23 - -parameters: - flavor: - type: string - default: m1.small - image: - type: string - default: ubuntu-server - key_name: - type: string - public_net_id: - type: string - default: public - username: - type: string - default: storperf - -resources: - storperf_manager: - type: "OS::Nova::Server" - properties: - name: storperf-manager - image: { get_param: image } - flavor: { get_param: flavor } - key_name: { get_param: key_name } - networks: - - port: { get_resource: storperf_manager_port } - user_data: { get_resource: storperf_manager_config } - user_data_format: RAW - - storperf_manager_config: - type: "OS::Heat::CloudConfig" - properties: - cloud_config: - users: - - name: { get_param: username } - groups: users - shell: /bin/bash - sudo: "ALL=(ALL) NOPASSWD:ALL" - ssh_authorized_keys: - - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07 - - default - package_update: true - package_upgrade: true - packages: - - fio - - python - - rsync - - graphite-carbon - - graphite-web - - apache2 - - libapache2-mod-wsgi - - curl - - storperf_manager_port: - type: "OS::Neutron::Port" - properties: - network_id: { get_resource: storperf_agent_net } - security_groups: - - { get_resource: storperf_security_group } - - storperf_manager_ip: - type: "OS::Neutron::FloatingIP" - properties: - floating_network_id: { get_param: public_net_id } - port_id: { get_resource: storperf_manager_port } - - storperf_agent_net: - type: "OS::Neutron::Net" - properties: - name: storperf-agent-network - - storperf_agent_subnet: - type: "OS::Neutron::Subnet" - properties: - name: StorPerf-Agent-Subnet - network_id: { get_resource: storperf_agent_net } - cidr: 192.168.101.0/24 - gateway_ip: 192.168.101.1 - enable_dhcp: true - allocation_pools: - - start: "192.168.101.2" - end: "192.168.101.250" - - storperf_security_group: - type: OS::Neutron::SecurityGroup - properties: - description: Neutron security group rules - name: storperf_security_group - rules: - - remote_ip_prefix: 0.0.0.0/0 - protocol: tcp - direction: ingress - - remote_ip_prefix: 0.0.0.0/0 - protocol: icmp - direction: ingress - - router: - type: OS::Neutron::Router - - router_gateway: - type: OS::Neutron::RouterGateway - properties: - router_id: { get_resource: router } - network_id: { get_param: public_net_id } - - router_interface: - type: OS::Neutron::RouterInterface - properties: - router_id: { get_resource: router } - subnet_id: { get_resource: storperf_agent_subnet } - -outputs: - public_ip: - description: Floating IP address in public network - value: { get_attr: [ storperf_manager_ip, floating_ip_address ] } diff --git a/ci/test.sh b/ci/test.sh deleted file mode 100755 index ff23c1e..0000000 --- a/ci/test.sh +++ /dev/null @@ -1 +0,0 @@ -PYTHONPATH="`pwd`/storperf":"`pwd`/tests" nosetests --with-xunit . diff --git a/cli.py b/cli.py new file mode 100644 index 0000000..560d77d --- /dev/null +++ b/cli.py @@ -0,0 +1,172 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +""" +""" + +from storperf.storperf_master import StorPerfMaster +from storperf.test_executor import UnknownWorkload +from threading import Thread +import cPickle +import getopt +import json +import logging +import logging.config +import logging.handlers +import os +import socket +import struct +import sys +import time + +import html2text +import requests + + +class Usage(Exception): + pass + + +def event(event_string): + logging.getLogger(__name__).info(event_string) + + +class LogRecordStreamHandler(object): + + def __init__(self): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket.bind(( + 'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT)) + self.level = logging.INFO + + def read_logs(self): + try: + while True: + datagram = self.socket.recv(8192) + chunk = datagram[0:4] + slen = struct.unpack(">L", chunk)[0] + chunk = datagram[4:] + obj = cPickle.loads(chunk) + record = logging.makeLogRecord(obj) + if (record.levelno >= self.level): + logger = logging.getLogger(record.name) + logger.handle(record) + + except Exception as e: + print "ERROR: " + str(e) + finally: + self.socket.close() + + +def main(argv=None): + verbose = False + debug = False + report = None + erase = False + options = {} + + storperf = StorPerfMaster() + + if argv is None: + argv = sys.argv + try: + try: + opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdh", + ["target=", + "workload=", + "report=", + "configure=", + "erase", + "nossd", + "nowarm", + "verbose", + "debug", + "help", + ]) + except getopt.error, msg: + raise Usage(msg) + + configuration = None + options['workload'] = None + + for o, a in opts: + if o in ("-h", "--help"): + print __doc__ + return 0 + elif o in ("-t", "--target"): + options['filename'] = a + elif o in ("-v", "--verbose"): + verbose = True + elif o in ("-d", "--debug"): + debug = True + elif o in ("-s", "--nossd"): + options['nossd'] = a + elif o in ("-c", "--nowarm"): + options['nowarm'] = False + elif o in ("-w", "--workload"): + options['workload'] = a + elif o in ("-r", "--report"): + report = a + elif o in ("-e", "--erase"): + erase = True + elif o in ("-f", "--configure"): + configuration = dict(x.split('=') for x in a.split(',')) + + if (debug) or (verbose): + udpserver = LogRecordStreamHandler() + + if (debug): + udpserver.level = logging.DEBUG + + logging.basicConfig(format="%(asctime)s - %(name)s - " + + "%(levelname)s - %(message)s") + + t = Thread(target=udpserver.read_logs, args=()) + t.setDaemon(True) + t.start() + + if (erase): + response = requests.delete( + 'http://127.0.0.1:5000/api/v1.0/configure') + if (response.status_code == 400): + content = json.loads(response.content) + raise Usage(content['message']) + return 0 + + if (configuration is not None): + response = requests.post( + 'http://127.0.0.1:5000/api/v1.0/configure', json=configuration) + if (response.status_code == 400): + content = json.loads(response.content) + raise Usage(content['message']) + + if (report is not None): + print storperf.fetch_results(report, workloads) + else: + print "Calling start..." + response = requests.post( + 'http://127.0.0.1:5000/api/v1.0/start', json=options) + if (response.status_code == 400): + content = json.loads(response.content) + raise Usage(content['message']) + + content = json.loads(response.content) + print "Started job id: " + content['job_id'] + + except Usage as e: + print >> sys.stderr, str(e) + print >> sys.stderr, "For help use --help" + return 2 + + except Exception as e: + print >> sys.stderr, str(e) + return 2 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/docker/Dockerfile b/docker/Dockerfile index 5ad8624..4ca66d0 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,6 +1,13 @@ -######################################## +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## # Docker container for STORPERF -######################################## +# # Purpose: docker image for Storperf project # # Maintained by Jose Lausuch, Mark Beierl @@ -8,7 +15,7 @@ # $ docker build -t opnfv/storperf:tag . # # Execution: -# $ docker run -t -i opnfv/storperf /bin/bash +# $ docker run -t opnfv/storperf /bin/bash # @@ -36,6 +43,19 @@ python-dev \ python-pip \ --no-install-recommends + +# Allow root SSH access with 'storperf' as the password + +RUN echo 'root:storperf' | chpasswd +RUN sed -i 's/PermitRootLogin without-password/PermitRootLogin yes/' /etc/ssh/sshd_config + +# SSH login fix. Otherwise user is kicked off after login +RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd + +ENV NOTVISIBLE "in users profile" +RUN echo "export VISIBLE=now" >> /etc/profile +RUN mkdir -p /var/run/sshd + RUN mkdir -p ${repos_dir} RUN mkdir -p /root/.ssh RUN chmod 700 /root/.ssh @@ -45,14 +65,24 @@ RUN git clone https://gerrit.opnfv.org/gerrit/storperf ${repos_dir}/storperf RUN git clone https://gerrit.opnfv.org/gerrit/releng ${repos_dir}/releng RUN git clone http://git.kernel.dk/fio.git ${repos_dir}/fio RUN cd ${repos_dir}/fio && git checkout tags/fio-2.2.10 -RUN cd ${repos_dir}/fio && make -j 4 install +RUN cd ${repos_dir}/fio && make -j 6 install RUN puppet module install gdsoperations-graphite +RUN chmod 600 ${repos_dir}/storperf/storperf/resources/ssh/storperf_rsa + +RUN pip install -r ${repos_dir}/storperf/docker/requirements.pip + COPY storperf.pp /etc/puppet/manifests/storperf.pp RUN puppet apply /etc/puppet/manifests/storperf.pp -#Let others connect to Graphite if they want our data +# Open access to SSH if desired +EXPOSE 22 + +# Graphite EXPOSE 8000 +# ReST API +EXPOSE 5000 + COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf CMD ["/usr/bin/supervisord"] diff --git a/docker/requirements.pip b/docker/requirements.pip new file mode 100644 index 0000000..be29c28 --- /dev/null +++ b/docker/requirements.pip @@ -0,0 +1,10 @@ +pyyaml==3.10 +python-neutronclient==2.6.0 +python-heatclient==0.8.0 +python-novaclient==2.28.1 +python-glanceclient==1.1.0 +python-cinderclient==1.4.0 +python-keystoneclient==1.6.0 +flask>=0.10 +flask-restful>=0.3.5 +html2text==2016.1.8 \ No newline at end of file diff --git a/docker/storperf.pp b/docker/storperf.pp index 00a6482..7de1024 100644 --- a/docker/storperf.pp +++ b/docker/storperf.pp @@ -1,3 +1,12 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + class { 'python': pip => true, dev => true, diff --git a/docker/supervisord.conf b/docker/supervisord.conf index 80dfe5e..566b4be 100644 --- a/docker/supervisord.conf +++ b/docker/supervisord.conf @@ -17,3 +17,19 @@ command = /opt/graphite/bin/gunicorn_django -b0.0.0.0:8000 -w2 graphite/setting stdout_logfile = /var/log/supervisor/%(program_name)s.log stderr_logfile = /var/log/supervisor/%(program_name)s.log autorestart = true + +[program:sshd] +user = root +command = /usr/sbin/sshd =D +stdout_logfile = /var/log/supervisor/%(program_name)s.log +stderr_logfile = /var/log/supervisor/%(program_name)s.log +autorestart = true + +[program:storperf-webapp] +user = root +directory = /home/opnfv/repos/storperf +command = /usr/bin/python /home/opnfv/repos/storperf/rest_server.py +stdout_logfile = /var/log/supervisor/%(program_name)s.log +stderr_logfile = /var/log/supervisor/%(program_name)s.log +autorestart = true + diff --git a/rest_server.py b/rest_server.py new file mode 100644 index 0000000..a8459f3 --- /dev/null +++ b/rest_server.py @@ -0,0 +1,134 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from storperf.storperf_master import StorPerfMaster +import json +import logging +import logging.config +import os + +from flask import abort, Flask, request, jsonify +from flask_restful import Resource, Api + + +app = Flask(__name__) +api = Api(app) +storperf = StorPerfMaster() + + +class Configure(Resource): + + def __init__(self): + self.logger = logging.getLogger(__name__) + + def get(self): + return jsonify({'agent_count': storperf.agent_count, + 'agent_network': storperf.agent_network, + 'volume_size': storperf.volume_size, + 'stack_created': storperf.is_stack_created, + 'stack_id': storperf.stack_id}) + + def post(self): + if not request.json: + abort(400, "ERROR: No data specified") + + try: + if ('agent_count' in request.json): + storperf.agent_count = request.json['agent_count'] + if ('agent_network' in request.json): + storperf.agent_network = request.json['agent_network'] + if ('volume_size' in request.json): + storperf.volume_size = request.json['volume_size'] + + storperf.validate_stack() + storperf.create_stack() + + return jsonify({'agent_count': storperf.agent_count, + 'agent_network': storperf.agent_network, + 'volume_size': storperf.volume_size, + 'stack_id': storperf.stack_id}) + + except Exception as e: + abort(400, str(e)) + + def delete(self): + try: + storperf.delete_stack() + except Exception as e: + abort(400, str(e)) + pass + + +class StartJob(Resource): + + def __init__(self): + self.logger = logging.getLogger(__name__) + + def post(self): + if not request.json: + abort(400, "ERROR: Missing configuration data") + + self.logger.info(request.json) + + try: + if ('target' in request.json): + storperf.filename = request.json['filename'] + if ('nossd' in request.json): + storperf.precondition = False + if ('nowarm' in request.json): + storperf.warm_up = False + if ('workload' in request.json): + storperf.workloads = request.json['workload'] + + job_id = storperf.execute_workloads() + + return jsonify({'job_id': job_id}) + + except Exception as e: + abort(400, str(e)) + + +class Quota(Resource): + + def get(self): + quota = storperf.get_volume_quota() + return jsonify({'quota': quota}) + + +def setup_logging(default_path='storperf/logging.json', + default_level=logging.INFO, env_key='LOG_CFG'): + """Setup logging configuration + """ + + path = default_path + value = os.getenv(env_key, None) + if value: + path = value + if os.path.exists(path): + with open(path, 'rt') as f: + config = json.load(f) + logging.config.dictConfig(config) + else: + logging.basicConfig(level=default_level) + + socketHandler = logging.handlers.DatagramHandler( + 'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT) + rootLogger = logging.getLogger('') + rootLogger.addHandler(socketHandler) + + +api.add_resource(Configure, "/api/v1.0/configure") +api.add_resource(Quota, "/api/v1.0/quota") +api.add_resource(StartJob, "/api/v1.0/start") + +if __name__ == "__main__": + setup_logging() + logging.getLogger("storperf").setLevel(logging.DEBUG) + + app.run(host='0.0.0.0', debug=True) diff --git a/storperf/__init__.py b/storperf/__init__.py new file mode 100644 index 0000000..73334c7 --- /dev/null +++ b/storperf/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/carbon/__init__.py b/storperf/carbon/__init__.py index e69de29..73334c7 100644 --- a/storperf/carbon/__init__.py +++ b/storperf/carbon/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/db/__init__.py b/storperf/db/__init__.py index e69de29..73334c7 100644 --- a/storperf/db/__init__.py +++ b/storperf/db/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/db/configuration_db.py b/storperf/db/configuration_db.py new file mode 100644 index 0000000..649c186 --- /dev/null +++ b/storperf/db/configuration_db.py @@ -0,0 +1,103 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from _sqlite3 import OperationalError +import logging +import sqlite3 + + +class ConfigurationDB(object): + + db_name = "StorPerf.db" + + def __init__(self): + """ + Creates the StorPerf.db and configuration tables on demand + """ + + self.logger = logging.getLogger(__name__) + self.logger.debug("Connecting to " + ConfigurationDB.db_name) + db = sqlite3.connect(ConfigurationDB.db_name) + + cursor = db.cursor() + try: + cursor.execute('''CREATE TABLE configuration + (configuration_name text, + key text, + value text)''') + self.logger.debug("Created configuration table") + except OperationalError: + self.logger.debug("Configuration table exists") + + cursor.execute('SELECT * FROM configuration') + + def delete_configuration_value(self, configuration_name, key): + """Deletes the value associated with the given key + """ + + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() + + cursor.execute( + "delete from configuration where configuration_name=? and key=?", + (configuration_name, key)) + + self.logger.debug("Deleted " + configuration_name + ":" + key) + + db.commit() + + def get_configuration_value(self, configuration_name, key): + """Returns a string representation of the value stored + with this key under the given configuration name. + """ + + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() + + cursor.execute( + """select value from configuration + where configuration_name = ? + and key = ?""", + (configuration_name, key,)) + + row = cursor.fetchone() + + if (row is None): + self.logger.debug( + configuration_name + ":" + key + " does not exist") + return None + else: + self.logger.debug( + configuration_name + ":" + key + " is " + str(row[0])) + return str(row[0]) + + def set_configuration_value(self, configuration_name, key, value): + """Updates or creates the key under the given configuration + name so that it holds the value specified. + """ + + if (value is None): + return self.delete_configuration_value(configuration_name, key) + + value = str(value) + + db = sqlite3.connect(ConfigurationDB.db_name) + cursor = db.cursor() + + cursor.execute( + "delete from configuration where configuration_name=? and key=?", + (configuration_name, key)) + + cursor.execute( + """insert into configuration(configuration_name, key, value) + values (?,?,?)""", (configuration_name, key, value)) + + self.logger.debug(configuration_name + ":" + key + " set to " + value) + + db.commit() diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py index a65fa78..bec8d3f 100644 --- a/storperf/db/job_db.py +++ b/storperf/db/job_db.py @@ -28,10 +28,10 @@ class JobDB(object): self.logger = logging.getLogger(__name__) self.logger.debug("Connecting to " + JobDB.db_name) - self.db = sqlite3.connect(JobDB.db_name) self.job_id = None - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() try: cursor.execute('''CREATE TABLE jobs (job_id text, @@ -49,7 +49,8 @@ class JobDB(object): Returns a job id that is guaranteed to be unique in this StorPerf instance. """ - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() self.job_id = str(uuid.uuid4()) row = cursor.execute( @@ -64,7 +65,7 @@ class JobDB(object): cursor.execute( "insert into jobs(job_id) values (?)", (self.job_id,)) self.logger.debug("Reserved job id " + self.job_id) - self.db.commit() + db.commit() def start_workload(self, workload_name): """ @@ -73,7 +74,9 @@ class JobDB(object): if (self.job_id is None): self.create_job_id() - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() + now = str(calendar.timegm(time.gmtime())) row = cursor.execute( @@ -104,7 +107,7 @@ class JobDB(object): now, workload_name,)) - self.db.commit() + db.commit() def end_workload(self, workload_name): """ @@ -113,7 +116,8 @@ class JobDB(object): if (self.job_id is None): self.create_job_id() - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() now = str(calendar.timegm(time.gmtime())) row = cursor.execute( @@ -146,7 +150,7 @@ class JobDB(object): now, workload_name,)) - self.db.commit() + db.commit() def fetch_results(self, workload_prefix=""): if (workload_prefix is None): @@ -161,7 +165,8 @@ class JobDB(object): self.logger.debug("Workload like: " + workload_prefix) - cursor = self.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() cursor.execute("""select start, end, workload from jobs where workload like ?""", (workload_prefix,)) @@ -186,8 +191,6 @@ class JobDB(object): '.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \ start_time + "&until=" + end_time - print '\n\t' + request + '\n' - response = requests.get(request) if (response.status_code == 200): diff --git a/storperf/fio/__init__.py b/storperf/fio/__init__.py index e69de29..73334c7 100644 --- a/storperf/fio/__init__.py +++ b/storperf/fio/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index 0b13349..e343dce 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -7,10 +7,11 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from threading import Thread +import cmd import json import logging import subprocess -from threading import Thread class FIOInvoker(object): @@ -18,6 +19,18 @@ class FIOInvoker(object): def __init__(self): self.logger = logging.getLogger(__name__) self.event_listeners = set() + self.event_callback_ids = set() + self._remote_host = None + self.callback_id = None + + @property + def remote_host(self): + return self._remote_host + + @remote_host.setter + def remote_host(self, value): + self._remote_host = value + self.logger = logging.getLogger(__name__ + ":" + value) def register(self, event_listener): self.event_listeners.add(event_listener) @@ -41,7 +54,7 @@ class FIOInvoker(object): self.json_body = "" for event_listener in self.event_listeners: - event_listener(json_metric) + event_listener(self.callback_id, json_metric) except Exception, e: self.logger.error("Error parsing JSON: %s", e) @@ -58,10 +71,18 @@ class FIOInvoker(object): self.fio_process.stderr.close() def execute(self, args=[]): - for arg in args: - self.logger.debug("FIO arg: " + arg) - - self.fio_process = subprocess.Popen(['fio'] + args, + self.logger.debug("FIO args " + str(args)) + + if (self.remote_host is None): + cmd = "fio" + else: + cmd = "ssh" + additional_args = ['-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + 'ubuntu@' + self.remote_host, "./fio"] + args = additional_args + args + + self.fio_process = subprocess.Popen([cmd] + args, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/storperf/logging.json b/storperf/logging.json index b2fb73b..6168717 100644 --- a/storperf/logging.json +++ b/storperf/logging.json @@ -15,11 +15,11 @@ "stream": "ext://sys.stdout" }, - "info_file_handler": { + "file_handler": { "class": "logging.handlers.RotatingFileHandler", - "level": "INFO", + "level": "DEBUG", "formatter": "simple", - "filename": "info.log", + "filename": "storperf.log", "maxBytes": 10485760, "backupCount": 20, "encoding": "utf8" @@ -45,7 +45,12 @@ }, "root": { - "level": "INFO", - "handlers": ["console", "info_file_handler", "error_file_handler"] + "level": "WARN", + "handlers": ["console", "file_handler", "error_file_handler"] + }, + + "storperf": { + "level": "DEBUG", + "handlers": ["console", "file_handler", "error_file_handler"] } } \ No newline at end of file diff --git a/storperf/main.py b/storperf/main.py deleted file mode 100644 index 11357f4..0000000 --- a/storperf/main.py +++ /dev/null @@ -1,120 +0,0 @@ -############################################################################## -# Copyright (c) 2015 EMC and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import getopt -import json -import logging.config -import os -import sys - -from test_executor import TestExecutor, UnknownWorkload - -""" -""" - - -class Usage(Exception): - - def __init__(self, msg): - self.msg = msg - - -def setup_logging( - default_path='storperf/logging.json', - default_level=logging.INFO, - env_key='LOG_CFG' -): - """Setup logging configuration - - """ - path = default_path - value = os.getenv(env_key, None) - if value: - path = value - if os.path.exists(path): - with open(path, 'rt') as f: - config = json.load(f) - logging.config.dictConfig(config) - else: - logging.basicConfig(level=default_level) - - -def event(event_string): - logging.getLogger(__name__).info(event_string) - - -def main(argv=None): - setup_logging() - test_executor = TestExecutor() - verbose = False - debug = False - workloads = None - report = None - - if argv is None: - argv = sys.argv - try: - try: - opts, args = getopt.getopt(argv[1:], "t:w:r:scvdh", - ["target=", - "workload=", - "report=", - "nossd", - "nowarm", - "verbose", - "debug", - "help", - ]) - except getopt.error, msg: - raise Usage(msg) - - for o, a in opts: - if o in ("-h", "--help"): - print __doc__ - return 0 - elif o in ("-t", "--target"): - test_executor.filename = a - elif o in ("-t", "--target"): - report = a - elif o in ("-v", "--verbose"): - verbose = True - elif o in ("-d", "--debug"): - debug = True - elif o in ("-s", "--nossd"): - test_executor.precondition = False - elif o in ("-c", "--nowarm"): - test_executor.warm = False - elif o in ("-w", "--workload"): - workloads = a.split(",") - elif o in ("-r", "--report"): - report = a - - if (debug): - logging.getLogger().setLevel(logging.DEBUG) - - test_executor.register_workloads(workloads) - - except Usage, err: - print >> sys.stderr, err.msg - print >> sys.stderr, "for help use --help" - return 2 - except UnknownWorkload, err: - print >> sys.stderr, err.msg - print >> sys.stderr, "for help use --help" - return 2 - - if (verbose): - test_executor.register(event) - - if (report is not None): - print test_executor.fetch_results(report, workloads) - else: - test_executor.execute() - -if __name__ == "__main__": - sys.exit(main()) diff --git a/storperf/resources/hot/agent-group.yaml b/storperf/resources/hot/agent-group.yaml new file mode 100644 index 0000000..315ecf3 --- /dev/null +++ b/storperf/resources/hot/agent-group.yaml @@ -0,0 +1,57 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +heat_template_version: 2013-05-23 + +parameters: + agent_network: + type: string + constraints: + - custom_constraint: neutron.network + flavor: + type: string + default: "StorPerf Agent" + key_name: + type: string + default: StorPerf + volume_size: + type: number + description: Size of the volume to be created. + default: 1 + constraints: + - range: { min: 1, max: 1024 } + description: must be between 1 and 1024 Gb. + agent_count: + type: number + default: 1 + constraints: + - range: { min: 1, max: 512 } + description: must be between 1 and 512 agents. + + +resources: + slaves: + type: OS::Heat::ResourceGroup + properties: + count: {get_param: agent_count} + resource_def: { + type: "storperf-agent.yaml", + properties: { + agent_network: {get_param: agent_network}, + flavor: {get_param: flavor}, + key_name: {get_param: key_name}, + volume_size: {get_param: volume_size} + } + } + +outputs: + slave_ips: { + description: "Slave addresses", + value: { get_attr: [ slaves, storperf_agent_ip] } + } diff --git a/storperf/resources/hot/storperf-agent.yaml b/storperf/resources/hot/storperf-agent.yaml new file mode 100644 index 0000000..94238e5 --- /dev/null +++ b/storperf/resources/hot/storperf-agent.yaml @@ -0,0 +1,101 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +heat_template_version: 2013-05-23 + +parameters: + flavor: + type: string + default: m1.small + image: + type: string + default: 'StorPerf Agent' + key_name: + type: string + default: StorPerf + username: + type: string + default: storperf + volume_size: + type: number + description: Size of the volume to be created. + default: 1 + constraints: + - range: { min: 1, max: 1024 } + description: must be between 1 and 1024 Gb. + agent_network: + type: string + constraints: + - custom_constraint: neutron.network + +resources: + + storperf_agent: + type: "OS::Nova::Server" + properties: + name: storperf-agent + image: { get_param: image } + flavor: { get_param: flavor } + key_name: { get_param: key_name } + networks: + - port: { get_resource: storperf_agent_port } + user_data: { get_resource: storperf_agent_config } + user_data_format: RAW + + storperf_agent_config: + type: "OS::Heat::CloudConfig" + properties: + cloud_config: + users: + - name: { get_param: username } + groups: users + shell: /bin/bash + sudo: "ALL=(ALL) NOPASSWD:ALL" + ssh_authorized_keys: + - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07 + - default + package_update: false + package_upgrade: false + manage_etc_hosts: localhost + + storperf_agent_port: + type: "OS::Neutron::Port" + properties: + network_id: { get_param: agent_network } + security_groups: + - { get_resource: storperf_security_group } + + storperf_security_group: + type: OS::Neutron::SecurityGroup + properties: + description: Neutron security group rules + name: StorPerf-Security-Group + rules: + - remote_ip_prefix: 0.0.0.0/0 + protocol: tcp + direction: ingress + - remote_ip_prefix: 0.0.0.0/0 + protocol: icmp + direction: ingress + + agent_volume: + type: OS::Cinder::Volume + properties: + size: { get_param: volume_size } + + agent_volume_att: + type: OS::Cinder::VolumeAttachment + properties: + instance_uuid: { get_resource: storperf_agent } + volume_id: { get_resource: agent_volume} + +outputs: + storperf_agent_ip: + description: The IP address of the agent on the StorPerf network + value: { get_attr: [ storperf_agent, first_address ] } \ No newline at end of file diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py new file mode 100644 index 0000000..9e88a3c --- /dev/null +++ b/storperf/storperf_master.py @@ -0,0 +1,342 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from threading import Thread +import logging +import os +import subprocess + +from db.configuration_db import ConfigurationDB +from test_executor import TestExecutor +import cinderclient.v2 as cinderclient +import heatclient.client as heatclient +import keystoneclient.v2_0 as ksclient + + +class ParameterError(Exception): + """ """ + + +class StorPerfMaster(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + + self.configuration_db = ConfigurationDB() + + template_file = open("storperf/resources/hot/agent-group.yaml") + self._agent_group_hot = template_file.read() + template_file = open("storperf/resources/hot/storperf-agent.yaml") + self._agent_resource_hot = template_file.read() + self._hot_files = { + 'storperf-agent.yaml': self._agent_resource_hot + } + self.logger.debug( + "Loaded agent-group template as: " + self._agent_group_hot) + self.logger.debug( + "Loaded agent-resource template as: " + self._agent_resource_hot) + + self._username = os.environ.get('OS_USERNAME') + self._password = os.environ.get('OS_PASSWORD') + self._tenant_name = os.environ.get('OS_TENANT_NAME') + self._project_name = os.environ.get('OS_PROJECT_NAME') + self._auth_url = os.environ.get('OS_AUTH_URL') + + self._cinder_client = None + self._heat_client = None + self._test_executor = TestExecutor() + + @property + def volume_size(self): + value = self.configuration_db.get_configuration_value( + 'stack', + 'volume_size') + if (value is None): + self.volume_size = 1 + value = 1 + return int(value) + + @volume_size.setter + def volume_size(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change volume size after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'volume_size', + value) + + @property + def agent_count(self): + value = self.configuration_db.get_configuration_value( + 'stack', + 'agent_count') + + if (value is None): + self.agent_count = 1 + value = 1 + return int(value) + + @agent_count.setter + def agent_count(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change agent count after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'agent_count', + value) + + @property + def agent_network(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'agent_network') + + @agent_network.setter + def agent_network(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change agent network after stack is created") + + self.configuration_db.set_configuration_value( + 'stack', + 'agent_network', + value) + + @property + def stack_id(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'stack_id') + + @stack_id.setter + def stack_id(self, value): + self.configuration_db.set_configuration_value( + 'stack', + 'stack_id', + value) + + @property + def volume_quota(self): + self._attach_to_openstack() + quotas = self._cinder_client.quotas.get(self._tenant_name) + return int(quotas.volumes) + + @property + def filename(self): + return self._test_executor.filename + + @filename.setter + def filename(self, value): + self._test_executor.filename = value + + @property + def precondition(self): + return self._test_executor.precondition + + @precondition.setter + def precondition(self, value): + self._test_executor.precondition = value + + @property + def warm_up(self): + return self._test_executor.warm + + @warm_up.setter + def warm_up(self, value): + self._test_executor.warm = value + + @property + def is_stack_created(self): + if (self.stack_id is not None): + self._attach_to_openstack() + + stack = self._heat_client.stacks.get(self.stack_id) + status = getattr(stack, 'stack_status') + + self.logger.info("Status=" + status) + if (status == u'CREATE_COMPLETE'): + return True + + return False + + @property + def workloads(self): + return self.configuration_db.get_configuration_value( + 'workload', + 'workloads') + + @workloads.setter + def workloads(self, value): + self._test_executor.register_workloads(value) + + self.configuration_db.set_configuration_value( + 'workload', + 'workloads', + str(self._test_executor.workload_modules)) + + def create_stack(self): + if (self.stack_id is not None): + raise ParameterError("ERROR: Stack has already been created") + + self._attach_to_openstack() + if (self.agent_count > self.volume_quota): + message = "ERROR: Volume quota too low: " + \ + str(self.agent_count) + " > " + str(self.volume_quota) + raise ParameterError(message) + + stack = self._heat_client.stacks.create( + stack_name="StorPerfAgentGroup", + template=self._agent_group_hot, + files=self._hot_files, + parameters=self._make_parameters()) + + self.stack_id = stack['stack']['id'] + pass + + def validate_stack(self): + self._attach_to_openstack() + if (self.agent_count > self.volume_quota): + message = "ERROR: Volume quota too low: " + \ + str(self.agent_count) + " > " + str(self.volume_quota) + self.logger.error(message) + raise ParameterError(message) + + self._heat_client.stacks.preview( + stack_name="StorPerfAgentGroup", + template=self._agent_group_hot, + files=self._hot_files, + parameters=self._make_parameters()) + return True + + def wait_for_stack_creation(self): + + pass + + def delete_stack(self): + if (self.stack_id is None): + raise ParameterError("ERROR: Stack does not exist") + + self._attach_to_openstack() + + self._heat_client.stacks.delete(stack_id=self.stack_id) + self.stack_id = None + + pass + + def execute_workloads(self): + + if (self.stack_id is None): + raise ParameterError("ERROR: Stack does not exist") + + self._attach_to_openstack() + + stack = self._heat_client.stacks.get(self.stack_id) + outputs = getattr(stack, 'outputs') + slaves = outputs[0]['output_value'] + + setup_threads = [] + + for slave in slaves: + t = Thread(target=self._setup_slave, args=(slave,)) + setup_threads.append(t) + t.start() + + for thread in setup_threads: + thread.join() + + self._test_executor.slaves = slaves + return self._test_executor.execute() + + def _setup_slave(self, slave): + logger = logging.getLogger(__name__ + ":" + slave) + + logger.info("Initializing slave at " + slave) + + args = ['scp', '-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + '/lib/x86_64-linux-gnu/libaio.so.1', + 'ubuntu@' + slave + ":"] + + logger.debug(args) + proc = subprocess.Popen(args, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + (stdout, stderr) = proc.communicate() + if (len(stdout) > 0): + logger.debug(stdout.decode('utf-8').strip()) + if (len(stderr) > 0): + logger.error(stderr.decode('utf-8').strip()) + + args = ['scp', '-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + '/usr/local/bin/fio', + 'ubuntu@' + slave + ":"] + + logger.debug(args) + proc = subprocess.Popen(args, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + (stdout, stderr) = proc.communicate() + if (len(stdout) > 0): + logger.debug(stdout.decode('utf-8').strip()) + if (len(stderr) > 0): + logger.error(stderr.decode('utf-8').strip()) + + args = ['ssh', '-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + 'ubuntu@' + slave, + 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1' + ] + + logger.debug(args) + proc = subprocess.Popen(args, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + (stdout, stderr) = proc.communicate() + if (len(stdout) > 0): + logger.debug(stdout.decode('utf-8').strip()) + if (len(stderr) > 0): + logger.error(stderr.decode('utf-8').strip()) + + def _make_parameters(self): + heat_parameters = {} + heat_parameters['agent_network'] = self.agent_network + heat_parameters['agent_count'] = self.agent_count + heat_parameters['volume_size'] = self.volume_size + return heat_parameters + + def _attach_to_openstack(self): + + if (self._cinder_client is None): + self._cinder_client = cinderclient.Client( + self._username, self._password, self._project_name, + self._auth_url, service_type='volumev2') + self._cinder_client.authenticate() + + if (self._heat_client is None): + self._keystone_client = ksclient.Client( + auth_url=self._auth_url, + username=self._username, + password=self._password, + tenant_name=self._tenant_name) + heat_endpoint = self._keystone_client.service_catalog.url_for( + service_type='orchestration') + self._heat_client = heatclient.Client( + '1', endpoint=heat_endpoint, + token=self._keystone_client.auth_token) diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 462f06b..a1a817e 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -7,23 +7,20 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from os import listdir +from os.path import isfile, join +from storperf.carbon.converter import JSONToCarbon +from storperf.carbon.emitter import CarbonMetricTransmitter +from storperf.db.job_db import JobDB +from storperf.fio.fio_invoker import FIOInvoker +from threading import Thread import imp import logging -from os import listdir import os -from os.path import isfile, join -import socket - -from carbon.converter import JSONToCarbon -from carbon.emitter import CarbonMetricTransmitter -from db.job_db import JobDB -from fio.fio_invoker import FIOInvoker class UnknownWorkload(Exception): - - def __init__(self, msg): - self.msg = msg + pass class TestExecutor(object): @@ -31,7 +28,7 @@ class TestExecutor(object): def __init__(self): self.logger = logging.getLogger(__name__) self.workload_modules = [] - self.filename = "storperf.dat" + self.filename = None self.precondition = True self.warm = True self.event_listeners = set() @@ -39,6 +36,16 @@ class TestExecutor(object): self.metrics_emitter = CarbonMetricTransmitter() self.prefix = None self.job_db = JobDB() + self._slaves = [] + + @property + def slaves(self): + return self._slaves + + @slaves.setter + def slaves(self, slaves): + self.logger.debug("Set slaves to: " + str(slaves)) + self._slaves = slaves def register(self, event_listener): self.event_listeners.add(event_listener) @@ -46,15 +53,15 @@ class TestExecutor(object): def unregister(self, event_listener): self.event_listeners.discard(event_listener) - def event(self, metric): + def event(self, callback_id, metric): carbon_metrics = self.metrics_converter.convert_to_dictionary( metric, - self.prefix) + callback_id) - read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"] - write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"] - read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"] - write_iops = carbon_metrics[self.prefix + ".jobs.1.write.iops"] + read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"] + write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"] + read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"] + write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"] message = "Average Latency us Read/Write: " + read_latency \ + "/" + write_latency + " IOPS r/w: " + \ @@ -78,9 +85,11 @@ class TestExecutor(object): workloads = [] for filename in workload_files: - mname, ext = os.path.splitext(filename) + mname, _ = os.path.splitext(filename) if (not mname.startswith('_')): workloads.append(mname) + else: + workloads = workloads.split(',') if (self.warm is True): workloads.insert(0, "_warm_up") @@ -94,15 +103,16 @@ class TestExecutor(object): workload + ".py") self.logger.debug("Found: " + str(workload_module)) if(workload_module is None): - raise UnknownWorkload("Unknown workload: " + workload) + raise UnknownWorkload( + "ERROR: Unknown workload: " + workload) self.workload_modules.append(workload_module) except ImportError, err: - raise UnknownWorkload(err) + raise UnknownWorkload("ERROR: " + str(err)) def load_from_file(self, uri): uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri)) path, fname = os.path.split(uri) - mname, ext = os.path.splitext(fname) + mname, _ = os.path.splitext(fname) no_ext = os.path.join(path, mname) self.logger.debug("Looking for: " + no_ext) if os.path.exists(no_ext + '.pyc'): @@ -115,21 +125,34 @@ class TestExecutor(object): def execute(self): - shortname = socket.getfqdn().split('.')[0] + self.job_db.create_job_id() + for slave in self.slaves: + t = Thread(target=self.execute_on_node, args=(slave,)) + t.daemon = False + t.start() + + return self.job_db.job_id + + def execute_on_node(self, remote_host): + + logger = logging.getLogger(__name__ + ":" + remote_host) invoker = FIOInvoker() + invoker.remote_host = remote_host invoker.register(self.event) - self.job_db.create_job_id() - self.logger.info("Starting job " + self.job_db.job_id) + + logger.info( + "Starting job " + self.job_db.job_id + " on " + remote_host) for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") constructorMethod = getattr(workload_module, workload_name) - self.logger.debug( + logger.debug( "Found workload: " + str(constructorMethod)) workload = constructorMethod() - workload.filename = self.filename + if (self.filename is not None): + workload.filename = self.filename workload.invoker = invoker if (workload_name.startswith("_")): @@ -143,6 +166,7 @@ class TestExecutor(object): for iodepth in iodepths: full_workload_name = workload_name + \ + ".host." + remote_host + \ ".queue-depth." + str(iodepth) + \ ".block-size." + str(blocksize) @@ -151,14 +175,15 @@ class TestExecutor(object): self.logger.info( "Executing workload: " + full_workload_name) - self.prefix = shortname + "." + self.job_db.job_id + \ + invoker.callback_id = self.job_db.job_id + \ "." + full_workload_name self.job_db.start_workload(full_workload_name) workload.execute() self.job_db.end_workload(full_workload_name) - self.logger.info("Finished job " + self.job_db.job_id) + logger.info( + "Finished job " + self.job_db.job_id + " on " + remote_host) def fetch_results(self, job, workload_name=""): self.job_db.job_id = job diff --git a/storperf/tests/carbon_tests/__init__.py b/storperf/tests/carbon_tests/__init__.py index e69de29..73334c7 100644 --- a/storperf/tests/carbon_tests/__init__.py +++ b/storperf/tests/carbon_tests/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/tests/carbon_tests/emitter_test.py b/storperf/tests/carbon_tests/emitter_test.py index c26e837..f3ff57e 100644 --- a/storperf/tests/carbon_tests/emitter_test.py +++ b/storperf/tests/carbon_tests/emitter_test.py @@ -7,15 +7,13 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -import unittest - -import json +from storperf.carbon import converter +from storperf.carbon.emitter import CarbonMetricTransmitter +from time import sleep import SocketServer +import json import threading -from time import sleep - -from carbon import converter -from carbon.emitter import CarbonMetricTransmitter +import unittest class MetricsHandler(SocketServer.BaseRequestHandler): diff --git a/storperf/tests/carbon_tests/json_to_carbon_test.py b/storperf/tests/carbon_tests/json_to_carbon_test.py index 6d62418..d309b48 100644 --- a/storperf/tests/carbon_tests/json_to_carbon_test.py +++ b/storperf/tests/carbon_tests/json_to_carbon_test.py @@ -7,10 +7,9 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -import unittest +from storperf.carbon.converter import JSONToCarbon import json - -from carbon.converter import JSONToCarbon +import unittest class JSONToCarbonTest(unittest.TestCase): diff --git a/storperf/tests/db_tests/configuration_db_test.py b/storperf/tests/db_tests/configuration_db_test.py new file mode 100644 index 0000000..e8b7188 --- /dev/null +++ b/storperf/tests/db_tests/configuration_db_test.py @@ -0,0 +1,66 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from storperf.db.configuration_db import ConfigurationDB +import os +import unittest + + +class ConfigurationDBTest(unittest.TestCase): + + def setUp(self): + ConfigurationDB.db_name = __name__ + ".db" + try: + os.remove(ConfigurationDB.db_name) + except OSError: + pass + + self.config_db = ConfigurationDB() + + def test_create_key(self): + expected = "ABCDE-12345" + + self.config_db.set_configuration_value( + "test", "key", expected) + + actual = self.config_db.get_configuration_value( + "test", "key") + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_update_key(self): + expected = "ABCDE-12345" + + self.config_db.set_configuration_value( + "test", "key", "initial_value") + + self.config_db.set_configuration_value( + "test", "key", expected) + + actual = self.config_db.get_configuration_value( + "test", "key") + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_deleted_key(self): + expected = None + + self.config_db.set_configuration_value( + "test", "key", "initial_value") + + self.config_db.delete_configuration_value( + "test", "key") + + actual = self.config_db.get_configuration_value( + "test", "key") + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py index d9b10a2..4620412 100644 --- a/storperf/tests/db_tests/job_db_test.py +++ b/storperf/tests/db_tests/job_db_test.py @@ -7,17 +7,23 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from storperf.db.job_db import JobDB +import os +import sqlite3 import unittest import mock -from db.job_db import JobDB - class JobDBTest(unittest.TestCase): def setUp(self): - JobDB.db_name = ":memory:" + + JobDB.db_name = __name__ + '.db' + try: + os.remove(JobDB.db_name) + except OSError: + pass self.job = JobDB() @mock.patch("uuid.uuid4") @@ -56,7 +62,8 @@ class JobDBTest(unittest.TestCase): mock_uuid.side_effect = (job_id,) workload_name = "Workload" - cursor = self.job.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() row = cursor.execute( """select * from jobs @@ -97,7 +104,8 @@ class JobDBTest(unittest.TestCase): self.job.start_workload(workload_name) self.job.end_workload(workload_name) - cursor = self.job.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() cursor.execute( """select job_id, workload, start, end from jobs where job_id = ? @@ -124,7 +132,8 @@ class JobDBTest(unittest.TestCase): mock_uuid.side_effect = (job_id,) workload_name = "Workload" - cursor = self.job.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() self.job.start_workload(workload_name) self.job.start_workload(workload_name) @@ -155,7 +164,8 @@ class JobDBTest(unittest.TestCase): self.job.end_workload(workload_name) - cursor = self.job.db.cursor() + db = sqlite3.connect(JobDB.db_name) + cursor = db.cursor() cursor.execute( """select job_id, workload, start, end from jobs where job_id = ? diff --git a/storperf/tests/storperf_master_test.py b/storperf/tests/storperf_master_test.py new file mode 100644 index 0000000..ff85fb0 --- /dev/null +++ b/storperf/tests/storperf_master_test.py @@ -0,0 +1,51 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from storperf.db.configuration_db import ConfigurationDB +from storperf.storperf_master import StorPerfMaster +import os +import unittest + + +class StorPerfMasterTest(unittest.TestCase): + + def setUp(self): + ConfigurationDB.db_name = __name__ + ".db" + try: + os.remove(ConfigurationDB.db_name) + except OSError: + pass + self.storperf = StorPerfMaster() + + def test_agent_count(self): + expected = 10 + + self.storperf.agent_count = expected + actual = self.storperf.agent_count + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_volume_size(self): + expected = 20 + + self.storperf.volume_size = expected + actual = self.storperf.volume_size + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) + + def test_agent_network(self): + expected = "ABCDEF" + + self.storperf.agent_network = expected + actual = self.storperf.agent_network + + self.assertEqual( + expected, actual, "Did not expect: " + str(actual)) diff --git a/storperf/workloads/__init__.py b/storperf/workloads/__init__.py index e69de29..73334c7 100644 --- a/storperf/workloads/__init__.py +++ b/storperf/workloads/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/storperf/workloads/_ssd_preconditioning.py b/storperf/workloads/_ssd_preconditioning.py index e1e8bef..cce3c31 100644 --- a/storperf/workloads/_ssd_preconditioning.py +++ b/storperf/workloads/_ssd_preconditioning.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class _ssd_preconditioning(_base_workload._base_workload): diff --git a/storperf/workloads/_warm_up.py b/storperf/workloads/_warm_up.py index 27667ca..9cd268e 100644 --- a/storperf/workloads/_warm_up.py +++ b/storperf/workloads/_warm_up.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class _warm_up(_base_workload._base_workload): diff --git a/storperf/workloads/rr.py b/storperf/workloads/rr.py index 824974d..3823a4c 100644 --- a/storperf/workloads/rr.py +++ b/storperf/workloads/rr.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class rr(_base_workload._base_workload): diff --git a/storperf/workloads/rs.py b/storperf/workloads/rs.py index 92e3ce6..511888e 100644 --- a/storperf/workloads/rs.py +++ b/storperf/workloads/rs.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class rs(_base_workload._base_workload): diff --git a/storperf/workloads/rw.py b/storperf/workloads/rw.py index 2132a81..f4b6979 100644 --- a/storperf/workloads/rw.py +++ b/storperf/workloads/rw.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class rw(_base_workload._base_workload): diff --git a/storperf/workloads/wr.py b/storperf/workloads/wr.py index 19b2c61..457a29a 100644 --- a/storperf/workloads/wr.py +++ b/storperf/workloads/wr.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class wr(_base_workload._base_workload): diff --git a/storperf/workloads/ws.py b/storperf/workloads/ws.py index 8ec2ebe..f37079e 100644 --- a/storperf/workloads/ws.py +++ b/storperf/workloads/ws.py @@ -6,7 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from workloads import _base_workload +from storperf.workloads import _base_workload class ws(_base_workload._base_workload): -- cgit 1.2.3-korg