From fc7ec1c0c73d2ecc52035634c8dd0ae6647273b1 Mon Sep 17 00:00:00 2001 From: Shrenik Date: Wed, 16 Aug 2017 17:04:00 +0530 Subject: Graphite Standalone container A new Graphite container is used Metrics are sent to both Graphite instances However, it seems that some metrics might be missing This is however a direct plugin. There are differences in carbon.conf, storage-schemas.conf and other files as well. It is suggested to write own Dockerfile instead of using the image available. We anyway have to do it with respect ARM Support. Change-Id: Id34c728f598150caac23ac167c3cce5eaf183a6c JIRA: STORPERF-142 Signed-off-by: Shrenik Signed-off-by: mbeierl --- docker/storperf-master/storperf/carbon/emitter.py | 75 ++++++++++++++++++---- docker/storperf-master/storperf/db/graphite_db.py | 31 ++++++--- docker/storperf-master/storperf/storperf_master.py | 11 ++-- docker/storperf-master/storperf/test_executor.py | 31 +++++---- .../storperf/utilities/data_handler.py | 30 ++------- .../storperf/workloads/_base_workload.py | 12 ++-- 6 files changed, 120 insertions(+), 70 deletions(-) (limited to 'docker/storperf-master/storperf') diff --git a/docker/storperf-master/storperf/carbon/emitter.py b/docker/storperf-master/storperf/carbon/emitter.py index e23dc79..05f6c3c 100644 --- a/docker/storperf-master/storperf/carbon/emitter.py +++ b/docker/storperf-master/storperf/carbon/emitter.py @@ -11,28 +11,75 @@ import logging import socket import time +from storperf.db.graphite_db import GraphiteDB + class CarbonMetricTransmitter(): - carbon_host = '127.0.0.1' - carbon_port = 2003 + carbon_servers = [('127.0.0.1', 2003), + ('storperf-graphite', 2003)] def __init__(self): self.logger = logging.getLogger(__name__) + self.graphite_db = GraphiteDB() + self.commit_markers = {} - def transmit_metrics(self, metrics): - if 'timestamp' in metrics: - metrics.pop('timestamp') + def transmit_metrics(self, metrics, commit_marker): timestamp = str(calendar.timegm(time.gmtime())) + self.commit_markers[commit_marker] = int(timestamp) + + carbon_socket = None + + for host, port in self.carbon_servers: + try: + carbon_socket = socket.socket(socket.AF_INET, + socket.SOCK_STREAM) + carbon_socket.connect((host, port)) + + for key, value in metrics.items(): + try: + float(value) + message = "%s %s %s\n" \ + % (key, value, timestamp) + self.logger.debug("Metric: " + message.strip()) + carbon_socket.send(message) + except ValueError: + self.logger.debug("Ignoring non numeric metric %s %s" + % (key, value)) + + message = "%s.commit-marker %s %s\n" \ + % (commit_marker, timestamp, timestamp) + carbon_socket.send(message) + self.logger.debug("Marker %s" % message.strip()) + self.logger.info("Sent metrics to %s:%s with timestamp %s" + % (host, port, timestamp)) + + except Exception, e: + self.logger.error("While notifying carbon %s:%s %s" + % (host, port, e)) + + if carbon_socket is not None: + carbon_socket.close() + + def confirm_commit(self, commit_marker): + marker_timestamp = self.commit_markers[commit_marker] + request = "%s.commit-marker&from=%s" \ + % (commit_marker, marker_timestamp - 60) + marker_data = self.graphite_db.fetch_item(request) + self.logger.debug("Marker data %s" % marker_data) + fetched_timestamps = self.parse_timestamp(marker_data) - carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - carbon_socket.connect((self.carbon_host, self.carbon_port)) + return marker_timestamp in fetched_timestamps - for key, metric in metrics.items(): - message = key + " " + metric + " " + timestamp - self.logger.debug("Metric: " + message) - carbon_socket.send(message + '\n') + def parse_timestamp(self, marker_data): + timestamps = [] + if (type(marker_data) is list and + len(marker_data) > 0): + datapoints = marker_data[0]['datapoints'] + for datapoint in datapoints: + try: + timestamps.append(int(datapoint[0])) + except Exception: + pass - carbon_socket.close() - self.logger.info("Sent metrics to carbon with timestamp %s" - % timestamp) + return timestamps diff --git a/docker/storperf-master/storperf/db/graphite_db.py b/docker/storperf-master/storperf/db/graphite_db.py index c8a2d35..aa71855 100644 --- a/docker/storperf-master/storperf/db/graphite_db.py +++ b/docker/storperf-master/storperf/db/graphite_db.py @@ -9,33 +9,44 @@ import json import logging - import requests -from storperf.db.job_db import JobDB - class GraphiteDB(object): + graphite_host = "storperf-graphite" + graphite_port = 8080 + def __init__(self): - """ - """ - self._job_db = JobDB() self.logger = logging.getLogger(__name__) + def fetch_item(self, target): + + result = None + request = ("http://%s:%s/graphite/render/?format=json&target=%s" + % (self.graphite_host, self.graphite_port, target)) + self.logger.debug("Calling %s" % (request)) + + response = requests.get(request) + if (response.status_code == 200): + result = json.loads(response.content) + + return result + def fetch_series(self, workload, metric, io_type, time, duration): series = [] end = time start = end - duration - request = ("http://127.0.0.1:8000/render/?target=" + request = ("http://%s:%s/graphite/render/?target=" "averageSeries(%s.*.jobs.1.%s.%s)" "&format=json" "&from=%s" - "&until=%s" % - (workload, io_type, metric, - start, end)) + "&until=%s" + % (self.graphite_host, self.graphite_port, + workload, io_type, metric, + start, end)) self.logger.debug("Calling %s" % (request)) response = requests.get(request) diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 7f2c395..3b0af78 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -11,19 +11,20 @@ from datetime import datetime import logging import os import socket -from storperf.db.configuration_db import ConfigurationDB -from storperf.db.job_db import JobDB -from storperf.test_executor import TestExecutor from threading import Thread from time import sleep from cinderclient import client as cinderclient -import heatclient.client as heatclient from keystoneauth1 import loading from keystoneauth1 import session import paramiko from scp import SCPClient +import heatclient.client as heatclient +from storperf.db.configuration_db import ConfigurationDB +from storperf.db.job_db import JobDB +from storperf.test_executor import TestExecutor + class ParameterError(Exception): """ """ @@ -257,7 +258,7 @@ class StorPerfMaster(object): str(self._test_executor.workload_modules)) def get_logs(self, lines=None): - LOG_DIR = '/var/log/supervisor/storperf-webapp.log' + LOG_DIR = './storperf.log' if isinstance(lines, int): logs = [] diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index b2d5914..dc178d8 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -15,14 +15,16 @@ from os import listdir import os from os.path import isfile, join import sched +from threading import Thread +from time import sleep +import time + from storperf.carbon.converter import Converter from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from storperf.utilities.data_handler import DataHandler from storperf.utilities.thread_gate import ThreadGate -from threading import Thread -import time class UnknownWorkload(Exception): @@ -98,7 +100,14 @@ class TestExecutor(object): metric, callback_id) - self.metrics_emitter.transmit_metrics(carbon_metrics) + self.metrics_emitter.transmit_metrics(carbon_metrics, callback_id) + + commit_count = 10 + while (commit_count > 0 and + not self.metrics_emitter.confirm_commit(callback_id)): + self.logger.info("Waiting 1 more second for commit") + sleep(1) + commit_count -= 1 if self._thread_gate.report(callback_id): self.broadcast_event() @@ -244,11 +253,11 @@ class TestExecutor(object): if self._terminated: return self.current_workload = ( - "%s.%s.queue-depth.%s.block-size.%s" % - (self.job_db.job_id, - workload_name, - iodepth, - blocksize)) + "%s.%s.queue-depth.%s.block-size.%s" + % (self.job_db.job_id, + workload_name, + iodepth, + blocksize)) self.logger.info("Starting run %s" % self.current_workload) self.workload_status[self.current_workload] = "Running" @@ -287,11 +296,11 @@ class TestExecutor(object): if not scheduler.empty(): try: scheduler.cancel(event) - except: + except ValueError: pass - self.logger.info("Completed run %s" % - self.current_workload) + self.logger.info("Completed run %s" + % self.current_workload) self.workload_status[self.current_workload] = "Completed" self._workload_executors = [] self.current_workload = None diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py index 9d20383..44b1f6b 100644 --- a/docker/storperf-master/storperf/utilities/data_handler.py +++ b/docker/storperf-master/storperf/utilities/data_handler.py @@ -9,6 +9,8 @@ import logging import os +import time + from storperf.db import test_results_db from storperf.db.graphite_db import GraphiteDB from storperf.db.job_db import JobDB @@ -16,8 +18,6 @@ from storperf.utilities import data_treatment as DataTreatment from storperf.utilities import dictionary from storperf.utilities import math as math from storperf.utilities import steady_state as SteadyState -from time import sleep -import time class DataHandler(object): @@ -93,31 +93,12 @@ class DataHandler(object): # A bit of a hack here as Carbon might not be finished storing the # data we just sent to it now = int(time.time()) - backtime = 60 * (executor.steady_state_samples + 2) + backtime = 60 * (executor.steady_state_samples + 1) data_series = graphite_db.fetch_series(workload, metric, io_type, now, backtime) - most_recent_time = now - if len(data_series) > 0: - most_recent_time = data_series[-1][0] - - delta = now - most_recent_time - self.logger.debug("Last update to graphite was %s ago" % delta) - - while (delta < 5 or (delta > 60 and delta < 120)): - sleep(5) - data_series = graphite_db.fetch_series(workload, - metric, - io_type, - now, - backtime) - if len(data_series) > 0: - most_recent_time = data_series[-1][0] - delta = time.time() - most_recent_time - self.logger.debug("Last update to graphite was %s ago" % delta) - return data_series def _convert_timestamps_to_samples(self, executor, series): @@ -201,5 +182,6 @@ class DataHandler(object): build_tag, payload) executor.result_url = response['href'] - except: - self.logger.exception("Error pushing results into Database") + except Exception as e: + self.logger.exception("Error pushing results into Database", + e) diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py index 936c839..c045278 100644 --- a/docker/storperf-master/storperf/workloads/_base_workload.py +++ b/docker/storperf-master/storperf/workloads/_base_workload.py @@ -74,9 +74,9 @@ class _base_workload(object): @property def fullname(self): - return ("%s.%s.queue-depth.%s.block-size.%s.%s" % - (str(self.id), - self.__class__.__name__, - str(self.options['iodepth']), - str(self.options['bs']), - str(self.remote_host).replace(".", "-"))) + return ("%s.%s.queue-depth.%s.block-size.%s.%s" + % (str(self.id), + self.__class__.__name__, + str(self.options['iodepth']), + str(self.options['bs']), + str(self.remote_host).replace(".", "-"))) -- cgit 1.2.3-korg