diff options
Diffstat (limited to 'tools/collectors/collectd/collectd.py')
-rw-r--r-- | tools/collectors/collectd/collectd.py | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/tools/collectors/collectd/collectd.py b/tools/collectors/collectd/collectd.py new file mode 100644 index 00000000..90df6b04 --- /dev/null +++ b/tools/collectors/collectd/collectd.py @@ -0,0 +1,265 @@ +# Copyright 2017-2018 Spirent Communications. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Collects samples from collectd through collectd_bucky. +Depending on the policy - decides to keep the sample or discard. +Plot the values of the stored samples once the test is completed +""" + +import copy +import csv +import logging +import multiprocessing +import os +from collections import OrderedDict +import queue + +import matplotlib.pyplot as plt +import numpy as np +import tools.collectors.collectd.collectd_bucky as cb +from tools.collectors.collector import collector +from conf import settings + +# The y-lables. Keys in this dictionary are used as y-labels. +YLABELS = {'No/Of Packets': ['dropped', 'packets', 'if_octets', 'errors', + 'if_rx_octets', 'if_tx_octets'], + 'Jiffies': ['cputime'], + 'Bandwidth b/s': ['memory_bandwidth'], + 'Bytes': ['bytes.llc']} + + +def get_label(sample): + """ + Returns the y-label for the plot. + """ + for label in YLABELS: + if any(r in sample for r in YLABELS[label]): + return label + + +def plot_graphs(dict_of_arrays): + """ + Plot the values + Store the data used for plotting. + """ + i = 1 + results_dir = settings.getValue('RESULTS_PATH') + for key in dict_of_arrays: + tup_list = dict_of_arrays[key] + two_lists = list(map(list, zip(*tup_list))) + y_axis_list = two_lists[0] + x_axis_list = two_lists[1] + if np.count_nonzero(y_axis_list) > 0: + with open(os.path.join(results_dir, + str(key) + '.data'), "w") as pfile: + writer = csv.writer(pfile, delimiter='\t') + writer.writerows(zip(x_axis_list, y_axis_list)) + plt.figure(i) + plt.plot(x_axis_list, y_axis_list) + plt.xlabel("Time (Ticks)") + plt.ylabel(get_label(key)) + plt.savefig(os.path.join(results_dir, str(key) + '.png')) + plt.cla() + plt.clf() + plt.close() + i = i + 1 + + +def get_results_to_print(dict_of_arrays): + """ + Return a results dictionary for report tool to + print the process-statistics. + """ + presults = OrderedDict() + results = OrderedDict() + for key in dict_of_arrays: + if ('processes' in key and + any(proc in key for proc in ['ovs', 'vpp', 'qemu'])): + reskey = '.'.join(key.split('.')[2:]) + preskey = key.split('.')[1] + '_collectd' + tup_list = dict_of_arrays[key] + two_lists = list(map(list, zip(*tup_list))) + y_axis_list = two_lists[0] + mean = 0.0 + if np.count_nonzero(y_axis_list) > 0: + mean = np.mean(y_axis_list) + results[reskey] = mean + presults[preskey] = results + return presults + + +class Receiver(multiprocessing.Process): + """ + Wrapper Receiver (of samples) class + """ + def __init__(self, pd_dict, control): + """ + Initialize. + A queue will be shared with collectd_bucky + """ + super(Receiver, self).__init__() + self.daemon = False + self.q_of_samples = multiprocessing.Queue() + self.server = cb.get_collectd_server(self.q_of_samples) + self.control = control + self.pd_dict = pd_dict + self.collectd_cpu_keys = settings.getValue('COLLECTD_CPU_KEYS') + self.collectd_processes_keys = settings.getValue( + 'COLLECTD_PROCESSES_KEYS') + self.collectd_iface_keys = settings.getValue( + 'COLLECTD_INTERFACE_KEYS') + self.collectd_iface_xkeys = settings.getValue( + 'COLLECTD_INTERFACE_XKEYS') + self.collectd_intelrdt_keys = settings.getValue( + 'COLLECTD_INTELRDT_KEYS') + self.collectd_ovsstats_keys = settings.getValue( + 'COLLECTD_OVSSTAT_KEYS') + self.collectd_dpdkstats_keys = settings.getValue( + 'COLLECTD_DPDKSTAT_KEYS') + self.collectd_intelrdt_xkeys = settings.getValue( + 'COLLECTD_INTELRDT_XKEYS') + self.exclude_coreids = [] + # Expand the ranges in the intelrdt-xkeys + for xkey in self.collectd_intelrdt_xkeys: + if '-' not in xkey: + self.exclude_coreids.append(int(xkey)) + else: + left, right = map(int, xkey.split('-')) + self.exclude_coreids += range(left, right + 1) + + def run(self): + """ + Start receiving the samples. + """ + while not self.control.value: + try: + sample = self.q_of_samples.get(True, 1) + if not sample: + break + self.handle(sample) + except queue.Empty: + pass + except IOError: + continue + except (ValueError, IndexError, KeyError, MemoryError): + self.stop() + break + + # pylint: disable=too-many-boolean-expressions + def handle(self, sample): + ''' Store values and names if names matches following: + 1. cpu + keys + 2. processes + keys + 3. interface + keys + !xkeys + 4. ovs_stats + keys + 5. dpdkstat + keys + 6. intel_rdt + keys + !xkeys + sample[1] is the name of the sample, which is . separated strings. + The first field in sample[1] is the type - cpu, proceesses, etc. + For intel_rdt, the second field contains the core-id, which is + used to make the decision on 'exclusions' + sample[0]: Contains the host information - which is not considered. + sample[2]: Contains the Value. + sample[3]: Contains the Time (in ticks) + ''' + if (('cpu' in sample[1] and + any(c in sample[1] for c in self.collectd_cpu_keys)) or + ('processes' in sample[1] and + any(p in sample[1] for p in self.collectd_processes_keys)) or + ('interface' in sample[1] and + (any(i in sample[1] for i in self.collectd_iface_keys) and + any(x not in sample[1] + for x in self.collectd_iface_xkeys))) or + ('ovs_stats' in sample[1] and + any(o in sample[1] for o in self.collectd_ovsstats_keys)) or + ('dpdkstat' in sample[1] and + any(d in sample[1] for d in self.collectd_dpdkstats_keys)) or + ('intel_rdt' in sample[1] and + any(r in sample[1] for r in self.collectd_intelrdt_keys) and + (int(sample[1].split('.')[1]) not in self.exclude_coreids))): + if sample[1] not in self.pd_dict: + self.pd_dict[sample[1]] = list() + val = self.pd_dict[sample[1]] + val.append((sample[2], sample[3])) + self.pd_dict[sample[1]] = val + + def stop(self): + """ + Stop receiving the samples. + """ + self.server.close() + self.q_of_samples.put(None) + self.control.value = True + + +# inherit from collector.Icollector. +class Collectd(collector.ICollector): + """A collector of system statistics based on collectd + + It starts a UDP server, receives metrics from collectd + and plot the results. + """ + + def __init__(self, results_dir, test_name): + """ + Initialize collection of statistics + """ + self._log = os.path.join(results_dir, + settings.getValue('LOG_FILE_COLLECTD') + + '_' + test_name + '.log') + self.results = {} + self.sample_dict = multiprocessing.Manager().dict() + self.control = multiprocessing.Value('b', False) + self.receiver = Receiver(self.sample_dict, self.control) + + def start(self): + """ + Start receiving samples + """ + self.receiver.server.start() + self.receiver.start() + + def stop(self): + """ + Stop receiving samples + """ + self.control.value = True + self.receiver.stop() + self.receiver.server.join(5) + self.receiver.join(5) + if self.receiver.server.is_alive(): + self.receiver.server.terminate() + if self.receiver.is_alive(): + self.receiver.terminate() + self.results = copy.deepcopy(self.sample_dict) + + def get_results(self): + """ + Return the results. + """ + return get_results_to_print(self.results) + + def print_results(self): + """ + Print - Plot and save raw-data. + log the collected statistics + """ + plot_graphs(self.results) + proc_stats = get_results_to_print(self.results) + for process in proc_stats: + logging.info("Process: " + '_'.join(process.split('_')[:-1])) + for(key, value) in proc_stats[process].items(): + logging.info(" Statistic: " + str(key) + + ", Value: " + str(value)) |