summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master
diff options
context:
space:
mode:
Diffstat (limited to 'docker/storperf-master')
-rw-r--r--docker/storperf-master/Dockerfile60
-rw-r--r--docker/storperf-master/graphite/carbon.conf80
-rw-r--r--docker/storperf-master/graphite/initial_data.json20
-rw-r--r--docker/storperf-master/graphite/local_settings.py1
-rw-r--r--docker/storperf-master/graphite/nginx.conf69
-rw-r--r--docker/storperf-master/graphite/storage-schemas.conf7
-rw-r--r--docker/storperf-master/rest_server.py54
-rw-r--r--docker/storperf-master/storperf/carbon/emitter.py75
-rw-r--r--docker/storperf-master/storperf/db/graphite_db.py31
-rw-r--r--docker/storperf-master/storperf/db/test_results_db.py37
-rw-r--r--docker/storperf-master/storperf/fio/fio_invoker.py29
-rw-r--r--docker/storperf-master/storperf/storperf_master.py124
-rw-r--r--docker/storperf-master/storperf/test_executor.py62
-rw-r--r--docker/storperf-master/storperf/utilities/data_handler.py97
-rw-r--r--docker/storperf-master/storperf/workloads/_base_workload.py12
-rw-r--r--docker/storperf-master/supervisord.conf33
-rw-r--r--docker/storperf-master/tests/carbon_tests/emitter_test.py195
-rw-r--r--docker/storperf-master/tests/utilities_tests/data_handler_test.py77
18 files changed, 564 insertions, 499 deletions
diff --git a/docker/storperf-master/Dockerfile b/docker/storperf-master/Dockerfile
index b19c12d..eaaf811 100644
--- a/docker/storperf-master/Dockerfile
+++ b/docker/storperf-master/Dockerfile
@@ -15,7 +15,9 @@
# $ docker build -t opnfv/storperf-master:tag .
#
-FROM alpine:3.5 as storperf-builder
+ARG ARCH=x86_64
+ARG ALPINE_VERSION=v3.6
+FROM multiarch/alpine:$ARCH-$ALPINE_VERSION as storperf-builder
LABEL version="5.0" description="OPNFV Storperf Docker container"
@@ -57,7 +59,7 @@ RUN pip install -r /storperf/requirements.pip
# Build stripped down StorPerf image
-FROM alpine:3.5 as storperf-master
+FROM multiarch/alpine:$ARCH-$ALPINE_VERSION as storperf-master
RUN apk --no-cache add --update \
python \
@@ -74,57 +76,5 @@ RUN chmod 600 storperf/resources/ssh/storperf_rsa
EXPOSE 5000
-# Install Graphite
-# Everything from here down will be removed once Graphite/Carbon gets broken
-# out into its own container.
-
-RUN apk --no-cache add --update \
- python \
- py-pip \
- python-dev \
- alpine-sdk \
- py-tz \
- nginx \
- cairo \
- supervisor
-
-RUN deluser xfs
-
-RUN pip install \
- gunicorn==17.5 \
- Django==1.6.11 \
- django-tagging==0.3.1 \
- cairocffi \
- constants \
- zope.interface
-
-RUN adduser -S -g www-data -u 33 www-data
-
-RUN pip install whisper==0.9.15
-RUN pip install --install-option="--prefix=/var/lib/graphite" --install-option="--install-lib=/var/lib/graphite/lib" carbon==0.9.15
-RUN pip install --install-option="--prefix=/var/lib/graphite" --install-option="--install-lib=/var/lib/graphite/webapp" graphite-web==0.9.15
-
-ADD graphite/nginx.conf /etc/nginx/nginx.conf
-ADD graphite/local_settings.py /var/lib/graphite/webapp/graphite/local_settings.py
-ADD graphite/carbon.conf /var/lib/graphite/conf/carbon.conf
-ADD graphite/storage-schemas.conf /var/lib/graphite/conf/storage-schemas.conf
-RUN mkdir -p /opt/graphite/storage
-RUN ln -s /var/lib/graphite/storage/whisper /opt/graphite/storage/whisper
-RUN touch /var/lib/graphite/storage/graphite.db /var/lib/graphite/storage/index
-RUN chown -R www-data /var/lib/graphite/storage
-RUN chmod 0775 /var/lib/graphite/storage /var/lib/graphite/storage/whisper
-RUN chmod 0664 /var/lib/graphite/storage/graphite.db
-
-RUN cd /var/lib/graphite/webapp/graphite && python manage.py syncdb --noinput
-ADD graphite/initial_data.json /var/lib/graphite/webapp/graphite/initial_data.json
-RUN cd /var/lib/graphite/webapp/graphite && python manage.py syncdb --noinput
-
-RUN mkdir -p /var/log/supervisor
-
-COPY ./supervisord.conf /etc/supervisor/conf.d/supervisord.conf
-
-EXPOSE 8000
-
# Entry point
-
-CMD ["/usr/bin/supervisord"]
+CMD [ "python", "./rest_server.py" ]
diff --git a/docker/storperf-master/graphite/carbon.conf b/docker/storperf-master/graphite/carbon.conf
deleted file mode 100644
index 13088dd..0000000
--- a/docker/storperf-master/graphite/carbon.conf
+++ /dev/null
@@ -1,80 +0,0 @@
-[cache]
-LOCAL_DATA_DIR = /var/lib/graphite/storage/whisper/
-
-# Specify the user to drop privileges to
-# If this is blank carbon runs as the user that invokes it
-# This user must have write access to the local data directory
-USER =
-
-# Limit the size of the cache to avoid swapping or becoming CPU bound.
-# Sorts and serving cache queries gets more expensive as the cache grows.
-# Use the value "inf" (infinity) for an unlimited cache size.
-MAX_CACHE_SIZE = inf
-
-# Limits the number of whisper update_many() calls per second, which effectively
-# means the number of write requests sent to the disk. This is intended to
-# prevent over-utilizing the disk and thus starving the rest of the system.
-# When the rate of required updates exceeds this, then carbon's caching will
-# take effect and increase the overall throughput accordingly.
-MAX_UPDATES_PER_SECOND = 1000
-
-# Softly limits the number of whisper files that get created each minute.
-# Setting this value low (like at 50) is a good way to ensure your graphite
-# system will not be adversely impacted when a bunch of new metrics are
-# sent to it. The trade off is that it will take much longer for those metrics'
-# database files to all get created and thus longer until the data becomes usable.
-# Setting this value high (like "inf" for infinity) will cause graphite to create
-# the files quickly but at the risk of slowing I/O down considerably for a while.
-MAX_CREATES_PER_MINUTE = inf
-
-LINE_RECEIVER_INTERFACE = 0.0.0.0
-LINE_RECEIVER_PORT = 2003
-
-ENABLE_UDP_LISTENER = True
-UDP_RECEIVER_INTERFACE = 0.0.0.0
-UDP_RECEIVER_PORT = 2003
-
-PICKLE_RECEIVER_INTERFACE = 0.0.0.0
-PICKLE_RECEIVER_PORT = 2004
-
-CACHE_QUERY_INTERFACE = 0.0.0.0
-CACHE_QUERY_PORT = 7002
-
-LOG_UPDATES = False
-
-# Enable AMQP if you want to receve metrics using an amqp broker
-# ENABLE_AMQP = False
-
-# Verbose means a line will be logged for every metric received
-# useful for testing
-# AMQP_VERBOSE = False
-
-# AMQP_HOST = localhost
-# AMQP_PORT = 5672
-# AMQP_VHOST = /
-# AMQP_USER = guest
-# AMQP_PASSWORD = guest
-# AMQP_EXCHANGE = graphite
-
-# Patterns for all of the metrics this machine will store. Read more at
-# http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol#Bindings
-#
-# Example: store all sales, linux servers, and utilization metrics
-# BIND_PATTERNS = sales.#, servers.linux.#, #.utilization
-#
-# Example: store everything
-# BIND_PATTERNS = #
-
-# NOTE: you cannot run both a cache and a relay on the same server
-# with the default configuration, you have to specify a distinict
-# interfaces and ports for the listeners.
-
-[relay]
-LINE_RECEIVER_INTERFACE = 0.0.0.0
-LINE_RECEIVER_PORT = 2003
-
-PICKLE_RECEIVER_INTERFACE = 0.0.0.0
-PICKLE_RECEIVER_PORT = 2004
-
-CACHE_SERVERS = server1, server2, server3
-MAX_QUEUE_SIZE = 10000
diff --git a/docker/storperf-master/graphite/initial_data.json b/docker/storperf-master/graphite/initial_data.json
deleted file mode 100644
index b3ac9b1..0000000
--- a/docker/storperf-master/graphite/initial_data.json
+++ /dev/null
@@ -1,20 +0,0 @@
-[
- {
- "pk": 1,
- "model": "auth.user",
- "fields": {
- "username": "admin",
- "first_name": "",
- "last_name": "",
- "is_active": true,
- "is_superuser": true,
- "is_staff": true,
- "last_login": "2011-09-20 17:02:14",
- "groups": [],
- "user_permissions": [],
- "password": "sha1$1b11b$edeb0a67a9622f1f2cfeabf9188a711f5ac7d236",
- "email": "root@example.com",
- "date_joined": "2011-09-20 17:02:14"
- }
- }
-]
diff --git a/docker/storperf-master/graphite/local_settings.py b/docker/storperf-master/graphite/local_settings.py
deleted file mode 100644
index 7cff8f7..0000000
--- a/docker/storperf-master/graphite/local_settings.py
+++ /dev/null
@@ -1 +0,0 @@
-TIME_ZONE = 'UTC'
diff --git a/docker/storperf-master/graphite/nginx.conf b/docker/storperf-master/graphite/nginx.conf
deleted file mode 100644
index 8a11e94..0000000
--- a/docker/storperf-master/graphite/nginx.conf
+++ /dev/null
@@ -1,69 +0,0 @@
-daemon off;
-user www-data;
-worker_processes 1;
-pid /var/run/nginx.pid;
-
-events {
- worker_connections 1024;
-}
-
-http {
- sendfile on;
- tcp_nopush on;
- tcp_nodelay on;
- keepalive_timeout 65;
- types_hash_max_size 2048;
- server_tokens off;
-
- server_names_hash_bucket_size 32;
-
- include /etc/nginx/mime.types;
- default_type application/octet-stream;
-
- access_log /var/log/nginx/access.log;
- error_log /var/log/nginx/error.log;
-
- gzip on;
- gzip_disable "msie6";
-
- server {
- listen 8000 default_server;
- server_name _;
-
- open_log_file_cache max=1000 inactive=20s min_uses=2 valid=1m;
-
- location / {
- proxy_pass http://127.0.0.1:8080;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header X-Forwarded-Proto $scheme;
- proxy_set_header X-Forwarded-Server $host;
- proxy_set_header X-Forwarded-Host $http_host;
- proxy_set_header Host $http_host;
-
- client_max_body_size 10m;
- client_body_buffer_size 128k;
-
- proxy_connect_timeout 90;
- proxy_send_timeout 90;
- proxy_read_timeout 90;
-
- proxy_buffer_size 4k;
- proxy_buffers 4 32k;
- proxy_busy_buffers_size 64k;
- proxy_temp_file_write_size 64k;
- }
-
- add_header Access-Control-Allow-Origin "*";
- add_header Access-Control-Allow-Methods "GET, OPTIONS";
- add_header Access-Control-Allow-Headers "origin, authorization, accept";
-
- location /content {
- alias /var/lib/graphite/webapp/content;
- }
-
- location /media {
- alias /usr/share/pyshared/django/contrib/admin/media;
- }
- }
-}
diff --git a/docker/storperf-master/graphite/storage-schemas.conf b/docker/storperf-master/graphite/storage-schemas.conf
deleted file mode 100644
index 855a9e4..0000000
--- a/docker/storperf-master/graphite/storage-schemas.conf
+++ /dev/null
@@ -1,7 +0,0 @@
-[carbon]
-pattern = ^carbon\..*
-retentions = 1m:31d,10m:1y,1h:5y
-
-[default]
-pattern = .*
-retentions = 10s:8d,1m:31d,10m:1y,1h:5y
diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py
index 19f87ca..0634b8f 100644
--- a/docker/storperf-master/rest_server.py
+++ b/docker/storperf-master/rest_server.py
@@ -13,12 +13,13 @@ import os
import sys
from flask import abort, Flask, request, jsonify
+from flask_cors import CORS
from flask_restful import Resource, Api, fields
from flask_restful_swagger import swagger
-from flask_cors import CORS
from storperf.storperf_master import StorPerfMaster
+
app = Flask(__name__, static_url_path="")
CORS(app)
api = swagger.docs(Api(app), apiVersion='1.0')
@@ -26,6 +27,35 @@ api = swagger.docs(Api(app), apiVersion='1.0')
storperf = StorPerfMaster()
+class Logs(Resource):
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ @swagger.operation(
+ notes="Fetch logs",
+ parameters=[
+ {
+ "name": "lines",
+ "description": "The number of lines to fetch",
+ "required": "False",
+ "type": "string",
+ "allowedMultiple": "False",
+ "paramType": "query"
+ }
+ ]
+ )
+ def get(self):
+ lines = request.args.get('lines')
+ if lines:
+ try:
+ lines = int(lines)
+ except Exception:
+ pass
+ else:
+ lines = 35
+ return jsonify({'logs': storperf.get_logs(lines)})
+
+
@swagger.model
class ConfigurationRequestModel:
resource_fields = {
@@ -34,7 +64,9 @@ class ConfigurationRequestModel:
'agent_image': fields.String,
'public_network': fields.String,
'volume_size': fields.Integer,
- 'availability_zone': fields.String
+ 'availability_zone': fields.String,
+ 'username': fields.String,
+ 'password': fields.String
}
@@ -107,6 +139,10 @@ class Configure(Resource):
storperf.volume_size = request.json['volume_size']
if ('availability_zone' in request.json):
storperf.availabilty_zone = request.json['availability_zone']
+ if ('username' in request.json):
+ storperf.username = request.json['username']
+ if ('password' in request.json):
+ storperf.password = request.json['password']
storperf.create_stack()
if storperf.stack_id is None:
@@ -194,13 +230,13 @@ class Job(Resource):
)
def get(self):
- metrics_type = "metrics"
- if request.args.get('type'):
- metrics_type = request.args.get('type')
-
workload_id = request.args.get('id')
if workload_id:
+ metrics_type = "metrics"
+ if request.args.get('type'):
+ metrics_type = request.args.get('type')
+
if metrics_type == "metrics":
return jsonify(storperf.fetch_results(workload_id))
@@ -210,7 +246,10 @@ class Job(Resource):
if metrics_type == "status":
return jsonify(storperf.fetch_job_status(workload_id))
else:
- return jsonify({"job_ids": storperf.fetch_all_jobs()})
+ metrics_type = None
+ if request.args.get('type'):
+ metrics_type = request.args.get('type')
+ return jsonify(storperf.fetch_all_jobs(metrics_type))
@swagger.operation(
parameters=[
@@ -343,6 +382,7 @@ def setup_logging(default_path='logging.json',
api.add_resource(Configure, "/api/v1.0/configurations")
api.add_resource(Quota, "/api/v1.0/quotas")
api.add_resource(Job, "/api/v1.0/jobs")
+api.add_resource(Logs, "/api/v1.0/logs")
if __name__ == "__main__":
setup_logging()
diff --git a/docker/storperf-master/storperf/carbon/emitter.py b/docker/storperf-master/storperf/carbon/emitter.py
index e23dc79..05f6c3c 100644
--- a/docker/storperf-master/storperf/carbon/emitter.py
+++ b/docker/storperf-master/storperf/carbon/emitter.py
@@ -11,28 +11,75 @@ import logging
import socket
import time
+from storperf.db.graphite_db import GraphiteDB
+
class CarbonMetricTransmitter():
- carbon_host = '127.0.0.1'
- carbon_port = 2003
+ carbon_servers = [('127.0.0.1', 2003),
+ ('storperf-graphite', 2003)]
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.graphite_db = GraphiteDB()
+ self.commit_markers = {}
- def transmit_metrics(self, metrics):
- if 'timestamp' in metrics:
- metrics.pop('timestamp')
+ def transmit_metrics(self, metrics, commit_marker):
timestamp = str(calendar.timegm(time.gmtime()))
+ self.commit_markers[commit_marker] = int(timestamp)
+
+ carbon_socket = None
+
+ for host, port in self.carbon_servers:
+ try:
+ carbon_socket = socket.socket(socket.AF_INET,
+ socket.SOCK_STREAM)
+ carbon_socket.connect((host, port))
+
+ for key, value in metrics.items():
+ try:
+ float(value)
+ message = "%s %s %s\n" \
+ % (key, value, timestamp)
+ self.logger.debug("Metric: " + message.strip())
+ carbon_socket.send(message)
+ except ValueError:
+ self.logger.debug("Ignoring non numeric metric %s %s"
+ % (key, value))
+
+ message = "%s.commit-marker %s %s\n" \
+ % (commit_marker, timestamp, timestamp)
+ carbon_socket.send(message)
+ self.logger.debug("Marker %s" % message.strip())
+ self.logger.info("Sent metrics to %s:%s with timestamp %s"
+ % (host, port, timestamp))
+
+ except Exception, e:
+ self.logger.error("While notifying carbon %s:%s %s"
+ % (host, port, e))
+
+ if carbon_socket is not None:
+ carbon_socket.close()
+
+ def confirm_commit(self, commit_marker):
+ marker_timestamp = self.commit_markers[commit_marker]
+ request = "%s.commit-marker&from=%s" \
+ % (commit_marker, marker_timestamp - 60)
+ marker_data = self.graphite_db.fetch_item(request)
+ self.logger.debug("Marker data %s" % marker_data)
+ fetched_timestamps = self.parse_timestamp(marker_data)
- carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- carbon_socket.connect((self.carbon_host, self.carbon_port))
+ return marker_timestamp in fetched_timestamps
- for key, metric in metrics.items():
- message = key + " " + metric + " " + timestamp
- self.logger.debug("Metric: " + message)
- carbon_socket.send(message + '\n')
+ def parse_timestamp(self, marker_data):
+ timestamps = []
+ if (type(marker_data) is list and
+ len(marker_data) > 0):
+ datapoints = marker_data[0]['datapoints']
+ for datapoint in datapoints:
+ try:
+ timestamps.append(int(datapoint[0]))
+ except Exception:
+ pass
- carbon_socket.close()
- self.logger.info("Sent metrics to carbon with timestamp %s"
- % timestamp)
+ return timestamps
diff --git a/docker/storperf-master/storperf/db/graphite_db.py b/docker/storperf-master/storperf/db/graphite_db.py
index c8a2d35..aa71855 100644
--- a/docker/storperf-master/storperf/db/graphite_db.py
+++ b/docker/storperf-master/storperf/db/graphite_db.py
@@ -9,33 +9,44 @@
import json
import logging
-
import requests
-from storperf.db.job_db import JobDB
-
class GraphiteDB(object):
+ graphite_host = "storperf-graphite"
+ graphite_port = 8080
+
def __init__(self):
- """
- """
- self._job_db = JobDB()
self.logger = logging.getLogger(__name__)
+ def fetch_item(self, target):
+
+ result = None
+ request = ("http://%s:%s/graphite/render/?format=json&target=%s"
+ % (self.graphite_host, self.graphite_port, target))
+ self.logger.debug("Calling %s" % (request))
+
+ response = requests.get(request)
+ if (response.status_code == 200):
+ result = json.loads(response.content)
+
+ return result
+
def fetch_series(self, workload, metric, io_type, time, duration):
series = []
end = time
start = end - duration
- request = ("http://127.0.0.1:8000/render/?target="
+ request = ("http://%s:%s/graphite/render/?target="
"averageSeries(%s.*.jobs.1.%s.%s)"
"&format=json"
"&from=%s"
- "&until=%s" %
- (workload, io_type, metric,
- start, end))
+ "&until=%s"
+ % (self.graphite_host, self.graphite_port,
+ workload, io_type, metric,
+ start, end))
self.logger.debug("Calling %s" % (request))
response = requests.get(request)
diff --git a/docker/storperf-master/storperf/db/test_results_db.py b/docker/storperf-master/storperf/db/test_results_db.py
index a2f7038..9c87e32 100644
--- a/docker/storperf-master/storperf/db/test_results_db.py
+++ b/docker/storperf-master/storperf/db/test_results_db.py
@@ -8,38 +8,19 @@
##############################################################################
import json
-import os
import requests
-def get_installer_type(logger=None):
- """
- Get installer type (fuel, apex, joid, compass)
- """
- try:
- installer = os.environ['INSTALLER_TYPE']
- except KeyError:
- if logger:
- logger.error("Impossible to retrieve the installer type")
- installer = "Unknown_installer"
-
- return installer
-
-
-def push_results_to_db(db_url, project, case_name,
- test_start, test_stop, logger, pod_name,
- version, scenario, criteria, build_tag, details):
+def push_results_to_db(db_url, details, logger):
"""
POST results to the Result target DB
"""
url = db_url + "/results"
- installer = get_installer_type(logger)
- params = {"project_name": project, "case_name": case_name,
- "pod_name": pod_name, "installer": installer,
- "version": version, "scenario": scenario, "criteria": criteria,
- "build_tag": build_tag, "start_date": test_start,
- "stop_date": test_stop, "details": details}
+ params = details.copy()
+ params.pop('details')
+
+ logger.info("popped params= %s" % params)
headers = {'Content-Type': 'application/json'}
try:
@@ -53,9 +34,7 @@ def push_results_to_db(db_url, project, case_name,
logger.debug(r.status_code)
logger.debug(r.content)
return json.loads(r.content)
- except Exception, e:
- logger.error("Error [push_results_to_db('%s', '%s', '%s', " +
- "'%s', '%s', '%s', '%s', '%s', '%s')]:" %
- (db_url, project, case_name, pod_name, version,
- scenario, criteria, build_tag, details), e)
+ except Exception:
+ logger.exception("Error [push_results_to_db('%s', '%s')]:" %
+ (db_url, params))
return None
diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py
index 106696d..0360ea2 100644
--- a/docker/storperf-master/storperf/fio/fio_invoker.py
+++ b/docker/storperf-master/storperf/fio/fio_invoker.py
@@ -15,13 +15,14 @@ import paramiko
class FIOInvoker(object):
- def __init__(self):
+ def __init__(self, var_dict={}):
self.logger = logging.getLogger(__name__)
self.event_listeners = set()
self.event_callback_ids = set()
self._remote_host = None
self.callback_id = None
self.terminated = False
+ self.metadata = var_dict
@property
def remote_host(self):
@@ -90,11 +91,7 @@ class FIOInvoker(object):
self.logger.debug("Finished")
def execute(self, args=[]):
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(self.remote_host, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ ssh = self._ssh_client()
command = "sudo ./fio " + ' '.join(args)
self.logger.debug("Remote command: %s" % command)
@@ -133,11 +130,7 @@ class FIOInvoker(object):
self.logger.debug("Terminating fio on " + self.remote_host)
self.terminated = True
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(self.remote_host, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ ssh = self._ssh_client()
command = "sudo killall fio"
@@ -151,3 +144,17 @@ class FIOInvoker(object):
stdout.close()
stderr.close()
+
+ def _ssh_client(self):
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ if 'username' in self.metadata and 'password' in self.metadata:
+ ssh.connect(self.remote_host,
+ username=self.metadata['username'],
+ password=self.metadata['password'])
+ return ssh
+ else:
+ ssh.connect(self.remote_host, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+ return ssh
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
index 8c2a7b4..8a67048 100644
--- a/docker/storperf-master/storperf/storperf_master.py
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -11,19 +11,20 @@ from datetime import datetime
import logging
import os
import socket
-from storperf.db.configuration_db import ConfigurationDB
-from storperf.db.job_db import JobDB
-from storperf.test_executor import TestExecutor
from threading import Thread
from time import sleep
from cinderclient import client as cinderclient
-import heatclient.client as heatclient
from keystoneauth1 import loading
from keystoneauth1 import session
import paramiko
from scp import SCPClient
+import heatclient.client as heatclient
+from storperf.db.configuration_db import ConfigurationDB
+from storperf.db.job_db import JobDB
+from storperf.test_executor import TestExecutor
+
class ParameterError(Exception):
""" """
@@ -256,6 +257,53 @@ class StorPerfMaster(object):
'workloads',
str(self._test_executor.workload_modules))
+ @property
+ def username(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'username'
+ )
+
+ @username.setter
+ def username(self, value):
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'username',
+ value
+ )
+
+ @property
+ def password(self):
+ return self.configuration_db.get_configuration_value(
+ 'stack',
+ 'password'
+ )
+
+ @password.setter
+ def password(self, value):
+ self.configuration_db.set_configuration_value(
+ 'stack',
+ 'password',
+ value
+ )
+
+ def get_logs(self, lines=None):
+ LOG_DIR = './storperf.log'
+
+ if isinstance(lines, int):
+ logs = []
+ index = 0
+ for line in reversed(open(LOG_DIR).readlines()):
+ if index != int(lines):
+ logs.insert(0, line.strip())
+ index += 1
+ else:
+ break
+ else:
+ with open(LOG_DIR) as f:
+ logs = f.read().split('\n')
+ return logs
+
def create_stack(self):
if (self.stack_id is not None):
raise ParameterError("ERROR: Stack has already been created")
@@ -336,6 +384,9 @@ class StorPerfMaster(object):
params['agent_count'] = self.agent_count
params['public_network'] = self.public_network
params['volume_size'] = self.volume_size
+ if self.username and self.password:
+ params['username'] = self.username
+ params['password'] = self.password
job_id = self._test_executor.execute(params)
return job_id
@@ -345,7 +396,7 @@ class StorPerfMaster(object):
def fetch_results(self, job_id):
if self._test_executor.job_db.job_id == job_id:
- return self._test_executor.metadata['metrics']
+ return self._test_executor.metadata['details']['metrics']
workload_params = self.job_db.fetch_workload_params(job_id)
if 'report' in workload_params:
@@ -359,8 +410,25 @@ class StorPerfMaster(object):
def fetch_job_status(self, job_id):
return self._test_executor.execution_status(job_id)
- def fetch_all_jobs(self):
- return self.job_db.fetch_jobs()
+ def fetch_all_jobs(self, metrics_type):
+ job_list = self.job_db.fetch_jobs()
+ job_report = {}
+ if metrics_type is None:
+ job_report['job_ids'] = job_list
+ elif metrics_type == "metadata":
+ job_report['results'] = []
+ for job in job_list:
+ if metrics_type == 'metadata':
+ metadata = self.fetch_metadata(job)
+ if 'report' in metadata:
+ metadata['report']['_id'] = job
+ metadata['report']['start_date'] = \
+ metadata['report']['start_time']
+ metadata['report']['end_date'] = \
+ metadata['report']['end_time']
+ metadata['report']['_id'] = job
+ job_report['results'].append(metadata['report'])
+ return job_report
def _setup_slave(self, slave):
logger = logging.getLogger(__name__ + ":" + slave)
@@ -389,14 +457,50 @@ class StorPerfMaster(object):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(slave, username='storperf',
- key_filename='storperf/resources/ssh/storperf_rsa',
- timeout=2)
+ if self.username and self.password:
+ ssh.connect(slave,
+ username=self.username,
+ password=self.password)
+ else:
+ ssh.connect(slave, username='storperf',
+ key_filename='storperf/resources/ssh/storperf_rsa',
+ timeout=2)
+
+ available = self._check_root_fs(ssh)
+ logger.debug("Available space on / is %s" % available)
+ if available < 65536:
+ logger.warn("Root filesystem is too small, attemping resize")
+ self._resize_root_fs(ssh, logger)
+
+ available = self._check_root_fs(ssh)
+ logger.debug("Available space on / is now %s" % available)
+ if available < 65536:
+ logger.error("Cannot create enough space on /")
+ raise Exception("Root filesystem has only %s free" %
+ available)
scp = SCPClient(ssh.get_transport())
logger.debug("Transferring fio to %s" % slave)
scp.put('/usr/local/bin/fio', '~/')
+ def _check_root_fs(self, ssh):
+ (_, stdout, _) = ssh.exec_command("df /")
+ stdout.readline()
+ lines = stdout.readline().split()
+ if len(lines) > 4:
+ available = lines[3]
+ return int(available)
+
+ def _resize_root_fs(self, ssh, logger):
+ command = "sudo /usr/sbin/resize2fs /dev/vda1"
+ logger.info("Attempting %s" % command)
+ (_, stdout, stderr) = ssh.exec_command(command)
+ stdout.channel.recv_exit_status()
+ for line in iter(stdout.readline, b''):
+ logger.info(line)
+ for line in iter(stderr.readline, b''):
+ logger.error(line)
+
def _make_parameters(self):
heat_parameters = {}
heat_parameters['public_network'] = self.public_network
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index b2d5914..0e3fce0 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -15,14 +15,16 @@ from os import listdir
import os
from os.path import isfile, join
import sched
+from threading import Thread
+from time import sleep
+import time
+
from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from storperf.utilities.data_handler import DataHandler
from storperf.utilities.thread_gate import ThreadGate
-from threading import Thread
-import time
class UnknownWorkload(Exception):
@@ -37,7 +39,6 @@ class TestExecutor(object):
self.filename = None
self.deadline = None
self.steady_state_samples = 10
- self.metadata = {}
self.start_time = None
self.end_time = None
self.current_workload = None
@@ -55,6 +56,27 @@ class TestExecutor(object):
self._workload_executors = []
self._workload_thread = None
self._thread_gate = None
+ self._setup_metadata({})
+
+ def _setup_metadata(self, metadata={}):
+ try:
+ installer = os.environ['INSTALLER_TYPE']
+ except KeyError:
+ self.logger.error("Cannot determine installer")
+ installer = "Unknown_installer"
+
+ self.metadata = {}
+ self.metadata['project_name'] = 'storperf'
+ self.metadata['installer'] = installer
+ self.metadata['pod_name'] = 'Unknown'
+ self.metadata['version'] = 'Unknown'
+ self.metadata['scenario'] = 'Unknown'
+ self.metadata['build_tag'] = 'Unknown'
+ self.metadata['test_case'] = 'Unknown'
+ self.metadata['details'] = {}
+ self.metadata['details']['metrics'] = {}
+ self.metadata.update(metadata)
+ self.metadata['case_name'] = self.metadata['test_case']
@property
def slaves(self):
@@ -98,7 +120,14 @@ class TestExecutor(object):
metric,
callback_id)
- self.metrics_emitter.transmit_metrics(carbon_metrics)
+ self.metrics_emitter.transmit_metrics(carbon_metrics, callback_id)
+
+ commit_count = 10
+ while (commit_count > 0 and
+ not self.metrics_emitter.confirm_commit(callback_id)):
+ self.logger.info("Waiting 1 more second for commit")
+ sleep(1)
+ commit_count -= 1
if self._thread_gate.report(callback_id):
self.broadcast_event()
@@ -162,8 +191,7 @@ class TestExecutor(object):
def execute(self, metadata):
self.job_db.create_job_id()
self.job_db.record_workload_params(metadata)
- self.metadata = metadata
- self.metadata['metrics'] = {}
+ self._setup_metadata(metadata)
self._workload_thread = Thread(target=self.execute_workloads,
args=(),
name="Workload thread")
@@ -244,11 +272,11 @@ class TestExecutor(object):
if self._terminated:
return
self.current_workload = (
- "%s.%s.queue-depth.%s.block-size.%s" %
- (self.job_db.job_id,
- workload_name,
- iodepth,
- blocksize))
+ "%s.%s.queue-depth.%s.block-size.%s"
+ % (self.job_db.job_id,
+ workload_name,
+ iodepth,
+ blocksize))
self.logger.info("Starting run %s" % self.current_workload)
self.workload_status[self.current_workload] = "Running"
@@ -287,11 +315,11 @@ class TestExecutor(object):
if not scheduler.empty():
try:
scheduler.cancel(event)
- except:
+ except ValueError:
pass
- self.logger.info("Completed run %s" %
- self.current_workload)
+ self.logger.info("Completed run %s"
+ % self.current_workload)
self.workload_status[self.current_workload] = "Completed"
self._workload_executors = []
self.current_workload = None
@@ -304,15 +332,15 @@ class TestExecutor(object):
self.end_time = time.time()
self._terminated = True
- report = {'report': json.dumps(self.metadata)}
- self.job_db.record_workload_params(report)
self.broadcast_event()
self.unregister(data_handler.data_event)
+ report = {'report': json.dumps(self.metadata)}
+ self.job_db.record_workload_params(report)
self.job_db.job_id = None
def execute_on_node(self, workload):
- invoker = FIOInvoker()
+ invoker = FIOInvoker(self.metadata)
invoker.register(self.event)
workload.invoker = invoker
diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py
index 9d20383..471c295 100644
--- a/docker/storperf-master/storperf/utilities/data_handler.py
+++ b/docker/storperf-master/storperf/utilities/data_handler.py
@@ -9,15 +9,14 @@
import logging
import os
+import time
+
from storperf.db import test_results_db
from storperf.db.graphite_db import GraphiteDB
from storperf.db.job_db import JobDB
from storperf.utilities import data_treatment as DataTreatment
-from storperf.utilities import dictionary
from storperf.utilities import math as math
from storperf.utilities import steady_state as SteadyState
-from time import sleep
-import time
class DataHandler(object):
@@ -36,8 +35,6 @@ class DataHandler(object):
self._push_to_db(executor)
else:
workload = '.'.join(executor.current_workload.split('.')[1:6])
- if 'metrics' not in executor.metadata:
- executor.metadata['metrics'] = {}
steady_state = True
metrics = {}
@@ -67,19 +64,21 @@ class DataHandler(object):
metrics[metric][io_type]['average'] = average
metrics_key = '%s.%s.%s' % (workload, io_type, metric)
- executor.metadata['metrics'][metrics_key] = average
+ executor.metadata['details']['metrics'][metrics_key] = \
+ average
if not steady:
steady_state = False
- if 'report_data' not in executor.metadata:
- executor.metadata['report_data'] = {}
+ if 'report_data' not in executor.metadata['details']:
+ executor.metadata['details']['report_data'] = {}
- if 'steady_state' not in executor.metadata:
- executor.metadata['steady_state'] = {}
+ if 'steady_state' not in executor.metadata['details']:
+ executor.metadata['details']['steady_state'] = {}
- executor.metadata['report_data'][workload] = metrics
- executor.metadata['steady_state'][workload] = steady_state
+ executor.metadata['details']['report_data'][workload] = metrics
+ executor.metadata['details']['steady_state'][workload] = \
+ steady_state
workload_name = executor.current_workload.split('.')[1]
@@ -93,31 +92,12 @@ class DataHandler(object):
# A bit of a hack here as Carbon might not be finished storing the
# data we just sent to it
now = int(time.time())
- backtime = 60 * (executor.steady_state_samples + 2)
+ backtime = 60 * (executor.steady_state_samples + 1)
data_series = graphite_db.fetch_series(workload,
metric,
io_type,
now,
backtime)
- most_recent_time = now
- if len(data_series) > 0:
- most_recent_time = data_series[-1][0]
-
- delta = now - most_recent_time
- self.logger.debug("Last update to graphite was %s ago" % delta)
-
- while (delta < 5 or (delta > 60 and delta < 120)):
- sleep(5)
- data_series = graphite_db.fetch_series(workload,
- metric,
- io_type,
- now,
- backtime)
- if len(data_series) > 0:
- most_recent_time = data_series[-1][0]
- delta = time.time() - most_recent_time
- self.logger.debug("Last update to graphite was %s ago" % delta)
-
return data_series
def _convert_timestamps_to_samples(self, executor, series):
@@ -147,59 +127,36 @@ class DataHandler(object):
return SteadyState.steady_state(data_series)
def _push_to_db(self, executor):
- pod_name = dictionary.get_key_from_dict(executor.metadata,
- 'pod_name',
- 'Unknown')
- version = dictionary.get_key_from_dict(executor.metadata,
- 'version',
- 'Unknown')
- scenario = dictionary.get_key_from_dict(executor.metadata,
- 'scenario_name',
- 'Unknown')
- build_tag = dictionary.get_key_from_dict(executor.metadata,
- 'build_tag',
- 'Unknown')
- test_case = dictionary.get_key_from_dict(executor.metadata,
- 'test_case',
- 'Unknown')
- duration = executor.end_time - executor.start_time
-
- payload = executor.metadata
+ executor.metadata['duration'] = executor.end_time - executor.start_time
steady_state = True
- for _, value in executor.metadata['steady_state'].items():
+ for _, value in executor.metadata['details']['steady_state'].items():
steady_state = steady_state and value
- payload['timestart'] = executor.start_time
- payload['duration'] = duration
+ executor.metadata['timestart'] = executor.start_time
if steady_state:
criteria = 'PASS'
else:
criteria = 'FAIL'
+ executor.metadata['criteria'] = criteria
- start_time = time.strftime('%Y-%m-%d %H:%M:%S',
- time.gmtime(executor.start_time))
+ executor.metadata['start_time'] = \
+ time.strftime('%Y-%m-%d %H:%M:%S',
+ time.gmtime(executor.start_time))
- end_time = time.strftime('%Y-%m-%d %H:%M:%S',
- time.gmtime(executor.end_time))
+ executor.metadata['end_time'] = \
+ time.strftime('%Y-%m-%d %H:%M:%S',
+ time.gmtime(executor.end_time))
test_db = os.environ.get('TEST_DB_URL')
if test_db is not None:
self.logger.info("Pushing results to %s" % (test_db))
try:
- response = test_results_db.push_results_to_db(test_db,
- "storperf",
- test_case,
- start_time,
- end_time,
- self.logger,
- pod_name,
- version,
- scenario,
- criteria,
- build_tag,
- payload)
+ response = test_results_db.push_results_to_db(
+ test_db,
+ executor.metadata,
+ self.logger)
executor.result_url = response['href']
- except:
+ except Exception:
self.logger.exception("Error pushing results into Database")
diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py
index 936c839..c045278 100644
--- a/docker/storperf-master/storperf/workloads/_base_workload.py
+++ b/docker/storperf-master/storperf/workloads/_base_workload.py
@@ -74,9 +74,9 @@ class _base_workload(object):
@property
def fullname(self):
- return ("%s.%s.queue-depth.%s.block-size.%s.%s" %
- (str(self.id),
- self.__class__.__name__,
- str(self.options['iodepth']),
- str(self.options['bs']),
- str(self.remote_host).replace(".", "-")))
+ return ("%s.%s.queue-depth.%s.block-size.%s.%s"
+ % (str(self.id),
+ self.__class__.__name__,
+ str(self.options['iodepth']),
+ str(self.options['bs']),
+ str(self.remote_host).replace(".", "-")))
diff --git a/docker/storperf-master/supervisord.conf b/docker/storperf-master/supervisord.conf
deleted file mode 100644
index 558f6bf..0000000
--- a/docker/storperf-master/supervisord.conf
+++ /dev/null
@@ -1,33 +0,0 @@
-[supervisord]
-nodaemon = true
-environment = GRAPHITE_STORAGE_DIR='/var/lib/graphite/storage',GRAPHITE_CONF_DIR='/var/lib/graphite/conf'
-
-[program:nginx]
-command = /usr/sbin/nginx
-stdout_logfile = /var/log/supervisor/%(program_name)s.log
-stderr_logfile = /var/log/supervisor/%(program_name)s.log
-autorestart = true
-
-[program:carbon-cache]
-user = www-data
-command = /var/lib/graphite/bin/carbon-cache.py --debug start
-stdout_logfile = /var/log/supervisor/%(program_name)s.log
-stderr_logfile = /var/log/supervisor/%(program_name)s.log
-autorestart = true
-
-[program:graphite-webapp]
-user = www-data
-directory = /var/lib/graphite/webapp
-environment = PYTHONPATH='/var/lib/graphite/webapp'
-command = /usr/bin/gunicorn_django -b127.0.0.1:8080 -w2 graphite/settings.py
-stdout_logfile = /var/log/supervisor/%(program_name)s.log
-stderr_logfile = /var/log/supervisor/%(program_name)s.log
-autorestart = true
-
-[program:storperf-webapp]
-user = root
-directory = /storperf/
-command = /usr/bin/python rest_server.py
-stdout_logfile = /var/log/supervisor/%(program_name)s.log
-stderr_logfile = /var/log/supervisor/%(program_name)s.log
-autorestart = true
diff --git a/docker/storperf-master/tests/carbon_tests/emitter_test.py b/docker/storperf-master/tests/carbon_tests/emitter_test.py
index 7f61049..f5a78d1 100644
--- a/docker/storperf-master/tests/carbon_tests/emitter_test.py
+++ b/docker/storperf-master/tests/carbon_tests/emitter_test.py
@@ -7,27 +7,39 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import SocketServer
import json
-from storperf.carbon import converter
-from storperf.carbon.emitter import CarbonMetricTransmitter
-import threading
-from time import sleep, strptime
+from time import strptime
import unittest
import mock
+from storperf.carbon import converter
+from storperf.carbon.emitter import CarbonMetricTransmitter
+
+
+addresses = []
+data = []
+connect_exception = []
+send_exception = []
+
+
+class MockSocket(object):
-class MetricsHandler(SocketServer.BaseRequestHandler):
+ def __init__(self, *args):
+ pass
- def handle(self):
- # Echo the back to the client
- CarbonMetricTransmitterTest.response = self.request.recv(1024)
- return
+ def connect(self, address):
+ if len(connect_exception) != 0:
+ raise connect_exception[0]
+ addresses.append(address)
+ def send(self, datum):
+ if len(send_exception) != 0:
+ raise send_exception[0]
+ data.append(datum)
-class MetricsServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
- pass
+ def close(self):
+ pass
class CarbonMetricTransmitterTest(unittest.TestCase):
@@ -35,17 +47,37 @@ class CarbonMetricTransmitterTest(unittest.TestCase):
response = None
def setUp(self):
+ del addresses[:]
+ del data[:]
+ del connect_exception[:]
+ del send_exception[:]
+
+ @mock.patch("socket.socket")
+ @mock.patch("time.gmtime")
+ def test_transmit_metrics(self, mock_time, mock_socket):
+
+ mock_socket.side_effect = MockSocket
+
+ mock_time.return_value = strptime("30 Nov 00", "%d %b %y")
+
+ testconv = converter.Converter()
+ json_object = json.loads(
+ """{"timestamp" : "975542400", "key":123.0 }""")
+ result = testconv.convert_json_to_flat(json_object, "host.run-name")
- address = ('localhost', 0)
- server = MetricsServer(address, MetricsHandler)
- ip, self.listen_port = server.server_address
+ emitter = CarbonMetricTransmitter()
+ emitter.carbon_port = self.listen_port
+ emitter.transmit_metrics(result, None)
- t = threading.Thread(target=server.serve_forever)
- t.setDaemon(True)
- t.start()
+ self.assertEqual("host.run-name.key 123.0 975542400\n",
+ data[1],
+ data[1])
+ @mock.patch("socket.socket")
@mock.patch("time.gmtime")
- def test_transmit_metrics(self, mock_time):
+ def test_skip_non_numeric_metrics(self, mock_time, mock_socket):
+
+ mock_socket.side_effect = MockSocket
mock_time.return_value = strptime("30 Nov 00", "%d %b %y")
@@ -56,17 +88,126 @@ class CarbonMetricTransmitterTest(unittest.TestCase):
emitter = CarbonMetricTransmitter()
emitter.carbon_port = self.listen_port
- emitter.transmit_metrics(result)
+ emitter.transmit_metrics(result, None)
+
+ self.assertEqual("None.commit-marker 975542400 975542400\n",
+ data[1],
+ data[1])
+
+ @mock.patch("socket.socket")
+ def test_connect_fails(self, mock_socket):
+
+ mock_socket.side_effect = MockSocket
+ connect_exception.append(Exception("Mock connection error"))
+
+ testconv = converter.Converter()
+ json_object = json.loads(
+ """{"timestamp" : "975542400", "key":"value" }""")
+ result = testconv.convert_json_to_flat(json_object, "host.run-name")
+
+ emitter = CarbonMetricTransmitter()
+ emitter.carbon_port = self.listen_port
+ emitter.transmit_metrics(result, None)
+
+ self.assertEqual(0,
+ len(data),
+ len(data))
- count = 0
+ @mock.patch("socket.socket")
+ def test_send_fails(self, mock_socket):
- while (CarbonMetricTransmitterTest.response is None and count < 10):
- count += 1
- sleep(0.1)
+ mock_socket.side_effect = MockSocket
+ send_exception.append(Exception("Mock send error"))
+
+ testconv = converter.Converter()
+ json_object = json.loads(
+ """{"timestamp" : "975542400", "key":"value" }""")
+ result = testconv.convert_json_to_flat(json_object, "host.run-name")
+
+ emitter = CarbonMetricTransmitter()
+ emitter.carbon_port = self.listen_port
+ emitter.transmit_metrics(result, None)
+
+ self.assertEqual(0,
+ len(data),
+ len(data))
+
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item")
+ def test_confirm_commit(self, mock_graphite_db):
+ graphite_return = json.loads("""[
+ {"target":
+ "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker",
+ "datapoints": [[1503078366.0, 1503078370]]}]
+ """)
+ mock_graphite_db.return_value = graphite_return
+
+ commit_marker = "commit-marker"
+
+ emitter = CarbonMetricTransmitter()
+ emitter.commit_markers[commit_marker] = 1503078366
+
+ committed = emitter.confirm_commit(commit_marker)
+ self.assertTrue(committed)
+
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item")
+ def test_confirm_multiple_commits(self, mock_graphite_db):
+ graphite_return = json.loads("""[
+ {"target":
+ "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker",
+ "datapoints": [
+ [1503078300.0, 1503078350],
+ [1503078366.0, 1503078360]]}]
+ """)
+ mock_graphite_db.return_value = graphite_return
+
+ commit_marker = "commit-marker"
+
+ emitter = CarbonMetricTransmitter()
+ emitter.commit_markers[commit_marker] = 1503078366
+
+ committed = emitter.confirm_commit(commit_marker)
+ self.assertTrue(committed)
+
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item")
+ def test_empty_commit(self, mock_graphite_db):
+ graphite_return = json.loads("[]")
+ mock_graphite_db.return_value = graphite_return
+
+ commit_marker = "commit-marker"
+
+ emitter = CarbonMetricTransmitter()
+ emitter.commit_markers[commit_marker] = 1503078366
+
+ committed = emitter.confirm_commit(commit_marker)
+ self.assertFalse(committed)
+
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_item")
+ def test_badtimestamp_commit(self, mock_graphite_db):
+ graphite_return = json.loads("""[
+ {"target":
+ "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker",
+ "datapoints": [[1234, 1503078370]]}]
+ """)
+ mock_graphite_db.return_value = graphite_return
+
+ commit_marker = "commit-marker"
+
+ emitter = CarbonMetricTransmitter()
+ emitter.commit_markers[commit_marker] = 1503078366
+
+ committed = emitter.confirm_commit(commit_marker)
+ self.assertFalse(committed)
+
+ def test_timestamp_parse(self):
+ emitter = CarbonMetricTransmitter()
+ result = json.loads("""[
+ {"target":
+ "rw.queue-depth.2.block-size.2048.10-10-243-154.commit-marker",
+ "datapoints": [[1503078366.0, 1503078370]]}]
+ """)
+ timestamps = emitter.parse_timestamp(result)
+ self.assertEqual(1503078366, timestamps[0], timestamps[0])
- self.assertEqual("host.run-name.key value 975542400\n",
- CarbonMetricTransmitterTest.response,
- CarbonMetricTransmitterTest.response)
if __name__ == '__main__':
unittest.main()
diff --git a/docker/storperf-master/tests/utilities_tests/data_handler_test.py b/docker/storperf-master/tests/utilities_tests/data_handler_test.py
index f514ae7..f028bd0 100644
--- a/docker/storperf-master/tests/utilities_tests/data_handler_test.py
+++ b/docker/storperf-master/tests/utilities_tests/data_handler_test.py
@@ -8,11 +8,12 @@
##############################################################################
import os
-from storperf.utilities.data_handler import DataHandler
import unittest
import mock
+from storperf.utilities.data_handler import DataHandler
+
class MockGraphiteDB(object):
@@ -34,6 +35,8 @@ class DataHandlerTest(unittest.TestCase):
self.steady_state_samples = 10
self.end_time = 1
self.metadata = {}
+ self.metadata['details'] = {}
+ self.metadata['details']['metrics'] = {}
self.block_sizes = "1"
self.queue_depths = "1"
mock.job_id = "1"
@@ -76,11 +79,11 @@ class DataHandlerTest(unittest.TestCase):
mock_graphite_db.return_value = expected
mock_time.return_value = expected[-1][0] + 10
- self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
- ("job_id",
- "rw",
- 8,
- 8192))
+ self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s"
+ % ("job_id",
+ "rw",
+ 8,
+ 8192))
actual = self.data_handler._lookup_prior_data(self, 'read', 'iops')
self.assertEqual(expected, actual)
@@ -133,7 +136,7 @@ class DataHandlerTest(unittest.TestCase):
self._terminated = True
mock_results_db.side_effect = self.push_results_to_db
mock_graphite_db.side_effect = MockGraphiteDB
- self.metadata = {
+ self.metadata['details'] = {
"steady_state": {
"rr.queue-depth.8.block-size.16384": True,
"rr.queue-depth.8.block-size.2048": False,
@@ -148,8 +151,9 @@ class DataHandlerTest(unittest.TestCase):
@mock.patch("time.time")
@mock.patch("storperf.db.test_results_db.push_results_to_db")
@mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
- @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads")
- def test_non_terminated_report(self, mock_job_db, mock_graphite_db,
+ @mock.patch("storperf.db.job_db.JobDB.fetch_workloads")
+ def test_non_terminated_report(self, mock_job_db,
+ mock_graphite_db,
mock_results_db, mock_time):
self._terminated = False
mock_results_db.side_effect = self.push_results_to_db
@@ -169,29 +173,31 @@ class DataHandlerTest(unittest.TestCase):
expected_range = 17.78
expected_average = 212.49777777777774
- self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
- ("job_id",
- "rw",
- 8,
- 8192))
-
+ self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s"
+ % ("job_id",
+ "rw",
+ 8,
+ 8192))
mock_job_db.return_value = [[self.current_workload, 4804559000, None]]
self.data_handler.data_event(self)
self.assertEqual(False, self.pushed)
self.assertEqual(False, self._terminated)
- self.assertEqual(expected_slope, self.metadata['report_data']
+ self.assertEqual(expected_slope, self.metadata['details']
+ ['report_data']
['rw.queue-depth.8.block-size.8192']
['lat_ns.mean']
['read']
['slope'])
- self.assertEqual(expected_range, self.metadata['report_data']
+ self.assertEqual(expected_range, self.metadata['details']
+ ['report_data']
['rw.queue-depth.8.block-size.8192']
['lat_ns.mean']
['read']
['range'])
- self.assertEqual(expected_average, self.metadata['report_data']
+ self.assertEqual(expected_average, self.metadata['details']
+ ['report_data']
['rw.queue-depth.8.block-size.8192']
['lat_ns.mean']
['read']
@@ -201,7 +207,7 @@ class DataHandlerTest(unittest.TestCase):
@mock.patch("time.time")
@mock.patch("storperf.db.test_results_db.push_results_to_db")
@mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
- @mock.patch("storperf.db.graphite_db.JobDB.fetch_workloads")
+ @mock.patch("storperf.db.job_db.JobDB.fetch_workloads")
def test_report_that_causes_termination(self,
mock_job_db,
mock_graphite_db,
@@ -236,32 +242,35 @@ class DataHandlerTest(unittest.TestCase):
expected_range = 17.78
expected_average = 209.2135
- self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
- ("job_id",
- "rw",
- 8,
- 8192))
-
+ self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s"
+ % ("job_id",
+ "rw",
+ 8,
+ 8192))
mock_job_db.return_value = [[self.current_workload, 4804559000, None]]
self.data_handler.data_event(self)
- self.assertEqual(expected_slope, self.metadata['report_data']
+ self.assertEqual(expected_slope, self.metadata['details']
+ ['report_data']
['rw.queue-depth.8.block-size.8192']
['lat_ns.mean']
['read']
['slope'])
- self.assertEqual(expected_range, self.metadata['report_data']
+ self.assertEqual(expected_range, self.metadata['details']
+ ['report_data']
['rw.queue-depth.8.block-size.8192']
['lat_ns.mean']
['read']
['range'])
- self.assertEqual(expected_average, self.metadata['report_data']
+ self.assertEqual(expected_average, self.metadata['details']
+ ['report_data']
['rw.queue-depth.8.block-size.8192']
['lat_ns.mean']
['read']
['average'])
- self.assertEqual(report_data, self.metadata['report_data']
+ self.assertEqual(report_data, self.metadata['details']
+ ['report_data']
['rw.queue-depth.8.block-size.8192']
['lat_ns.mean']
['read']
@@ -277,7 +286,7 @@ class DataHandlerTest(unittest.TestCase):
mock_results_db.side_effect = self.push_results_to_db
self.start_time = 1504559100
self.end_time = 1504560000
- self.metadata = {
+ self.metadata['details'] = {
"scenario_name": "ceph_ws,wr,rs,rr,rw",
"status": "OK",
"steady_state": {
@@ -289,9 +298,11 @@ class DataHandlerTest(unittest.TestCase):
"volume_size": 10
}
self.data_handler._push_to_db(self)
- self.assertEqual('FAIL', self.db_results[9],
+ self.assertEqual('FAIL', self.db_results[1]['criteria'],
'Expected FAIL in criteria')
- self.assertEqual('2017-09-04 21:05:00', self.db_results[3],
+ self.assertEqual('2017-09-04 21:05:00',
+ self.db_results[1]['start_time'],
'Start time')
- self.assertEqual('2017-09-04 21:20:00', self.db_results[4],
+ self.assertEqual('2017-09-04 21:20:00',
+ self.db_results[1]['end_time'],
'End time')