diff options
Diffstat (limited to 'tools/collectors')
-rwxr-xr-x | tools/collectors/cadvisor/__init__.py | 17 | ||||
-rw-r--r-- | tools/collectors/cadvisor/cadvisor.py | 218 | ||||
-rw-r--r-- | tools/collectors/collectd/collectd.py | 37 | ||||
-rw-r--r-- | tools/collectors/collectd/collectd_bucky.py | 25 | ||||
-rwxr-xr-x | tools/collectors/multicmd/__init__.py | 17 | ||||
-rw-r--r-- | tools/collectors/multicmd/multicmd.py | 138 | ||||
-rw-r--r-- | tools/collectors/sysmetrics/pidstat.py | 52 |
7 files changed, 478 insertions, 26 deletions
diff --git a/tools/collectors/cadvisor/__init__.py b/tools/collectors/cadvisor/__init__.py new file mode 100755 index 00000000..235ab875 --- /dev/null +++ b/tools/collectors/cadvisor/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2020 University Of Delhi. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Wrapper for cAdvisor as a collector +""" diff --git a/tools/collectors/cadvisor/cadvisor.py b/tools/collectors/cadvisor/cadvisor.py new file mode 100644 index 00000000..de48cecd --- /dev/null +++ b/tools/collectors/cadvisor/cadvisor.py @@ -0,0 +1,218 @@ +# Copyright 2020 University Of Delhi. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Collects container metrics from cAdvisor. +Sends metrics to influxDB and also stores results locally. +""" + +import subprocess +import logging +import os +from collections import OrderedDict + +from tools.collectors.collector import collector +from tools import tasks +from conf import settings + + + +# inherit from collector.Icollector. +class Cadvisor(collector.ICollector): + """A collector of container metrics based on cAdvisor + + It starts cadvisor and collects metrics. + """ + + def __init__(self, results_dir, test_name): + """ + Initialize collection of statistics + """ + self._logger = logging.getLogger(__name__) + self.resultsdir = results_dir + self.testname = test_name + self._pid = 0 + self._results = OrderedDict() + self._log = os.path.join(results_dir, + settings.getValue('LOG_FILE_CADVISOR') + + '_' + test_name + '.log') + self._logfile = 0 + + + def start(self): + """ + Starts collection of statistics by cAdvisor and stores them + into- + 1. The file in directory with test results + 2. InfluxDB result container + """ + + # CMD options for cAdvisor + cmd = ['sudo', '/opt/cadvisor/cadvisor', + '-storage_driver='+settings.getValue('CADVISOR_STORAGE_DRIVER'), + '-storage_driver_host='+settings.getValue('CADVISOR_STORAGE_HOST'), + '-storage_driver_db='+settings.getValue('CADVISOR_DRIVER_DB'), + '-housekeeping_interval=0.5s', + '-storage_driver_buffer_duration=1s' + ] + + self._logfile = open(self._log, 'a') + + self._pid = subprocess.Popen(map(os.path.expanduser, cmd), stdout=self._logfile, bufsize=0) + self._logger.info('Starting cAdvisor') + + + + def stop(self): + """ + Stops collection of metrics by cAdvisor and stores statistic + summary for each monitored container into self._results dictionary + """ + try: + subprocess.check_output(["pidof", "cadvisor"]) + tasks.run_task(['sudo', 'pkill', '--signal', '2', 'cadvisor'], + self._logger, 'Stopping cAdvisor', True) + except subprocess.CalledProcessError: + self._logger.error('Failed to stop cAdvisor, maybe process does not exist') + + + self._logfile.close() + self._logger.info('cAdvisor log available at %s', self._log) + + containers = settings.getValue('CADVISOR_CONTAINERS') + self._results = cadvisor_log_result(self._log, containers) + + + def get_results(self): + """Returns collected statistics. + """ + return self._results + + def print_results(self): + """Logs collected statistics. + """ + for cnt in self._results: + logging.info("Container: %s", cnt) + for (key, value) in self._results[cnt].items(): + + postfix = '' + + if key == 'cpu_cumulative_usage': + key = 'CPU_usage' + value = round(float(value) / 1000000000, 4) + postfix = '%' + + if key in ['memory_usage', 'memory_working_set']: + value = round(float(value) / 1024 / 1024, 4) + postfix = 'MB' + + if key in ['rx_bytes', 'tx_bytes']: + value = round(float(value) / 1024 / 1024, 4) + postfix = 'mBps' + + logging.info(" Statistic: %s Value: %s %s", + str(key), str(value), postfix) + + +def cadvisor_log_result(filename, containers): + """ + Processes cAdvisor logfile and returns average results + + :param filename: Name of cadvisor logfile + :param containers: List of container names + + :returns: Result as average stats of Containers + """ + result = OrderedDict() + previous = OrderedDict() + logfile = open(filename, 'r') + with logfile: + # for every line + for _, line in enumerate(logfile): + # skip lines having root '/' metrics + if line[0:7] == 'cName=/': + continue + + # parse line into OrderedDict + tmp_res = parse_line(line) + + cnt = tmp_res['cName'] + + # skip if cnt is not in container list + if cnt not in containers: + continue + + # add metrics to result + if cnt not in result: + result[cnt] = tmp_res + previous[cnt] = tmp_res + result[cnt]['count'] = 1 + else: + for field in tmp_res: + + if field in ['rx_errors', 'tx_errors', 'memory_usage', 'memory_working_set']: + val = float(tmp_res[field]) + elif field in ['cpu_cumulative_usage', 'rx_bytes', 'tx_bytes']: + val = float(tmp_res[field]) - float(previous[cnt][field]) + else: + # discard remaining fields + try: + result[cnt].pop(field) + except KeyError: + continue + continue + + result[cnt][field] = float(result[cnt][field]) + val + + result[cnt]['count'] += 1 + previous[cnt] = tmp_res + + # calculate average results for containers + result = calculate_average(result) + return result + + +def calculate_average(results): + """ + Calculates average for container stats + """ + for cnt in results: + for field in results[cnt]: + if field != 'count': + val = float(results[cnt][field])/results[cnt]['count'] + results[cnt][field] = '{0:.2f}'.format(val) + + results[cnt].pop('count') + #sort results + results[cnt] = OrderedDict(sorted(results[cnt].items())) + + return results + + +def parse_line(line): + """ + Reads single line from cAdvisor logfile + + :param line: single line as str + + :returns: OrderedDict of line read + """ + tmp_res = OrderedDict() + # split line into array of "key=value" metrics + metrics = line.split() + for metric in metrics: + key, value = metric.split('=') + tmp_res[key] = value + + return tmp_res diff --git a/tools/collectors/collectd/collectd.py b/tools/collectors/collectd/collectd.py index 90df6b04..5e996d3a 100644 --- a/tools/collectors/collectd/collectd.py +++ b/tools/collectors/collectd/collectd.py @@ -20,6 +20,7 @@ Plot the values of the stored samples once the test is completed import copy import csv +import glob import logging import multiprocessing import os @@ -30,6 +31,7 @@ import matplotlib.pyplot as plt import numpy as np import tools.collectors.collectd.collectd_bucky as cb from tools.collectors.collector import collector +from tools import tasks from conf import settings # The y-lables. Keys in this dictionary are used as y-labels. @@ -47,6 +49,7 @@ def get_label(sample): for label in YLABELS: if any(r in sample for r in YLABELS[label]): return label + return None def plot_graphs(dict_of_arrays): @@ -194,6 +197,7 @@ class Receiver(multiprocessing.Process): val = self.pd_dict[sample[1]] val.append((sample[2], sample[3])) self.pd_dict[sample[1]] = val + logging.debug("COLLECTD %s", ' '.join(str(p) for p in sample)) def stop(self): """ @@ -216,13 +220,27 @@ class Collectd(collector.ICollector): """ Initialize collection of statistics """ - self._log = os.path.join(results_dir, - settings.getValue('LOG_FILE_COLLECTD') + - '_' + test_name + '.log') + self.logger = logging.getLogger(__name__) + self.resultsdir = results_dir + self.testname = test_name self.results = {} self.sample_dict = multiprocessing.Manager().dict() self.control = multiprocessing.Value('b', False) self.receiver = Receiver(self.sample_dict, self.control) + self.cleanup_metrics() + # Assumption: collected is installed at /opt/collectd + # And collected is configured to write to csv at /tmp/csv + self.pid = tasks.run_background_task( + ['sudo', '/opt/collectd/sbin/collectd'], + self.logger, 'Staring Collectd') + + def cleanup_metrics(self): + """ + Cleaup the old or archived metrics + """ + for name in glob.glob(os.path.join('/tmp/csv/', '*')): + tasks.run_task(['sudo', 'rm', '-rf', name], self.logger, + 'Cleaning up Metrics', True) def start(self): """ @@ -235,6 +253,11 @@ class Collectd(collector.ICollector): """ Stop receiving samples """ + tasks.terminate_task_subtree(self.pid, logger=self.logger) + # At times collectd fails to fully terminate. + # Killing process by name too helps. + tasks.run_task(['sudo', 'pkill', '--signal', '2', 'collectd'], + self.logger, 'Stopping Collectd', True) self.control.value = True self.receiver.stop() self.receiver.server.join(5) @@ -244,6 +267,12 @@ class Collectd(collector.ICollector): if self.receiver.is_alive(): self.receiver.terminate() self.results = copy.deepcopy(self.sample_dict) + # Backup the collectd-metrics for this test into a zipfile + filename = ('/tmp/collectd-' + settings.getValue('LOG_TIMESTAMP') + + '.tar.gz') + tasks.run_task(['sudo', 'tar', '-czvf', filename, '/tmp/csv/'], + self.logger, 'Zipping File', True) + self.cleanup_metrics() def get_results(self): """ @@ -259,7 +288,7 @@ class Collectd(collector.ICollector): plot_graphs(self.results) proc_stats = get_results_to_print(self.results) for process in proc_stats: - logging.info("Process: " + '_'.join(process.split('_')[:-1])) + logging.info("Process: %s", '_'.join(process.split('_')[:-1])) for(key, value) in proc_stats[process].items(): logging.info(" Statistic: " + str(key) + ", Value: " + str(value)) diff --git a/tools/collectors/collectd/collectd_bucky.py b/tools/collectors/collectd/collectd_bucky.py index bac24ed7..f6061c55 100644 --- a/tools/collectors/collectd/collectd_bucky.py +++ b/tools/collectors/collectd/collectd_bucky.py @@ -498,6 +498,7 @@ class CollectDCrypto(object): return self.parse_signed(part_len, data) if sec_level == 2: return self.parse_encrypted(part_len, data) + return None def parse_signed(self, part_len, data): """ @@ -574,12 +575,12 @@ class CollectDConverter(object): try: name_parts = handler(sample) if name_parts is None: - return # treat None as "ignore sample" + return None # treat None as "ignore sample" name = '.'.join(name_parts) except (AttributeError, IndexError, MemoryError, RuntimeError): LOG.exception("Exception in sample handler %s (%s):", sample["plugin"], handler) - return + return None host = sample.get("host", "") return ( host, @@ -655,7 +656,7 @@ class CollectDHandler(object): Check the value range """ if val is None: - return + return None try: vmin, vmax = self.parser.types.type_ranges[stype][vname] except KeyError: @@ -664,11 +665,11 @@ class CollectDHandler(object): if vmin is not None and val < vmin: LOG.debug("Invalid value %s (<%s) for %s", val, vmin, vname) LOG.debug("Last sample: %s", self.last_sample) - return + return None if vmax is not None and val > vmax: LOG.debug("Invalid value %s (>%s) for %s", val, vmax, vname) LOG.debug("Last sample: %s", self.last_sample) - return + return None return val def calculate(self, host, name, vtype, val, time): @@ -684,7 +685,7 @@ class CollectDHandler(object): if vtype not in handlers: LOG.error("Invalid value type %s for %s", vtype, name) LOG.info("Last sample: %s", self.last_sample) - return + return None return handlers[vtype](host, name, val, time) def _calc_counter(self, host, name, val, time): @@ -694,13 +695,13 @@ class CollectDHandler(object): key = (host, name) if key not in self.prev_samples: self.prev_samples[key] = (val, time) - return + return None pval, ptime = self.prev_samples[key] self.prev_samples[key] = (val, time) if time <= ptime: LOG.error("Invalid COUNTER update for: %s:%s", key[0], key[1]) LOG.info("Last sample: %s", self.last_sample) - return + return None if val < pval: # this is supposed to handle counter wrap around # see https://collectd.org/wiki/index.php/Data_source @@ -719,13 +720,13 @@ class CollectDHandler(object): key = (host, name) if key not in self.prev_samples: self.prev_samples[key] = (val, time) - return + return None pval, ptime = self.prev_samples[key] self.prev_samples[key] = (val, time) if time <= ptime: LOG.debug("Invalid DERIVE update for: %s:%s", key[0], key[1]) LOG.debug("Last sample: %s", self.last_sample) - return + return None return float(abs(val - pval)) / (time - ptime) def _calc_absolute(self, host, name, val, time): @@ -735,13 +736,13 @@ class CollectDHandler(object): key = (host, name) if key not in self.prev_samples: self.prev_samples[key] = (val, time) - return + return None _, ptime = self.prev_samples[key] self.prev_samples[key] = (val, time) if time <= ptime: LOG.error("Invalid ABSOLUTE update for: %s:%s", key[0], key[1]) LOG.info("Last sample: %s", self.last_sample) - return + return None return float(val) / (time - ptime) diff --git a/tools/collectors/multicmd/__init__.py b/tools/collectors/multicmd/__init__.py new file mode 100755 index 00000000..2ae2340f --- /dev/null +++ b/tools/collectors/multicmd/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2019 Spirent Communications. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Wrapper for multi-commands as a collector +""" diff --git a/tools/collectors/multicmd/multicmd.py b/tools/collectors/multicmd/multicmd.py new file mode 100644 index 00000000..275a0693 --- /dev/null +++ b/tools/collectors/multicmd/multicmd.py @@ -0,0 +1,138 @@ +# Copyright 2019 Spirent Communications. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Collects information using various command line tools. +""" + +#from tools.collectors.collector import collector +import glob +import logging +import os +from collections import OrderedDict +from tools import tasks +from tools.collectors.collector import collector +from conf import settings + +class MultiCmd(collector.ICollector): + """ Multiple command-line controllers + collectd, prox, crond, filebeat + """ + def __init__(self, results_dir, test_name): + """ + initialize collectrs + """ + self.prox_home = settings.getValue('MC_PROX_HOME') + self.collectd_cmd = settings.getValue('MC_COLLECTD_CMD') + self.collectd_csv = settings.getValue('MC_COLLECTD_CSV') + self.prox_out = settings.getValue('MC_PROX_OUT') + self.prox_cmd = settings.getValue('MC_PROX_CMD') + self.cron_out = settings.getValue('MC_CRON_OUT') + self.logger = logging.getLogger(__name__) + self.results_dir = results_dir + self.collectd_pid = 0 + self.prox_pid = 0 + self.cleanup_collectd_metrics() + self.logger.debug('%s', 'Multicmd data for '+ str(test_name)) + # There should not be a file by name stop in prox_home folder + # Else Prox will start and stop immediately. This is a Hack to + # control prox-runrapid, which by default runs for specified duration. + filename = os.path.join(self.prox_home, 'stop') + if os.path.exists(filename): + tasks.run_task(['sudo', 'rm', filename], + self.logger, 'deleting stop') + self.results = OrderedDict() + + def cleanup_collectd_metrics(self): + """ + Cleaup the old or archived metrics + """ + for name in glob.glob(os.path.join(self.collectd_csv, '*')): + tasks.run_task(['sudo', 'rm', '-rf', name], self.logger, + 'Cleaning up Metrics', True) + + def start(self): + # Command-1: Start Collectd + self.collectd_pid = tasks.run_background_task( + ['sudo', self.collectd_cmd], + self.logger, 'Staring Collectd') + + # Command-2: Start PROX + working_dir = os.getcwd() + if os.path.exists(self.prox_home): + os.chdir(self.prox_home) + self.prox_pid = tasks.run_background_task(['sudo', self.prox_cmd, + '--test', 'irq', + '--env', 'irq'], + self.logger, + 'Start PROX') + os.chdir(working_dir) + # Command-3: Start CROND + tasks.run_task(['sudo', 'systemctl', 'start', 'crond'], + self.logger, 'Staring CROND', True) + + # command-4: BEATS + tasks.run_task(['sudo', 'systemctl', 'start', 'filebeat'], + self.logger, 'Starting BEATS', True) + + def stop(self): + """ + Stop All commands + """ + # Command-1: COLLECTD + tasks.terminate_task_subtree(self.collectd_pid, logger=self.logger) + tasks.run_task(['sudo', 'pkill', '--signal', '2', 'collectd'], + self.logger, 'Stopping Collectd', True) + + # Backup the collectd-metrics for this test into a results folder + # results_dir = os.path.join(settings.getValue('RESULTS_PATH'), '/') + tasks.run_task(['sudo', 'cp', '-r', self.collectd_csv, + self.results_dir], self.logger, + 'Copying Collectd Results File', True) + self.cleanup_collectd_metrics() + + # Command-2: PROX + filename = os.path.join(self.prox_home, 'stop') + if os.path.exists(self.prox_home): + tasks.run_task(['sudo', 'touch', filename], + self.logger, 'Stopping PROX', True) + + outfile = os.path.join(self.prox_home, self.prox_out) + if os.path.exists(outfile): + tasks.run_task(['sudo', 'mv', outfile, self.results_dir], + self.logger, 'Moving PROX-OUT file', True) + + # Command-3: CROND + tasks.run_task(['sudo', 'systemctl', 'stop', 'crond'], + self.logger, 'Stopping CROND', True) + if os.path.exists(self.cron_out): + tasks.run_task(['sudo', 'mv', self.cron_out, self.results_dir], + self.logger, 'Move Cron Logs', True) + + # Command-4: BEATS + tasks.run_task(['sudo', 'systemctl', 'stop', 'filebeat'], + self.logger, 'Stopping BEATS', True) + + def get_results(self): + """ + Return results + """ + return self.results + + def print_results(self): + """ + Print results + """ + logging.info("Multicmd Output is not collected by VSPERF") + logging.info("Please refer to corresponding command's output") diff --git a/tools/collectors/sysmetrics/pidstat.py b/tools/collectors/sysmetrics/pidstat.py index 99341ccf..277fdb11 100644 --- a/tools/collectors/sysmetrics/pidstat.py +++ b/tools/collectors/sysmetrics/pidstat.py @@ -70,13 +70,13 @@ class Pidstat(collector.ICollector): into the file in directory with test results """ monitor = settings.getValue('PIDSTAT_MONITOR') - self._logger.info('Statistics are requested for: ' + ', '.join(monitor)) + self._logger.info('Statistics are requested for: %s', ', '.join(monitor)) pids = systeminfo.get_pids(monitor) if pids: with open(self._log, 'w') as logfile: cmd = ['sudo', 'LC_ALL=' + settings.getValue('DEFAULT_CMD_LOCALE'), 'pidstat', settings.getValue('PIDSTAT_OPTIONS'), - '-p', ','.join(pids), + '-t', '-p', ','.join(pids), str(settings.getValue('PIDSTAT_SAMPLE_INTERVAL'))] self._logger.debug('%s', ' '.join(cmd)) self._pid = subprocess.Popen(cmd, stdout=logfile, bufsize=0).pid @@ -116,16 +116,48 @@ class Pidstat(collector.ICollector): # combine stored header fields with actual values tmp_res = OrderedDict(zip(tmp_header, line[8:].split())) - # use process's name and its pid as unique key - key = tmp_res.pop('Command') + '_' + tmp_res['PID'] - # store values for given command into results dict - if key in self._results: - self._results[key].update(tmp_res) - else: - self._results[key] = tmp_res + cmd = tmp_res.pop('Command') + # remove unused fields (given by option '-t') + tmp_res.pop('UID') + tmp_res.pop('TID') + if '|_' not in cmd: # main process + # use process's name and its pid as unique key + tmp_pid = tmp_res.pop('TGID') + tmp_key = "%s_%s" % (cmd, tmp_pid) + # do not trust cpu usage of pid + # see VSPERF-569 for more details + if 'CPU' not in tmp_header: + self.update_results(tmp_key, tmp_res, False) + else: # thread + # accumulate cpu usage of all threads + if 'CPU' in tmp_header: + tmp_res.pop('TGID') + self.update_results(tmp_key, tmp_res, True) line = logfile.readline() + def update_results(self, key, result, accumulate=False): + """ + Update final results dictionary. If ``accumulate`` param is set to + ``True``, try to accumulate existing values. + """ + # store values for given command into results dict + if key not in self._results: + self._results[key] = result + elif accumulate: + for field in result: + if field not in self._results[key]: + self._results[key][field] = result[field] + else: + try: + val = float(self._results[key][field]) + float(result[field]) + self._results[key][field] = '{0:.2f}'.format(val) + except ValueError: + # cannot cast to float, let's update with the previous value + self._results[key][field] = result[field] + else: + self._results[key].update(result) + def get_results(self): """Returns collected statistics. """ @@ -135,7 +167,7 @@ class Pidstat(collector.ICollector): """Logs collected statistics. """ for process in self._results: - logging.info("Process: " + '_'.join(process.split('_')[:-1])) + logging.info("Process: %s", '_'.join(process.split('_')[:-1])) for(key, value) in self._results[process].items(): logging.info(" Statistic: " + str(key) + ", Value: " + str(value)) |