diff options
Diffstat (limited to 'tools/collectors/collectd')
-rw-r--r-- | tools/collectors/collectd/collectd.py | 37 | ||||
-rw-r--r-- | tools/collectors/collectd/collectd_bucky.py | 25 |
2 files changed, 46 insertions, 16 deletions
diff --git a/tools/collectors/collectd/collectd.py b/tools/collectors/collectd/collectd.py index 90df6b04..5e996d3a 100644 --- a/tools/collectors/collectd/collectd.py +++ b/tools/collectors/collectd/collectd.py @@ -20,6 +20,7 @@ Plot the values of the stored samples once the test is completed import copy import csv +import glob import logging import multiprocessing import os @@ -30,6 +31,7 @@ import matplotlib.pyplot as plt import numpy as np import tools.collectors.collectd.collectd_bucky as cb from tools.collectors.collector import collector +from tools import tasks from conf import settings # The y-lables. Keys in this dictionary are used as y-labels. @@ -47,6 +49,7 @@ def get_label(sample): for label in YLABELS: if any(r in sample for r in YLABELS[label]): return label + return None def plot_graphs(dict_of_arrays): @@ -194,6 +197,7 @@ class Receiver(multiprocessing.Process): val = self.pd_dict[sample[1]] val.append((sample[2], sample[3])) self.pd_dict[sample[1]] = val + logging.debug("COLLECTD %s", ' '.join(str(p) for p in sample)) def stop(self): """ @@ -216,13 +220,27 @@ class Collectd(collector.ICollector): """ Initialize collection of statistics """ - self._log = os.path.join(results_dir, - settings.getValue('LOG_FILE_COLLECTD') + - '_' + test_name + '.log') + self.logger = logging.getLogger(__name__) + self.resultsdir = results_dir + self.testname = test_name self.results = {} self.sample_dict = multiprocessing.Manager().dict() self.control = multiprocessing.Value('b', False) self.receiver = Receiver(self.sample_dict, self.control) + self.cleanup_metrics() + # Assumption: collected is installed at /opt/collectd + # And collected is configured to write to csv at /tmp/csv + self.pid = tasks.run_background_task( + ['sudo', '/opt/collectd/sbin/collectd'], + self.logger, 'Staring Collectd') + + def cleanup_metrics(self): + """ + Cleaup the old or archived metrics + """ + for name in glob.glob(os.path.join('/tmp/csv/', '*')): + tasks.run_task(['sudo', 'rm', '-rf', name], self.logger, + 'Cleaning up Metrics', True) def start(self): """ @@ -235,6 +253,11 @@ class Collectd(collector.ICollector): """ Stop receiving samples """ + tasks.terminate_task_subtree(self.pid, logger=self.logger) + # At times collectd fails to fully terminate. + # Killing process by name too helps. + tasks.run_task(['sudo', 'pkill', '--signal', '2', 'collectd'], + self.logger, 'Stopping Collectd', True) self.control.value = True self.receiver.stop() self.receiver.server.join(5) @@ -244,6 +267,12 @@ class Collectd(collector.ICollector): if self.receiver.is_alive(): self.receiver.terminate() self.results = copy.deepcopy(self.sample_dict) + # Backup the collectd-metrics for this test into a zipfile + filename = ('/tmp/collectd-' + settings.getValue('LOG_TIMESTAMP') + + '.tar.gz') + tasks.run_task(['sudo', 'tar', '-czvf', filename, '/tmp/csv/'], + self.logger, 'Zipping File', True) + self.cleanup_metrics() def get_results(self): """ @@ -259,7 +288,7 @@ class Collectd(collector.ICollector): plot_graphs(self.results) proc_stats = get_results_to_print(self.results) for process in proc_stats: - logging.info("Process: " + '_'.join(process.split('_')[:-1])) + logging.info("Process: %s", '_'.join(process.split('_')[:-1])) for(key, value) in proc_stats[process].items(): logging.info(" Statistic: " + str(key) + ", Value: " + str(value)) diff --git a/tools/collectors/collectd/collectd_bucky.py b/tools/collectors/collectd/collectd_bucky.py index bac24ed7..f6061c55 100644 --- a/tools/collectors/collectd/collectd_bucky.py +++ b/tools/collectors/collectd/collectd_bucky.py @@ -498,6 +498,7 @@ class CollectDCrypto(object): return self.parse_signed(part_len, data) if sec_level == 2: return self.parse_encrypted(part_len, data) + return None def parse_signed(self, part_len, data): """ @@ -574,12 +575,12 @@ class CollectDConverter(object): try: name_parts = handler(sample) if name_parts is None: - return # treat None as "ignore sample" + return None # treat None as "ignore sample" name = '.'.join(name_parts) except (AttributeError, IndexError, MemoryError, RuntimeError): LOG.exception("Exception in sample handler %s (%s):", sample["plugin"], handler) - return + return None host = sample.get("host", "") return ( host, @@ -655,7 +656,7 @@ class CollectDHandler(object): Check the value range """ if val is None: - return + return None try: vmin, vmax = self.parser.types.type_ranges[stype][vname] except KeyError: @@ -664,11 +665,11 @@ class CollectDHandler(object): if vmin is not None and val < vmin: LOG.debug("Invalid value %s (<%s) for %s", val, vmin, vname) LOG.debug("Last sample: %s", self.last_sample) - return + return None if vmax is not None and val > vmax: LOG.debug("Invalid value %s (>%s) for %s", val, vmax, vname) LOG.debug("Last sample: %s", self.last_sample) - return + return None return val def calculate(self, host, name, vtype, val, time): @@ -684,7 +685,7 @@ class CollectDHandler(object): if vtype not in handlers: LOG.error("Invalid value type %s for %s", vtype, name) LOG.info("Last sample: %s", self.last_sample) - return + return None return handlers[vtype](host, name, val, time) def _calc_counter(self, host, name, val, time): @@ -694,13 +695,13 @@ class CollectDHandler(object): key = (host, name) if key not in self.prev_samples: self.prev_samples[key] = (val, time) - return + return None pval, ptime = self.prev_samples[key] self.prev_samples[key] = (val, time) if time <= ptime: LOG.error("Invalid COUNTER update for: %s:%s", key[0], key[1]) LOG.info("Last sample: %s", self.last_sample) - return + return None if val < pval: # this is supposed to handle counter wrap around # see https://collectd.org/wiki/index.php/Data_source @@ -719,13 +720,13 @@ class CollectDHandler(object): key = (host, name) if key not in self.prev_samples: self.prev_samples[key] = (val, time) - return + return None pval, ptime = self.prev_samples[key] self.prev_samples[key] = (val, time) if time <= ptime: LOG.debug("Invalid DERIVE update for: %s:%s", key[0], key[1]) LOG.debug("Last sample: %s", self.last_sample) - return + return None return float(abs(val - pval)) / (time - ptime) def _calc_absolute(self, host, name, val, time): @@ -735,13 +736,13 @@ class CollectDHandler(object): key = (host, name) if key not in self.prev_samples: self.prev_samples[key] = (val, time) - return + return None _, ptime = self.prev_samples[key] self.prev_samples[key] = (val, time) if time <= ptime: LOG.error("Invalid ABSOLUTE update for: %s:%s", key[0], key[1]) LOG.info("Last sample: %s", self.last_sample) - return + return None return float(val) / (time - ptime) |