############################################################################## # Copyright (c) 2016 EMC and others. # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Apache License, Version 2.0 # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## import copy import imp import logging from os import listdir import os from os.path import isfile, join import sched from threading import Thread import time from storperf.carbon.converter import Converter from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db import test_results_db from storperf.db.graphite_db import GraphiteDB from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from storperf.utilities import dictionary class UnknownWorkload(Exception): pass class TestExecutor(object): def __init__(self): self.logger = logging.getLogger(__name__) self.workload_modules = [] self.filename = None self.precondition = True self.deadline = None self.warm = True self.metadata = None self._queue_depths = [1, 4, 8] self._block_sizes = [512, 4096, 16384] self.event_listeners = set() self.metrics_converter = Converter() self.metrics_emitter = CarbonMetricTransmitter() self.prefix = None self.job_db = JobDB() self._slaves = [] self._terminated = False self._workload_executors = [] self._workload_thread = None @property def slaves(self): return self._slaves @slaves.setter def slaves(self, slaves): self.logger.debug("Set slaves to: " + str(slaves)) self._slaves = slaves @property def queue_depths(self): return ','.join(self._queue_depths) @queue_depths.setter def queue_depths(self, queue_depths): self.logger.debug("Set queue_depths to: " + str(queue_depths)) self._queue_depths = queue_depths.split(',') @property def block_sizes(self): return ','.join(self._block_sizes) @block_sizes.setter def block_sizes(self, block_sizes): self.logger.debug("Set block_sizes to: " + str(block_sizes)) self._block_sizes = block_sizes.split(',') def register(self, event_listener): self.event_listeners.add(event_listener) def unregister(self, event_listener): self.event_listeners.discard(event_listener) def event(self, callback_id, metric): carbon_metrics = self.metrics_converter.convert_json_to_flat( metric, callback_id) self.metrics_emitter.transmit_metrics(carbon_metrics) def register_workloads(self, workloads): self.workload_modules = [] if (workloads is None or len(workloads) == 0): workload_dir = os.path.normpath( os.path.join(os.path.dirname(__file__), "workloads")) workload_files = [ f for f in listdir(workload_dir) if isfile(join(workload_dir, f))] workloads = [] for filename in workload_files: mname, _ = os.path.splitext(filename) if (not mname.startswith('_')): workloads.append(mname) else: workloads = workloads.split(',') if (self.warm is True): workloads.insert(0, "_warm_up") if (self.precondition is True): workloads.insert(0, "_ssd_preconditioning") for workload in workloads: try: workload_module = self.load_from_file("workloads/" + workload + ".py") self.logger.debug("Found: " + str(workload_module)) if(workload_module is None): raise UnknownWorkload( "ERROR: Unknown workload: " + workload) if workload_module not in self.workload_modules: self.workload_modules.append(workload_module) except ImportError, err: raise UnknownWorkload("ERROR: " + str(err)) def load_from_file(self, uri): uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri)) path, fname = os.path.split(uri) mname, _ = os.path.splitext(fname) no_ext = os.path.join(path, mname) self.logger.debug("Looking for: " + no_ext) if os.path.exists(no_ext + '.pyc'): self.logger.debug("Loading compiled: " + mname + " from " + no_ext) return imp.load_compiled(mname, no_ext + '.pyc') if os.path.exists(no_ext + '.py'): self.logger.debug("Compiling: " + mname + " from " + no_ext) return imp.load_source(mname, no_ext + '.py') return None def execute(self, metadata): self.job_db.create_job_id() self.job_db.record_workload_params(metadata) self.metadata = metadata self._workload_thread = Thread(target=self.execute_workloads, args=()) self._workload_thread.start() return self.job_db.job_id def terminate(self): self._terminated = True return self.terminate_current_run() def terminate_current_run(self): self.logger.info("Terminating current run") terminated_hosts = [] for workload in self._workload_executors: workload.terminate() terminated_hosts.append(workload.remote_host) return terminated_hosts def execution_status(self, job_id): if self.job_db.job_id != job_id: return "Completed" if (self._terminated is False): return "Running" return "Completed" def execute_workloads(self): self._terminated = False self.logger.info("Starting job %s" % (self.job_db.job_id)) start_time = time.time() for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") self.logger.info("Starting workload %s" % (workload_name)) constructorMethod = getattr(workload_module, workload_name) workload = constructorMethod() if (self.filename is not None): workload.filename = self.filename if (workload_name.startswith("_")): iodepths = [8, ] blocksizes = [16384, ] else: iodepths = self._queue_depths blocksizes = self._block_sizes workload.id = self.job_db.job_id for blocksize in blocksizes: for iodepth in iodepths: scheduler = sched.scheduler(time.time, time.sleep) if self._terminated: return if self.deadline is not None \ and not workload_name.startswith("_"): event = scheduler.enter(self.deadline * 60, 1, self.terminate_current_run, ()) t = Thread(target=scheduler.run, args=()) t.start() workload.options['iodepth'] = str(iodepth) workload.options['bs'] = str(blocksize) slave_threads = [] for slave in self.slaves: slave_workload = copy.copy(workload) slave_workload.remote_host = slave self._workload_executors.append(slave_workload) t = Thread(target=self.execute_on_node, args=(slave_workload,)) t.daemon = False t.start() slave_threads.append(t) for slave_thread in slave_threads: slave_thread.join() if not scheduler.empty(): try: scheduler.cancel(event) except: pass self._workload_executors = [] self.logger.info("Completed workload %s" % (workload_name)) self.logger.info("Completed job %s" % (self.job_db.job_id)) self._terminated = True end_time = time.time() pod_name = dictionary.get_key_from_dict(self.metadata, 'pod_name', 'Unknown') version = dictionary.get_key_from_dict(self.metadata, 'version', 'Unknown') scenario = dictionary.get_key_from_dict(self.metadata, 'scenario', 'Unknown') build_tag = dictionary.get_key_from_dict(self.metadata, 'build_tag', 'Unknown') duration = end_time - start_time test_db = os.environ.get('TEST_DB_URL') if test_db is not None: # I really do not like doing this. As our threads just # terminated, their final results are still being spooled # off to Carbon. Need to give that a little time to finish time.sleep(5) self.logger.info("Pushing results to %s" % (test_db)) payload = self.metadata payload['timestart'] = start_time payload['duration'] = duration payload['status'] = 'OK' graphite_db = GraphiteDB() payload['metrics'] = graphite_db.fetch_averages(self.job_db.job_id) criteria = {} criteria['block_sizes'] = self.block_sizes criteria['queue_depths'] = self.queue_depths try: test_results_db.push_results_to_db(test_db, "storperf", "Latency Test", self.logger, pod_name, version, scenario, criteria, build_tag, payload) except: self.logger.exception("Error pushing results into Database") def execute_on_node(self, workload): invoker = FIOInvoker() invoker.register(self.event) workload.invoker = invoker self.logger.info("Starting " + workload.fullname) self.job_db.start_workload(workload) workload.execute() self.job_db.end_workload(workload) self.logger.info("Ended " + workload.fullname)