diff options
Diffstat (limited to 'docker/storperf-master/storperf')
-rw-r--r-- | docker/storperf-master/storperf/carbon/emitter.py | 75 | ||||
-rw-r--r-- | docker/storperf-master/storperf/db/graphite_db.py | 31 | ||||
-rw-r--r-- | docker/storperf-master/storperf/db/test_results_db.py | 37 | ||||
-rw-r--r-- | docker/storperf-master/storperf/fio/fio_invoker.py | 29 | ||||
-rw-r--r-- | docker/storperf-master/storperf/storperf_master.py | 124 | ||||
-rw-r--r-- | docker/storperf-master/storperf/test_executor.py | 62 | ||||
-rw-r--r-- | docker/storperf-master/storperf/utilities/data_handler.py | 97 | ||||
-rw-r--r-- | docker/storperf-master/storperf/workloads/_base_workload.py | 12 |
8 files changed, 300 insertions, 167 deletions
diff --git a/docker/storperf-master/storperf/carbon/emitter.py b/docker/storperf-master/storperf/carbon/emitter.py index e23dc79..05f6c3c 100644 --- a/docker/storperf-master/storperf/carbon/emitter.py +++ b/docker/storperf-master/storperf/carbon/emitter.py @@ -11,28 +11,75 @@ import logging import socket import time +from storperf.db.graphite_db import GraphiteDB + class CarbonMetricTransmitter(): - carbon_host = '127.0.0.1' - carbon_port = 2003 + carbon_servers = [('127.0.0.1', 2003), + ('storperf-graphite', 2003)] def __init__(self): self.logger = logging.getLogger(__name__) + self.graphite_db = GraphiteDB() + self.commit_markers = {} - def transmit_metrics(self, metrics): - if 'timestamp' in metrics: - metrics.pop('timestamp') + def transmit_metrics(self, metrics, commit_marker): timestamp = str(calendar.timegm(time.gmtime())) + self.commit_markers[commit_marker] = int(timestamp) + + carbon_socket = None + + for host, port in self.carbon_servers: + try: + carbon_socket = socket.socket(socket.AF_INET, + socket.SOCK_STREAM) + carbon_socket.connect((host, port)) + + for key, value in metrics.items(): + try: + float(value) + message = "%s %s %s\n" \ + % (key, value, timestamp) + self.logger.debug("Metric: " + message.strip()) + carbon_socket.send(message) + except ValueError: + self.logger.debug("Ignoring non numeric metric %s %s" + % (key, value)) + + message = "%s.commit-marker %s %s\n" \ + % (commit_marker, timestamp, timestamp) + carbon_socket.send(message) + self.logger.debug("Marker %s" % message.strip()) + self.logger.info("Sent metrics to %s:%s with timestamp %s" + % (host, port, timestamp)) + + except Exception, e: + self.logger.error("While notifying carbon %s:%s %s" + % (host, port, e)) + + if carbon_socket is not None: + carbon_socket.close() + + def confirm_commit(self, commit_marker): + marker_timestamp = self.commit_markers[commit_marker] + request = "%s.commit-marker&from=%s" \ + % (commit_marker, marker_timestamp - 60) + marker_data = self.graphite_db.fetch_item(request) + self.logger.debug("Marker data %s" % marker_data) + fetched_timestamps = self.parse_timestamp(marker_data) - carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - carbon_socket.connect((self.carbon_host, self.carbon_port)) + return marker_timestamp in fetched_timestamps - for key, metric in metrics.items(): - message = key + " " + metric + " " + timestamp - self.logger.debug("Metric: " + message) - carbon_socket.send(message + '\n') + def parse_timestamp(self, marker_data): + timestamps = [] + if (type(marker_data) is list and + len(marker_data) > 0): + datapoints = marker_data[0]['datapoints'] + for datapoint in datapoints: + try: + timestamps.append(int(datapoint[0])) + except Exception: + pass - carbon_socket.close() - self.logger.info("Sent metrics to carbon with timestamp %s" - % timestamp) + return timestamps diff --git a/docker/storperf-master/storperf/db/graphite_db.py b/docker/storperf-master/storperf/db/graphite_db.py index c8a2d35..aa71855 100644 --- a/docker/storperf-master/storperf/db/graphite_db.py +++ b/docker/storperf-master/storperf/db/graphite_db.py @@ -9,33 +9,44 @@ import json import logging - import requests -from storperf.db.job_db import JobDB - class GraphiteDB(object): + graphite_host = "storperf-graphite" + graphite_port = 8080 + def __init__(self): - """ - """ - self._job_db = JobDB() self.logger = logging.getLogger(__name__) + def fetch_item(self, target): + + result = None + request = ("http://%s:%s/graphite/render/?format=json&target=%s" + % (self.graphite_host, self.graphite_port, target)) + self.logger.debug("Calling %s" % (request)) + + response = requests.get(request) + if (response.status_code == 200): + result = json.loads(response.content) + + return result + def fetch_series(self, workload, metric, io_type, time, duration): series = [] end = time start = end - duration - request = ("http://127.0.0.1:8000/render/?target=" + request = ("http://%s:%s/graphite/render/?target=" "averageSeries(%s.*.jobs.1.%s.%s)" "&format=json" "&from=%s" - "&until=%s" % - (workload, io_type, metric, - start, end)) + "&until=%s" + % (self.graphite_host, self.graphite_port, + workload, io_type, metric, + start, end)) self.logger.debug("Calling %s" % (request)) response = requests.get(request) diff --git a/docker/storperf-master/storperf/db/test_results_db.py b/docker/storperf-master/storperf/db/test_results_db.py index a2f7038..9c87e32 100644 --- a/docker/storperf-master/storperf/db/test_results_db.py +++ b/docker/storperf-master/storperf/db/test_results_db.py @@ -8,38 +8,19 @@ ############################################################################## import json -import os import requests -def get_installer_type(logger=None): - """ - Get installer type (fuel, apex, joid, compass) - """ - try: - installer = os.environ['INSTALLER_TYPE'] - except KeyError: - if logger: - logger.error("Impossible to retrieve the installer type") - installer = "Unknown_installer" - - return installer - - -def push_results_to_db(db_url, project, case_name, - test_start, test_stop, logger, pod_name, - version, scenario, criteria, build_tag, details): +def push_results_to_db(db_url, details, logger): """ POST results to the Result target DB """ url = db_url + "/results" - installer = get_installer_type(logger) - params = {"project_name": project, "case_name": case_name, - "pod_name": pod_name, "installer": installer, - "version": version, "scenario": scenario, "criteria": criteria, - "build_tag": build_tag, "start_date": test_start, - "stop_date": test_stop, "details": details} + params = details.copy() + params.pop('details') + + logger.info("popped params= %s" % params) headers = {'Content-Type': 'application/json'} try: @@ -53,9 +34,7 @@ def push_results_to_db(db_url, project, case_name, logger.debug(r.status_code) logger.debug(r.content) return json.loads(r.content) - except Exception, e: - logger.error("Error [push_results_to_db('%s', '%s', '%s', " + - "'%s', '%s', '%s', '%s', '%s', '%s')]:" % - (db_url, project, case_name, pod_name, version, - scenario, criteria, build_tag, details), e) + except Exception: + logger.exception("Error [push_results_to_db('%s', '%s')]:" % + (db_url, params)) return None diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py index 106696d..0360ea2 100644 --- a/docker/storperf-master/storperf/fio/fio_invoker.py +++ b/docker/storperf-master/storperf/fio/fio_invoker.py @@ -15,13 +15,14 @@ import paramiko class FIOInvoker(object): - def __init__(self): + def __init__(self, var_dict={}): self.logger = logging.getLogger(__name__) self.event_listeners = set() self.event_callback_ids = set() self._remote_host = None self.callback_id = None self.terminated = False + self.metadata = var_dict @property def remote_host(self): @@ -90,11 +91,7 @@ class FIOInvoker(object): self.logger.debug("Finished") def execute(self, args=[]): - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(self.remote_host, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh = self._ssh_client() command = "sudo ./fio " + ' '.join(args) self.logger.debug("Remote command: %s" % command) @@ -133,11 +130,7 @@ class FIOInvoker(object): self.logger.debug("Terminating fio on " + self.remote_host) self.terminated = True - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(self.remote_host, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh = self._ssh_client() command = "sudo killall fio" @@ -151,3 +144,17 @@ class FIOInvoker(object): stdout.close() stderr.close() + + def _ssh_client(self): + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + if 'username' in self.metadata and 'password' in self.metadata: + ssh.connect(self.remote_host, + username=self.metadata['username'], + password=self.metadata['password']) + return ssh + else: + ssh.connect(self.remote_host, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + return ssh diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 8c2a7b4..8a67048 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -11,19 +11,20 @@ from datetime import datetime import logging import os import socket -from storperf.db.configuration_db import ConfigurationDB -from storperf.db.job_db import JobDB -from storperf.test_executor import TestExecutor from threading import Thread from time import sleep from cinderclient import client as cinderclient -import heatclient.client as heatclient from keystoneauth1 import loading from keystoneauth1 import session import paramiko from scp import SCPClient +import heatclient.client as heatclient +from storperf.db.configuration_db import ConfigurationDB +from storperf.db.job_db import JobDB +from storperf.test_executor import TestExecutor + class ParameterError(Exception): """ """ @@ -256,6 +257,53 @@ class StorPerfMaster(object): 'workloads', str(self._test_executor.workload_modules)) + @property + def username(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'username' + ) + + @username.setter + def username(self, value): + self.configuration_db.set_configuration_value( + 'stack', + 'username', + value + ) + + @property + def password(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'password' + ) + + @password.setter + def password(self, value): + self.configuration_db.set_configuration_value( + 'stack', + 'password', + value + ) + + def get_logs(self, lines=None): + LOG_DIR = './storperf.log' + + if isinstance(lines, int): + logs = [] + index = 0 + for line in reversed(open(LOG_DIR).readlines()): + if index != int(lines): + logs.insert(0, line.strip()) + index += 1 + else: + break + else: + with open(LOG_DIR) as f: + logs = f.read().split('\n') + return logs + def create_stack(self): if (self.stack_id is not None): raise ParameterError("ERROR: Stack has already been created") @@ -336,6 +384,9 @@ class StorPerfMaster(object): params['agent_count'] = self.agent_count params['public_network'] = self.public_network params['volume_size'] = self.volume_size + if self.username and self.password: + params['username'] = self.username + params['password'] = self.password job_id = self._test_executor.execute(params) return job_id @@ -345,7 +396,7 @@ class StorPerfMaster(object): def fetch_results(self, job_id): if self._test_executor.job_db.job_id == job_id: - return self._test_executor.metadata['metrics'] + return self._test_executor.metadata['details']['metrics'] workload_params = self.job_db.fetch_workload_params(job_id) if 'report' in workload_params: @@ -359,8 +410,25 @@ class StorPerfMaster(object): def fetch_job_status(self, job_id): return self._test_executor.execution_status(job_id) - def fetch_all_jobs(self): - return self.job_db.fetch_jobs() + def fetch_all_jobs(self, metrics_type): + job_list = self.job_db.fetch_jobs() + job_report = {} + if metrics_type is None: + job_report['job_ids'] = job_list + elif metrics_type == "metadata": + job_report['results'] = [] + for job in job_list: + if metrics_type == 'metadata': + metadata = self.fetch_metadata(job) + if 'report' in metadata: + metadata['report']['_id'] = job + metadata['report']['start_date'] = \ + metadata['report']['start_time'] + metadata['report']['end_date'] = \ + metadata['report']['end_time'] + metadata['report']['_id'] = job + job_report['results'].append(metadata['report']) + return job_report def _setup_slave(self, slave): logger = logging.getLogger(__name__ + ":" + slave) @@ -389,14 +457,50 @@ class StorPerfMaster(object): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(slave, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + if self.username and self.password: + ssh.connect(slave, + username=self.username, + password=self.password) + else: + ssh.connect(slave, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + available = self._check_root_fs(ssh) + logger.debug("Available space on / is %s" % available) + if available < 65536: + logger.warn("Root filesystem is too small, attemping resize") + self._resize_root_fs(ssh, logger) + + available = self._check_root_fs(ssh) + logger.debug("Available space on / is now %s" % available) + if available < 65536: + logger.error("Cannot create enough space on /") + raise Exception("Root filesystem has only %s free" % + available) scp = SCPClient(ssh.get_transport()) logger.debug("Transferring fio to %s" % slave) scp.put('/usr/local/bin/fio', '~/') + def _check_root_fs(self, ssh): + (_, stdout, _) = ssh.exec_command("df /") + stdout.readline() + lines = stdout.readline().split() + if len(lines) > 4: + available = lines[3] + return int(available) + + def _resize_root_fs(self, ssh, logger): + command = "sudo /usr/sbin/resize2fs /dev/vda1" + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + def _make_parameters(self): heat_parameters = {} heat_parameters['public_network'] = self.public_network diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index b2d5914..0e3fce0 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -15,14 +15,16 @@ from os import listdir import os from os.path import isfile, join import sched +from threading import Thread +from time import sleep +import time + from storperf.carbon.converter import Converter from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from storperf.utilities.data_handler import DataHandler from storperf.utilities.thread_gate import ThreadGate -from threading import Thread -import time class UnknownWorkload(Exception): @@ -37,7 +39,6 @@ class TestExecutor(object): self.filename = None self.deadline = None self.steady_state_samples = 10 - self.metadata = {} self.start_time = None self.end_time = None self.current_workload = None @@ -55,6 +56,27 @@ class TestExecutor(object): self._workload_executors = [] self._workload_thread = None self._thread_gate = None + self._setup_metadata({}) + + def _setup_metadata(self, metadata={}): + try: + installer = os.environ['INSTALLER_TYPE'] + except KeyError: + self.logger.error("Cannot determine installer") + installer = "Unknown_installer" + + self.metadata = {} + self.metadata['project_name'] = 'storperf' + self.metadata['installer'] = installer + self.metadata['pod_name'] = 'Unknown' + self.metadata['version'] = 'Unknown' + self.metadata['scenario'] = 'Unknown' + self.metadata['build_tag'] = 'Unknown' + self.metadata['test_case'] = 'Unknown' + self.metadata['details'] = {} + self.metadata['details']['metrics'] = {} + self.metadata.update(metadata) + self.metadata['case_name'] = self.metadata['test_case'] @property def slaves(self): @@ -98,7 +120,14 @@ class TestExecutor(object): metric, callback_id) - self.metrics_emitter.transmit_metrics(carbon_metrics) + self.metrics_emitter.transmit_metrics(carbon_metrics, callback_id) + + commit_count = 10 + while (commit_count > 0 and + not self.metrics_emitter.confirm_commit(callback_id)): + self.logger.info("Waiting 1 more second for commit") + sleep(1) + commit_count -= 1 if self._thread_gate.report(callback_id): self.broadcast_event() @@ -162,8 +191,7 @@ class TestExecutor(object): def execute(self, metadata): self.job_db.create_job_id() self.job_db.record_workload_params(metadata) - self.metadata = metadata - self.metadata['metrics'] = {} + self._setup_metadata(metadata) self._workload_thread = Thread(target=self.execute_workloads, args=(), name="Workload thread") @@ -244,11 +272,11 @@ class TestExecutor(object): if self._terminated: return self.current_workload = ( - "%s.%s.queue-depth.%s.block-size.%s" % - (self.job_db.job_id, - workload_name, - iodepth, - blocksize)) + "%s.%s.queue-depth.%s.block-size.%s" + % (self.job_db.job_id, + workload_name, + iodepth, + blocksize)) self.logger.info("Starting run %s" % self.current_workload) self.workload_status[self.current_workload] = "Running" @@ -287,11 +315,11 @@ class TestExecutor(object): if not scheduler.empty(): try: scheduler.cancel(event) - except: + except ValueError: pass - self.logger.info("Completed run %s" % - self.current_workload) + self.logger.info("Completed run %s" + % self.current_workload) self.workload_status[self.current_workload] = "Completed" self._workload_executors = [] self.current_workload = None @@ -304,15 +332,15 @@ class TestExecutor(object): self.end_time = time.time() self._terminated = True - report = {'report': json.dumps(self.metadata)} - self.job_db.record_workload_params(report) self.broadcast_event() self.unregister(data_handler.data_event) + report = {'report': json.dumps(self.metadata)} + self.job_db.record_workload_params(report) self.job_db.job_id = None def execute_on_node(self, workload): - invoker = FIOInvoker() + invoker = FIOInvoker(self.metadata) invoker.register(self.event) workload.invoker = invoker diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py index 9d20383..471c295 100644 --- a/docker/storperf-master/storperf/utilities/data_handler.py +++ b/docker/storperf-master/storperf/utilities/data_handler.py @@ -9,15 +9,14 @@ import logging import os +import time + from storperf.db import test_results_db from storperf.db.graphite_db import GraphiteDB from storperf.db.job_db import JobDB from storperf.utilities import data_treatment as DataTreatment -from storperf.utilities import dictionary from storperf.utilities import math as math from storperf.utilities import steady_state as SteadyState -from time import sleep -import time class DataHandler(object): @@ -36,8 +35,6 @@ class DataHandler(object): self._push_to_db(executor) else: workload = '.'.join(executor.current_workload.split('.')[1:6]) - if 'metrics' not in executor.metadata: - executor.metadata['metrics'] = {} steady_state = True metrics = {} @@ -67,19 +64,21 @@ class DataHandler(object): metrics[metric][io_type]['average'] = average metrics_key = '%s.%s.%s' % (workload, io_type, metric) - executor.metadata['metrics'][metrics_key] = average + executor.metadata['details']['metrics'][metrics_key] = \ + average if not steady: steady_state = False - if 'report_data' not in executor.metadata: - executor.metadata['report_data'] = {} + if 'report_data' not in executor.metadata['details']: + executor.metadata['details']['report_data'] = {} - if 'steady_state' not in executor.metadata: - executor.metadata['steady_state'] = {} + if 'steady_state' not in executor.metadata['details']: + executor.metadata['details']['steady_state'] = {} - executor.metadata['report_data'][workload] = metrics - executor.metadata['steady_state'][workload] = steady_state + executor.metadata['details']['report_data'][workload] = metrics + executor.metadata['details']['steady_state'][workload] = \ + steady_state workload_name = executor.current_workload.split('.')[1] @@ -93,31 +92,12 @@ class DataHandler(object): # A bit of a hack here as Carbon might not be finished storing the # data we just sent to it now = int(time.time()) - backtime = 60 * (executor.steady_state_samples + 2) + backtime = 60 * (executor.steady_state_samples + 1) data_series = graphite_db.fetch_series(workload, metric, io_type, now, backtime) - most_recent_time = now - if len(data_series) > 0: - most_recent_time = data_series[-1][0] - - delta = now - most_recent_time - self.logger.debug("Last update to graphite was %s ago" % delta) - - while (delta < 5 or (delta > 60 and delta < 120)): - sleep(5) - data_series = graphite_db.fetch_series(workload, - metric, - io_type, - now, - backtime) - if len(data_series) > 0: - most_recent_time = data_series[-1][0] - delta = time.time() - most_recent_time - self.logger.debug("Last update to graphite was %s ago" % delta) - return data_series def _convert_timestamps_to_samples(self, executor, series): @@ -147,59 +127,36 @@ class DataHandler(object): return SteadyState.steady_state(data_series) def _push_to_db(self, executor): - pod_name = dictionary.get_key_from_dict(executor.metadata, - 'pod_name', - 'Unknown') - version = dictionary.get_key_from_dict(executor.metadata, - 'version', - 'Unknown') - scenario = dictionary.get_key_from_dict(executor.metadata, - 'scenario_name', - 'Unknown') - build_tag = dictionary.get_key_from_dict(executor.metadata, - 'build_tag', - 'Unknown') - test_case = dictionary.get_key_from_dict(executor.metadata, - 'test_case', - 'Unknown') - duration = executor.end_time - executor.start_time - - payload = executor.metadata + executor.metadata['duration'] = executor.end_time - executor.start_time steady_state = True - for _, value in executor.metadata['steady_state'].items(): + for _, value in executor.metadata['details']['steady_state'].items(): steady_state = steady_state and value - payload['timestart'] = executor.start_time - payload['duration'] = duration + executor.metadata['timestart'] = executor.start_time if steady_state: criteria = 'PASS' else: criteria = 'FAIL' + executor.metadata['criteria'] = criteria - start_time = time.strftime('%Y-%m-%d %H:%M:%S', - time.gmtime(executor.start_time)) + executor.metadata['start_time'] = \ + time.strftime('%Y-%m-%d %H:%M:%S', + time.gmtime(executor.start_time)) - end_time = time.strftime('%Y-%m-%d %H:%M:%S', - time.gmtime(executor.end_time)) + executor.metadata['end_time'] = \ + time.strftime('%Y-%m-%d %H:%M:%S', + time.gmtime(executor.end_time)) test_db = os.environ.get('TEST_DB_URL') if test_db is not None: self.logger.info("Pushing results to %s" % (test_db)) try: - response = test_results_db.push_results_to_db(test_db, - "storperf", - test_case, - start_time, - end_time, - self.logger, - pod_name, - version, - scenario, - criteria, - build_tag, - payload) + response = test_results_db.push_results_to_db( + test_db, + executor.metadata, + self.logger) executor.result_url = response['href'] - except: + except Exception: self.logger.exception("Error pushing results into Database") diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py index 936c839..c045278 100644 --- a/docker/storperf-master/storperf/workloads/_base_workload.py +++ b/docker/storperf-master/storperf/workloads/_base_workload.py @@ -74,9 +74,9 @@ class _base_workload(object): @property def fullname(self): - return ("%s.%s.queue-depth.%s.block-size.%s.%s" % - (str(self.id), - self.__class__.__name__, - str(self.options['iodepth']), - str(self.options['bs']), - str(self.remote_host).replace(".", "-"))) + return ("%s.%s.queue-depth.%s.block-size.%s.%s" + % (str(self.id), + self.__class__.__name__, + str(self.options['iodepth']), + str(self.options['bs']), + str(self.remote_host).replace(".", "-"))) |