diff options
Diffstat (limited to 'docker/storperf-master')
18 files changed, 564 insertions, 499 deletions
diff --git a/docker/storperf-master/Dockerfile b/docker/storperf-master/Dockerfile index b19c12d..eaaf811 100644 --- a/docker/storperf-master/Dockerfile +++ b/docker/storperf-master/Dockerfile @@ -15,7 +15,9 @@ # $ docker build -t opnfv/storperf-master:tag . # -FROM alpine:3.5 as storperf-builder +ARG ARCH=x86_64 +ARG ALPINE_VERSION=v3.6 +FROM multiarch/alpine:$ARCH-$ALPINE_VERSION as storperf-builder LABEL version="5.0" description="OPNFV Storperf Docker container" @@ -57,7 +59,7 @@ RUN pip install -r /storperf/requirements.pip # Build stripped down StorPerf image -FROM alpine:3.5 as storperf-master +FROM multiarch/alpine:$ARCH-$ALPINE_VERSION as storperf-master RUN apk --no-cache add --update \ python \ @@ -74,57 +76,5 @@ RUN chmod 600 storperf/resources/ssh/storperf_rsa EXPOSE 5000 -# Install Graphite -# Everything from here down will be removed once Graphite/Carbon gets broken -# out into its own container. - -RUN apk --no-cache add --update \ - python \ - py-pip \ - python-dev \ - alpine-sdk \ - py-tz \ - nginx \ - cairo \ - supervisor - -RUN deluser xfs - -RUN pip install \ - gunicorn==17.5 \ - Django==1.6.11 \ - django-tagging==0.3.1 \ - cairocffi \ - constants \ - zope.interface - -RUN adduser -S -g www-data -u 33 www-data - -RUN pip install whisper==0.9.15 -RUN pip install --install-option="--prefix=/var/lib/graphite" --install-option="--install-lib=/var/lib/graphite/lib" carbon==0.9.15 -RUN pip install --install-option="--prefix=/var/lib/graphite" --install-option="--install-lib=/var/lib/graphite/webapp" graphite-web==0.9.15 - -ADD graphite/nginx.conf /etc/nginx/nginx.conf -ADD graphite/local_settings.py /var/lib/graphite/webapp/graphite/local_settings.py -ADD graphite/carbon.conf /var/lib/graphite/conf/carbon.conf -ADD graphite/storage-schemas.conf /var/lib/graphite/conf/storage-schemas.conf -RUN mkdir -p /opt/graphite/storage -RUN ln -s /var/lib/graphite/storage/whisper /opt/graphite/storage/whisper -RUN touch /var/lib/graphite/storage/graphite.db /var/lib/graphite/storage/index -RUN chown -R www-data /var/lib/graphite/storage -RUN chmod 0775 /var/lib/graphite/storage /var/lib/graphite/storage/whisper -RUN chmod 0664 /var/lib/graphite/storage/graphite.db - -RUN cd /var/lib/graphite/webapp/graphite && python manage.py syncdb --noinput -ADD graphite/initial_data.json /var/lib/graphite/webapp/graphite/initial_data.json -RUN cd /var/lib/graphite/webapp/graphite && python manage.py syncdb --noinput - -RUN mkdir -p /var/log/supervisor - -COPY ./supervisord.conf /etc/supervisor/conf.d/supervisord.conf - -EXPOSE 8000 - # Entry point - -CMD ["/usr/bin/supervisord"] +CMD [ "python", "./rest_server.py" ] diff --git a/docker/storperf-master/graphite/carbon.conf b/docker/storperf-master/graphite/carbon.conf deleted file mode 100644 index 13088dd..0000000 --- a/docker/storperf-master/graphite/carbon.conf +++ /dev/null @@ -1,80 +0,0 @@ -[cache] -LOCAL_DATA_DIR = /var/lib/graphite/storage/whisper/ - -# Specify the user to drop privileges to -# If this is blank carbon runs as the user that invokes it -# This user must have write access to the local data directory -USER = - -# Limit the size of the cache to avoid swapping or becoming CPU bound. -# Sorts and serving cache queries gets more expensive as the cache grows. -# Use the value "inf" (infinity) for an unlimited cache size. -MAX_CACHE_SIZE = inf - -# Limits the number of whisper update_many() calls per second, which effectively -# means the number of write requests sent to the disk. This is intended to -# prevent over-utilizing the disk and thus starving the rest of the system. -# When the rate of required updates exceeds this, then carbon's caching will -# take effect and increase the overall throughput accordingly. -MAX_UPDATES_PER_SECOND = 1000 - -# Softly limits the number of whisper files that get created each minute. -# Setting this value low (like at 50) is a good way to ensure your graphite -# system will not be adversely impacted when a bunch of new metrics are -# sent to it. The trade off is that it will take much longer for those metrics' -# database files to all get created and thus longer until the data becomes usable. -# Setting this value high (like "inf" for infinity) will cause graphite to create -# the files quickly but at the risk of slowing I/O down considerably for a while. -MAX_CREATES_PER_MINUTE = inf - -LINE_RECEIVER_INTERFACE = 0.0.0.0 -LINE_RECEIVER_PORT = 2003 - -ENABLE_UDP_LISTENER = True -UDP_RECEIVER_INTERFACE = 0.0.0.0 -UDP_RECEIVER_PORT = 2003 - -PICKLE_RECEIVER_INTERFACE = 0.0.0.0 -PICKLE_RECEIVER_PORT = 2004 - -CACHE_QUERY_INTERFACE = 0.0.0.0 -CACHE_QUERY_PORT = 7002 - -LOG_UPDATES = False - -# Enable AMQP if you want to receve metrics using an amqp broker -# ENABLE_AMQP = False - -# Verbose means a line will be logged for every metric received -# useful for testing -# AMQP_VERBOSE = False - -# AMQP_HOST = localhost -# AMQP_PORT = 5672 -# AMQP_VHOST = / -# AMQP_USER = guest -# AMQP_PASSWORD = guest -# AMQP_EXCHANGE = graphite - -# Patterns for all of the metrics this machine will store. Read more at -# http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol#Bindings -# -# Example: store all sales, linux servers, and utilization metrics -# BIND_PATTERNS = sales.#, servers.linux.#, #.utilization -# -# Example: store everything -# BIND_PATTERNS = # - -# NOTE: you cannot run both a cache and a relay on the same server -# with the default configuration, you have to specify a distinict -# interfaces and ports for the listeners. - -[relay] -LINE_RECEIVER_INTERFACE = 0.0.0.0 -LINE_RECEIVER_PORT = 2003 - -PICKLE_RECEIVER_INTERFACE = 0.0.0.0 -PICKLE_RECEIVER_PORT = 2004 - -CACHE_SERVERS = server1, server2, server3 -MAX_QUEUE_SIZE = 10000 diff --git a/docker/storperf-master/graphite/initial_data.json b/docker/storperf-master/graphite/initial_data.json deleted file mode 100644 index b3ac9b1..0000000 --- a/docker/storperf-master/graphite/initial_data.json +++ /dev/null @@ -1,20 +0,0 @@ -[ - { - "pk": 1, - "model": "auth.user", - "fields": { - "username": "admin", - "first_name": "", - "last_name": "", - "is_active": true, - "is_superuser": true, - "is_staff": true, - "last_login": "2011-09-20 17:02:14", - "groups": [], - "user_permissions": [], - "password": "sha1$1b11b$edeb0a67a9622f1f2cfeabf9188a711f5ac7d236", - "email": "root@example.com", - "date_joined": "2011-09-20 17:02:14" - } - } -] diff --git a/docker/storperf-master/graphite/local_settings.py b/docker/storperf-master/graphite/local_settings.py deleted file mode 100644 index 7cff8f7..0000000 --- a/docker/storperf-master/graphite/local_settings.py +++ /dev/null @@ -1 +0,0 @@ -TIME_ZONE = 'UTC' diff --git a/docker/storperf-master/graphite/nginx.conf b/docker/storperf-master/graphite/nginx.conf deleted file mode 100644 index 8a11e94..0000000 --- a/docker/storperf-master/graphite/nginx.conf +++ /dev/null @@ -1,69 +0,0 @@ -daemon off; -user www-data; -worker_processes 1; -pid /var/run/nginx.pid; - -events { - worker_connections 1024; -} - -http { - sendfile on; - tcp_nopush on; - tcp_nodelay on; - keepalive_timeout 65; - types_hash_max_size 2048; - server_tokens off; - - server_names_hash_bucket_size 32; - - include /etc/nginx/mime.types; - default_type application/octet-stream; - - access_log /var/log/nginx/access.log; - error_log /var/log/nginx/error.log; - - gzip on; - gzip_disable "msie6"; - - server { - listen 8000 default_server; - server_name _; - - open_log_file_cache max=1000 inactive=20s min_uses=2 valid=1m; - - location / { - proxy_pass http://127.0.0.1:8080; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - proxy_set_header X-Forwarded-Server $host; - proxy_set_header X-Forwarded-Host $http_host; - proxy_set_header Host $http_host; - - client_max_body_size 10m; - client_body_buffer_size 128k; - - proxy_connect_timeout 90; - proxy_send_timeout 90; - proxy_read_timeout 90; - - proxy_buffer_size 4k; - proxy_buffers 4 32k; - proxy_busy_buffers_size 64k; - proxy_temp_file_write_size 64k; - } - - add_header Access-Control-Allow-Origin "*"; - add_header Access-Control-Allow-Methods "GET, OPTIONS"; - add_header Access-Control-Allow-Headers "origin, authorization, accept"; - - location /content { - alias /var/lib/graphite/webapp/content; - } - - location /media { - alias /usr/share/pyshared/django/contrib/admin/media; - } - } -} diff --git a/docker/storperf-master/graphite/storage-schemas.conf b/docker/storperf-master/graphite/storage-schemas.conf deleted file mode 100644 index 855a9e4..0000000 --- a/docker/storperf-master/graphite/storage-schemas.conf +++ /dev/null @@ -1,7 +0,0 @@ -[carbon] -pattern = ^carbon\..* -retentions = 1m:31d,10m:1y,1h:5y - -[default] -pattern = .* -retentions = 10s:8d,1m:31d,10m:1y,1h:5y diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py index 19f87ca..0634b8f 100644 --- a/docker/storperf-master/rest_server.py +++ b/docker/storperf-master/rest_server.py @@ -13,12 +13,13 @@ import os import sys from flask import abort, Flask, request, jsonify +from flask_cors import CORS from flask_restful import Resource, Api, fields from flask_restful_swagger import swagger -from flask_cors import CORS from storperf.storperf_master import StorPerfMaster + app = Flask(__name__, static_url_path="") CORS(app) api = swagger.docs(Api(app), apiVersion='1.0') @@ -26,6 +27,35 @@ api = swagger.docs(Api(app), apiVersion='1.0') storperf = StorPerfMaster() +class Logs(Resource): + def __init__(self): + self.logger = logging.getLogger(__name__) + + @swagger.operation( + notes="Fetch logs", + parameters=[ + { + "name": "lines", + "description": "The number of lines to fetch", + "required": "False", + "type": "string", + "allowedMultiple": "False", + "paramType": "query" + } + ] + ) + def get(self): + lines = request.args.get('lines') + if lines: + try: + lines = int(lines) + except Exception: + pass + else: + lines = 35 + return jsonify({'logs': storperf.get_logs(lines)}) + + @swagger.model class ConfigurationRequestModel: resource_fields = { @@ -34,7 +64,9 @@ class ConfigurationRequestModel: 'agent_image': fields.String, 'public_network': fields.String, 'volume_size': fields.Integer, - 'availability_zone': fields.String + 'availability_zone': fields.String, + 'username': fields.String, + 'password': fields.String } @@ -107,6 +139,10 @@ class Configure(Resource): storperf.volume_size = request.json['volume_size'] if ('availability_zone' in request.json): storperf.availabilty_zone = request.json['availability_zone'] + if ('username' in request.json): + storperf.username = request.json['username'] + if ('password' in request.json): + storperf.password = request.json['password'] storperf.create_stack() if storperf.stack_id is None: @@ -194,13 +230,13 @@ class Job(Resource): ) def get(self): - metrics_type = "metrics" - if request.args.get('type'): - metrics_type = request.args.get('type') - workload_id = request.args.get('id') if workload_id: + metrics_type = "metrics" + if request.args.get('type'): + metrics_type = request.args.get('type') + if metrics_type == "metrics": return jsonify(storperf.fetch_results(workload_id)) @@ -210,7 +246,10 @@ class Job(Resource): if metrics_type == "status": return jsonify(storperf.fetch_job_status(workload_id)) else: - return jsonify({"job_ids": storperf.fetch_all_jobs()}) + metrics_type = None + if request.args.get('type'): + metrics_type = request.args.get('type') + return jsonify(storperf.fetch_all_jobs(metrics_type)) @swagger.operation( parameters=[ @@ -343,6 +382,7 @@ def setup_logging(default_path='logging.json', api.add_resource(Configure, "/api/v1.0/configurations") api.add_resource(Quota, "/api/v1.0/quotas") api.add_resource(Job, "/api/v1.0/jobs") +api.add_resource(Logs, "/api/v1.0/logs") if __name__ == "__main__": setup_logging() diff --git a/docker/storperf-master/storperf/carbon/emitter.py b/docker/storperf-master/storperf/carbon/emitter.py index e23dc79..05f6c3c 100644 --- a/docker/storperf-master/storperf/carbon/emitter.py +++ b/docker/storperf-master/storperf/carbon/emitter.py @@ -11,28 +11,75 @@ import logging import socket import time +from storperf.db.graphite_db import GraphiteDB + class CarbonMetricTransmitter(): - carbon_host = '127.0.0.1' - carbon_port = 2003 + carbon_servers = [('127.0.0.1', 2003), + ('storperf-graphite', 2003)] def __init__(self): self.logger = logging.getLogger(__name__) + self.graphite_db = GraphiteDB() + self.commit_markers = {} - def transmit_metrics(self, metrics): - if 'timestamp' in metrics: - metrics.pop('timestamp') + def transmit_metrics(self, metrics, commit_marker): timestamp = str(calendar.timegm(time.gmtime())) + self.commit_markers[commit_marker] = int(timestamp) + + carbon_socket = None + + for host, port in self.carbon_servers: + try: + carbon_socket = socket.socket(socket.AF_INET, + socket.SOCK_STREAM) + carbon_socket.connect((host, port)) + + for key, value in metrics.items(): + try: + float(value) + message = "%s %s %s\n" \ + % (key, value, timestamp) + self.logger.debug("Metric: " + message.strip()) + carbon_socket.send(message) + except ValueError: + self.logger.debug("Ignoring non numeric metric %s %s" + % (key, value)) + + message = "%s.commit-marker %s %s\n" \ + % (commit_marker, timestamp, timestamp) + carbon_socket.send(message) + self.logger.debug("Marker %s" % message.strip()) + self.logger.info("Sent metrics to %s:%s with timestamp %s" + % (host, port, timestamp)) + + except Exception, e: + self.logger.error("While notifying carbon %s:%s %s" + % (host, port, e)) + + if carbon_socket is not None: + carbon_socket.close() + + def confirm_commit(self, commit_marker): + marker_timestamp = self.commit_markers[commit_marker] + request = "%s.commit-marker&from=%s" \ + % (commit_marker, marker_timestamp - 60) + marker_data = self.graphite_db.fetch_item(request) + self.logger.debug("Marker data %s" % marker_data) + fetched_timestamps = self.parse_timestamp(marker_data) - carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - carbon_socket.connect((self.carbon_host, self.carbon_port)) + return marker_timestamp in fetched_timestamps - for key, metric in metrics.items(): - message = key + " " + metric + " " + timestamp - self.logger.debug("Metric: " + message) - carbon_socket.send(message + '\n') + def parse_timestamp(self, marker_data): + timestamps = [] + if (type(marker_data) is list and + len(marker_data) > 0): + datapoints = marker_data[0]['datapoints'] + for datapoint in datapoints: + try: + timestamps.append(int(datapoint[0])) + except Exception: + pass - carbon_socket.close() - self.logger.info("Sent metrics to carbon with timestamp %s" - % timestamp) + return timestamps diff --git a/docker/storperf-master/storperf/db/graphite_db.py b/docker/storperf-master/storperf/db/graphite_db.py index c8a2d35..aa71855 100644 --- a/docker/storperf-master/storperf/db/graphite_db.py +++ b/docker/storperf-master/storperf/db/graphite_db.py @@ -9,33 +9,44 @@ import json import logging - import requests -from storperf.db.job_db import JobDB - class GraphiteDB(object): + graphite_host = "storperf-graphite" + graphite_port = 8080 + def __init__(self): - """ - """ - self._job_db = JobDB() self.logger = logging.getLogger(__name__) + def fetch_item(self, target): + + result = None + request = ("http://%s:%s/graphite/render/?format=json&target=%s" + % (self.graphite_host, self.graphite_port, target)) + self.logger.debug("Calling %s" % (request)) + + response = requests.get(request) + if (response.status_code == 200): + result = json.loads(response.content) + + return result + def fetch_series(self, workload, metric, io_type, time, duration): series = [] end = time start = end - duration - request = ("http://127.0.0.1:8000/render/?target=" + request = ("http://%s:%s/graphite/render/?target=" "averageSeries(%s.*.jobs.1.%s.%s)" "&format=json" "&from=%s" - "&until=%s" % - (workload, io_type, metric, - start, end)) + "&until=%s" + % (self.graphite_host, self.graphite_port, + workload, io_type, metric, + start, end)) self.logger.debug("Calling %s" % (request)) response = requests.get(request) diff --git a/docker/storperf-master/storperf/db/test_results_db.py b/docker/storperf-master/storperf/db/test_results_db.py index a2f7038..9c87e32 100644 --- a/docker/storperf-master/storperf/db/test_results_db.py +++ b/docker/storperf-master/storperf/db/test_results_db.py @@ -8,38 +8,19 @@ ############################################################################## import json -import os import requests -def get_installer_type(logger=None): - """ - Get installer type (fuel, apex, joid, compass) - """ - try: - installer = os.environ['INSTALLER_TYPE'] - except KeyError: - if logger: - logger.error("Impossible to retrieve the installer type") - installer = "Unknown_installer" - - return installer - - -def push_results_to_db(db_url, project, case_name, - test_start, test_stop, logger, pod_name, - version, scenario, criteria, build_tag, details): +def push_results_to_db(db_url, details, logger): """ POST results to the Result target DB """ url = db_url + "/results" - installer = get_installer_type(logger) - params = {"project_name": project, "case_name": case_name, - "pod_name": pod_name, "installer": installer, - "version": version, "scenario": scenario, "criteria": criteria, - "build_tag": build_tag, "start_date": test_start, - "stop_date": test_stop, "details": details} + params = details.copy() + params.pop('details') + + logger.info("popped params= %s" % params) headers = {'Content-Type': 'application/json'} try: @@ -53,9 +34,7 @@ def push_results_to_db(db_url, project, case_name, logger.debug(r.status_code) logger.debug(r.content) return json.loads(r.content) - except Exception, e: - logger.error("Error [push_results_to_db('%s', '%s', '%s', " + - "'%s', '%s', '%s', '%s', '%s', '%s')]:" % - (db_url, project, case_name, pod_name, version, - scenario, criteria, build_tag, details), e) + except Exception: + logger.exception("Error [push_results_to_db('%s', '%s')]:" % + (db_url, params)) return None diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py index 106696d..0360ea2 100644 --- a/docker/storperf-master/storperf/fio/fio_invoker.py +++ b/docker/storperf-master/storperf/fio/fio_invoker.py @@ -15,13 +15,14 @@ import paramiko class FIOInvoker(object): - def __init__(self): + def __init__(self, var_dict={}): self.logger = logging.getLogger(__name__) self.event_listeners = set() self.event_callback_ids = set() self._remote_host = None self.callback_id = None self.terminated = False + self.metadata = var_dict @property def remote_host(self): @@ -90,11 +91,7 @@ class FIOInvoker(object): self.logger.debug("Finished") def execute(self, args=[]): - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(self.remote_host, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh = self._ssh_client() command = "sudo ./fio " + ' '.join(args) self.logger.debug("Remote command: %s" % command) @@ -133,11 +130,7 @@ class FIOInvoker(object): self.logger.debug("Terminating fio on " + self.remote_host) self.terminated = True - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(self.remote_host, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh = self._ssh_client() command = "sudo killall fio" @@ -151,3 +144,17 @@ class FIOInvoker(object): stdout.close() stderr.close() + + def _ssh_client(self): + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + if 'username' in self.metadata and 'password' in self.metadata: + ssh.connect(self.remote_host, + username=self.metadata['username'], + password=self.metadata['password']) + return ssh + else: + ssh.connect(self.remote_host, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + return ssh diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 8c2a7b4..8a67048 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -11,19 +11,20 @@ from datetime import datetime import logging import os import socket -from storperf.db.configuration_db import ConfigurationDB -from storperf.db.job_db import JobDB -from storperf.test_executor import TestExecutor from threading import Thread from time import sleep from cinderclient import client as cinderclient -import heatclient.client as heatclient from keystoneauth1 import loading from keystoneauth1 import session import paramiko from scp import SCPClient +import heatclient.client as heatclient +from storperf.db.configuration_db import ConfigurationDB +from storperf.db.job_db import JobDB +from storperf.test_executor import TestExecutor + class ParameterError(Exception): """ """ @@ -256,6 +257,53 @@ class StorPerfMaster(object): 'workloads', str(self._test_executor.workload_modules)) + @property + def username(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'username' + ) + + @username.setter + def username(self, value): + self.configuration_db.set_configuration_value( + 'stack', + 'username', + value + ) + + @property + def password(self): + return self.configuration_db.get_configuration_value( + 'stack', + 'password' + ) + + @password.setter + def password(self, value): + self.configuration_db.set_configuration_value( + 'stack', + 'password', + value + ) + + def get_logs(self, lines=None): + LOG_DIR = './storperf.log' + + if isinstance(lines, int): + logs = [] + index = 0 + for line in reversed(open(LOG_DIR).readlines()): + if index != int(lines): + logs.insert(0, line.strip()) + index += 1 + else: + break + else: + with open(LOG_DIR) as f: + logs = f.read().split('\n') + return logs + def create_stack(self): if (self.stack_id is not None): raise ParameterError("ERROR: Stack has already been created") @@ -336,6 +384,9 @@ class StorPerfMaster(object): params['agent_count'] = self.agent_count params['public_network'] = self.public_network params['volume_size'] = self.volume_size + if self.username and self.password: + params['username'] = self.username + params['password'] = self.password job_id = self._test_executor.execute(params) return job_id @@ -345,7 +396,7 @@ class StorPerfMaster(object): def fetch_results(self, job_id): if self._test_executor.job_db.job_id == job_id: - return self._test_executor.metadata['metrics'] + return self._test_executor.metadata['details']['metrics'] workload_params = self.job_db.fetch_workload_params(job_id) if 'report' in workload_params: @@ -359,8 +410,25 @@ class StorPerfMaster(object): def fetch_job_status(self, job_id): return self._test_executor.execution_status(job_id) - def fetch_all_jobs(self): - return self.job_db.fetch_jobs() + def fetch_all_jobs(self, metrics_type): + job_list = self.job_db.fetch_jobs() + job_report = {} + if metrics_type is None: + job_report['job_ids'] = job_list + elif metrics_type == "metadata": + job_report['results'] = [] + for job in job_list: + if metrics_type == 'metadata': + metadata = self.fetch_metadata(job) + if 'report' in metadata: + metadata['report']['_id'] = job + metadata['report']['start_date'] = \ + metadata['report']['start_time'] + metadata['report']['end_date'] = \ + metadata['report']['end_time'] + metadata['report']['_id'] = job + job_report['results'].append(metadata['report']) + return job_report def _setup_slave(self, slave): logger = logging.getLogger(__name__ + ":" + slave) @@ -389,14 +457,50 @@ class StorPerfMaster(object): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(slave, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + if self.username and self.password: + ssh.connect(slave, + username=self.username, + password=self.password) + else: + ssh.connect(slave, username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + available = self._check_root_fs(ssh) + logger.debug("Available space on / is %s" % available) + if available < 65536: + logger.warn("Root filesystem is too small, attemping resize") + self._resize_root_fs(ssh, logger) + + available = self._check_root_fs(ssh) + logger.debug("Available space on / is now %s" % available) + if available < 65536: + logger.error("Cannot create enough space on /") + raise Exception("Root filesystem has only %s free" % + available) scp = SCPClient(ssh.get_transport()) logger.debug("Transferring fio to %s" % slave) scp.put('/usr/local/bin/fio', '~/') + def _check_root_fs(self, ssh): + (_, stdout, _) = ssh.exec_command("df /") + stdout.readline() + lines = stdout.readline().split() + if len(lines) > 4: + available = lines[3] + return int(available) + + def _resize_root_fs(self, ssh, logger): + command = "sudo /usr/sbin/resize2fs /dev/vda1" + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + def _make_parameters(self): heat_parameters = {} heat_parameters['public_network'] = self.public_network diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index b2d5914..0e3fce0 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -15,14 +15,16 @@ from os import listdir import os from os.path import isfile, join import sched +from threading import Thread +from time import sleep +import time + from storperf.carbon.converter import Converter from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from storperf.utilities.data_handler import DataHandler from storperf.utilities.thread_gate import ThreadGate -from threading import Thread -import time class UnknownWorkload(Exception): @@ -37,7 +39,6 @@ class TestExecutor(object): self.filename = None self.deadline = None self.steady_state_samples = 10 - self.metadata = {} self.start_time = None self.end_time = None self.current_workload = None @@ -55,6 +56,27 @@ class TestExecutor(object): self._workload_executors = [] self._workload_thread = None self._thread_gate = None + self._setup_metadata({}) + + def _setup_metadata(self, metadata={}): + try: + installer = os.environ['INSTALLER_TYPE'] + except KeyError: + self.logger.error("Cannot determine installer") + installer = "Unknown_installer" + + self.metadata = {} + self.metadata['project_name'] = 'storperf' + self.metadata['installer'] = installer + self.metadata['pod_name'] = 'Unknown' + self.metadata['version'] = 'Unknown' + self.metadata['scenario'] = 'Unknown' + self.metadata['build_tag'] = 'Unknown' + self.metadata['test_case'] = 'Unknown' + self.metadata['details'] = {} + self.metadata['details']['metrics'] = {} + self.metadata.update(metadata) + self.metadata['case_name'] = self.metadata['test_case'] @property def slaves(self): @@ -98,7 +120,14 @@ class TestExecutor(object): metric, callback_id) - self.metrics_emitter.transmit_metrics(carbon_metrics) + self.metrics_emitter.transmit_metrics(carbon_metrics, callback_id) + + commit_count = 10 + while (commit_count > 0 and + not self.metrics_emitter.confirm_commit(callback_id)): + self.logger.info("Waiting 1 more second for commit") + sleep(1) + commit_count -= 1 if self._thread_gate.report(callback_id): self.broadcast_event() @@ -162,8 +191,7 @@ class TestExecutor(object): def execute(self, metadata): self.job_db.create_job_id() self.job_db.record_workload_params(metadata) - self.metadata = metadata - self.metadata['metrics'] = {} + self._setup_metadata(metadata) self._workload_thread = Thread(target=self.execute_workloads, args=(), name="Workload thread") @@ -244,11 +272,11 @@ class TestExecutor(object): if self._terminated: return self.current_workload = ( - "%s.%s.queue-depth.%s.block-size.%s" % - (self.job_db.job_id, - workload_name, - iodepth, - blocksize)) + "%s.%s.queue-depth.%s.block-size.%s" + % (self.job_db.job_id, + workload_name, + iodepth, + blocksize)) self.logger.info("Starting run %s" % self.current_workload) self.workload_status[self.current_workload] = "Running" @@ -287,11 +315,11 @@ class TestExecutor(object): if not scheduler.empty(): try: scheduler.cancel(event) - except: + except ValueError: pass - self.logger.info("Completed run %s" % - self.current_workload) + self.logger.info("Completed run %s" + % self.current_workload) self.workload_status[self.current_workload] = "Completed" self._workload_executors = [] self.current_workload = None @@ -304,15 +332,15 @@ class TestExecutor(object): self.end_time = time.time() self._terminated = True - report = {'report': json.dumps(self.metadata)} - self.job_db.record_workload_params(report) self.broadcast_event() self.unregister(data_handler.data_event) + report = {'report': json.dumps(self.metadata)} + self.job_db.record_workload_params(report) self.job_db.job_id = None def execute_on_node(self, workload): - invoker = FIOInvoker() + invoker = FIOInvoker(self.metadata) invoker.register(self.event) workload.invoker = invoker diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py index 9d20383..471c295 100644 --- a/docker/storperf-master/storperf/utilities/data_handler.py +++ b/docker/storperf-master/storperf/utilities/data_handler.py @@ -9,15 +9,14 @@ import logging import os +import time + from storperf.db import test_results_db from storperf.db.graphite_db import GraphiteDB from storperf.db.job_db import JobDB from storperf.utilities import data_treatment as DataTreatment -from storperf.utilities import dictionary from storperf.utilities import math as math from storperf.utilities import steady_state as SteadyState -from time import sleep -import time class DataHandler(object): @@ -36,8 +35,6 @@ class DataHandler(object): self._push_to_db(executor) else: workload = '.'.join(executor.current_workload.split('.')[1:6]) - if 'metrics' not in executor.metadata: - executor.metadata['metrics'] = {} steady_state = True metrics = {} @@ -67,19 +64,21 @@ class DataHandler(object): metrics[metric][io_type]['average'] = average metrics_key = '%s.%s.%s' % (workload, io_type, metric) - executor.metadata['metrics'][metrics_key] = average + executor.metadata['details']['metrics'][metrics_key] = \ + average if not steady: steady_state = False - if 'report_data' not in executor.metadata: - executor.metadata['report_data'] = {} + if 'report_data' not in executor.metadata['details']: + executor.metadata['details']['report_data'] = {} - if 'steady_state' not in executor.metadata: - executor.metadata['steady_state'] = {} + if 'steady_state' not in executor.metadata['details']: + executor.metadata['details']['steady_state'] = {} - executor.metadata['report_data'][workload] = metrics - executor.metadata['steady_state'][workload] = steady_state + executor.metadata['details']['report_data'][workload] = metrics + executor.metadata['details']['steady_state'][workload] = \ + steady_state workload_name = executor.current_workload.split('.')[1] @@ -93,31 +92,12 @@ class DataHandler(object): # A bit of a hack here as Carbon might not be finished storing the # data we just sent to it now = int(time.time()) - backtime = 60 * (executor.steady_state_samples + 2) + backtime = 60 * (executor.steady_state_samples + 1) data_series = graphite_db.fetch_series(workload, metric, io_type, now, backtime) - most_recent_time = now - if len(data_series) > 0: - most_recent_time = data_series[-1][0] - - delta = now - most_recent_time - self.logger.debug("Last update to graphite was %s ago" % delta) - - while (delta < 5 or (delta > 60 and delta < 120)): - sleep(5) - data_series = graphite_db.fetch_series(workload, - metric, - io_type, - now, - backtime) - if len(data_series) > 0: - most_recent_time = data_series[-1][0] - delta = time.time() - most_recent_time - self.logger.debug("Last update to graphite was %s ago" % delta) - return data_series def _convert_timestamps_to_samples(self, executor, series): @@ -147,59 +127,36 @@ class DataHandler(object): return SteadyState.steady_state(data_series) def _push_to_db(self, executor): - pod_name = dictionary.get_key_from_dict(executor.metadata, - 'pod_name', - 'Unknown') - version = dictionary.get_key_from_dict(executor.metadata, - 'version', - 'Unknown') - scenario = dictionary.get_key_from_dict(executor.metadata, - 'scenario_name', - 'Unknown') - build_tag = dictionary.get_key_from_dict(executor.metadata, - 'build_tag', - 'Unknown') - test_case = dictionary.get_key_from_dict(executor.metadata, - 'test_case', - 'Unknown') - duration = executor.end_time - executor.start_time - - payload = executor.metadata + executor.metadata['duration'] = executor.end_time - executor.start_time steady_state = True - for _, value in executor.metadata['steady_state'].items(): + for _, value in executor.metadata['details']['steady_state'].items(): steady_state = steady_state and value - payload['timestart'] = executor.start_time - payload['duration'] = duration + executor.metadata['timestart'] = executor.start_time if steady_state: criteria = 'PASS' else: criteria = 'FAIL' + executor.metadata['criteria'] = criteria - start_time = time.strftime('%Y-%m-%d %H:%M:%S', - time.gmtime(executor.start_time)) + executor.metadata['start_time'] = \ + time.strftime('%Y-%m-%d %H:%M:%S', + time.gmtime(executor.start_time)) - end_time = time.strftime('%Y-%m-%d %H:%M:%S', - time.gmtime(executor.end_time)) + executor.metadata['end_time'] = \ + time.strftime('%Y-%m-%d %H:%M:%S', + time.gmtime(executor.end_time)) test_db = os.environ.get('TEST_DB_URL') if test_db is not None: self.logger.info("Pushing results to %s" % (test_db)) try: - response = test_results_db.push_results_to_db(test_db, - "storperf", - test_case, - start_time, - end_time, - self.logger, - pod_name, - version, - scenario, - criteria, - build_tag, - payload) + response = test_results_db.push_results_to_db( + test_db, + executor.metadata, + self.logger) executor.result_url = response['href'] - except: + except Exception: self.logger.exception("Error pushing results into Database") diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py index 936c839..c045278 100644 --- a/docker/storperf-master/storperf/workloads/_base_workload.py +++ b/docker/storperf-master/storperf/workloads/_base_workload.py @@ -74,9 +74,9 @@ class _base_workload(object): @property def fullname(self): - return ("%s.%s.queue-depth.%s.block-size.%s.%s" % - (str(self.id), - self.__class__.__name__, - str(self.options['iodepth']), - str(self.options['bs']), - str(self.remote_host).replace(".", "-"))) + return ("%s.%s.queue-depth.%s.block-size.%s.%s" + % (str(self.id), + self.__class__.__name__, + str(self.options['iodepth']), + str(self.options['bs']), + str(self.remote_host).replace(".", "-"))) diff --git a/docker/storperf-master/supervisord.conf b/docker/storperf-master/supervisord.conf deleted file mode 100644 index 558f6bf..0000000 --- a/docker/storperf-master/supervisord.conf +++ /dev/null @@ -1,33 +0,0 @@ -[supervisord] -nodaemon = true -environment = GRAPHITE_STORAGE_DIR='/var/lib/graphite/storage',GRAPHITE_CONF_DIR='/var/lib/graphite/conf' - -[program:nginx] -command = /usr/sbin/nginx -stdout_logfile = /var/log/supervisor/%(program_name)s.log -stderr_logfile = /var/log/supervisor/%(program_name)s.log -autorestart = true - -[program:carbon-cache] -user = www-data -command = /var/lib/graphite/bin/carbon-cache.py --debug start -stdout_logfile = /var/log/supervisor/%(program_name)s.log -stderr_logfile = /var/log/supervisor/%(program_name)s.log -autorestart = true - -[program:graphite-webapp] -user = www-data -directory = /var/lib/graphite/webapp -environment = PYTHONPATH='/var/lib/graphite/webapp' -command = /usr/bin/gunicorn_django -b127.0.0.1:8080 -w2 graphite/settings.py -stdout_logfile = /var/log/supervisor/%(program_name)s.log -stderr_logfile = /var/log/supervisor/%(program_name)s.log -autorestart = true - -[program:storperf-webapp] -user = root -directory = /storperf/ -command = /usr/bin/python rest_server.py -stdout_logfile = /var/log/supervisor/%(program_name)s.log -stderr_logfile = /var/log/supervisor/%(program_name)s.log -autorestart = true diff --git a/docker/storperf-master/tests/carbon_tests/emitter_test.py b/docker/storperf-master/tests/carbon_tests/emitter_test.py index 7f61049..f5a78d1 100644 --- a/docker/storperf-master/tests/carbon_tests/emitter_test.py +++ b/docker/storperf-master/tests/carbon_tests/emitter_test.py @@ -7,27 +7,39 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -import SocketServer import json -from storperf.carbon import converter -from storperf.carbon.emitter import CarbonMetricTransmitter -import threading -from time import sleep, strptime +from time import strptime import unittest import mock +from storperf.carbon import converter +from storperf.carbon.emitter import CarbonMetricTransmitter + + +addresses = [] +data = [] +connect_exception = [] +send_exception = [] + + +class MockSocket(object): -class MetricsHandler(SocketServer.BaseRequestHandler): + def __init__(self, *args): + pass - def handle(self): - # Echo the back to the client - CarbonMetricTransmitterTest.response = self.request.recv(1024) - return + def connect(self, address): + if len(connect_exception) != 0: + raise connect_exception[0] + addresses.append(address) + def send(self, datum): + if len(send_exception) != 0: + raise send_exception[0] + data.append(datum) -class MetricsServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): - pass + def close(self): + pass class CarbonMetricTransmitterTest(unittest.TestCase): @@ -35,17 +47,37 @@ class CarbonMetricTransmitterTest(unittest.TestCase): response = None def setUp(self): + del addresses[:] + del data[:] + del connect_exception[:] + del send_exception[:] + + @mock.patch("socket.socket") + @mock.patch("time.gmtime") + def test_transmit_metrics(self, mock_time, mock_socket): + + mock_socket.side_effect = MockSocket + + mock_time.return_value = strptime("30 Nov 00", "%d %b %y") + + testconv = converter.Converter() + json_object = json.loads( + """{"timestamp" : "975542400", "key":123.0 }""") + result = testconv.convert_json_to_flat(json_object, "host.run-name") - address = ('localhost', 0) - server = MetricsServer(address, MetricsHandler) - ip, self.listen_port = server.server_address + emitter = CarbonMetricTransmitter() + emitter.carbon_port = self.listen_port + emitter.transmit_metrics(result, None) - t = threading.Thread(target=server.serve_forever) - t.setDaemon(True) - t.start() + self.assertEqual("host.run-name.key 123.0 975542400\n", + data[1], + data[1]) + @mock.patch("socket.socket") @mock.patch("time.gmtime") - def test_transmit_metrics(self, mock_time): + def test_skip_non_numeric_metrics(self, mock_time, mock_socket): + + mock_socket.side_effect = MockSocket mock_time.return_value = strptime("30 Nov 00", "%d %b %y") @@ -56,17 +88,126 @@ class CarbonMetricTransmitterTest(unittest.TestCase): emitter = CarbonMetricTransmitter() emitter.carbon_port = self.listen_port - emitter.transmit_metrics(result) + emitter.transmit_metrics(result, None) + + self.assertEqual("None.commit-marker 975542400 975542400\n", + data[1], + data[1]) + + @mock.patch("socket.socket") + def test_connect_fails(self, mock_socket): + + mock_socket.side_effect = MockSocket + connect_exception.append(Exception("Mock connection error")) + + testconv = converter.Converter() + json_object = json.loads( + """{"timestamp" : "975542400", "key":"value" }""") + result = testconv.convert_json_to_flat(json_object, "host.run-name") + + emitter = CarbonMetricTransmitter() + emitter.carbon_port = self.listen_port + emitter.transmit_metrics(result, None) + + self.assertEqual(0, + len(data), + len(data)) - count = 0 + @mock.patch("socket.socket") + def test_send_fails(self, mock_socket): - while (CarbonMetricTransmitterTest.response is None and count < 10): - count += 1 - sleep(0.1) + mock_socket.side_effect = MockSocket + send_exception.append(Exception("Mock send error")) + + testconv = converter.Converter() + json_object = json.loads( + """{"timestamp" : "975542400", "key":"value" }""") + result = testconv.convert_json_to_flat(json_object, "host.run-name") + + emitter = CarbonMetricTransmitter() + emitter.carbon_port = self.listen_port + emitter.transmit_metrics(result, None) + + self.assertEqual(0, + len(data), + len(data)) + + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item") + def test_confirm_commit(self, mock_graphite_db): + graphite_return = json.loads("""[ + {"target": + "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker", + "datapoints": [[1503078366.0, 1503078370]]}] + """) + mock_graphite_db.return_value = graphite_return + + commit_marker = "commit-marker" + + emitter = CarbonMetricTransmitter() + emitter.commit_markers[commit_marker] = 1503078366 + + committed = emitter.confirm_commit(commit_marker) + self.assertTrue(committed) + + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item") + def test_confirm_multiple_commits(self, mock_graphite_db): + graphite_return = json.loads("""[ + {"target": + "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker", + "datapoints": [ + [1503078300.0, 1503078350], + [1503078366.0, 1503078360]]}] + """) + mock_graphite_db.return_value = graphite_return + + commit_marker = "commit-marker" + + emitter = CarbonMetricTransmitter() + emitter.commit_markers[commit_marker] = 1503078366 + + committed = emitter.confirm_commit(commit_marker) + self.assertTrue(committed) + + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item") + def test_empty_commit(self, mock_graphite_db): + graphite_return = json.loads("[]") + mock_graphite_db.return_value = graphite_return + + commit_marker = "commit-marker" + + emitter = CarbonMetricTransmitter() + emitter.commit_markers[commit_marker] = 1503078366 + + committed = emitter.confirm_commit(commit_marker) + self.assertFalse(committed) + + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item") + def test_badtimestamp_commit(self, mock_graphite_db): + graphite_return = json.loads("""[ + {"target": + "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker", + "datapoints": [[1234, 1503078370]]}] + """) + mock_graphite_db.return_value = graphite_return + + commit_marker = "commit-marker" + + emitter = CarbonMetricTransmitter() + emitter.commit_markers[commit_marker] = 1503078366 + + committed = emitter.confirm_commit(commit_marker) + self.assertFalse(committed) + + def test_timestamp_parse(self): + emitter = CarbonMetricTransmitter() + result = json.loads("""[ + {"target": + "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker", + "datapoints": [[1503078366.0, 1503078370]]}] + """) + timestamps = emitter.parse_timestamp(result) + self.assertEqual(1503078366, timestamps[0], timestamps[0]) - self.assertEqual("host.run-name.key value 975542400\n", - CarbonMetricTransmitterTest.response, - CarbonMetricTransmitterTest.response) if __name__ == '__main__': unittest.main() diff --git a/docker/storperf-master/tests/utilities_tests/data_handler_test.py b/docker/storperf-master/tests/utilities_tests/data_handler_test.py index f514ae7..f028bd0 100644 --- a/docker/storperf-master/tests/utilities_tests/data_handler_test.py +++ b/docker/storperf-master/tests/utilities_tests/data_handler_test.py @@ -8,11 +8,12 @@ ############################################################################## import os -from storperf.utilities.data_handler import DataHandler import unittest import mock +from storperf.utilities.data_handler import DataHandler + class MockGraphiteDB(object): @@ -34,6 +35,8 @@ class DataHandlerTest(unittest.TestCase): self.steady_state_samples = 10 self.end_time = 1 self.metadata = {} + self.metadata['details'] = {} + self.metadata['details']['metrics'] = {} self.block_sizes = "1" self.queue_depths = "1" mock.job_id = "1" @@ -76,11 +79,11 @@ class DataHandlerTest(unittest.TestCase): mock_graphite_db.return_value = expected mock_time.return_value = expected[-1][0] + 10 - self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" % - ("job_id", - "rw", - 8, - 8192)) + self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" + % ("job_id", + "rw", + 8, + 8192)) actual = self.data_handler._lookup_prior_data(self, 'read', 'iops') self.assertEqual(expected, actual) @@ -133,7 +136,7 @@ class DataHandlerTest(unittest.TestCase): self._terminated = True mock_results_db.side_effect = self.push_results_to_db mock_graphite_db.side_effect = MockGraphiteDB - self.metadata = { + self.metadata['details'] = { "steady_state": { "rr.queue-depth.8.block-size.16384": True, "rr.queue-depth.8.block-size.2048": False, @@ -148,8 +151,9 @@ class DataHandlerTest(unittest.TestCase): @mock.patch("time.time") @mock.patch("storperf.db.test_results_db.push_results_to_db") @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series") - @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads") - def test_non_terminated_report(self, mock_job_db, mock_graphite_db, + @mock.patch("storperf.db.job_db.JobDB.fetch_workloads") + def test_non_terminated_report(self, mock_job_db, + mock_graphite_db, mock_results_db, mock_time): self._terminated = False mock_results_db.side_effect = self.push_results_to_db @@ -169,29 +173,31 @@ class DataHandlerTest(unittest.TestCase): expected_range = 17.78 expected_average = 212.49777777777774 - self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" % - ("job_id", - "rw", - 8, - 8192)) - + self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" + % ("job_id", + "rw", + 8, + 8192)) mock_job_db.return_value = [[self.current_workload, 4804559000, None]] self.data_handler.data_event(self) self.assertEqual(False, self.pushed) self.assertEqual(False, self._terminated) - self.assertEqual(expected_slope, self.metadata['report_data'] + self.assertEqual(expected_slope, self.metadata['details'] + ['report_data'] ['rw.queue-depth.8.block-size.8192'] ['lat_ns.mean'] ['read'] ['slope']) - self.assertEqual(expected_range, self.metadata['report_data'] + self.assertEqual(expected_range, self.metadata['details'] + ['report_data'] ['rw.queue-depth.8.block-size.8192'] ['lat_ns.mean'] ['read'] ['range']) - self.assertEqual(expected_average, self.metadata['report_data'] + self.assertEqual(expected_average, self.metadata['details'] + ['report_data'] ['rw.queue-depth.8.block-size.8192'] ['lat_ns.mean'] ['read'] @@ -201,7 +207,7 @@ class DataHandlerTest(unittest.TestCase): @mock.patch("time.time") @mock.patch("storperf.db.test_results_db.push_results_to_db") @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series") - @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads") + @mock.patch("storperf.db.job_db.JobDB.fetch_workloads") def test_report_that_causes_termination(self, mock_job_db, mock_graphite_db, @@ -236,32 +242,35 @@ class DataHandlerTest(unittest.TestCase): expected_range = 17.78 expected_average = 209.2135 - self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" % - ("job_id", - "rw", - 8, - 8192)) - + self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" + % ("job_id", + "rw", + 8, + 8192)) mock_job_db.return_value = [[self.current_workload, 4804559000, None]] self.data_handler.data_event(self) - self.assertEqual(expected_slope, self.metadata['report_data'] + self.assertEqual(expected_slope, self.metadata['details'] + ['report_data'] ['rw.queue-depth.8.block-size.8192'] ['lat_ns.mean'] ['read'] ['slope']) - self.assertEqual(expected_range, self.metadata['report_data'] + self.assertEqual(expected_range, self.metadata['details'] + ['report_data'] ['rw.queue-depth.8.block-size.8192'] ['lat_ns.mean'] ['read'] ['range']) - self.assertEqual(expected_average, self.metadata['report_data'] + self.assertEqual(expected_average, self.metadata['details'] + ['report_data'] ['rw.queue-depth.8.block-size.8192'] ['lat_ns.mean'] ['read'] ['average']) - self.assertEqual(report_data, self.metadata['report_data'] + self.assertEqual(report_data, self.metadata['details'] + ['report_data'] ['rw.queue-depth.8.block-size.8192'] ['lat_ns.mean'] ['read'] @@ -277,7 +286,7 @@ class DataHandlerTest(unittest.TestCase): mock_results_db.side_effect = self.push_results_to_db self.start_time = 1504559100 self.end_time = 1504560000 - self.metadata = { + self.metadata['details'] = { "scenario_name": "ceph_ws,wr,rs,rr,rw", "status": "OK", "steady_state": { @@ -289,9 +298,11 @@ class DataHandlerTest(unittest.TestCase): "volume_size": 10 } self.data_handler._push_to_db(self) - self.assertEqual('FAIL', self.db_results[9], + self.assertEqual('FAIL', self.db_results[1]['criteria'], 'Expected FAIL in criteria') - self.assertEqual('2017-09-04 21:05:00', self.db_results[3], + self.assertEqual('2017-09-04 21:05:00', + self.db_results[1]['start_time'], 'Start time') - self.assertEqual('2017-09-04 21:20:00', self.db_results[4], + self.assertEqual('2017-09-04 21:20:00', + self.db_results[1]['end_time'], 'End time') |