summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf/storperf_master.py
diff options
context:
space:
mode:
Diffstat (limited to 'docker/storperf-master/storperf/storperf_master.py')
-rw-r--r--docker/storperf-master/storperf/storperf_master.py448
1 files changed, 448 insertions, 0 deletions
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
new file mode 100644
index 0000000..fb3e269
--- /dev/null
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -0,0 +1,448 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+from datetime import datetime
+import logging
+import os
+import socket
+from storperf.db.configuration_db import ConfigurationDB
+from storperf.db.job_db import JobDB
+from storperf.test_executor import TestExecutor
+from threading import Thread
+from time import sleep
+
+from cinderclient import client as cinderclient
+import heatclient.client as heatclient
+from keystoneauth1 import loading
+from keystoneauth1 import session
+import paramiko
+from scp import SCPClient
+
+
+class ParameterError(Exception):
+ """ """
+
+
+class StorPerfMaster(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ self.configuration_db = ConfigurationDB()
+ self.job_db = JobDB()
+
+ template_file = open("storperf/resources/hot/agent-group.yaml")
+ self._agent_group_hot = template_file.read()
+ template_file = open("storperf/resources/hot/storperf-agent.yaml")
+ self._agent_resource_hot = template_file.read()
+ self._hot_files = {
+ 'storperf-agent.yaml': self._agent_resource_hot
+ }
+ self.logger.debug(
+ "Loaded agent-group template as: " + self._agent_group_hot)
+ self.logger.debug(
+ "Loaded agent-resource template as: " + self._agent_resource_hot)
+
+ self._cinder_client = None
+ self._heat_client = None
+ self._test_executor = TestExecutor()
+ self._last_openstack_auth = datetime.now()
+
+ @property
+ def volume_size(self):
+ value = self.configuration_db.get_configuration_value(
+ 'stack',
+ 'volume_size')
+ if (value is None):
+ self.volume_size = 1
+ value = 1
+ return int(value)
+
+ @volume_size.setter
+ def volume_size(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change volume size after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'volume_size',
+ value)
+
+ @property
+ def agent_count(self):
+ value = self.configuration_db.get_configuration_value(
+ 'stack',
+ 'agent_count')
+
+ if (value is None):
+ self.agent_count = 1
+ value = 1
+ return int(value)
+
+ @agent_count.setter
+ def agent_count(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change agent count after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'agent_count',
+ value)
+
+ @property
+ def agent_image(self):
+ value = self.configuration_db.get_configuration_value(
+ 'stack',
+ 'agent_image')
+
+ if (value is None):
+ value = 'Ubuntu 14.04'
+ self.agent_image = value
+
+ return value
+
+ @agent_image.setter
+ def agent_image(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change agent image after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'agent_image',
+ value)
+
+ @property
+ def public_network(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'public_network')
+
+ @public_network.setter
+ def public_network(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change public network after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'public_network',
+ value)
+
+ @property
+ def agent_flavor(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'agent_flavor')
+
+ @agent_flavor.setter
+ def agent_flavor(self, value):
+ if (self.stack_id is not None):
+ raise ParameterError(
+ "ERROR: Cannot change flavor after stack is created")
+
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'agent_flavor',
+ value)
+
+ @property
+ def stack_id(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'stack_id')
+
+ @stack_id.setter
+ def stack_id(self, value):
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'stack_id',
+ value)
+
+ @property
+ def volume_quota(self):
+ self._attach_to_openstack()
+ quotas = self._cinder_client.quotas.get(
+ os.environ.get('OS_TENANT_ID'))
+ return int(quotas.volumes)
+
+ @property
+ def filename(self):
+ return self._test_executor.filename
+
+ @filename.setter
+ def filename(self, value):
+ self._test_executor.filename = value
+
+ @property
+ def deadline(self):
+ return self._test_executor.deadline
+
+ @deadline.setter
+ def deadline(self, value):
+ self._test_executor.deadline = value
+
+ @property
+ def steady_state_samples(self):
+ return self._test_executor.steady_state_samples
+
+ @steady_state_samples.setter
+ def steady_state_samples(self, value):
+ self._test_executor.steady_state_samples = value
+
+ @property
+ def queue_depths(self):
+ return self._test_executor.queue_depths
+
+ @queue_depths.setter
+ def queue_depths(self, value):
+ self._test_executor.queue_depths = value
+
+ @property
+ def block_sizes(self):
+ return self._test_executor.block_sizes
+
+ @block_sizes.setter
+ def block_sizes(self, value):
+ self._test_executor.block_sizes = value
+
+ @property
+ def is_stack_created(self):
+ if (self.stack_id is not None):
+ self._attach_to_openstack()
+
+ stack = self._heat_client.stacks.get(self.stack_id)
+ status = getattr(stack, 'stack_status')
+
+ self.logger.info("Status=" + status)
+ if (status == u'CREATE_COMPLETE'):
+ return True
+
+ return False
+
+ @property
+ def workloads(self):
+ return self.configuration_db.get_configuration_value(
+ 'workload',
+ 'workloads')
+
+ @workloads.setter
+ def workloads(self, value):
+ self._test_executor.register_workloads(value)
+
+ self.configuration_db.set_configuration_value(
+ 'workload',
+ 'workloads',
+ str(self._test_executor.workload_modules))
+
+ def create_stack(self):
+ if (self.stack_id is not None):
+ raise ParameterError("ERROR: Stack has already been created")
+
+ self._attach_to_openstack()
+ volume_quota = self.volume_quota
+ if (volume_quota > 0 and self.agent_count > volume_quota):
+ message = "ERROR: Volume quota too low: " + \
+ str(self.agent_count) + " > " + str(self.volume_quota)
+ raise ParameterError(message)
+
+ self.logger.debug("Creating stack")
+ stack = self._heat_client.stacks.create(
+ stack_name="StorPerfAgentGroup",
+ template=self._agent_group_hot,
+ files=self._hot_files,
+ parameters=self._make_parameters())
+
+ self.stack_id = stack['stack']['id']
+
+ while True:
+ stack = self._heat_client.stacks.get(self.stack_id)
+ status = getattr(stack, 'stack_status')
+ self.logger.debug("Stack status=%s" % (status,))
+ if (status == u'CREATE_COMPLETE'):
+ return True
+ if (status == u'DELETE_COMPLETE'):
+ self.stack_id = None
+ return True
+ if (status == u'CREATE_FAILED'):
+ sleep(5)
+ self._heat_client.stacks.delete(stack_id=self.stack_id)
+ sleep(2)
+
+ def delete_stack(self):
+ if (self.stack_id is None):
+ raise ParameterError("ERROR: Stack does not exist")
+
+ self._attach_to_openstack()
+ while True:
+ stack = self._heat_client.stacks.get(self.stack_id)
+ status = getattr(stack, 'stack_status')
+ self.logger.debug("Stack status=%s" % (status,))
+ if (status == u'CREATE_COMPLETE'):
+ self._heat_client.stacks.delete(stack_id=self.stack_id)
+ if (status == u'DELETE_COMPLETE'):
+ self.stack_id = None
+ return True
+ if (status == u'DELETE_FAILED'):
+ sleep(5)
+ self._heat_client.stacks.delete(stack_id=self.stack_id)
+ sleep(2)
+
+ def execute_workloads(self, metadata={}):
+ if (self.stack_id is None):
+ raise ParameterError("ERROR: Stack does not exist")
+
+ self._attach_to_openstack()
+
+ stack = self._heat_client.stacks.get(self.stack_id)
+ outputs = getattr(stack, 'outputs')
+ slaves = outputs[0]['output_value']
+
+ setup_threads = []
+
+ for slave in slaves:
+ t = Thread(target=self._setup_slave, args=(slave,))
+ setup_threads.append(t)
+ t.start()
+
+ for thread in setup_threads:
+ thread.join()
+
+ self._test_executor.slaves = slaves
+
+ params = metadata
+ params['agent_count'] = self.agent_count
+ params['public_network'] = self.public_network
+ params['volume_size'] = self.volume_size
+ job_id = self._test_executor.execute(params)
+
+ return job_id
+
+ def terminate_workloads(self):
+ return self._test_executor.terminate()
+
+ def fetch_results(self, job_id):
+ if self._test_executor.job_db.job_id == job_id:
+ return self._test_executor.metadata['metrics']
+
+ workload_params = self.job_db.fetch_workload_params(job_id)
+ if 'report' in workload_params:
+ report = workload_params['report']
+ return report['metrics']
+ return {}
+
+ def fetch_metadata(self, job_id):
+ return self.job_db.fetch_workload_params(job_id)
+
+ def fetch_job_status(self, job_id):
+ return self._test_executor.execution_status(job_id)
+
+ def _setup_slave(self, slave):
+ logger = logging.getLogger(__name__ + ":" + slave)
+
+ logger.info("Initializing slave at " + slave)
+
+ logger.debug("Checking if slave " + slave + " is alive")
+
+ alive = False
+ timer = 10
+ while not alive:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ result = s.connect_ex((slave, 22))
+ s.close()
+
+ if result:
+ alive = False
+ sleep(1)
+ timer -= 1
+ if timer == 0:
+ logger.debug("Still waiting for slave " + slave)
+ timer = 10
+ else:
+ alive = True
+ logger.debug("Slave " + slave + " is alive and ready")
+
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.connect(slave, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+
+ scp = SCPClient(ssh.get_transport())
+ logger.debug("Transferring libaio.so.1 to %s" % slave)
+ scp.put('/lib/x86_64-linux-gnu/libaio.so.1', '~/')
+ logger.debug("Transferring fio to %s" % slave)
+ scp.put('/usr/local/bin/fio', '~/')
+
+ cmd = 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1'
+ logger.debug("Executing on %s: %s" % (slave, cmd))
+ (_, stdout, stderr) = ssh.exec_command(cmd)
+
+ for line in stdout.readlines():
+ logger.debug(line.strip())
+ for line in stderr.readlines():
+ logger.error(line.strip())
+
+ def _make_parameters(self):
+ heat_parameters = {}
+ heat_parameters['public_network'] = self.public_network
+ heat_parameters['agent_count'] = self.agent_count
+ heat_parameters['volume_size'] = self.volume_size
+ heat_parameters['agent_image'] = self.agent_image
+ heat_parameters['agent_flavor'] = self.agent_flavor
+ return heat_parameters
+
+ def _attach_to_openstack(self):
+
+ time_since_last_auth = datetime.now() - self._last_openstack_auth
+
+ if (self._heat_client is None or
+ time_since_last_auth.total_seconds() > 600):
+ self._last_openstack_auth = datetime.now()
+
+ creds = {
+ "username": os.environ.get('OS_USERNAME'),
+ "password": os.environ.get('OS_PASSWORD'),
+ "auth_url": os.environ.get('OS_AUTH_URL'),
+ "project_domain_id":
+ os.environ.get('OS_PROJECT_DOMAIN_ID'),
+ "project_domain_name":
+ os.environ.get('OS_PROJECT_DOMAIN_NAME'),
+ "project_id": os.environ.get('OS_PROJECT_ID'),
+ "project_name": os.environ.get('OS_PROJECT_NAME'),
+ "tenant_name": os.environ.get('OS_TENANT_NAME'),
+ "tenant_id": os.environ.get("OS_TENANT_ID"),
+ "user_domain_id": os.environ.get('OS_USER_DOMAIN_ID'),
+ "user_domain_name": os.environ.get('OS_USER_DOMAIN_NAME')
+ }
+
+ self.logger.debug("Creds: %s" % creds)
+
+ loader = loading.get_plugin_loader('password')
+ auth = loader.load_from_options(**creds)
+ sess = session.Session(auth=auth)
+
+ self.logger.debug("Looking up orchestration endpoint")
+ heat_endpoint = sess.get_endpoint(auth=auth,
+ service_type="orchestration",
+ endpoint_type='publicURL')
+
+ self.logger.debug("Orchestration endpoint is %s" % heat_endpoint)
+ token = sess.get_token(auth=auth)
+
+ self._heat_client = heatclient.Client(
+ "1",
+ endpoint=heat_endpoint,
+ token=token)
+
+ self.logger.debug("Creating cinder client")
+ self._cinder_client = cinderclient.Client("2", session=sess)
+ self.logger.debug("OpenStack authentication complete")