summaryrefslogtreecommitdiffstats
path: root/storperf/utilities/data_handler.py
blob: ebf715ab317c32eff135542b45a7ca6584f837cd (plain)
1
2
3
4
5
6
7
8
##############################################################################
# Copyright (c) 2016 Dell EMC and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################

import logging
import os
from time import sleep
import time

from storperf.db import test_results_db
from storperf.db.graphite_db import GraphiteDB
from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import dictionary
from storperf.utilities import math as math
from storperf.utilities import steady_state as SteadyState


class DataHandler(object):

    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.samples = 11

    """
    """

    def data_event(self, executor):
        self.logger.debug("Event received")

        if executor.terminated:
            self._push_to_db(executor)
        else:
            steady_state = True
            metrics = {}
            for metric in ('lat.mean', 'iops', 'bw'):
                metrics[metric] = {}
                for io_type in ('read', 'write'):
                    metrics[metric][io_type] = {}

                    series = self._lookup_prior_data(executor, metric, io_type)
                    steady = self._evaluate_prior_data(series)

                    self.logger.debug("Steady state for %s %s: %s"
                                      % (io_type, metric, steady))

                    metrics[metric][io_type]['series'] = series
                    metrics[metric][io_type]['steady_state'] = steady
                    treated_data = DataTreatment.data_treatment(series)

                    metrics[metric][io_type]['slope'] = \
                        math.slope(treated_data['slope_data'])
                    metrics[metric][io_type]['range'] = \
                        math.range_value(treated_data['range_data'])
                    metrics[metric][io_type]['average'] = \
                        math.average(treated_data['average_data'])

                    if not steady:
                        steady_state = False

            executor.metadata['report_data'] = metrics
            executor.metadata['steady_state'] = steady_state

            if steady_state:
                executor.terminate()

    def _lookup_prior_data(self, executor, metric, io_type):
        workload = executor.current_workload
        graphite_db = GraphiteDB()

        # A bit of a hack here as Carbon might not be finished storing the
        # data we just sent to it
        now = int(time.time())
        backtime = 60 * (self.samples + 2)
        data_series = graphite_db.fetch_series(workload,
                                               metric,
                                               io_type,
                                               now,
                                               backtime)
        most_recent_time = now
        if len(data_series) > 0:
            most_recent_time = data_series[-1][0]

        delta = now - most_recent_time
        self.logger.debug("Last update to graphite was %s ago" % delta)

        while (delta < 5 or (delta > 60 and delta < 120)):
            sleep(5)
            data_series = graphite_db.fetch_series(workload,
                                                   metric,
                                                   io_type,
                                                   now,
                                                   backtime)
            if len(data_series) > 0:
                most_recent_time = data_series[-1][0]
            delta = time.time() - most_recent_time
            self.logger.debug("Last update to graphite was %s ago" % delta)

        return data_series

    def _evaluate_prior_data(self, data_series):
        self.logger.debug("Data series: %s" % data_series)
        if len(data_series) == 0:
            return False
        earliest_timestamp = data_series[0][0]
        latest_timestamp = data_series[-1][0]
        duration = latest_timestamp - earliest_timestamp
        if (duration < 60 * self.samples):
            self.logger.debug("Only %s minutes of samples, ignoring" %
                              (duration / 60,))
            return False

        return SteadyState.steady_state(data_series)

    def _push_to_db(self, executor):
        test_db = os.environ.get('TEST_DB_URL')

        if test_db is not None:
            pod_name = dictionary.get_key_from_dict(executor.metadata,
                                                    'pod_name',
                                                    'Unknown')
            version = dictionary.get_key_from_dict(executor.metadata,
                                                   'version',
                                                   'Unknown')
            scenario = dictionary.get_key_from_dict(executor.metadata,
                                                    'scenario_name',
                                                    'Unknown')
            build_tag = dictionary.get_key_from_dict(executor.metadata,
                                                     'build_tag',
                                                     'Unknown')
            duration = executor.end_time - executor.start_time

            self.logger.info("Pushing results to %s" % (test_db))

            payload = executor.metadata
            payload['timestart'] = executor.start_time
            payload['duration'] = duration
            payload['status'] = 'OK'
            graphite_db = GraphiteDB()
            payload['metrics'] = graphite_db.fetch_averages(
                executor.job_db.job_id)
            criteria = {}
            criteria['block_sizes'] = executor.block_sizes
            criteria['queue_depths'] = executor.queue_depths

            try:
                test_results_db.push_results_to_db(test_db,
                                                   "storperf",
                                                   "Latency Test",
                                                   executor.start_time,
                                                   executor.end_time,
                                                   self.logger,
                                                   pod_name,
                                                   version,
                                                   scenario,
                                                   criteria,
                                                   build_tag,
                                                   payload)
            except:
                self.logger.exception("Error pushing results into Database")