summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf
diff options
context:
space:
mode:
authorShrenik <shrenik.jain@research.iiit.ac.in>2017-08-16 17:04:00 +0530
committermbeierl <mark.beierl@dell.com>2017-08-23 12:52:56 -0400
commitfc7ec1c0c73d2ecc52035634c8dd0ae6647273b1 (patch)
tree7fbeb19a723dded2a597aab78fb33d3ee6c34f15 /docker/storperf-master/storperf
parent596470aa54a78537434343e6cd310b77128d2ad6 (diff)
Graphite Standalone container
A new Graphite container is used Metrics are sent to both Graphite instances However, it seems that some metrics might be missing This is however a direct plugin. There are differences in carbon.conf, storage-schemas.conf and other files as well. It is suggested to write own Dockerfile instead of using the image available. We anyway have to do it with respect ARM Support. Change-Id: Id34c728f598150caac23ac167c3cce5eaf183a6c JIRA: STORPERF-142 Signed-off-by: Shrenik <shrenik.jain@research.iiit.ac.in> Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker/storperf-master/storperf')
-rw-r--r--docker/storperf-master/storperf/carbon/emitter.py75
-rw-r--r--docker/storperf-master/storperf/db/graphite_db.py31
-rw-r--r--docker/storperf-master/storperf/storperf_master.py11
-rw-r--r--docker/storperf-master/storperf/test_executor.py31
-rw-r--r--docker/storperf-master/storperf/utilities/data_handler.py30
-rw-r--r--docker/storperf-master/storperf/workloads/_base_workload.py12
6 files changed, 120 insertions, 70 deletions
diff --git a/docker/storperf-master/storperf/carbon/emitter.py b/docker/storperf-master/storperf/carbon/emitter.py
index e23dc79..05f6c3c 100644
--- a/docker/storperf-master/storperf/carbon/emitter.py
+++ b/docker/storperf-master/storperf/carbon/emitter.py
@@ -11,28 +11,75 @@ import logging
import socket
import time
+from storperf.db.graphite_db import GraphiteDB
+
class CarbonMetricTransmitter():
- carbon_host = '127.0.0.1'
- carbon_port = 2003
+ carbon_servers = [('127.0.0.1', 2003),
+ ('storperf-graphite', 2003)]
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.graphite_db = GraphiteDB()
+ self.commit_markers = {}
- def transmit_metrics(self, metrics):
- if 'timestamp' in metrics:
- metrics.pop('timestamp')
+ def transmit_metrics(self, metrics, commit_marker):
timestamp = str(calendar.timegm(time.gmtime()))
+ self.commit_markers[commit_marker] = int(timestamp)
+
+ carbon_socket = None
+
+ for host, port in self.carbon_servers:
+ try:
+ carbon_socket = socket.socket(socket.AF_INET,
+ socket.SOCK_STREAM)
+ carbon_socket.connect((host, port))
+
+ for key, value in metrics.items():
+ try:
+ float(value)
+ message = "%s %s %s\n" \
+ % (key, value, timestamp)
+ self.logger.debug("Metric: " + message.strip())
+ carbon_socket.send(message)
+ except ValueError:
+ self.logger.debug("Ignoring non numeric metric %s %s"
+ % (key, value))
+
+ message = "%s.commit-marker %s %s\n" \
+ % (commit_marker, timestamp, timestamp)
+ carbon_socket.send(message)
+ self.logger.debug("Marker %s" % message.strip())
+ self.logger.info("Sent metrics to %s:%s with timestamp %s"
+ % (host, port, timestamp))
+
+ except Exception, e:
+ self.logger.error("While notifying carbon %s:%s %s"
+ % (host, port, e))
+
+ if carbon_socket is not None:
+ carbon_socket.close()
+
+ def confirm_commit(self, commit_marker):
+ marker_timestamp = self.commit_markers[commit_marker]
+ request = "%s.commit-marker&from=%s" \
+ % (commit_marker, marker_timestamp - 60)
+ marker_data = self.graphite_db.fetch_item(request)
+ self.logger.debug("Marker data %s" % marker_data)
+ fetched_timestamps = self.parse_timestamp(marker_data)
- carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- carbon_socket.connect((self.carbon_host, self.carbon_port))
+ return marker_timestamp in fetched_timestamps
- for key, metric in metrics.items():
- message = key + " " + metric + " " + timestamp
- self.logger.debug("Metric: " + message)
- carbon_socket.send(message + '\n')
+ def parse_timestamp(self, marker_data):
+ timestamps = []
+ if (type(marker_data) is list and
+ len(marker_data) > 0):
+ datapoints = marker_data[0]['datapoints']
+ for datapoint in datapoints:
+ try:
+ timestamps.append(int(datapoint[0]))
+ except Exception:
+ pass
- carbon_socket.close()
- self.logger.info("Sent metrics to carbon with timestamp %s"
- % timestamp)
+ return timestamps
diff --git a/docker/storperf-master/storperf/db/graphite_db.py b/docker/storperf-master/storperf/db/graphite_db.py
index c8a2d35..aa71855 100644
--- a/docker/storperf-master/storperf/db/graphite_db.py
+++ b/docker/storperf-master/storperf/db/graphite_db.py
@@ -9,33 +9,44 @@
import json
import logging
-
import requests
-from storperf.db.job_db import JobDB
-
class GraphiteDB(object):
+ graphite_host = "storperf-graphite"
+ graphite_port = 8080
+
def __init__(self):
- """
- """
- self._job_db = JobDB()
self.logger = logging.getLogger(__name__)
+ def fetch_item(self, target):
+
+ result = None
+ request = ("http://%s:%s/graphite/render/?format=json&target=%s"
+ % (self.graphite_host, self.graphite_port, target))
+ self.logger.debug("Calling %s" % (request))
+
+ response = requests.get(request)
+ if (response.status_code == 200):
+ result = json.loads(response.content)
+
+ return result
+
def fetch_series(self, workload, metric, io_type, time, duration):
series = []
end = time
start = end - duration
- request = ("http://127.0.0.1:8000/render/?target="
+ request = ("http://%s:%s/graphite/render/?target="
"averageSeries(%s.*.jobs.1.%s.%s)"
"&format=json"
"&from=%s"
- "&until=%s" %
- (workload, io_type, metric,
- start, end))
+ "&until=%s"
+ % (self.graphite_host, self.graphite_port,
+ workload, io_type, metric,
+ start, end))
self.logger.debug("Calling %s" % (request))
response = requests.get(request)
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
index 7f2c395..3b0af78 100644
--- a/docker/storperf-master/storperf/storperf_master.py
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -11,19 +11,20 @@ from datetime import datetime
import logging
import os
import socket
-from storperf.db.configuration_db import ConfigurationDB
-from storperf.db.job_db import JobDB
-from storperf.test_executor import TestExecutor
from threading import Thread
from time import sleep
from cinderclient import client as cinderclient
-import heatclient.client as heatclient
from keystoneauth1 import loading
from keystoneauth1 import session
import paramiko
from scp import SCPClient
+import heatclient.client as heatclient
+from storperf.db.configuration_db import ConfigurationDB
+from storperf.db.job_db import JobDB
+from storperf.test_executor import TestExecutor
+
class ParameterError(Exception):
""" """
@@ -257,7 +258,7 @@ class StorPerfMaster(object):
str(self._test_executor.workload_modules))
def get_logs(self, lines=None):
- LOG_DIR = '/var/log/supervisor/storperf-webapp.log'
+ LOG_DIR = './storperf.log'
if isinstance(lines, int):
logs = []
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index b2d5914..dc178d8 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -15,14 +15,16 @@ from os import listdir
import os
from os.path import isfile, join
import sched
+from threading import Thread
+from time import sleep
+import time
+
from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from storperf.utilities.data_handler import DataHandler
from storperf.utilities.thread_gate import ThreadGate
-from threading import Thread
-import time
class UnknownWorkload(Exception):
@@ -98,7 +100,14 @@ class TestExecutor(object):
metric,
callback_id)
- self.metrics_emitter.transmit_metrics(carbon_metrics)
+ self.metrics_emitter.transmit_metrics(carbon_metrics, callback_id)
+
+ commit_count = 10
+ while (commit_count > 0 and
+ not self.metrics_emitter.confirm_commit(callback_id)):
+ self.logger.info("Waiting 1 more second for commit")
+ sleep(1)
+ commit_count -= 1
if self._thread_gate.report(callback_id):
self.broadcast_event()
@@ -244,11 +253,11 @@ class TestExecutor(object):
if self._terminated:
return
self.current_workload = (
- "%s.%s.queue-depth.%s.block-size.%s" %
- (self.job_db.job_id,
- workload_name,
- iodepth,
- blocksize))
+ "%s.%s.queue-depth.%s.block-size.%s"
+ % (self.job_db.job_id,
+ workload_name,
+ iodepth,
+ blocksize))
self.logger.info("Starting run %s" % self.current_workload)
self.workload_status[self.current_workload] = "Running"
@@ -287,11 +296,11 @@ class TestExecutor(object):
if not scheduler.empty():
try:
scheduler.cancel(event)
- except:
+ except ValueError:
pass
- self.logger.info("Completed run %s" %
- self.current_workload)
+ self.logger.info("Completed run %s"
+ % self.current_workload)
self.workload_status[self.current_workload] = "Completed"
self._workload_executors = []
self.current_workload = None
diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py
index 9d20383..44b1f6b 100644
--- a/docker/storperf-master/storperf/utilities/data_handler.py
+++ b/docker/storperf-master/storperf/utilities/data_handler.py
@@ -9,6 +9,8 @@
import logging
import os
+import time
+
from storperf.db import test_results_db
from storperf.db.graphite_db import GraphiteDB
from storperf.db.job_db import JobDB
@@ -16,8 +18,6 @@ from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import dictionary
from storperf.utilities import math as math
from storperf.utilities import steady_state as SteadyState
-from time import sleep
-import time
class DataHandler(object):
@@ -93,31 +93,12 @@ class DataHandler(object):
# A bit of a hack here as Carbon might not be finished storing the
# data we just sent to it
now = int(time.time())
- backtime = 60 * (executor.steady_state_samples + 2)
+ backtime = 60 * (executor.steady_state_samples + 1)
data_series = graphite_db.fetch_series(workload,
metric,
io_type,
now,
backtime)
- most_recent_time = now
- if len(data_series) > 0:
- most_recent_time = data_series[-1][0]
-
- delta = now - most_recent_time
- self.logger.debug("Last update to graphite was %s ago" % delta)
-
- while (delta < 5 or (delta > 60 and delta < 120)):
- sleep(5)
- data_series = graphite_db.fetch_series(workload,
- metric,
- io_type,
- now,
- backtime)
- if len(data_series) > 0:
- most_recent_time = data_series[-1][0]
- delta = time.time() - most_recent_time
- self.logger.debug("Last update to graphite was %s ago" % delta)
-
return data_series
def _convert_timestamps_to_samples(self, executor, series):
@@ -201,5 +182,6 @@ class DataHandler(object):
build_tag,
payload)
executor.result_url = response['href']
- except:
- self.logger.exception("Error pushing results into Database")
+ except Exception as e:
+ self.logger.exception("Error pushing results into Database",
+ e)
diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py
index 936c839..c045278 100644
--- a/docker/storperf-master/storperf/workloads/_base_workload.py
+++ b/docker/storperf-master/storperf/workloads/_base_workload.py
@@ -74,9 +74,9 @@ class _base_workload(object):
@property
def fullname(self):
- return ("%s.%s.queue-depth.%s.block-size.%s.%s" %
- (str(self.id),
- self.__class__.__name__,
- str(self.options['iodepth']),
- str(self.options['bs']),
- str(self.remote_host).replace(".", "-")))
+ return ("%s.%s.queue-depth.%s.block-size.%s.%s"
+ % (str(self.id),
+ self.__class__.__name__,
+ str(self.options['iodepth']),
+ str(self.options['bs']),
+ str(self.remote_host).replace(".", "-")))