aboutsummaryrefslogtreecommitdiffstats
path: root/tools/collectors/collectd
diff options
context:
space:
mode:
Diffstat (limited to 'tools/collectors/collectd')
-rw-r--r--tools/collectors/collectd/collectd.py37
-rw-r--r--tools/collectors/collectd/collectd_bucky.py25
2 files changed, 46 insertions, 16 deletions
diff --git a/tools/collectors/collectd/collectd.py b/tools/collectors/collectd/collectd.py
index 90df6b04..5e996d3a 100644
--- a/tools/collectors/collectd/collectd.py
+++ b/tools/collectors/collectd/collectd.py
@@ -20,6 +20,7 @@ Plot the values of the stored samples once the test is completed
import copy
import csv
+import glob
import logging
import multiprocessing
import os
@@ -30,6 +31,7 @@ import matplotlib.pyplot as plt
import numpy as np
import tools.collectors.collectd.collectd_bucky as cb
from tools.collectors.collector import collector
+from tools import tasks
from conf import settings
# The y-lables. Keys in this dictionary are used as y-labels.
@@ -47,6 +49,7 @@ def get_label(sample):
for label in YLABELS:
if any(r in sample for r in YLABELS[label]):
return label
+ return None
def plot_graphs(dict_of_arrays):
@@ -194,6 +197,7 @@ class Receiver(multiprocessing.Process):
val = self.pd_dict[sample[1]]
val.append((sample[2], sample[3]))
self.pd_dict[sample[1]] = val
+ logging.debug("COLLECTD %s", ' '.join(str(p) for p in sample))
def stop(self):
"""
@@ -216,13 +220,27 @@ class Collectd(collector.ICollector):
"""
Initialize collection of statistics
"""
- self._log = os.path.join(results_dir,
- settings.getValue('LOG_FILE_COLLECTD') +
- '_' + test_name + '.log')
+ self.logger = logging.getLogger(__name__)
+ self.resultsdir = results_dir
+ self.testname = test_name
self.results = {}
self.sample_dict = multiprocessing.Manager().dict()
self.control = multiprocessing.Value('b', False)
self.receiver = Receiver(self.sample_dict, self.control)
+ self.cleanup_metrics()
+ # Assumption: collected is installed at /opt/collectd
+ # And collected is configured to write to csv at /tmp/csv
+ self.pid = tasks.run_background_task(
+ ['sudo', '/opt/collectd/sbin/collectd'],
+ self.logger, 'Staring Collectd')
+
+ def cleanup_metrics(self):
+ """
+ Cleaup the old or archived metrics
+ """
+ for name in glob.glob(os.path.join('/tmp/csv/', '*')):
+ tasks.run_task(['sudo', 'rm', '-rf', name], self.logger,
+ 'Cleaning up Metrics', True)
def start(self):
"""
@@ -235,6 +253,11 @@ class Collectd(collector.ICollector):
"""
Stop receiving samples
"""
+ tasks.terminate_task_subtree(self.pid, logger=self.logger)
+ # At times collectd fails to fully terminate.
+ # Killing process by name too helps.
+ tasks.run_task(['sudo', 'pkill', '--signal', '2', 'collectd'],
+ self.logger, 'Stopping Collectd', True)
self.control.value = True
self.receiver.stop()
self.receiver.server.join(5)
@@ -244,6 +267,12 @@ class Collectd(collector.ICollector):
if self.receiver.is_alive():
self.receiver.terminate()
self.results = copy.deepcopy(self.sample_dict)
+ # Backup the collectd-metrics for this test into a zipfile
+ filename = ('/tmp/collectd-' + settings.getValue('LOG_TIMESTAMP') +
+ '.tar.gz')
+ tasks.run_task(['sudo', 'tar', '-czvf', filename, '/tmp/csv/'],
+ self.logger, 'Zipping File', True)
+ self.cleanup_metrics()
def get_results(self):
"""
@@ -259,7 +288,7 @@ class Collectd(collector.ICollector):
plot_graphs(self.results)
proc_stats = get_results_to_print(self.results)
for process in proc_stats:
- logging.info("Process: " + '_'.join(process.split('_')[:-1]))
+ logging.info("Process: %s", '_'.join(process.split('_')[:-1]))
for(key, value) in proc_stats[process].items():
logging.info(" Statistic: " + str(key) +
", Value: " + str(value))
diff --git a/tools/collectors/collectd/collectd_bucky.py b/tools/collectors/collectd/collectd_bucky.py
index bac24ed7..f6061c55 100644
--- a/tools/collectors/collectd/collectd_bucky.py
+++ b/tools/collectors/collectd/collectd_bucky.py
@@ -498,6 +498,7 @@ class CollectDCrypto(object):
return self.parse_signed(part_len, data)
if sec_level == 2:
return self.parse_encrypted(part_len, data)
+ return None
def parse_signed(self, part_len, data):
"""
@@ -574,12 +575,12 @@ class CollectDConverter(object):
try:
name_parts = handler(sample)
if name_parts is None:
- return # treat None as "ignore sample"
+ return None # treat None as "ignore sample"
name = '.'.join(name_parts)
except (AttributeError, IndexError, MemoryError, RuntimeError):
LOG.exception("Exception in sample handler %s (%s):",
sample["plugin"], handler)
- return
+ return None
host = sample.get("host", "")
return (
host,
@@ -655,7 +656,7 @@ class CollectDHandler(object):
Check the value range
"""
if val is None:
- return
+ return None
try:
vmin, vmax = self.parser.types.type_ranges[stype][vname]
except KeyError:
@@ -664,11 +665,11 @@ class CollectDHandler(object):
if vmin is not None and val < vmin:
LOG.debug("Invalid value %s (<%s) for %s", val, vmin, vname)
LOG.debug("Last sample: %s", self.last_sample)
- return
+ return None
if vmax is not None and val > vmax:
LOG.debug("Invalid value %s (>%s) for %s", val, vmax, vname)
LOG.debug("Last sample: %s", self.last_sample)
- return
+ return None
return val
def calculate(self, host, name, vtype, val, time):
@@ -684,7 +685,7 @@ class CollectDHandler(object):
if vtype not in handlers:
LOG.error("Invalid value type %s for %s", vtype, name)
LOG.info("Last sample: %s", self.last_sample)
- return
+ return None
return handlers[vtype](host, name, val, time)
def _calc_counter(self, host, name, val, time):
@@ -694,13 +695,13 @@ class CollectDHandler(object):
key = (host, name)
if key not in self.prev_samples:
self.prev_samples[key] = (val, time)
- return
+ return None
pval, ptime = self.prev_samples[key]
self.prev_samples[key] = (val, time)
if time <= ptime:
LOG.error("Invalid COUNTER update for: %s:%s", key[0], key[1])
LOG.info("Last sample: %s", self.last_sample)
- return
+ return None
if val < pval:
# this is supposed to handle counter wrap around
# see https://collectd.org/wiki/index.php/Data_source
@@ -719,13 +720,13 @@ class CollectDHandler(object):
key = (host, name)
if key not in self.prev_samples:
self.prev_samples[key] = (val, time)
- return
+ return None
pval, ptime = self.prev_samples[key]
self.prev_samples[key] = (val, time)
if time <= ptime:
LOG.debug("Invalid DERIVE update for: %s:%s", key[0], key[1])
LOG.debug("Last sample: %s", self.last_sample)
- return
+ return None
return float(abs(val - pval)) / (time - ptime)
def _calc_absolute(self, host, name, val, time):
@@ -735,13 +736,13 @@ class CollectDHandler(object):
key = (host, name)
if key not in self.prev_samples:
self.prev_samples[key] = (val, time)
- return
+ return None
_, ptime = self.prev_samples[key]
self.prev_samples[key] = (val, time)
if time <= ptime:
LOG.error("Invalid ABSOLUTE update for: %s:%s", key[0], key[1])
LOG.info("Last sample: %s", self.last_sample)
- return
+ return None
return float(val) / (time - ptime)