summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@emc.com>2016-01-19 20:58:35 -0500
committerMark Beierl <mark.beierl@emc.com>2016-01-29 13:43:04 -0500
commit488a47d945d3ef3dfa9ee37ca0aac3b480ffc800 (patch)
tree295ea3f6df99884675ba8f21c207bf892f0170bd
parent9960601b321f10a11257832a2ecacb91acf03c53 (diff)
Remote slave agent workload
Add storperf master object to manage stack lifecycle. Add configuration db. Creation of CLI vs. main so that ReST API and CLI API can be kept clear. Fixed License in files. Changed DB objects to be thread safe. Added ssh server to container if desired for CLI. Change-Id: Idfe84bfb7920e6ce19d27462593c21ea86e7b406 JIRA: STORPERF-29 Signed-off-by: Mark Beierl <mark.beierl@emc.com>
-rw-r--r--.gitignore1
-rwxr-xr-xbuild-dev-docker.sh10
-rwxr-xr-xci/build.sh1
-rwxr-xr-xci/setup.py10
-rw-r--r--ci/storperf-master.yaml117
-rwxr-xr-xci/test.sh1
-rw-r--r--cli.py172
-rw-r--r--docker/Dockerfile40
-rw-r--r--docker/requirements.pip10
-rw-r--r--docker/storperf.pp9
-rw-r--r--docker/supervisord.conf16
-rw-r--r--rest_server.py134
-rw-r--r--storperf/__init__.py8
-rw-r--r--storperf/carbon/__init__.py8
-rw-r--r--storperf/db/__init__.py8
-rw-r--r--storperf/db/configuration_db.py103
-rw-r--r--storperf/db/job_db.py25
-rw-r--r--storperf/fio/__init__.py8
-rw-r--r--storperf/fio/fio_invoker.py33
-rw-r--r--storperf/logging.json15
-rw-r--r--storperf/main.py120
-rw-r--r--storperf/resources/hot/agent-group.yaml57
-rw-r--r--storperf/resources/hot/storperf-agent.yaml101
-rw-r--r--storperf/storperf_master.py342
-rw-r--r--storperf/test_executor.py83
-rw-r--r--storperf/tests/carbon_tests/__init__.py8
-rw-r--r--storperf/tests/carbon_tests/emitter_test.py12
-rw-r--r--storperf/tests/carbon_tests/json_to_carbon_test.py5
-rw-r--r--storperf/tests/db_tests/configuration_db_test.py66
-rw-r--r--storperf/tests/db_tests/job_db_test.py24
-rw-r--r--storperf/tests/storperf_master_test.py51
-rw-r--r--storperf/workloads/__init__.py8
-rw-r--r--storperf/workloads/_ssd_preconditioning.py2
-rw-r--r--storperf/workloads/_warm_up.py2
-rw-r--r--storperf/workloads/rr.py2
-rw-r--r--storperf/workloads/rs.py2
-rw-r--r--storperf/workloads/rw.py2
-rw-r--r--storperf/workloads/wr.py2
-rw-r--r--storperf/workloads/ws.py2
39 files changed, 1300 insertions, 320 deletions
diff --git a/.gitignore b/.gitignore
index 10e8437..c06ce0a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,3 +7,4 @@ coverage.xml
.coverage
.settings
storperf.egg-info
+*.db
diff --git a/build-dev-docker.sh b/build-dev-docker.sh
index 1e6a861..131d8ef 100755
--- a/build-dev-docker.sh
+++ b/build-dev-docker.sh
@@ -1,4 +1,12 @@
#!/bin/bash
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
echo "Creating a docker image from the current working directory..."
@@ -8,4 +16,4 @@ sed -i "s|COPY supervisord.conf|COPY docker/supervisord.conf|" Dockerfile
docker build -t opnfv/storperf:dev .
-rm Dockerfile
+rm -f Dockerfile
diff --git a/ci/build.sh b/ci/build.sh
deleted file mode 100755
index 8b13789..0000000
--- a/ci/build.sh
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/ci/setup.py b/ci/setup.py
index c8d05e8..daacc61 100755
--- a/ci/setup.py
+++ b/ci/setup.py
@@ -1,3 +1,12 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
from setuptools import setup, find_packages
@@ -16,6 +25,7 @@ setup(
"flake8>=2.5.1",
"flask>=0.10.1",
"flask-restful>=0.3.5",
+ "html2text>=2016.1.8",
"mock>=1.3",
"pyyaml>=3.11",
"python-cinderclient>=1.5.0",
diff --git a/ci/storperf-master.yaml b/ci/storperf-master.yaml
deleted file mode 100644
index 1bc84f5..0000000
--- a/ci/storperf-master.yaml
+++ /dev/null
@@ -1,117 +0,0 @@
-heat_template_version: 2013-05-23
-
-parameters:
- flavor:
- type: string
- default: m1.small
- image:
- type: string
- default: ubuntu-server
- key_name:
- type: string
- public_net_id:
- type: string
- default: public
- username:
- type: string
- default: storperf
-
-resources:
- storperf_manager:
- type: "OS::Nova::Server"
- properties:
- name: storperf-manager
- image: { get_param: image }
- flavor: { get_param: flavor }
- key_name: { get_param: key_name }
- networks:
- - port: { get_resource: storperf_manager_port }
- user_data: { get_resource: storperf_manager_config }
- user_data_format: RAW
-
- storperf_manager_config:
- type: "OS::Heat::CloudConfig"
- properties:
- cloud_config:
- users:
- - name: { get_param: username }
- groups: users
- shell: /bin/bash
- sudo: "ALL=(ALL) NOPASSWD:ALL"
- ssh_authorized_keys:
- - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07
- - default
- package_update: true
- package_upgrade: true
- packages:
- - fio
- - python
- - rsync
- - graphite-carbon
- - graphite-web
- - apache2
- - libapache2-mod-wsgi
- - curl
-
- storperf_manager_port:
- type: "OS::Neutron::Port"
- properties:
- network_id: { get_resource: storperf_agent_net }
- security_groups:
- - { get_resource: storperf_security_group }
-
- storperf_manager_ip:
- type: "OS::Neutron::FloatingIP"
- properties:
- floating_network_id: { get_param: public_net_id }
- port_id: { get_resource: storperf_manager_port }
-
- storperf_agent_net:
- type: "OS::Neutron::Net"
- properties:
- name: storperf-agent-network
-
- storperf_agent_subnet:
- type: "OS::Neutron::Subnet"
- properties:
- name: StorPerf-Agent-Subnet
- network_id: { get_resource: storperf_agent_net }
- cidr: 192.168.101.0/24
- gateway_ip: 192.168.101.1
- enable_dhcp: true
- allocation_pools:
- - start: "192.168.101.2"
- end: "192.168.101.250"
-
- storperf_security_group:
- type: OS::Neutron::SecurityGroup
- properties:
- description: Neutron security group rules
- name: storperf_security_group
- rules:
- - remote_ip_prefix: 0.0.0.0/0
- protocol: tcp
- direction: ingress
- - remote_ip_prefix: 0.0.0.0/0
- protocol: icmp
- direction: ingress
-
- router:
- type: OS::Neutron::Router
-
- router_gateway:
- type: OS::Neutron::RouterGateway
- properties:
- router_id: { get_resource: router }
- network_id: { get_param: public_net_id }
-
- router_interface:
- type: OS::Neutron::RouterInterface
- properties:
- router_id: { get_resource: router }
- subnet_id: { get_resource: storperf_agent_subnet }
-
-outputs:
- public_ip:
- description: Floating IP address in public network
- value: { get_attr: [ storperf_manager_ip, floating_ip_address ] }
diff --git a/ci/test.sh b/ci/test.sh
deleted file mode 100755
index ff23c1e..0000000
--- a/ci/test.sh
+++ /dev/null
@@ -1 +0,0 @@
-PYTHONPATH="`pwd`/storperf":"`pwd`/tests" nosetests --with-xunit .
diff --git a/cli.py b/cli.py
new file mode 100644
index 0000000..560d77d
--- /dev/null
+++ b/cli.py
@@ -0,0 +1,172 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+"""
+"""
+
+from storperf.storperf_master import StorPerfMaster
+from storperf.test_executor import UnknownWorkload
+from threading import Thread
+import cPickle
+import getopt
+import json
+import logging
+import logging.config
+import logging.handlers
+import os
+import socket
+import struct
+import sys
+import time
+
+import html2text
+import requests
+
+
+class Usage(Exception):
+ pass
+
+
+def event(event_string):
+ logging.getLogger(__name__).info(event_string)
+
+
+class LogRecordStreamHandler(object):
+
+ def __init__(self):
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.socket.bind((
+ 'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT))
+ self.level = logging.INFO
+
+ def read_logs(self):
+ try:
+ while True:
+ datagram = self.socket.recv(8192)
+ chunk = datagram[0:4]
+ slen = struct.unpack(">L", chunk)[0]
+ chunk = datagram[4:]
+ obj = cPickle.loads(chunk)
+ record = logging.makeLogRecord(obj)
+ if (record.levelno >= self.level):
+ logger = logging.getLogger(record.name)
+ logger.handle(record)
+
+ except Exception as e:
+ print "ERROR: " + str(e)
+ finally:
+ self.socket.close()
+
+
+def main(argv=None):
+ verbose = False
+ debug = False
+ report = None
+ erase = False
+ options = {}
+
+ storperf = StorPerfMaster()
+
+ if argv is None:
+ argv = sys.argv
+ try:
+ try:
+ opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdh",
+ ["target=",
+ "workload=",
+ "report=",
+ "configure=",
+ "erase",
+ "nossd",
+ "nowarm",
+ "verbose",
+ "debug",
+ "help",
+ ])
+ except getopt.error, msg:
+ raise Usage(msg)
+
+ configuration = None
+ options['workload'] = None
+
+ for o, a in opts:
+ if o in ("-h", "--help"):
+ print __doc__
+ return 0
+ elif o in ("-t", "--target"):
+ options['filename'] = a
+ elif o in ("-v", "--verbose"):
+ verbose = True
+ elif o in ("-d", "--debug"):
+ debug = True
+ elif o in ("-s", "--nossd"):
+ options['nossd'] = a
+ elif o in ("-c", "--nowarm"):
+ options['nowarm'] = False
+ elif o in ("-w", "--workload"):
+ options['workload'] = a
+ elif o in ("-r", "--report"):
+ report = a
+ elif o in ("-e", "--erase"):
+ erase = True
+ elif o in ("-f", "--configure"):
+ configuration = dict(x.split('=') for x in a.split(','))
+
+ if (debug) or (verbose):
+ udpserver = LogRecordStreamHandler()
+
+ if (debug):
+ udpserver.level = logging.DEBUG
+
+ logging.basicConfig(format="%(asctime)s - %(name)s - " +
+ "%(levelname)s - %(message)s")
+
+ t = Thread(target=udpserver.read_logs, args=())
+ t.setDaemon(True)
+ t.start()
+
+ if (erase):
+ response = requests.delete(
+ 'http://127.0.0.1:5000/api/v1.0/configure')
+ if (response.status_code == 400):
+ content = json.loads(response.content)
+ raise Usage(content['message'])
+ return 0
+
+ if (configuration is not None):
+ response = requests.post(
+ 'http://127.0.0.1:5000/api/v1.0/configure', json=configuration)
+ if (response.status_code == 400):
+ content = json.loads(response.content)
+ raise Usage(content['message'])
+
+ if (report is not None):
+ print storperf.fetch_results(report, workloads)
+ else:
+ print "Calling start..."
+ response = requests.post(
+ 'http://127.0.0.1:5000/api/v1.0/start', json=options)
+ if (response.status_code == 400):
+ content = json.loads(response.content)
+ raise Usage(content['message'])
+
+ content = json.loads(response.content)
+ print "Started job id: " + content['job_id']
+
+ except Usage as e:
+ print >> sys.stderr, str(e)
+ print >> sys.stderr, "For help use --help"
+ return 2
+
+ except Exception as e:
+ print >> sys.stderr, str(e)
+ return 2
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 5ad8624..4ca66d0 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -1,6 +1,13 @@
-########################################
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
# Docker container for STORPERF
-########################################
+#
# Purpose: docker image for Storperf project
#
# Maintained by Jose Lausuch, Mark Beierl
@@ -8,7 +15,7 @@
# $ docker build -t opnfv/storperf:tag .
#
# Execution:
-# $ docker run -t -i opnfv/storperf /bin/bash
+# $ docker run -t opnfv/storperf /bin/bash
#
@@ -36,6 +43,19 @@ python-dev \
python-pip \
--no-install-recommends
+
+# Allow root SSH access with 'storperf' as the password
+
+RUN echo 'root:storperf' | chpasswd
+RUN sed -i 's/PermitRootLogin without-password/PermitRootLogin yes/' /etc/ssh/sshd_config
+
+# SSH login fix. Otherwise user is kicked off after login
+RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd
+
+ENV NOTVISIBLE "in users profile"
+RUN echo "export VISIBLE=now" >> /etc/profile
+RUN mkdir -p /var/run/sshd
+
RUN mkdir -p ${repos_dir}
RUN mkdir -p /root/.ssh
RUN chmod 700 /root/.ssh
@@ -45,14 +65,24 @@ RUN git clone https://gerrit.opnfv.org/gerrit/storperf ${repos_dir}/storperf
RUN git clone https://gerrit.opnfv.org/gerrit/releng ${repos_dir}/releng
RUN git clone http://git.kernel.dk/fio.git ${repos_dir}/fio
RUN cd ${repos_dir}/fio && git checkout tags/fio-2.2.10
-RUN cd ${repos_dir}/fio && make -j 4 install
+RUN cd ${repos_dir}/fio && make -j 6 install
RUN puppet module install gdsoperations-graphite
+RUN chmod 600 ${repos_dir}/storperf/storperf/resources/ssh/storperf_rsa
+
+RUN pip install -r ${repos_dir}/storperf/docker/requirements.pip
+
COPY storperf.pp /etc/puppet/manifests/storperf.pp
RUN puppet apply /etc/puppet/manifests/storperf.pp
-#Let others connect to Graphite if they want our data
+# Open access to SSH if desired
+EXPOSE 22
+
+# Graphite
EXPOSE 8000
+# ReST API
+EXPOSE 5000
+
COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf
CMD ["/usr/bin/supervisord"]
diff --git a/docker/requirements.pip b/docker/requirements.pip
new file mode 100644
index 0000000..be29c28
--- /dev/null
+++ b/docker/requirements.pip
@@ -0,0 +1,10 @@
+pyyaml==3.10
+python-neutronclient==2.6.0
+python-heatclient==0.8.0
+python-novaclient==2.28.1
+python-glanceclient==1.1.0
+python-cinderclient==1.4.0
+python-keystoneclient==1.6.0
+flask>=0.10
+flask-restful>=0.3.5
+html2text==2016.1.8 \ No newline at end of file
diff --git a/docker/storperf.pp b/docker/storperf.pp
index 00a6482..7de1024 100644
--- a/docker/storperf.pp
+++ b/docker/storperf.pp
@@ -1,3 +1,12 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
class { 'python':
pip => true,
dev => true,
diff --git a/docker/supervisord.conf b/docker/supervisord.conf
index 80dfe5e..566b4be 100644
--- a/docker/supervisord.conf
+++ b/docker/supervisord.conf
@@ -17,3 +17,19 @@ command = /opt/graphite/bin/gunicorn_django -b0.0.0.0:8000 -w2 graphite/setting
stdout_logfile = /var/log/supervisor/%(program_name)s.log
stderr_logfile = /var/log/supervisor/%(program_name)s.log
autorestart = true
+
+[program:sshd]
+user = root
+command = /usr/sbin/sshd =D
+stdout_logfile = /var/log/supervisor/%(program_name)s.log
+stderr_logfile = /var/log/supervisor/%(program_name)s.log
+autorestart = true
+
+[program:storperf-webapp]
+user = root
+directory = /home/opnfv/repos/storperf
+command = /usr/bin/python /home/opnfv/repos/storperf/rest_server.py
+stdout_logfile = /var/log/supervisor/%(program_name)s.log
+stderr_logfile = /var/log/supervisor/%(program_name)s.log
+autorestart = true
+
diff --git a/rest_server.py b/rest_server.py
new file mode 100644
index 0000000..a8459f3
--- /dev/null
+++ b/rest_server.py
@@ -0,0 +1,134 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from storperf.storperf_master import StorPerfMaster
+import json
+import logging
+import logging.config
+import os
+
+from flask import abort, Flask, request, jsonify
+from flask_restful import Resource, Api
+
+
+app = Flask(__name__)
+api = Api(app)
+storperf = StorPerfMaster()
+
+
+class Configure(Resource):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ def get(self):
+ return jsonify({'agent_count': storperf.agent_count,
+ 'agent_network': storperf.agent_network,
+ 'volume_size': storperf.volume_size,
+ 'stack_created': storperf.is_stack_created,
+ 'stack_id': storperf.stack_id})
+
+ def post(self):
+ if not request.json:
+ abort(400, "ERROR: No data specified")
+
+ try:
+ if ('agent_count' in request.json):
+ storperf.agent_count = request.json['agent_count']
+ if ('agent_network' in request.json):
+ storperf.agent_network = request.json['agent_network']
+ if ('volume_size' in request.json):
+ storperf.volume_size = request.json['volume_size']
+
+ storperf.validate_stack()
+ storperf.create_stack()
+
+ return jsonify({'agent_count': storperf.agent_count,
+ 'agent_network': storperf.agent_network,
+ 'volume_size': storperf.volume_size,
+ 'stack_id': storperf.stack_id})
+
+ except Exception as e:
+ abort(400, str(e))
+
+ def delete(self):
+ try:
+ storperf.delete_stack()
+ except Exception as e:
+ abort(400, str(e))
+ pass
+
+
+class StartJob(Resource):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ def post(self):
+ if not request.json:
+ abort(400, "ERROR: Missing configuration data")
+
+ self.logger.info(request.json)
+
+ try:
+ if ('target' in request.json):
+ storperf.filename = request.json['filename']
+ if ('nossd' in request.json):
+ storperf.precondition = False
+ if ('nowarm' in request.json):
+ storperf.warm_up = False
+ if ('workload' in request.json):
+ storperf.workloads = request.json['workload']
+
+ job_id = storperf.execute_workloads()
+
+ return jsonify({'job_id': job_id})
+
+ except Exception as e:
+ abort(400, str(e))
+
+
+class Quota(Resource):
+
+ def get(self):
+ quota = storperf.get_volume_quota()
+ return jsonify({'quota': quota})
+
+
+def setup_logging(default_path='storperf/logging.json',
+ default_level=logging.INFO, env_key='LOG_CFG'):
+ """Setup logging configuration
+ """
+
+ path = default_path
+ value = os.getenv(env_key, None)
+ if value:
+ path = value
+ if os.path.exists(path):
+ with open(path, 'rt') as f:
+ config = json.load(f)
+ logging.config.dictConfig(config)
+ else:
+ logging.basicConfig(level=default_level)
+
+ socketHandler = logging.handlers.DatagramHandler(
+ 'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT)
+ rootLogger = logging.getLogger('')
+ rootLogger.addHandler(socketHandler)
+
+
+api.add_resource(Configure, "/api/v1.0/configure")
+api.add_resource(Quota, "/api/v1.0/quota")
+api.add_resource(StartJob, "/api/v1.0/start")
+
+if __name__ == "__main__":
+ setup_logging()
+ logging.getLogger("storperf").setLevel(logging.DEBUG)
+
+ app.run(host='0.0.0.0', debug=True)
diff --git a/storperf/__init__.py b/storperf/__init__.py
new file mode 100644
index 0000000..73334c7
--- /dev/null
+++ b/storperf/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/storperf/carbon/__init__.py b/storperf/carbon/__init__.py
index e69de29..73334c7 100644
--- a/storperf/carbon/__init__.py
+++ b/storperf/carbon/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/storperf/db/__init__.py b/storperf/db/__init__.py
index e69de29..73334c7 100644
--- a/storperf/db/__init__.py
+++ b/storperf/db/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/storperf/db/configuration_db.py b/storperf/db/configuration_db.py
new file mode 100644
index 0000000..649c186
--- /dev/null
+++ b/storperf/db/configuration_db.py
@@ -0,0 +1,103 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from _sqlite3 import OperationalError
+import logging
+import sqlite3
+
+
+class ConfigurationDB(object):
+
+ db_name = "StorPerf.db"
+
+ def __init__(self):
+ """
+ Creates the StorPerf.db and configuration tables on demand
+ """
+
+ self.logger = logging.getLogger(__name__)
+ self.logger.debug("Connecting to " + ConfigurationDB.db_name)
+ db = sqlite3.connect(ConfigurationDB.db_name)
+
+ cursor = db.cursor()
+ try:
+ cursor.execute('''CREATE TABLE configuration
+ (configuration_name text,
+ key text,
+ value text)''')
+ self.logger.debug("Created configuration table")
+ except OperationalError:
+ self.logger.debug("Configuration table exists")
+
+ cursor.execute('SELECT * FROM configuration')
+
+ def delete_configuration_value(self, configuration_name, key):
+ """Deletes the value associated with the given key
+ """
+
+ db = sqlite3.connect(ConfigurationDB.db_name)
+ cursor = db.cursor()
+
+ cursor.execute(
+ "delete from configuration where configuration_name=? and key=?",
+ (configuration_name, key))
+
+ self.logger.debug("Deleted " + configuration_name + ":" + key)
+
+ db.commit()
+
+ def get_configuration_value(self, configuration_name, key):
+ """Returns a string representation of the value stored
+ with this key under the given configuration name.
+ """
+
+ db = sqlite3.connect(ConfigurationDB.db_name)
+ cursor = db.cursor()
+
+ cursor.execute(
+ """select value from configuration
+ where configuration_name = ?
+ and key = ?""",
+ (configuration_name, key,))
+
+ row = cursor.fetchone()
+
+ if (row is None):
+ self.logger.debug(
+ configuration_name + ":" + key + " does not exist")
+ return None
+ else:
+ self.logger.debug(
+ configuration_name + ":" + key + " is " + str(row[0]))
+ return str(row[0])
+
+ def set_configuration_value(self, configuration_name, key, value):
+ """Updates or creates the key under the given configuration
+ name so that it holds the value specified.
+ """
+
+ if (value is None):
+ return self.delete_configuration_value(configuration_name, key)
+
+ value = str(value)
+
+ db = sqlite3.connect(ConfigurationDB.db_name)
+ cursor = db.cursor()
+
+ cursor.execute(
+ "delete from configuration where configuration_name=? and key=?",
+ (configuration_name, key))
+
+ cursor.execute(
+ """insert into configuration(configuration_name, key, value)
+ values (?,?,?)""", (configuration_name, key, value))
+
+ self.logger.debug(configuration_name + ":" + key + " set to " + value)
+
+ db.commit()
diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py
index a65fa78..bec8d3f 100644
--- a/storperf/db/job_db.py
+++ b/storperf/db/job_db.py
@@ -28,10 +28,10 @@ class JobDB(object):
self.logger = logging.getLogger(__name__)
self.logger.debug("Connecting to " + JobDB.db_name)
- self.db = sqlite3.connect(JobDB.db_name)
self.job_id = None
- cursor = self.db.cursor()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
try:
cursor.execute('''CREATE TABLE jobs
(job_id text,
@@ -49,7 +49,8 @@ class JobDB(object):
Returns a job id that is guaranteed to be unique in this
StorPerf instance.
"""
- cursor = self.db.cursor()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
self.job_id = str(uuid.uuid4())
row = cursor.execute(
@@ -64,7 +65,7 @@ class JobDB(object):
cursor.execute(
"insert into jobs(job_id) values (?)", (self.job_id,))
self.logger.debug("Reserved job id " + self.job_id)
- self.db.commit()
+ db.commit()
def start_workload(self, workload_name):
"""
@@ -73,7 +74,9 @@ class JobDB(object):
if (self.job_id is None):
self.create_job_id()
- cursor = self.db.cursor()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
+
now = str(calendar.timegm(time.gmtime()))
row = cursor.execute(
@@ -104,7 +107,7 @@ class JobDB(object):
now,
workload_name,))
- self.db.commit()
+ db.commit()
def end_workload(self, workload_name):
"""
@@ -113,7 +116,8 @@ class JobDB(object):
if (self.job_id is None):
self.create_job_id()
- cursor = self.db.cursor()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
now = str(calendar.timegm(time.gmtime()))
row = cursor.execute(
@@ -146,7 +150,7 @@ class JobDB(object):
now,
workload_name,))
- self.db.commit()
+ db.commit()
def fetch_results(self, workload_prefix=""):
if (workload_prefix is None):
@@ -161,7 +165,8 @@ class JobDB(object):
self.logger.debug("Workload like: " + workload_prefix)
- cursor = self.db.cursor()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
cursor.execute("""select start, end, workload
from jobs where workload like ?""",
(workload_prefix,))
@@ -186,8 +191,6 @@ class JobDB(object):
'.' + workload + '.jobs.1.*.clat.mean&format=json&from=' + \
start_time + "&until=" + end_time
- print '\n\t' + request + '\n'
-
response = requests.get(request)
if (response.status_code == 200):
diff --git a/storperf/fio/__init__.py b/storperf/fio/__init__.py
index e69de29..73334c7 100644
--- a/storperf/fio/__init__.py
+++ b/storperf/fio/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index 0b13349..e343dce 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -7,10 +7,11 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+from threading import Thread
+import cmd
import json
import logging
import subprocess
-from threading import Thread
class FIOInvoker(object):
@@ -18,6 +19,18 @@ class FIOInvoker(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.event_listeners = set()
+ self.event_callback_ids = set()
+ self._remote_host = None
+ self.callback_id = None
+
+ @property
+ def remote_host(self):
+ return self._remote_host
+
+ @remote_host.setter
+ def remote_host(self, value):
+ self._remote_host = value
+ self.logger = logging.getLogger(__name__ + ":" + value)
def register(self, event_listener):
self.event_listeners.add(event_listener)
@@ -41,7 +54,7 @@ class FIOInvoker(object):
self.json_body = ""
for event_listener in self.event_listeners:
- event_listener(json_metric)
+ event_listener(self.callback_id, json_metric)
except Exception, e:
self.logger.error("Error parsing JSON: %s", e)
@@ -58,10 +71,18 @@ class FIOInvoker(object):
self.fio_process.stderr.close()
def execute(self, args=[]):
- for arg in args:
- self.logger.debug("FIO arg: " + arg)
-
- self.fio_process = subprocess.Popen(['fio'] + args,
+ self.logger.debug("FIO args " + str(args))
+
+ if (self.remote_host is None):
+ cmd = "fio"
+ else:
+ cmd = "ssh"
+ additional_args = ['-o', 'StrictHostKeyChecking=no',
+ '-i', 'storperf/resources/ssh/storperf_rsa',
+ 'ubuntu@' + self.remote_host, "./fio"]
+ args = additional_args + args
+
+ self.fio_process = subprocess.Popen([cmd] + args,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
diff --git a/storperf/logging.json b/storperf/logging.json
index b2fb73b..6168717 100644
--- a/storperf/logging.json
+++ b/storperf/logging.json
@@ -15,11 +15,11 @@
"stream": "ext://sys.stdout"
},
- "info_file_handler": {
+ "file_handler": {
"class": "logging.handlers.RotatingFileHandler",
- "level": "INFO",
+ "level": "DEBUG",
"formatter": "simple",
- "filename": "info.log",
+ "filename": "storperf.log",
"maxBytes": 10485760,
"backupCount": 20,
"encoding": "utf8"
@@ -45,7 +45,12 @@
},
"root": {
- "level": "INFO",
- "handlers": ["console", "info_file_handler", "error_file_handler"]
+ "level": "WARN",
+ "handlers": ["console", "file_handler", "error_file_handler"]
+ },
+
+ "storperf": {
+ "level": "DEBUG",
+ "handlers": ["console", "file_handler", "error_file_handler"]
}
} \ No newline at end of file
diff --git a/storperf/main.py b/storperf/main.py
deleted file mode 100644
index 11357f4..0000000
--- a/storperf/main.py
+++ /dev/null
@@ -1,120 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 EMC and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import getopt
-import json
-import logging.config
-import os
-import sys
-
-from test_executor import TestExecutor, UnknownWorkload
-
-"""
-"""
-
-
-class Usage(Exception):
-
- def __init__(self, msg):
- self.msg = msg
-
-
-def setup_logging(
- default_path='storperf/logging.json',
- default_level=logging.INFO,
- env_key='LOG_CFG'
-):
- """Setup logging configuration
-
- """
- path = default_path
- value = os.getenv(env_key, None)
- if value:
- path = value
- if os.path.exists(path):
- with open(path, 'rt') as f:
- config = json.load(f)
- logging.config.dictConfig(config)
- else:
- logging.basicConfig(level=default_level)
-
-
-def event(event_string):
- logging.getLogger(__name__).info(event_string)
-
-
-def main(argv=None):
- setup_logging()
- test_executor = TestExecutor()
- verbose = False
- debug = False
- workloads = None
- report = None
-
- if argv is None:
- argv = sys.argv
- try:
- try:
- opts, args = getopt.getopt(argv[1:], "t:w:r:scvdh",
- ["target=",
- "workload=",
- "report=",
- "nossd",
- "nowarm",
- "verbose",
- "debug",
- "help",
- ])
- except getopt.error, msg:
- raise Usage(msg)
-
- for o, a in opts:
- if o in ("-h", "--help"):
- print __doc__
- return 0
- elif o in ("-t", "--target"):
- test_executor.filename = a
- elif o in ("-t", "--target"):
- report = a
- elif o in ("-v", "--verbose"):
- verbose = True
- elif o in ("-d", "--debug"):
- debug = True
- elif o in ("-s", "--nossd"):
- test_executor.precondition = False
- elif o in ("-c", "--nowarm"):
- test_executor.warm = False
- elif o in ("-w", "--workload"):
- workloads = a.split(",")
- elif o in ("-r", "--report"):
- report = a
-
- if (debug):
- logging.getLogger().setLevel(logging.DEBUG)
-
- test_executor.register_workloads(workloads)
-
- except Usage, err:
- print >> sys.stderr, err.msg
- print >> sys.stderr, "for help use --help"
- return 2
- except UnknownWorkload, err:
- print >> sys.stderr, err.msg
- print >> sys.stderr, "for help use --help"
- return 2
-
- if (verbose):
- test_executor.register(event)
-
- if (report is not None):
- print test_executor.fetch_results(report, workloads)
- else:
- test_executor.execute()
-
-if __name__ == "__main__":
- sys.exit(main())
diff --git a/storperf/resources/hot/agent-group.yaml b/storperf/resources/hot/agent-group.yaml
new file mode 100644
index 0000000..315ecf3
--- /dev/null
+++ b/storperf/resources/hot/agent-group.yaml
@@ -0,0 +1,57 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+heat_template_version: 2013-05-23
+
+parameters:
+ agent_network:
+ type: string
+ constraints:
+ - custom_constraint: neutron.network
+ flavor:
+ type: string
+ default: "StorPerf Agent"
+ key_name:
+ type: string
+ default: StorPerf
+ volume_size:
+ type: number
+ description: Size of the volume to be created.
+ default: 1
+ constraints:
+ - range: { min: 1, max: 1024 }
+ description: must be between 1 and 1024 Gb.
+ agent_count:
+ type: number
+ default: 1
+ constraints:
+ - range: { min: 1, max: 512 }
+ description: must be between 1 and 512 agents.
+
+
+resources:
+ slaves:
+ type: OS::Heat::ResourceGroup
+ properties:
+ count: {get_param: agent_count}
+ resource_def: {
+ type: "storperf-agent.yaml",
+ properties: {
+ agent_network: {get_param: agent_network},
+ flavor: {get_param: flavor},
+ key_name: {get_param: key_name},
+ volume_size: {get_param: volume_size}
+ }
+ }
+
+outputs:
+ slave_ips: {
+ description: "Slave addresses",
+ value: { get_attr: [ slaves, storperf_agent_ip] }
+ }
diff --git a/storperf/resources/hot/storperf-agent.yaml b/storperf/resources/hot/storperf-agent.yaml
new file mode 100644
index 0000000..94238e5
--- /dev/null
+++ b/storperf/resources/hot/storperf-agent.yaml
@@ -0,0 +1,101 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+heat_template_version: 2013-05-23
+
+parameters:
+ flavor:
+ type: string
+ default: m1.small
+ image:
+ type: string
+ default: 'StorPerf Agent'
+ key_name:
+ type: string
+ default: StorPerf
+ username:
+ type: string
+ default: storperf
+ volume_size:
+ type: number
+ description: Size of the volume to be created.
+ default: 1
+ constraints:
+ - range: { min: 1, max: 1024 }
+ description: must be between 1 and 1024 Gb.
+ agent_network:
+ type: string
+ constraints:
+ - custom_constraint: neutron.network
+
+resources:
+
+ storperf_agent:
+ type: "OS::Nova::Server"
+ properties:
+ name: storperf-agent
+ image: { get_param: image }
+ flavor: { get_param: flavor }
+ key_name: { get_param: key_name }
+ networks:
+ - port: { get_resource: storperf_agent_port }
+ user_data: { get_resource: storperf_agent_config }
+ user_data_format: RAW
+
+ storperf_agent_config:
+ type: "OS::Heat::CloudConfig"
+ properties:
+ cloud_config:
+ users:
+ - name: { get_param: username }
+ groups: users
+ shell: /bin/bash
+ sudo: "ALL=(ALL) NOPASSWD:ALL"
+ ssh_authorized_keys:
+ - ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEbnDiqZ8RjQJJzJPf074J41XlYED+zYBzaUZ5UkkUquXzymyUmoWaFBXJP+XPu4Ns44U/S8614+JxGk96tjUdJlIjL0Ag8HP6KLtTNCabucKcEASpgJIVWqJvE3E9upZLIEiTGsF8I8S67T2qq1J1uvtxyeZmyjm7NMamjyFXE53dhR2EHqSutyKK1CK74NkRY9wr3qWUIt35kLdKSVSfrr4gOOicDALbIRu77skHIvrjt+wK1VWphBdMg6ytuq5mIE6pjWAU3Gwl4aTxOU0z43ARzCLq8HVf8s/dKjYMj8plNqaIfceMbaEUqpNHv/xbvtGNG7N0aB/a4pkUQL07
+ - default
+ package_update: false
+ package_upgrade: false
+ manage_etc_hosts: localhost
+
+ storperf_agent_port:
+ type: "OS::Neutron::Port"
+ properties:
+ network_id: { get_param: agent_network }
+ security_groups:
+ - { get_resource: storperf_security_group }
+
+ storperf_security_group:
+ type: OS::Neutron::SecurityGroup
+ properties:
+ description: Neutron security group rules
+ name: StorPerf-Security-Group
+ rules:
+ - remote_ip_prefix: 0.0.0.0/0
+ protocol: tcp
+ direction: ingress
+ - remote_ip_prefix: 0.0.0.0/0
+ protocol: icmp
+ direction: ingress
+
+ agent_volume:
+ type: OS::Cinder::Volume
+ properties:
+ size: { get_param: volume_size }
+
+ agent_volume_att:
+ type: OS::Cinder::VolumeAttachment
+ properties:
+ instance_uuid: { get_resource: storperf_agent }
+ volume_id: { get_resource: agent_volume}
+
+outputs:
+ storperf_agent_ip:
+ description: The IP address of the agent on the StorPerf network
+ value: { get_attr: [ storperf_agent, first_address ] } \ No newline at end of file
diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py
new file mode 100644
index 0000000..9e88a3c
--- /dev/null
+++ b/storperf/storperf_master.py
@@ -0,0 +1,342 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from threading import Thread
+import logging
+import os
+import subprocess
+
+from db.configuration_db import ConfigurationDB
+from test_executor import TestExecutor
+import cinderclient.v2 as cinderclient
+import heatclient.client as heatclient
+import keystoneclient.v2_0 as ksclient
+
+
+class ParameterError(Exception):
+ """ """
+
+
+class StorPerfMaster(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ self.configuration_db = ConfigurationDB()
+
+ template_file = open("storperf/resources/hot/agent-group.yaml")
+ self._agent_group_hot = template_file.read()
+ template_file = open("storperf/resources/hot/storperf-agent.yaml")
+ self._agent_resource_hot = template_file.read()
+ self._hot_files = {
+ 'storperf-agent.yaml': self._agent_resource_hot
+ }
+ self.logger.debug(
+ "Loaded agent-group template as: " + self._agent_group_hot)
+ self.logger.debug(
+ "Loaded agent-resource template as: " + self._agent_resource_hot)
+
+ self._username = os.environ.get('OS_USERNAME')
+ self._password = os.environ.get('OS_PASSWORD')
+ self._tenant_name = os.environ.get('OS_TENANT_NAME')
+ self._project_name = os.environ.get('OS_PROJECT_NAME')
+ self._auth_url = os.environ.get('OS_AUTH_URL')
+
+ self._cinder_client = None
+ self._heat_client = None
+ self._test_executor = TestExecutor()
+
+ @property
+ def volume_size(self):
+ value = self.configuration_db.get_configuration_value(
+ 'stack',
+ 'volume_size')
+ if (value is None):
+ self.volume_size = 1
+ value = 1
+ return int(value)
+
+ @volume_size.setter
+ def volume_size(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change volume size after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'volume_size',
+ value)
+
+ @property
+ def agent_count(self):
+ value = self.configuration_db.get_configuration_value(
+ 'stack',
+ 'agent_count')
+
+ if (value is None):
+ self.agent_count = 1
+ value = 1
+ return int(value)
+
+ @agent_count.setter
+ def agent_count(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change agent count after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'agent_count',
+ value)
+
+ @property
+ def agent_network(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'agent_network')
+
+ @agent_network.setter
+ def agent_network(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change agent network after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'agent_network',
+ value)
+
+ @property
+ def stack_id(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'stack_id')
+
+ @stack_id.setter
+ def stack_id(self, value):
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'stack_id',
+ value)
+
+ @property
+ def volume_quota(self):
+ self._attach_to_openstack()
+ quotas = self._cinder_client.quotas.get(self._tenant_name)
+ return int(quotas.volumes)
+
+ @property
+ def filename(self):
+ return self._test_executor.filename
+
+ @filename.setter
+ def filename(self, value):
+ self._test_executor.filename = value
+
+ @property
+ def precondition(self):
+ return self._test_executor.precondition
+
+ @precondition.setter
+ def precondition(self, value):
+ self._test_executor.precondition = value
+
+ @property
+ def warm_up(self):
+ return self._test_executor.warm
+
+ @warm_up.setter
+ def warm_up(self, value):
+ self._test_executor.warm = value
+
+ @property
+ def is_stack_created(self):
+ if (self.stack_id is not None):
+ self._attach_to_openstack()
+
+ stack = self._heat_client.stacks.get(self.stack_id)
+ status = getattr(stack, 'stack_status')
+
+ self.logger.info("Status=" + status)
+ if (status == u'CREATE_COMPLETE'):
+ return True
+
+ return False
+
+ @property
+ def workloads(self):
+ return self.configuration_db.get_configuration_value(
+ 'workload',
+ 'workloads')
+
+ @workloads.setter
+ def workloads(self, value):
+ self._test_executor.register_workloads(value)
+
+ self.configuration_db.set_configuration_value(
+ 'workload',
+ 'workloads',
+ str(self._test_executor.workload_modules))
+
+ def create_stack(self):
+ if (self.stack_id is not None):
+ raise ParameterError("ERROR: Stack has already been created")
+
+ self._attach_to_openstack()
+ if (self.agent_count > self.volume_quota):
+ message = "ERROR: Volume quota too low: " + \
+ str(self.agent_count) + " > " + str(self.volume_quota)
+ raise ParameterError(message)
+
+ stack = self._heat_client.stacks.create(
+ stack_name="StorPerfAgentGroup",
+ template=self._agent_group_hot,
+ files=self._hot_files,
+ parameters=self._make_parameters())
+
+ self.stack_id = stack['stack']['id']
+ pass
+
+ def validate_stack(self):
+ self._attach_to_openstack()
+ if (self.agent_count > self.volume_quota):
+ message = "ERROR: Volume quota too low: " + \
+ str(self.agent_count) + " > " + str(self.volume_quota)
+ self.logger.error(message)
+ raise ParameterError(message)
+
+ self._heat_client.stacks.preview(
+ stack_name="StorPerfAgentGroup",
+ template=self._agent_group_hot,
+ files=self._hot_files,
+ parameters=self._make_parameters())
+ return True
+
+ def wait_for_stack_creation(self):
+
+ pass
+
+ def delete_stack(self):
+ if (self.stack_id is None):
+ raise ParameterError("ERROR: Stack does not exist")
+
+ self._attach_to_openstack()
+
+ self._heat_client.stacks.delete(stack_id=self.stack_id)
+ self.stack_id = None
+
+ pass
+
+ def execute_workloads(self):
+
+ if (self.stack_id is None):
+ raise ParameterError("ERROR: Stack does not exist")
+
+ self._attach_to_openstack()
+
+ stack = self._heat_client.stacks.get(self.stack_id)
+ outputs = getattr(stack, 'outputs')
+ slaves = outputs[0]['output_value']
+
+ setup_threads = []
+
+ for slave in slaves:
+ t = Thread(target=self._setup_slave, args=(slave,))
+ setup_threads.append(t)
+ t.start()
+
+ for thread in setup_threads:
+ thread.join()
+
+ self._test_executor.slaves = slaves
+ return self._test_executor.execute()
+
+ def _setup_slave(self, slave):
+ logger = logging.getLogger(__name__ + ":" + slave)
+
+ logger.info("Initializing slave at " + slave)
+
+ args = ['scp', '-o', 'StrictHostKeyChecking=no',
+ '-i', 'storperf/resources/ssh/storperf_rsa',
+ '/lib/x86_64-linux-gnu/libaio.so.1',
+ 'ubuntu@' + slave + ":"]
+
+ logger.debug(args)
+ proc = subprocess.Popen(args,
+ universal_newlines=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ (stdout, stderr) = proc.communicate()
+ if (len(stdout) > 0):
+ logger.debug(stdout.decode('utf-8').strip())
+ if (len(stderr) > 0):
+ logger.error(stderr.decode('utf-8').strip())
+
+ args = ['scp', '-o', 'StrictHostKeyChecking=no',
+ '-i', 'storperf/resources/ssh/storperf_rsa',
+ '/usr/local/bin/fio',
+ 'ubuntu@' + slave + ":"]
+
+ logger.debug(args)
+ proc = subprocess.Popen(args,
+ universal_newlines=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ (stdout, stderr) = proc.communicate()
+ if (len(stdout) > 0):
+ logger.debug(stdout.decode('utf-8').strip())
+ if (len(stderr) > 0):
+ logger.error(stderr.decode('utf-8').strip())
+
+ args = ['ssh', '-o', 'StrictHostKeyChecking=no',
+ '-i', 'storperf/resources/ssh/storperf_rsa',
+ 'ubuntu@' + slave,
+ 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1'
+ ]
+
+ logger.debug(args)
+ proc = subprocess.Popen(args,
+ universal_newlines=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ (stdout, stderr) = proc.communicate()
+ if (len(stdout) > 0):
+ logger.debug(stdout.decode('utf-8').strip())
+ if (len(stderr) > 0):
+ logger.error(stderr.decode('utf-8').strip())
+
+ def _make_parameters(self):
+ heat_parameters = {}
+ heat_parameters['agent_network'] = self.agent_network
+ heat_parameters['agent_count'] = self.agent_count
+ heat_parameters['volume_size'] = self.volume_size
+ return heat_parameters
+
+ def _attach_to_openstack(self):
+
+ if (self._cinder_client is None):
+ self._cinder_client = cinderclient.Client(
+ self._username, self._password, self._project_name,
+ self._auth_url, service_type='volumev2')
+ self._cinder_client.authenticate()
+
+ if (self._heat_client is None):
+ self._keystone_client = ksclient.Client(
+ auth_url=self._auth_url,
+ username=self._username,
+ password=self._password,
+ tenant_name=self._tenant_name)
+ heat_endpoint = self._keystone_client.service_catalog.url_for(
+ service_type='orchestration')
+ self._heat_client = heatclient.Client(
+ '1', endpoint=heat_endpoint,
+ token=self._keystone_client.auth_token)
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 462f06b..a1a817e 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -7,23 +7,20 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+from os import listdir
+from os.path import isfile, join
+from storperf.carbon.converter import JSONToCarbon
+from storperf.carbon.emitter import CarbonMetricTransmitter
+from storperf.db.job_db import JobDB
+from storperf.fio.fio_invoker import FIOInvoker
+from threading import Thread
import imp
import logging
-from os import listdir
import os
-from os.path import isfile, join
-import socket
-
-from carbon.converter import JSONToCarbon
-from carbon.emitter import CarbonMetricTransmitter
-from db.job_db import JobDB
-from fio.fio_invoker import FIOInvoker
class UnknownWorkload(Exception):
-
- def __init__(self, msg):
- self.msg = msg
+ pass
class TestExecutor(object):
@@ -31,7 +28,7 @@ class TestExecutor(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.workload_modules = []
- self.filename = "storperf.dat"
+ self.filename = None
self.precondition = True
self.warm = True
self.event_listeners = set()
@@ -39,6 +36,16 @@ class TestExecutor(object):
self.metrics_emitter = CarbonMetricTransmitter()
self.prefix = None
self.job_db = JobDB()
+ self._slaves = []
+
+ @property
+ def slaves(self):
+ return self._slaves
+
+ @slaves.setter
+ def slaves(self, slaves):
+ self.logger.debug("Set slaves to: " + str(slaves))
+ self._slaves = slaves
def register(self, event_listener):
self.event_listeners.add(event_listener)
@@ -46,15 +53,15 @@ class TestExecutor(object):
def unregister(self, event_listener):
self.event_listeners.discard(event_listener)
- def event(self, metric):
+ def event(self, callback_id, metric):
carbon_metrics = self.metrics_converter.convert_to_dictionary(
metric,
- self.prefix)
+ callback_id)
- read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"]
- write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"]
- read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"]
- write_iops = carbon_metrics[self.prefix + ".jobs.1.write.iops"]
+ read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
+ write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
+ read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
+ write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
message = "Average Latency us Read/Write: " + read_latency \
+ "/" + write_latency + " IOPS r/w: " + \
@@ -78,9 +85,11 @@ class TestExecutor(object):
workloads = []
for filename in workload_files:
- mname, ext = os.path.splitext(filename)
+ mname, _ = os.path.splitext(filename)
if (not mname.startswith('_')):
workloads.append(mname)
+ else:
+ workloads = workloads.split(',')
if (self.warm is True):
workloads.insert(0, "_warm_up")
@@ -94,15 +103,16 @@ class TestExecutor(object):
workload + ".py")
self.logger.debug("Found: " + str(workload_module))
if(workload_module is None):
- raise UnknownWorkload("Unknown workload: " + workload)
+ raise UnknownWorkload(
+ "ERROR: Unknown workload: " + workload)
self.workload_modules.append(workload_module)
except ImportError, err:
- raise UnknownWorkload(err)
+ raise UnknownWorkload("ERROR: " + str(err))
def load_from_file(self, uri):
uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri))
path, fname = os.path.split(uri)
- mname, ext = os.path.splitext(fname)
+ mname, _ = os.path.splitext(fname)
no_ext = os.path.join(path, mname)
self.logger.debug("Looking for: " + no_ext)
if os.path.exists(no_ext + '.pyc'):
@@ -115,21 +125,34 @@ class TestExecutor(object):
def execute(self):
- shortname = socket.getfqdn().split('.')[0]
+ self.job_db.create_job_id()
+ for slave in self.slaves:
+ t = Thread(target=self.execute_on_node, args=(slave,))
+ t.daemon = False
+ t.start()
+
+ return self.job_db.job_id
+
+ def execute_on_node(self, remote_host):
+
+ logger = logging.getLogger(__name__ + ":" + remote_host)
invoker = FIOInvoker()
+ invoker.remote_host = remote_host
invoker.register(self.event)
- self.job_db.create_job_id()
- self.logger.info("Starting job " + self.job_db.job_id)
+
+ logger.info(
+ "Starting job " + self.job_db.job_id + " on " + remote_host)
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
- self.logger.debug(
+ logger.debug(
"Found workload: " + str(constructorMethod))
workload = constructorMethod()
- workload.filename = self.filename
+ if (self.filename is not None):
+ workload.filename = self.filename
workload.invoker = invoker
if (workload_name.startswith("_")):
@@ -143,6 +166,7 @@ class TestExecutor(object):
for iodepth in iodepths:
full_workload_name = workload_name + \
+ ".host." + remote_host + \
".queue-depth." + str(iodepth) + \
".block-size." + str(blocksize)
@@ -151,14 +175,15 @@ class TestExecutor(object):
self.logger.info(
"Executing workload: " + full_workload_name)
- self.prefix = shortname + "." + self.job_db.job_id + \
+ invoker.callback_id = self.job_db.job_id + \
"." + full_workload_name
self.job_db.start_workload(full_workload_name)
workload.execute()
self.job_db.end_workload(full_workload_name)
- self.logger.info("Finished job " + self.job_db.job_id)
+ logger.info(
+ "Finished job " + self.job_db.job_id + " on " + remote_host)
def fetch_results(self, job, workload_name=""):
self.job_db.job_id = job
diff --git a/storperf/tests/carbon_tests/__init__.py b/storperf/tests/carbon_tests/__init__.py
index e69de29..73334c7 100644
--- a/storperf/tests/carbon_tests/__init__.py
+++ b/storperf/tests/carbon_tests/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/storperf/tests/carbon_tests/emitter_test.py b/storperf/tests/carbon_tests/emitter_test.py
index c26e837..f3ff57e 100644
--- a/storperf/tests/carbon_tests/emitter_test.py
+++ b/storperf/tests/carbon_tests/emitter_test.py
@@ -7,15 +7,13 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import unittest
-
-import json
+from storperf.carbon import converter
+from storperf.carbon.emitter import CarbonMetricTransmitter
+from time import sleep
import SocketServer
+import json
import threading
-from time import sleep
-
-from carbon import converter
-from carbon.emitter import CarbonMetricTransmitter
+import unittest
class MetricsHandler(SocketServer.BaseRequestHandler):
diff --git a/storperf/tests/carbon_tests/json_to_carbon_test.py b/storperf/tests/carbon_tests/json_to_carbon_test.py
index 6d62418..d309b48 100644
--- a/storperf/tests/carbon_tests/json_to_carbon_test.py
+++ b/storperf/tests/carbon_tests/json_to_carbon_test.py
@@ -7,10 +7,9 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import unittest
+from storperf.carbon.converter import JSONToCarbon
import json
-
-from carbon.converter import JSONToCarbon
+import unittest
class JSONToCarbonTest(unittest.TestCase):
diff --git a/storperf/tests/db_tests/configuration_db_test.py b/storperf/tests/db_tests/configuration_db_test.py
new file mode 100644
index 0000000..e8b7188
--- /dev/null
+++ b/storperf/tests/db_tests/configuration_db_test.py
@@ -0,0 +1,66 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from storperf.db.configuration_db import ConfigurationDB
+import os
+import unittest
+
+
+class ConfigurationDBTest(unittest.TestCase):
+
+ def setUp(self):
+ ConfigurationDB.db_name = __name__ + ".db"
+ try:
+ os.remove(ConfigurationDB.db_name)
+ except OSError:
+ pass
+
+ self.config_db = ConfigurationDB()
+
+ def test_create_key(self):
+ expected = "ABCDE-12345"
+
+ self.config_db.set_configuration_value(
+ "test", "key", expected)
+
+ actual = self.config_db.get_configuration_value(
+ "test", "key")
+
+ self.assertEqual(
+ expected, actual, "Did not expect: " + str(actual))
+
+ def test_update_key(self):
+ expected = "ABCDE-12345"
+
+ self.config_db.set_configuration_value(
+ "test", "key", "initial_value")
+
+ self.config_db.set_configuration_value(
+ "test", "key", expected)
+
+ actual = self.config_db.get_configuration_value(
+ "test", "key")
+
+ self.assertEqual(
+ expected, actual, "Did not expect: " + str(actual))
+
+ def test_deleted_key(self):
+ expected = None
+
+ self.config_db.set_configuration_value(
+ "test", "key", "initial_value")
+
+ self.config_db.delete_configuration_value(
+ "test", "key")
+
+ actual = self.config_db.get_configuration_value(
+ "test", "key")
+
+ self.assertEqual(
+ expected, actual, "Did not expect: " + str(actual))
diff --git a/storperf/tests/db_tests/job_db_test.py b/storperf/tests/db_tests/job_db_test.py
index d9b10a2..4620412 100644
--- a/storperf/tests/db_tests/job_db_test.py
+++ b/storperf/tests/db_tests/job_db_test.py
@@ -7,17 +7,23 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+from storperf.db.job_db import JobDB
+import os
+import sqlite3
import unittest
import mock
-from db.job_db import JobDB
-
class JobDBTest(unittest.TestCase):
def setUp(self):
- JobDB.db_name = ":memory:"
+
+ JobDB.db_name = __name__ + '.db'
+ try:
+ os.remove(JobDB.db_name)
+ except OSError:
+ pass
self.job = JobDB()
@mock.patch("uuid.uuid4")
@@ -56,7 +62,8 @@ class JobDBTest(unittest.TestCase):
mock_uuid.side_effect = (job_id,)
workload_name = "Workload"
- cursor = self.job.db.cursor()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
row = cursor.execute(
"""select * from jobs
@@ -97,7 +104,8 @@ class JobDBTest(unittest.TestCase):
self.job.start_workload(workload_name)
self.job.end_workload(workload_name)
- cursor = self.job.db.cursor()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
cursor.execute(
"""select job_id, workload, start, end from jobs
where job_id = ?
@@ -124,7 +132,8 @@ class JobDBTest(unittest.TestCase):
mock_uuid.side_effect = (job_id,)
workload_name = "Workload"
- cursor = self.job.db.cursor()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
self.job.start_workload(workload_name)
self.job.start_workload(workload_name)
@@ -155,7 +164,8 @@ class JobDBTest(unittest.TestCase):
self.job.end_workload(workload_name)
- cursor = self.job.db.cursor()
+ db = sqlite3.connect(JobDB.db_name)
+ cursor = db.cursor()
cursor.execute(
"""select job_id, workload, start, end from jobs
where job_id = ?
diff --git a/storperf/tests/storperf_master_test.py b/storperf/tests/storperf_master_test.py
new file mode 100644
index 0000000..ff85fb0
--- /dev/null
+++ b/storperf/tests/storperf_master_test.py
@@ -0,0 +1,51 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from storperf.db.configuration_db import ConfigurationDB
+from storperf.storperf_master import StorPerfMaster
+import os
+import unittest
+
+
+class StorPerfMasterTest(unittest.TestCase):
+
+ def setUp(self):
+ ConfigurationDB.db_name = __name__ + ".db"
+ try:
+ os.remove(ConfigurationDB.db_name)
+ except OSError:
+ pass
+ self.storperf = StorPerfMaster()
+
+ def test_agent_count(self):
+ expected = 10
+
+ self.storperf.agent_count = expected
+ actual = self.storperf.agent_count
+
+ self.assertEqual(
+ expected, actual, "Did not expect: " + str(actual))
+
+ def test_volume_size(self):
+ expected = 20
+
+ self.storperf.volume_size = expected
+ actual = self.storperf.volume_size
+
+ self.assertEqual(
+ expected, actual, "Did not expect: " + str(actual))
+
+ def test_agent_network(self):
+ expected = "ABCDEF"
+
+ self.storperf.agent_network = expected
+ actual = self.storperf.agent_network
+
+ self.assertEqual(
+ expected, actual, "Did not expect: " + str(actual))
diff --git a/storperf/workloads/__init__.py b/storperf/workloads/__init__.py
index e69de29..73334c7 100644
--- a/storperf/workloads/__init__.py
+++ b/storperf/workloads/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/storperf/workloads/_ssd_preconditioning.py b/storperf/workloads/_ssd_preconditioning.py
index e1e8bef..cce3c31 100644
--- a/storperf/workloads/_ssd_preconditioning.py
+++ b/storperf/workloads/_ssd_preconditioning.py
@@ -6,7 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
class _ssd_preconditioning(_base_workload._base_workload):
diff --git a/storperf/workloads/_warm_up.py b/storperf/workloads/_warm_up.py
index 27667ca..9cd268e 100644
--- a/storperf/workloads/_warm_up.py
+++ b/storperf/workloads/_warm_up.py
@@ -6,7 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
class _warm_up(_base_workload._base_workload):
diff --git a/storperf/workloads/rr.py b/storperf/workloads/rr.py
index 824974d..3823a4c 100644
--- a/storperf/workloads/rr.py
+++ b/storperf/workloads/rr.py
@@ -6,7 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
class rr(_base_workload._base_workload):
diff --git a/storperf/workloads/rs.py b/storperf/workloads/rs.py
index 92e3ce6..511888e 100644
--- a/storperf/workloads/rs.py
+++ b/storperf/workloads/rs.py
@@ -6,7 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
class rs(_base_workload._base_workload):
diff --git a/storperf/workloads/rw.py b/storperf/workloads/rw.py
index 2132a81..f4b6979 100644
--- a/storperf/workloads/rw.py
+++ b/storperf/workloads/rw.py
@@ -6,7 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
class rw(_base_workload._base_workload):
diff --git a/storperf/workloads/wr.py b/storperf/workloads/wr.py
index 19b2c61..457a29a 100644
--- a/storperf/workloads/wr.py
+++ b/storperf/workloads/wr.py
@@ -6,7 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
class wr(_base_workload._base_workload):
diff --git a/storperf/workloads/ws.py b/storperf/workloads/ws.py
index 8ec2ebe..f37079e 100644
--- a/storperf/workloads/ws.py
+++ b/storperf/workloads/ws.py
@@ -6,7 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from workloads import _base_workload
+from storperf.workloads import _base_workload
class ws(_base_workload._base_workload):