diff options
Diffstat (limited to 'docker/storperf-master')
-rw-r--r-- | docker/storperf-master/Dockerfile | 2 | ||||
-rw-r--r-- | docker/storperf-master/rest_server.py | 8 | ||||
-rw-r--r-- | docker/storperf-master/storperf/db/test_results_db.py | 5 | ||||
-rw-r--r-- | docker/storperf-master/storperf/fio/fio_invoker.py | 29 | ||||
-rw-r--r-- | docker/storperf-master/storperf/storperf_master.py | 75 | ||||
-rw-r--r-- | docker/storperf-master/storperf/test_executor.py | 173 |
6 files changed, 191 insertions, 101 deletions
diff --git a/docker/storperf-master/Dockerfile b/docker/storperf-master/Dockerfile index 38bd231..eaaf811 100644 --- a/docker/storperf-master/Dockerfile +++ b/docker/storperf-master/Dockerfile @@ -15,7 +15,7 @@ # $ docker build -t opnfv/storperf-master:tag . # -ARG ARCH +ARG ARCH=x86_64 ARG ALPINE_VERSION=v3.6 FROM multiarch/alpine:$ARCH-$ALPINE_VERSION as storperf-builder diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py index 6da2004..0634b8f 100644 --- a/docker/storperf-master/rest_server.py +++ b/docker/storperf-master/rest_server.py @@ -64,7 +64,9 @@ class ConfigurationRequestModel: 'agent_image': fields.String, 'public_network': fields.String, 'volume_size': fields.Integer, - 'availability_zone': fields.String + 'availability_zone': fields.String, + 'username': fields.String, + 'password': fields.String } @@ -137,6 +139,10 @@ class Configure(Resource): storperf.volume_size = request.json['volume_size'] if ('availability_zone' in request.json): storperf.availabilty_zone = request.json['availability_zone'] + if ('username' in request.json): + storperf.username = request.json['username'] + if ('password' in request.json): + storperf.password = request.json['password'] storperf.create_stack() if storperf.stack_id is None: diff --git a/docker/storperf-master/storperf/db/test_results_db.py b/docker/storperf-master/storperf/db/test_results_db.py index d6aabee..9c87e32 100644 --- a/docker/storperf-master/storperf/db/test_results_db.py +++ b/docker/storperf-master/storperf/db/test_results_db.py @@ -35,7 +35,6 @@ def push_results_to_db(db_url, details, logger): logger.debug(r.content) return json.loads(r.content) except Exception: - if logger: - logger.exception("Error [push_results_to_db('%s', '%s', '%s')]:" % - (db_url, params, details['details'])) + logger.exception("Error [push_results_to_db('%s', '%s')]:" % + (db_url, params)) return None diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py index 106696d..0360ea2 100644 --- a/docker/storperf-master/storperf/fio/fio_invoker.py +++ b/docker/storperf-master/storperf/fio/fio_invoker.py @@ -15,13 +15,14 @@ import paramiko class FIOInvoker(object): - def __init__(self): + def __init__(self, var_dict={}): self.logger = logging.getLogger(__name__) self.event_listeners = set() self.event_callback_ids = set() self._remote_host = None self.callback_id = None self.terminated = False + self.metadata = var_dict @property def remote_host(self): @@ -90,11 +91,7 @@ class FIOInvoker(object): self.logger.debug("Finished") def execute(self, args=[]): - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(self.remote_host, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh = self._ssh_client() command = "sudo ./fio " + ' '.join(args) self.logger.debug("Remote command: %s" % command) @@ -133,11 +130,7 @@ class FIOInvoker(object): self.logger.debug("Terminating fio on " + self.remote_host) self.terminated = True - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(self.remote_host, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh = self._ssh_client() command = "sudo killall fio" @@ -151,3 +144,17 @@ class FIOInvoker(object): stdout.close() stderr.close() + + def _ssh_client(self): + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + if 'username' in self.metadata and 'password' in self.metadata: + ssh.connect(self.remote_host, + username=self.metadata['username'], + password=self.metadata['password']) + return ssh + else: + ssh.connect(self.remote_host, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + return ssh diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 4e99e57..8a67048 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -257,6 +257,36 @@ class StorPerfMaster(object): 'workloads', str(self._test_executor.workload_modules)) + @property + def username(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'username' + ) + + @username.setter + def username(self, value): + self.configuration_db.set_configuration_value( + 'stack', + 'username', + value + ) + + @property + def password(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'password' + ) + + @password.setter + def password(self, value): + self.configuration_db.set_configuration_value( + 'stack', + 'password', + value + ) + def get_logs(self, lines=None): LOG_DIR = './storperf.log' @@ -354,6 +384,9 @@ class StorPerfMaster(object): params['agent_count'] = self.agent_count params['public_network'] = self.public_network params['volume_size'] = self.volume_size + if self.username and self.password: + params['username'] = self.username + params['password'] = self.password job_id = self._test_executor.execute(params) return job_id @@ -424,14 +457,50 @@ class StorPerfMaster(object): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(slave, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + if self.username and self.password: + ssh.connect(slave, + username=self.username, + password=self.password) + else: + ssh.connect(slave, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + available = self._check_root_fs(ssh) + logger.debug("Available space on / is %s" % available) + if available < 65536: + logger.warn("Root filesystem is too small, attemping resize") + self._resize_root_fs(ssh, logger) + + available = self._check_root_fs(ssh) + logger.debug("Available space on / is now %s" % available) + if available < 65536: + logger.error("Cannot create enough space on /") + raise Exception("Root filesystem has only %s free" % + available) scp = SCPClient(ssh.get_transport()) logger.debug("Transferring fio to %s" % slave) scp.put('/usr/local/bin/fio', '~/') + def _check_root_fs(self, ssh): + (_, stdout, _) = ssh.exec_command("df /") + stdout.readline() + lines = stdout.readline().split() + if len(lines) > 4: + available = lines[3] + return int(available) + + def _resize_root_fs(self, ssh, logger): + command = "sudo /usr/sbin/resize2fs /dev/vda1" + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + def _make_parameters(self): heat_parameters = {} heat_parameters['public_network'] = self.public_network diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index 3d1d9f2..4c2c972 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -235,25 +235,90 @@ class TestExecutor(object): self.start_time = time.time() self.workload_status = {} - # Prepare stats list - for workload_module in self.workload_modules: - workload_name = getattr(workload_module, "__name__") - blocksizes = self._block_sizes - iodepths = self._queue_depths - for blocksize in blocksizes: - for iodepth in iodepths: - name = '%s.%s.queue-depth.%s.block-size.%s' % \ - (self.job_db.job_id, workload_name, iodepth, blocksize) - self.workload_status[name] = "Pending" + + workloads = self._create_workload_matrix() + + for current_workload in workloads: + workload = current_workload['workload'] + self._thread_gate = ThreadGate(len(self.slaves), + workload.options['status-interval']) + + if self._terminated: + return + self.current_workload = current_workload['name'] + + self.logger.info("Starting run %s" % self.current_workload) + self.workload_status[self.current_workload] = "Running" + + scheduler = sched.scheduler(time.time, time.sleep) + if self.deadline is not None \ + and not current_workload['workload_name'].startswith("_"): + event = scheduler.enter(self.deadline * 60, 1, + self.terminate_current_run, + ()) + t = Thread(target=scheduler.run, args=()) + t.start() + + workload.options['iodepth'] = str(current_workload['queue-depth']) + workload.options['bs'] = str(current_workload['blocksize']) + + slave_threads = [] + for slave in self.slaves: + slave_workload = copy.copy(current_workload['workload']) + slave_workload.remote_host = slave + + self._workload_executors.append(slave_workload) + + t = Thread(target=self.execute_on_node, + args=(slave_workload,), + name="%s worker" % slave) + t.daemon = False + t.start() + slave_threads.append(t) + + for slave_thread in slave_threads: + self.logger.debug("Waiting on %s" % slave_thread) + slave_thread.join() + self.logger.debug("Done waiting for %s" % slave_thread) + + if not scheduler.empty(): + try: + scheduler.cancel(event) + except ValueError: + pass + + self.logger.info("Completed run %s" + % self.current_workload) + self.workload_status[self.current_workload] = "Completed" + self._workload_executors = [] + self.current_workload = None + + self.logger.info("Completed job %s" % (self.job_db.job_id)) + + self.end_time = time.time() + self._terminated = True + self.broadcast_event() + self.unregister(data_handler.data_event) + report = {'report': json.dumps(self.metadata)} + self.job_db.record_workload_params(report) + self.job_db.job_id = None + if self.result_url is not None: + self.logger.info("Results can be found at %s" % self.result_url) + + def _create_workload_matrix(self): + workloads = [] for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") - self.logger.info("Starting workload %s" % (workload_name)) constructorMethod = getattr(workload_module, workload_name) workload = constructorMethod() if (self.filename is not None): workload.filename = self.filename + workload.id = self.job_db.job_id + + if (self.filename is not None): + workload.filename = self.filename if (workload_name.startswith("_")): iodepths = [8, ] @@ -262,85 +327,29 @@ class TestExecutor(object): iodepths = self._queue_depths blocksizes = self._block_sizes - workload.id = self.job_db.job_id - self._thread_gate = ThreadGate(len(self.slaves), - workload.options['status-interval']) - for blocksize in blocksizes: for iodepth in iodepths: - if self._terminated: - return - self.current_workload = ( - "%s.%s.queue-depth.%s.block-size.%s" - % (self.job_db.job_id, - workload_name, - iodepth, - blocksize)) - - self.logger.info("Starting run %s" % self.current_workload) - self.workload_status[self.current_workload] = "Running" - - scheduler = sched.scheduler(time.time, time.sleep) - if self.deadline is not None \ - and not workload_name.startswith("_"): - event = scheduler.enter(self.deadline * 60, 1, - self.terminate_current_run, - ()) - t = Thread(target=scheduler.run, args=()) - t.start() - - workload.options['iodepth'] = str(iodepth) - workload.options['bs'] = str(blocksize) - - slave_threads = [] - for slave in self.slaves: - slave_workload = copy.copy(workload) - slave_workload.remote_host = slave - - self._workload_executors.append(slave_workload) - - t = Thread(target=self.execute_on_node, - args=(slave_workload,), - name="%s worker" % slave) - t.daemon = False - t.start() - slave_threads.append(t) - - for slave_thread in slave_threads: - self.logger.debug("Waiting on %s" % slave_thread) - slave_thread.join() - self.logger.debug("Done waiting for %s" % slave_thread) - - if not scheduler.empty(): - try: - scheduler.cancel(event) - except ValueError: - pass - - self.logger.info("Completed run %s" - % self.current_workload) - self.workload_status[self.current_workload] = "Completed" - self._workload_executors = [] - self.current_workload = None - - self.logger.info("Completed workload %s" % (workload_name)) - self.logger.info("Completed job %s" % (self.job_db.job_id)) + name = '%s.%s.queue-depth.%s.block-size.%s' % \ + (self.job_db.job_id, workload_name, iodepth, blocksize) + self.workload_status[name] = "Pending" - if self.result_url is not None: - self.logger.info("Results can be found at %s" % self.result_url) + parameters = {'queue-depth': iodepth, + 'blocksize': blocksize, + 'name': name, + 'workload_name': workload_name, + 'status': 'Pending', + 'workload': workload} - self.end_time = time.time() - self._terminated = True - self.broadcast_event() - self.unregister(data_handler.data_event) - report = {'report': json.dumps(self.metadata)} - self.job_db.record_workload_params(report) - self.job_db.job_id = None + self.logger.info("Workload %s=%s" % (name, parameters)) + + workloads.append(parameters) + + return workloads def execute_on_node(self, workload): - invoker = FIOInvoker() + invoker = FIOInvoker(self.metadata) invoker.register(self.event) workload.invoker = invoker |