diff options
Diffstat (limited to 'yardstick')
33 files changed, 533 insertions, 176 deletions
diff --git a/yardstick/__init__.py b/yardstick/__init__.py index b673e7c4a..f95b0a906 100644 --- a/yardstick/__init__.py +++ b/yardstick/__init__.py @@ -10,11 +10,22 @@ from __future__ import absolute_import import logging import os +import errno +# this module must only import other modules that do +# not require loggers to be created, so this cannot +# include yardstick.common.utils from yardstick.common import constants -from yardstick.common import utils as yardstick_utils -yardstick_utils.makedirs(constants.LOG_DIR) +try: + # do not use yardstick.common.utils.makedirs + # since yardstick.common.utils creates a logger + # and so it cannot be imported before this code + os.makedirs(constants.LOG_DIR) +except OSError as e: + if e.errno != errno.EEXIST: + raise + LOG_FILE = os.path.join(constants.LOG_DIR, 'yardstick.log') LOG_FORMATTER = '%(asctime)s [%(levelname)s] %(name)s %(filename)s:%(lineno)d %(message)s' @@ -34,6 +45,7 @@ def _init_logging(): _LOG_STREAM_HDLR.setLevel(logging.DEBUG) else: _LOG_STREAM_HDLR.setLevel(logging.INFO) + # don't append to log file, clobber _LOG_FILE_HDLR.setFormatter(_LOG_FORMATTER) _LOG_FILE_HDLR.setLevel(logging.DEBUG) diff --git a/yardstick/benchmark/contexts/heat.py b/yardstick/benchmark/contexts/heat.py index 9a7b3817f..ff3e5f801 100644 --- a/yardstick/benchmark/contexts/heat.py +++ b/yardstick/benchmark/contexts/heat.py @@ -361,6 +361,8 @@ class HeatContext(Context): 'subnet', 'gateway_ip')] return { + # add default port name + "name": port, "private_ip": private_ip, "subnet_id": outputs[h_join(stack_name, "subnet_id")], "subnet_cidr": output_subnet_cidr, diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 3ff064ae1..6aaaed888 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -191,7 +191,9 @@ class ArithmeticRunner(base.Runner): __execution_type__ = 'Arithmetic' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index 3ecf67736..13718d793 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -17,13 +17,14 @@ # rally/rally/benchmark/runners/base.py from __future__ import absolute_import -import importlib + import logging import multiprocessing import subprocess import time import traceback +import importlib from six.moves.queue import Empty @@ -36,7 +37,6 @@ log = logging.getLogger(__name__) def _execute_shell_command(command): """execute shell script with error handling""" exitcode = 0 - output = [] try: output = subprocess.check_output(command, shell=True) except Exception: diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index 75942766d..fbf72a74c 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -31,6 +31,9 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) +QUEUE_PUT_TIMEOUT = 10 + + def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg, aborted, output_queue): @@ -79,7 +82,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.exception("") else: if result: - output_queue.put(result) + # add timeout for put so we don't block test + # if we do timeout we don't care about dropping individual KPIs + output_queue.put(result, True, QUEUE_PUT_TIMEOUT) time.sleep(interval) @@ -90,7 +95,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - queue.put(benchmark_output) + queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], "sequence": sequence}) @@ -133,7 +138,9 @@ If the scenario ends before the time has elapsed, it will be started again. __execution_type__ = 'Duration' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py index 01e76c6f4..63bfc823a 100755 --- a/yardstick/benchmark/runners/dynamictp.py +++ b/yardstick/benchmark/runners/dynamictp.py @@ -19,11 +19,12 @@ """A runner that searches for the max throughput with binary search """ -import os -import multiprocessing import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base @@ -65,8 +66,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, max_throuput_found = False sequence = 0 - last_min_data = {} - last_min_data['packets_per_second'] = 0 + last_min_data = {'packets_per_second': 0} while True: sequence += 1 @@ -125,7 +125,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, queue.put(record) max_throuput_found = True - if (errors) or aborted.is_set() or max_throuput_found: + if errors or aborted.is_set() or max_throuput_found: LOG.info("worker END") break @@ -155,7 +155,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, class IterationRunner(base.Runner): - '''Run a scenario to find the max throughput + """Run a scenario to find the max throughput If the scenario ends before the time has elapsed, it will be started again. @@ -168,11 +168,13 @@ If the scenario ends before the time has elapsed, it will be started again. type: int unit: pps default: 1000 pps - ''' + """ __execution_type__ = 'Dynamictp' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted)) diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 4a7439588..cb0424377 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -20,17 +20,22 @@ """ from __future__ import absolute_import -import os -import multiprocessing + import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) +QUEUE_PUT_TIMEOUT = 10 + + def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg, aborted, output_queue): @@ -85,13 +90,14 @@ def _worker_process(queue, cls, method_name, scenario_cfg, scenario_cfg['options']['rate'] -= delta sequence = 1 continue - except Exception as e: + except Exception: errors = traceback.format_exc() - LOG.exception(e) + LOG.exception("") else: if result: - LOG.debug("output_queue.put %s", result) - output_queue.put(result, True, 1) + # add timeout for put so we don't block test + # if we do timeout we don't care about dropping individual KPIs + output_queue.put(result, True, QUEUE_PUT_TIMEOUT) time.sleep(interval) @@ -102,8 +108,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - LOG.debug("queue.put, %s", benchmark_output) - queue.put(benchmark_output, True, 1) + queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], @@ -148,7 +153,9 @@ If the scenario ends before the time has elapsed, it will be started again. __execution_type__ = 'Iteration' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/runners/search.py b/yardstick/benchmark/runners/search.py index 5948763a7..8037329b5 100644 --- a/yardstick/benchmark/runners/search.py +++ b/yardstick/benchmark/runners/search.py @@ -20,15 +20,16 @@ """ from __future__ import absolute_import -import os -import multiprocessing + import logging -import traceback +import multiprocessing import time - -from collections import Mapping +import traceback from contextlib import contextmanager from itertools import takewhile + +import os +from collections import Mapping from six.moves import zip from yardstick.benchmark.runners import base @@ -173,7 +174,9 @@ If the scenario ends before the time has elapsed, it will be started again. break def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=self._worker_run, args=(cls, method, scenario_cfg, context_cfg)) self.process.start() diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index f08ca5dde..d6e3f7109 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -21,11 +21,13 @@ The input value in the sequence is specified in a list in the input file. """ from __future__ import absolute_import -import os -import multiprocessing + import logging -import traceback +import multiprocessing import time +import traceback + +import os from yardstick.benchmark.runners import base @@ -140,7 +142,9 @@ class SequenceRunner(base.Runner): __execution_type__ = 'Sequence' def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) self.process = multiprocessing.Process( + name=name, target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, context_cfg, self.aborted, self.output_queue)) diff --git a/yardstick/benchmark/scenarios/availability/actionplayers.py b/yardstick/benchmark/scenarios/availability/actionplayers.py index c5e199ba6..d5a531e8a 100644 --- a/yardstick/benchmark/scenarios/availability/actionplayers.py +++ b/yardstick/benchmark/scenarios/availability/actionplayers.py @@ -20,8 +20,10 @@ class ActionPlayer(object): class AttackerPlayer(ActionPlayer): - def __init__(self, attacker): + def __init__(self, attacker, intermediate_variables): self.underlyingAttacker = attacker + self.underlyingAttacker.intermediate_variables \ + = intermediate_variables def action(self): self.underlyingAttacker.inject_fault() @@ -40,8 +42,10 @@ class OperationPlayer(ActionPlayer): class MonitorPlayer(ActionPlayer): - def __init__(self, monitor): + def __init__(self, monitor, intermediate_variables): self.underlyingmonitor = monitor + self.underlyingmonitor.intermediate_variables \ + = intermediate_variables def action(self): self.underlyingmonitor.start_monitor() @@ -49,8 +53,10 @@ class MonitorPlayer(ActionPlayer): class ResultCheckerPlayer(ActionPlayer): - def __init__(self, resultChecker): + def __init__(self, resultChecker, intermediate_variables): self.underlyingresultChecker = resultChecker + self.underlyingresultChecker.intermediate_variables \ + = intermediate_variables def action(self): self.underlyingresultChecker.verify() diff --git a/yardstick/benchmark/scenarios/availability/attacker/attacker_general.py b/yardstick/benchmark/scenarios/availability/attacker/attacker_general.py index 48863af93..11b02a222 100644 --- a/yardstick/benchmark/scenarios/availability/attacker/attacker_general.py +++ b/yardstick/benchmark/scenarios/availability/attacker/attacker_general.py @@ -13,6 +13,8 @@ import yardstick.ssh as ssh from yardstick.benchmark.scenarios.availability import util from yardstick.benchmark.scenarios.availability.attacker.baseattacker import \ BaseAttacker +from yardstick.benchmark.scenarios.availability.util \ + import read_stdout_item, build_shell_command LOG = logging.getLogger(__name__) @@ -33,13 +35,7 @@ class GeneralAttacker(BaseAttacker): self.attack_key = self._config['attack_key'] if "action_parameter" in self._config: - actionParameter = self._config['action_parameter'] - str = util.buildshellparams(actionParameter) - LOG.debug("inject parameter is: %s", actionParameter) - LOG.debug("inject parameter values are: %s", - list(actionParameter.values())) - l = list(actionParameter.values()) - self.action_param = str.format(*l) + self.actionParameter_config = self._config['action_parameter'] if "rollback_parameter" in self._config: rollbackParameter = self._config['rollback_parameter'] @@ -59,8 +55,12 @@ class GeneralAttacker(BaseAttacker): def inject_fault(self): LOG.debug("%s starting inject!", self.key) LOG.debug("the inject_script path:%s", self.inject_script) - if "action_parameter" in self._config: + self.action_param = \ + build_shell_command( + self.actionParameter_config, + bool(self.connection), + self.intermediate_variables) LOG.debug("the shell command is: %s", self.action_param) with open(self.inject_script, "r") as stdin_file: exit_status, stdout, stderr = self.connection.execute( @@ -75,6 +75,12 @@ class GeneralAttacker(BaseAttacker): LOG.debug("the inject_fault's exit status is: %s", exit_status) if exit_status == 0: LOG.debug("success,the inject_fault's output is: %s", stdout) + if "return_parameter" in self._config: + returnParameter = self._config['return_parameter'] + for key, item in returnParameter.items(): + value = read_stdout_item(stdout, key) + LOG.debug("intermediate variables %s: %s", item, value) + self.intermediate_variables[item] = value else: LOG.error( "the inject_fault's error, stdout:%s, stderr:%s", diff --git a/yardstick/benchmark/scenarios/availability/attacker/baseattacker.py b/yardstick/benchmark/scenarios/availability/attacker/baseattacker.py index 61698da43..d03d04420 100644 --- a/yardstick/benchmark/scenarios/availability/attacker/baseattacker.py +++ b/yardstick/benchmark/scenarios/availability/attacker/baseattacker.py @@ -62,6 +62,7 @@ class BaseAttacker(object): self._context = context self.data = {} self.setup_done = False + self.intermediate_variables = {} @staticmethod def get_attacker_cls(attacker_cfg): diff --git a/yardstick/benchmark/scenarios/availability/director.py b/yardstick/benchmark/scenarios/availability/director.py index f152af090..71690c135 100644 --- a/yardstick/benchmark/scenarios/availability/director.py +++ b/yardstick/benchmark/scenarios/availability/director.py @@ -71,12 +71,12 @@ class Director(object): LOG.debug( "the type of current action is %s, the key is %s", type, key) if type == ActionType.ATTACKER: - return actionplayers.AttackerPlayer(self.attackerMgr[key]) + return actionplayers.AttackerPlayer(self.attackerMgr[key], intermediate_variables) if type == ActionType.MONITOR: - return actionplayers.MonitorPlayer(self.monitorMgr[key]) + return actionplayers.MonitorPlayer(self.monitorMgr[key], intermediate_variables) if type == ActionType.RESULTCHECKER: return actionplayers.ResultCheckerPlayer( - self.resultCheckerMgr[key]) + self.resultCheckerMgr[key], intermediate_variables) if type == ActionType.OPERATION: return actionplayers.OperationPlayer(self.operationMgr[key], intermediate_variables) diff --git a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py index a6c1a28bd..50a63f53d 100644 --- a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py +++ b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py @@ -94,6 +94,7 @@ class BaseMonitor(multiprocessing.Process): self.monitor_data = data self.setup_done = False self.tag = "" + self.intermediate_variables = {} @staticmethod def get_monitor_cls(monitor_type): diff --git a/yardstick/benchmark/scenarios/availability/monitor/monitor_general.py b/yardstick/benchmark/scenarios/availability/monitor/monitor_general.py index c16765fe0..b058ae2b1 100644 --- a/yardstick/benchmark/scenarios/availability/monitor/monitor_general.py +++ b/yardstick/benchmark/scenarios/availability/monitor/monitor_general.py @@ -11,8 +11,8 @@ import logging import yardstick.ssh as ssh from yardstick.benchmark.scenarios.availability.monitor import basemonitor -from yardstick.benchmark.scenarios.availability.util import buildshellparams - +from yardstick.benchmark.scenarios.availability.util \ + import build_shell_command, execute_shell_command LOG = logging.getLogger(__name__) @@ -23,37 +23,41 @@ class GeneralMonitor(basemonitor.BaseMonitor): __monitor_type__ = "general-monitor" def setup(self): - host = self._context[self._config["host"]] + host = self._context.get(self._config.get('host', None), None) + self.connection = None + if host: + self.connection = ssh.SSH.from_node( + host, defaults={"user": "root"}) + self.connection.wait(timeout=600) + LOG.debug("ssh host success!") self.key = self._config["key"] self.monitor_key = self._config["monitor_key"] self.monitor_type = self._config["monitor_type"] - if "parameter" in self._config: - parameter = self._config['parameter'] - str = buildshellparams(parameter) - l = list(item for item in parameter.values()) - self.cmd_param = str.format(*l) - + self.parameter_config = self._config['parameter'] self.monitor_cfg = basemonitor.BaseMonitor.monitor_cfgs.get( self.monitor_key) self.monitor_script = self.get_script_fullpath( self.monitor_cfg['monitor_script']) - self.connection = ssh.SSH.from_node(host, defaults={"user": "root"}) - self.connection.wait(timeout=600) LOG.debug("ssh host success!") def monitor_func(self): if "parameter" in self._config: - with open(self.monitor_script, "r") as stdin_file: - exit_status, stdout, stderr = self.connection.execute( - "sudo {}".format(self.cmd_param), - stdin=stdin_file) + self.cmd_param = \ + build_shell_command( + self.parameter_config, + bool(self.connection), + self.intermediate_variables) + cmd_remote = "sudo {}".format(self.cmd_param) + cmd_local = "/bin/bash {0} {1}".format(self.monitor_script, self.cmd_param) else: + cmd_remote = "sudo /bin/sh -s " + cmd_local = "/bin/bash {0}".format(self.monitor_script) + if self.connection: with open(self.monitor_script, "r") as stdin_file: - exit_status, stdout, stderr = self.connection.execute( - "sudo /bin/bash -s ", - stdin=stdin_file) - + exit_status, stdout, stderr = self.connection.execute(cmd_remote, stdin=stdin_file) + else: + exit_status, stdout = execute_shell_command(cmd_local) if exit_status: return False return True diff --git a/yardstick/benchmark/scenarios/availability/result_checker/baseresultchecker.py b/yardstick/benchmark/scenarios/availability/result_checker/baseresultchecker.py index 05b660105..d1750ab65 100644 --- a/yardstick/benchmark/scenarios/availability/result_checker/baseresultchecker.py +++ b/yardstick/benchmark/scenarios/availability/result_checker/baseresultchecker.py @@ -66,6 +66,7 @@ class BaseResultChecker(object): self._config = config self._context = context self.setup_done = False + self.intermediate_variables = {} @staticmethod def get_resultchecker_cls(type): diff --git a/yardstick/benchmark/scenarios/availability/result_checker/result_checker_general.py b/yardstick/benchmark/scenarios/availability/result_checker/result_checker_general.py index 454338175..0802aa452 100644 --- a/yardstick/benchmark/scenarios/availability/result_checker/result_checker_general.py +++ b/yardstick/benchmark/scenarios/availability/result_checker/result_checker_general.py @@ -15,7 +15,7 @@ from yardstick.benchmark.scenarios.availability.result_checker \ from yardstick.benchmark.scenarios.availability import Condition import yardstick.ssh as ssh from yardstick.benchmark.scenarios.availability.util \ - import buildshellparams, execute_shell_command + import execute_shell_command, build_shell_command LOG = logging.getLogger(__name__) @@ -40,22 +40,20 @@ class GeneralResultChecker(BaseResultChecker): self.condition = self._config['condition'] self.expectedResult = self._config['expectedValue'] self.actualResult = object() - self.key = self._config['key'] if "parameter" in self._config: - parameter = self._config['parameter'] - str = buildshellparams( - parameter, True if self.connection else False) - l = list(item for item in parameter.values()) - self.shell_cmd = str.format(*l) - - self.resultchecker_cfgs = BaseResultChecker.resultchecker_cfgs.get( - self.resultchecker_key) + self.parameter_config = self._config['parameter'] + self.resultchecker_cfgs = BaseResultChecker.resultchecker_cfgs.get(self.resultchecker_key) self.verify_script = self.get_script_fullpath( self.resultchecker_cfgs['verify_script']) def verify(self): if "parameter" in self._config: + self.shell_cmd = \ + build_shell_command( + self.parameter_config, + bool(self.connection), + self.intermediate_variables) if self.connection: with open(self.verify_script, "r") as stdin_file: exit_status, stdout, stderr = self.connection.execute( diff --git a/yardstick/benchmark/scenarios/availability/util.py b/yardstick/benchmark/scenarios/availability/util.py index d288fcbc1..adc20b844 100644 --- a/yardstick/benchmark/scenarios/availability/util.py +++ b/yardstick/benchmark/scenarios/availability/util.py @@ -33,7 +33,7 @@ def execute_shell_command(command): LOG.error(traceback.format_exc()) return exitcode, output -PREFIX = '$' +PREFIX = '@' def build_shell_command(param_config, remote=True, intermediate_variables=None): diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py index d9cc0eac1..0fab45480 100644 --- a/yardstick/benchmark/scenarios/networking/vnf_generic.py +++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py @@ -19,6 +19,8 @@ import logging import errno import ipaddress + +import copy import os import sys import re @@ -30,6 +32,7 @@ from collections import defaultdict from yardstick.benchmark.scenarios import base from yardstick.common.constants import LOG_DIR +from yardstick.common.process import terminate_children from yardstick.common.utils import import_modules_from_package, itersubclasses from yardstick.common.yaml_loader import yaml_load from yardstick.network_services.collector.subscriber import Collector @@ -375,13 +378,7 @@ class NetworkServiceTestCase(base.Scenario): context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id'])) # convert OrderedDict to a list # pod.yaml nodes is a list - nodes = [] - for node in self.context_cfg["nodes"].values(): - # name field is required - # remove context suffix - node['name'] = node['name'].split('.')[0] - nodes.append(node) - nodes = self._convert_pkeys_to_string(nodes) + nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()] pod_dict = { "nodes": nodes, "networks": self.context_cfg["networks"] @@ -391,15 +388,16 @@ class NetworkServiceTestCase(base.Scenario): explicit_start=True) @staticmethod - def _convert_pkeys_to_string(nodes): - # make copy because we are mutating - nodes = nodes[:] - for i, node in enumerate(nodes): - try: - nodes[i] = dict(node, pkey=ssh.convert_key_to_str(node["pkey"])) - except KeyError: - pass - return nodes + def _serialize_node(node): + new_node = copy.deepcopy(node) + # name field is required + # remove context suffix + new_node["name"] = node['name'].split('.')[0] + try: + new_node["pkey"] = ssh.convert_key_to_str(node["pkey"]) + except KeyError: + pass + return new_node TOPOLOGY_REQUIRED_KEYS = frozenset({ "vpci", "local_ip", "netmask", "local_mac", "driver"}) @@ -596,7 +594,8 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ vnf.instantiate(self.scenario_cfg, self.context_cfg) LOG.info("Waiting for %s to instantiate", vnf.name) vnf.wait_for_instantiate() - except RuntimeError: + except: + LOG.exception("") for vnf in self.vnfs: vnf.terminate() raise @@ -635,7 +634,19 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ :return """ - self.collector.stop() - for vnf in self.vnfs: - LOG.info("Stopping %s", vnf.name) - vnf.terminate() + try: + try: + self.collector.stop() + for vnf in self.vnfs: + LOG.info("Stopping %s", vnf.name) + vnf.terminate() + LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs)) + finally: + terminate_children() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise RuntimeError("Error in teardown") diff --git a/yardstick/common/constants.py b/yardstick/common/constants.py index b416f42b9..32ed746df 100644 --- a/yardstick/common/constants.py +++ b/yardstick/common/constants.py @@ -8,11 +8,16 @@ ############################################################################## from __future__ import absolute_import import os +import errno + from functools import reduce import pkg_resources -from yardstick.common.utils import parse_yaml +# this module must only import other modules that do +# not require loggers to be created, so this cannot +# include yardstick.common.utils +from yardstick.common.yaml_loader import yaml_load dirname = os.path.dirname abspath = os.path.abspath @@ -29,7 +34,19 @@ def get_param(key, default=''): # don't re-parse yaml for each lookup if not CONF: - CONF.update(parse_yaml(conf_file)) + # do not use yardstick.common.utils.parse_yaml + # since yardstick.common.utils creates a logger + # and so it cannot be imported before this code + try: + with open(conf_file) as f: + value = yaml_load(f) + except IOError: + pass + except OSError as e: + if e.errno != errno.EEXIST: + raise + else: + CONF.update(value) try: return reduce(lambda a, b: a[b], key.split('.'), CONF) except KeyError: diff --git a/yardstick/common/process.py b/yardstick/common/process.py new file mode 100644 index 000000000..812ddea94 --- /dev/null +++ b/yardstick/common/process.py @@ -0,0 +1,47 @@ +# Copyright (c) 2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import multiprocessing + +import os + +LOG = logging.getLogger(__name__) + + +def check_if_process_failed(proc, timeout=1): + if proc is not None: + proc.join(timeout) + # Only abort if the process aborted + if proc.exitcode is not None and proc.exitcode > 0: + raise RuntimeError("{} exited with status {}".format(proc.name, proc.exitcode)) + + +def terminate_children(timeout=3): + current_proccess = multiprocessing.current_process() + active_children = multiprocessing.active_children() + if not active_children: + LOG.debug("no children to terminate") + return + for child in active_children: + LOG.debug("%s %s %s, child: %s %s", current_proccess.name, current_proccess.pid, + os.getpid(), child, child.pid) + LOG.debug("joining %s", child) + child.join(timeout) + child.terminate() + active_children = multiprocessing.active_children() + if not active_children: + LOG.debug("no children to terminate") + for child in active_children: + LOG.debug("%s %s %s, after terminate child: %s %s", current_proccess.name, + current_proccess.pid, os.getpid(), child, child.pid) diff --git a/yardstick/common/utils.py b/yardstick/common/utils.py index 68c9ed63f..6ac99a5a9 100644 --- a/yardstick/common/utils.py +++ b/yardstick/common/utils.py @@ -37,7 +37,6 @@ from oslo_utils import importutils from oslo_serialization import jsonutils import yardstick -from yardstick.common.yaml_loader import yaml_load logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -94,19 +93,6 @@ def import_modules_from_package(package): logger.exception("unable to import %s", module_name) -def parse_yaml(file_path): - try: - with open(file_path) as f: - value = yaml_load(f) - except IOError: - return {} - except OSError as e: - if e.errno != errno.EEXIST: - raise - else: - return value - - def makedirs(d): try: os.makedirs(d) diff --git a/yardstick/network_services/collector/subscriber.py b/yardstick/network_services/collector/subscriber.py index db52e0b45..d560e1d42 100644 --- a/yardstick/network_services/collector/subscriber.py +++ b/yardstick/network_services/collector/subscriber.py @@ -74,11 +74,12 @@ class Collector(object): # Result example: # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }} LOG.debug("collect KPI for %s", node_name) - if not resource.check_if_sa_running("collectd")[0]: + if resource.check_if_sa_running("collectd")[0] != 0: continue try: results[node_name] = {"core": resource.amqp_collect_nfvi_kpi()} + LOG.debug("%s collect KPIs %s", node_name, results[node_name]['core']) except Exception: LOG.exception("") return results diff --git a/yardstick/network_services/helpers/dpdknicbind_helper.py b/yardstick/network_services/helpers/dpdkbindnic_helper.py index 605d08d38..c07613147 100644 --- a/yardstick/network_services/helpers/dpdknicbind_helper.py +++ b/yardstick/network_services/helpers/dpdkbindnic_helper.py @@ -11,9 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + import re import itertools +import six + NETWORK_KERNEL = 'network_kernel' NETWORK_DPDK = 'network_dpdk' NETWORK_OTHER = 'network_other' @@ -22,6 +26,9 @@ CRYPTO_DPDK = 'crypto_dpdk' CRYPTO_OTHER = 'crypto_other' +LOG = logging.getLogger(__name__) + + class DpdkBindHelperException(Exception): pass @@ -123,23 +130,31 @@ class DpdkBindHelper(object): @property def interface_driver_map(self): return {interface['vpci']: interface['driver'] - for interface in itertools.chain(*self.dpdk_status.values())} + for interface in itertools.chain.from_iterable(self.dpdk_status.values())} def read_status(self): return self.parse_dpdk_status_output(self._dpdk_execute(self._status_cmd)[1]) - def bind(self, pci, driver, force=True): + def bind(self, pci_addresses, driver, force=True): + # accept single PCI or list of PCI + if isinstance(pci_addresses, six.string_types): + pci_addresses = [pci_addresses] cmd = self.DPDK_BIND_CMD.format(dpdk_nic_bind=self._dpdk_nic_bind, driver=driver, - vpci=' '.join(list(pci)), + vpci=' '.join(list(pci_addresses)), force='--force' if force else '') + LOG.debug(cmd) self._dpdk_execute(cmd) # update the inner status dict self.read_status() def save_used_drivers(self): - self.used_drivers = self.interface_driver_map + # invert the map, so we can bind by driver type + self.used_drivers = {} + # sort for stabililty + for vpci, driver in sorted(self.interface_driver_map.items()): + self.used_drivers.setdefault(driver, []).append(vpci) def rebind_drivers(self, force=True): - for vpci, driver in self.used_drivers.items(): - self.bind(vpci, driver, force) + for driver, vpcis in self.used_drivers.items(): + self.bind(vpcis, driver, force) diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py index 7e8334c73..e3d0e3bca 100644 --- a/yardstick/network_services/nfvi/resource.py +++ b/yardstick/network_services/nfvi/resource.py @@ -19,6 +19,7 @@ from __future__ import print_function import logging from itertools import chain +import errno import jinja2 import os import os.path @@ -72,7 +73,6 @@ class ResourceProfile(object): self.timeout = timeout self.enable = True - self.cores = validate_non_string_sequence(cores, default=[]) self._queue = multiprocessing.Queue() self.amqp_client = None self.port_names = validate_non_string_sequence(port_names, default=[]) @@ -83,8 +83,16 @@ class ResourceProfile(object): def check_if_sa_running(self, process): """ verify if system agent is running """ - status, pid, _ = self.connection.execute("pgrep -f %s" % process) - return status == 0, pid + try: + err, pid, _ = self.connection.execute("pgrep -f %s" % process) + # strip whitespace + return err, pid.strip() + except OSError as e: + if e.errno in {errno.ECONNRESET}: + # if we can't connect to check, then we won't be able to connect to stop it + LOG.exception("can't connect to host to check collectd status") + return 1, None + raise def run_collectd_amqp(self): """ run amqp consumer to collect the NFVi data """ @@ -137,7 +145,7 @@ class ResourceProfile(object): def parse_intel_pmu_stats(cls, key, value): return {''.join(str(v) for v in key): value.split(":")[1]} - def parse_collectd_result(self, metrics, core_list): + def parse_collectd_result(self, metrics): """ convert collectd data into json""" result = { "cpu": {}, @@ -161,8 +169,7 @@ class ResourceProfile(object): if "cpu" in res_key0 or "intel_rdt" in res_key0: cpu_key, name, metric, testcase = \ self.get_cpu_data(res_key0, res_key1, value) - if cpu_key in core_list: - result["cpu"].setdefault(cpu_key, {}).update({name: metric}) + result["cpu"].setdefault(cpu_key, {}).update({name: metric}) elif "memory" in res_key0: result["memory"].update({res_key1: value.split(":")[0]}) @@ -189,8 +196,9 @@ class ResourceProfile(object): def amqp_process_for_nfvi_kpi(self): """ amqp collect and return nfvi kpis """ if self.amqp_client is None and self.enable: - self.amqp_client = \ - multiprocessing.Process(target=self.run_collectd_amqp) + self.amqp_client = multiprocessing.Process( + name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()), + target=self.run_collectd_amqp) self.amqp_client.start() def amqp_collect_nfvi_kpi(self): @@ -201,7 +209,7 @@ class ResourceProfile(object): metric = {} while not self._queue.empty(): metric.update(self._queue.get()) - msg = self.parse_collectd_result(metric, self.cores) + msg = self.parse_collectd_result(metric) return msg def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs): @@ -265,8 +273,8 @@ class ResourceProfile(object): connection.execute("sudo rabbitmqctl authenticate_user admin admin") connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'") - LOG.debug("Start collectd service.....") - connection.execute("sudo %s" % collectd_path) + LOG.debug("Start collectd service..... %s second timeout", self.timeout) + connection.execute("sudo %s" % collectd_path, timeout=self.timeout) LOG.debug("Done") def initiate_systemagent(self, bin_path): @@ -292,13 +300,18 @@ class ResourceProfile(object): LOG.debug("Stop resource monitor...") if self.amqp_client is not None: + # we proper and try to join first + self.amqp_client.join(3) self.amqp_client.terminate() + LOG.debug("Check if %s is running", agent) status, pid = self.check_if_sa_running(agent) - if status == 0: + LOG.debug("status %s pid %s", status, pid) + if status != 0: return - self.connection.execute('sudo kill -9 %s' % pid) - self.connection.execute('sudo pkill -9 %s' % agent) + if pid: + self.connection.execute('sudo kill -9 "%s"' % pid) + self.connection.execute('sudo pkill -9 "%s"' % agent) self.connection.execute('sudo service rabbitmq-server stop') self.connection.execute("sudo rabbitmqctl stop_app") diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 67634a79c..778119568 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -64,6 +64,8 @@ class VnfdHelper(dict): def __init__(self, *args, **kwargs): super(VnfdHelper, self).__init__(*args, **kwargs) self.port_pairs = PortPairs(self['vdu'][0]['external-interface']) + # port num is not present until binding so we have to memoize + self._port_num_map = {} @property def mgmt_interface(self): @@ -91,12 +93,14 @@ class VnfdHelper(dict): virtual_intf = interface["virtual-interface"] if virtual_intf[key] == value: return interface + raise KeyError() def find_interface(self, **kwargs): key, value = next(iter(kwargs.items())) for interface in self.interfaces: if interface[key] == value: return interface + raise KeyError() # hide dpdk_port_num key so we can abstract def find_interface_by_port(self, port): @@ -105,6 +109,7 @@ class VnfdHelper(dict): # we have to convert to int to compare if int(virtual_intf['dpdk_port_num']) == port: return interface + raise KeyError() def port_num(self, port): # we need interface name -> DPDK port num (PMD ID) -> LINK ID @@ -118,7 +123,8 @@ class VnfdHelper(dict): intf = port else: intf = self.find_interface(name=port) - return int(intf["virtual-interface"]["dpdk_port_num"]) + return self._port_num_map.setdefault(intf["name"], + int(intf["virtual-interface"]["dpdk_port_num"])) def port_nums(self, intfs): return [self.port_num(i) for i in intfs] @@ -170,6 +176,13 @@ class GenericVNF(VNFObject): """ raise NotImplementedError() + def wait_for_instantiate(self): + """ Wait for VNF to start + + :return: True/False + """ + raise NotImplementedError() + def terminate(self): """ Kill all VNF processes diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index ba4d44c41..63c6467f4 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -364,6 +364,7 @@ class ProxSocketHelper(object): """ send data to the remote instance """ LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n')) try: + # TODO: sendall will block, we need a timeout self._sock.sendall(to_send.encode('utf-8')) except: pass @@ -593,6 +594,8 @@ _LOCAL_OBJECT = object() class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): # the actual app is lowercase APP_NAME = 'prox' + # not used for Prox but added for consistency + VNF_TYPE = "PROX" LUA_PARAMETER_NAME = "" LUA_PARAMETER_PEER = { @@ -609,6 +612,8 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): self._prox_config_data = None self.additional_files = {} self.config_queue = Queue() + # allow_exit_without_flush + self.config_queue.cancel_join_thread() self._global_section = None @property @@ -1262,23 +1267,26 @@ class ProxBngProfileHelper(ProxProfileHelper): continue for item_key, item_value in section: - if item_key == "name" and item_value.startswith("cpe"): + if item_key != 'name': + continue + + if item_value.startswith("cpe"): core_tuple = CoreSocketTuple(section_name) - core_tag = core_tuple.find_in_topology(self.cpu_topology) - cpe_cores.append(core_tag) + cpe_core = core_tuple.find_in_topology(self.cpu_topology) + cpe_cores.append(cpe_core) - elif item_key == "name" and item_value.startswith("inet"): + elif item_value.startswith("inet"): core_tuple = CoreSocketTuple(section_name) inet_core = core_tuple.find_in_topology(self.cpu_topology) inet_cores.append(inet_core) - elif item_key == "name" and item_value.startswith("arp"): + elif item_value.startswith("arp"): core_tuple = CoreSocketTuple(section_name) arp_core = core_tuple.find_in_topology(self.cpu_topology) arp_cores.append(arp_core) # We check the tasks/core separately - if item_key == "name" and item_value.startswith("arp_task"): + if item_value.startswith("arp_task"): core_tuple = CoreSocketTuple(section_name) arp_task_core = core_tuple.find_in_topology(self.cpu_topology) arp_tasks_core.append(arp_task_core) diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py index 2ac6ea412..3bfca19aa 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py @@ -16,9 +16,10 @@ import errno import logging +from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxDpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper -from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF +from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, PROCESS_JOIN_TIMEOUT LOG = logging.getLogger(__name__) @@ -51,10 +52,18 @@ class ProxApproxVnf(SampleVNF): try: return self.resource_helper.execute(cmd, *args, **kwargs) except OSError as e: - if not ignore_errors or e.errno not in {errno.EPIPE, errno.ESHUTDOWN}: + if e.errno in {errno.EPIPE, errno.ESHUTDOWN, errno.ECONNRESET}: + if ignore_errors: + LOG.debug("ignoring vnf_execute exception %s for command %s", e, cmd) + else: + raise + else: raise def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) + if self.resource_helper is None: result = { "packets_in": 0, @@ -64,12 +73,13 @@ class ProxApproxVnf(SampleVNF): } return result - intf_count = len(self.vnfd_helper.interfaces) - if intf_count not in {1, 2, 4}: + # use all_ports so we only use ports matched in topology + port_count = len(self.vnfd_helper.port_pairs.all_ports) + if port_count not in {1, 2, 4}: raise RuntimeError("Failed ..Invalid no of ports .. " "1, 2 or 4 ports only supported at this time") - port_stats = self.vnf_execute('port_stats', range(intf_count)) + port_stats = self.vnf_execute('port_stats', range(port_count)) try: rx_total = port_stats[6] tx_total = port_stats[7] @@ -94,13 +104,20 @@ class ProxApproxVnf(SampleVNF): self.setup_helper.tear_down() def terminate(self): + # stop collectd first or we get pika errors? + self.resource_helper.stop_collect() # try to quit with socket commands - self.vnf_execute("stop_all") - self.vnf_execute("quit") - # hopefully quit succeeds and socket closes, so ignore force_quit socket errors - self.vnf_execute("force_quit", _ignore_errors=True) - if self._vnf_process: - self._vnf_process.terminate() + # pkill is not matching, debug with pgrep + self.ssh_helper.execute("sudo pgrep -lax %s" % self.setup_helper.APP_NAME) + self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.setup_helper.APP_NAME) + if self._vnf_process.is_alive(): + self.vnf_execute("stop_all") + self.vnf_execute("quit") + # hopefully quit succeeds and socket closes, so ignore force_quit socket errors + self.vnf_execute("force_quit", _ignore_errors=True) self.setup_helper.kill_vnf() self._tear_down() - self.resource_helper.stop_collect() + if self._vnf_process is not None: + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 5cf234514..92f78c2bc 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -29,10 +29,11 @@ from six.moves import cStringIO from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file +from yardstick.common.process import check_if_process_failed from yardstick.network_services.helpers.cpu import CpuSysCores from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig -from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper +from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper from yardstick.network_services.nfvi.resource import ResourceProfile from yardstick.network_services.vnf_generic.vnf.base import GenericVNF from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper @@ -51,6 +52,8 @@ LOG = logging.getLogger(__name__) REMOTE_TMP = "/tmp" +DEFAULT_VNF_TIMEOUT = 3600 +PROCESS_JOIN_TIMEOUT = 3 class VnfSshHelper(AutoConnectSSH): @@ -290,8 +293,12 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): return resource def kill_vnf(self): + # pkill is not matching, debug with pgrep + self.ssh_helper.execute("sudo pgrep -lax %s" % self.APP_NAME) + self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME) # have to use exact match - self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME) + # try using killall to match + self.ssh_helper.execute("sudo killall %s" % self.APP_NAME) def _setup_dpdk(self): """ setup dpdk environment needed for vnf to run """ @@ -330,8 +337,10 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): port_names = (intf["name"] for intf in ports) collectd_options = self.get_collectd_options() plugins = collectd_options.get("plugins", {}) + # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores, - plugins=plugins, interval=collectd_options.get("interval")) + plugins=plugins, interval=collectd_options.get("interval"), + timeout=self.scenario_helper.timeout) def _detect_and_bind_drivers(self): interfaces = self.vnfd_helper.interfaces @@ -386,7 +395,7 @@ class ResourceHelper(object): def _collect_resource_kpi(self): result = {} status = self.resource.check_if_sa_running("collectd")[0] - if status: + if status == 0: result = self.resource.amqp_collect_nfvi_kpi() result = {"core": result} @@ -423,7 +432,6 @@ class ClientResourceHelper(ResourceHelper): self._queue = Queue() self._result = {} self._terminated = Value('i', 0) - self._vpci_ascending = None def _build_ports(self): self.networks = self.vnfd_helper.port_pairs.networks @@ -607,13 +615,7 @@ class SampleVNFDeployHelper(object): self.ssh_helper = ssh_helper self.vnfd_helper = vnfd_helper - DISABLE_DEPLOY = True - def deploy_vnfs(self, app_name): - # temp disable for now - if self.DISABLE_DEPLOY: - return - vnf_bin = self.ssh_helper.join_bin_path(app_name) exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0] if not exit_status: @@ -673,12 +675,18 @@ class ScenarioHelper(object): def topology(self): return self.scenario_cfg['topology'] + @property + def timeout(self): + return self.options.get('timeout', DEFAULT_VNF_TIMEOUT) + class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ VNF_PROMPT = "pipeline>" WAIT_TIME = 1 + APP_NAME = "SampleVNF" + # we run the VNF interactively, so the ssh command will timeout after this long def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNF, self).__init__(name, vnfd) @@ -761,7 +769,8 @@ class SampleVNF(GenericVNF): def _start_vnf(self): self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT) - self._vnf_process = Process(target=self._run) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._vnf_process = Process(name=name, target=self._run) self._vnf_process.start() def _vnf_up_post(self): @@ -773,7 +782,9 @@ class SampleVNF(GenericVNF): self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name]) # self.nfvi_context = None - self.deploy_helper.deploy_vnfs(self.APP_NAME) + # vnf deploy is unsupported, use ansible playbooks + if self.scenario_helper.options.get("vnf_deploy", False): + self.deploy_helper.deploy_vnfs(self.APP_NAME) self.resource_helper.setup() self._start_vnf() @@ -811,6 +822,7 @@ class SampleVNF(GenericVNF): 'stdout': self.queue_wrapper, 'keep_stdin_open': True, 'pty': True, + 'timeout': self.scenario_helper.timeout, } def _build_config(self): @@ -843,11 +855,15 @@ class SampleVNF(GenericVNF): def terminate(self): self.vnf_execute("quit") - if self._vnf_process: - self._vnf_process.terminate() self.setup_helper.kill_vnf() self._tear_down() self.resource_helper.stop_collect() + if self._vnf_process is not None: + # be proper and join first before we kill + LOG.debug("joining before terminate %s", self._vnf_process.name) + self._vnf_process.join(PROCESS_JOIN_TIMEOUT) + self._vnf_process.terminate() + # no terminate children here because we share processes with tg def get_stats(self, *args, **kwargs): """ @@ -861,6 +877,8 @@ class SampleVNF(GenericVNF): return out def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) stats = self.get_stats() m = re.search(self.COLLECT_KPI, stats, re.MULTILINE) if m: @@ -885,7 +903,6 @@ class SampleVNFTrafficGen(GenericTrafficGen): def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): super(SampleVNFTrafficGen, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') - self.name = "tgen__1" # name in topology file self.scenario_helper = ScenarioHelper(self.name) self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True) @@ -917,7 +934,8 @@ class SampleVNFTrafficGen(GenericTrafficGen): self.resource_helper.setup() LOG.info("Starting %s server...", self.APP_NAME) - self._tg_process = Process(target=self._start_server) + name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid()) + self._tg_process = Process(name=name, target=self._start_server) self._tg_process.start() def wait_for_instantiate(self): @@ -953,7 +971,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - self._traffic_process = Process(target=self._traffic_runner, + name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + os.getpid()) + self._traffic_process = Process(name=name, target=self._traffic_runner, args=(traffic_profile,)) self._traffic_process.start() # Wait for traffic process to start @@ -984,6 +1004,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): pass def collect_kpi(self): + # check if the tg processes have exited + for proc in (self._tg_process, self._traffic_process): + check_if_process_failed(proc) result = self.resource_helper.collect_kpi() LOG.debug("%s collect KPIs %s", self.APP_NAME, result) return result @@ -994,5 +1017,15 @@ class SampleVNFTrafficGen(GenericTrafficGen): :return: True/False """ self.traffic_finished = True + # we must kill client before we kill the server, or the client will raise exception if self._traffic_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._traffic_process.name) + self._traffic_process.join(PROCESS_JOIN_TIMEOUT) self._traffic_process.terminate() + if self._tg_process is not None: + # be proper and try to join before terminating + LOG.debug("joining before terminate %s", self._tg_process.name) + self._tg_process.join(PROCESS_JOIN_TIMEOUT) + self._tg_process.terminate() + # no terminate children here because we share processes with vnf diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py index 1fe790f08..fe435f63e 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py @@ -21,7 +21,7 @@ import os import yaml -from yardstick.common.utils import mac_address_to_hex_list +from yardstick.common.utils import mac_address_to_hex_list, try_int from yardstick.network_services.utils import get_nsb_option from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper @@ -76,7 +76,6 @@ class TrexResourceHelper(ClientResourceHelper): cfg_str = yaml.safe_dump(cfg_file, default_flow_style=False, explicit_start=True) self.ssh_helper.upload_config_file(os.path.basename(self.CONF_FILE), cfg_str) - self._vpci_ascending = sorted(vpci_list) def check_status(self): status, _, _ = self.ssh_helper.execute("sudo lsof -i:%s" % self.SYNC_PORT) @@ -109,15 +108,28 @@ class TrexResourceHelper(ClientResourceHelper): self.ssh_helper.execute("sudo pkill -9 rex > /dev/null 2>&1") + # We MUST default to 1 because TRex won't work on single-queue devices with + # more than one core per port + # We really should be trying to find the number of queues in the driver, + # but there doesn't seem to be a way to do this + # TRex Error: the number of cores should be 1 when the driver + # support only one tx queue and one rx queue. Please use -c 1 + threads_per_port = try_int(self.scenario_helper.options.get("queues_per_port"), 1) + trex_path = self.ssh_helper.join_bin_path("trex", "scripts") path = get_nsb_option("trex_path", trex_path) - # cmd = "sudo ./t-rex-64 -i --cfg %s > /dev/null 2>&1" % self.CONF_FILE - cmd = "./t-rex-64 -i --cfg '{}'".format(self.CONF_FILE) + cmd = "./t-rex-64 --no-scapy-server -i -c {} --cfg '{}'".format(threads_per_port, + self.CONF_FILE) - # if there are errors we want to see them + if self.scenario_helper.options.get("trex_server_debug"): + # if there are errors we want to see them + redir = "" + else: + redir = ">/dev/null" # we have to sudo cd because the path might be owned by root - trex_cmd = """sudo bash -c "cd '{}' ; {}" >/dev/null""".format(path, cmd) + trex_cmd = """sudo bash -c "cd '{}' ; {}" {}""".format(path, cmd, redir) + LOG.debug(trex_cmd) self.ssh_helper.execute(trex_cmd) def terminate(self): diff --git a/yardstick/network_services/vnf_generic/vnf/udp_replay.py b/yardstick/network_services/vnf_generic/vnf/udp_replay.py index 6b7779782..a57f53bc7 100644 --- a/yardstick/network_services/vnf_generic/vnf/udp_replay.py +++ b/yardstick/network_services/vnf_generic/vnf/udp_replay.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import logging +from yardstick.common.process import check_if_process_failed from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper @@ -107,6 +108,8 @@ class UdpReplayApproxVnf(SampleVNF): def collect_kpi(self): def get_sum(offset): return sum(int(i) for i in split_stats[offset::5]) + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) number_of_ports = len(self.vnfd_helper.port_pairs.all_ports) diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index 5f1c4d4d3..c02c0eb27 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -24,6 +24,7 @@ import posixpath from six.moves import configparser, zip +from yardstick.common.process import check_if_process_failed from yardstick.network_services.helpers.samplevnf_helper import PortPairs from yardstick.network_services.pipeline import PipelineRules from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper @@ -278,6 +279,8 @@ class VpeApproxVnf(SampleVNF): raise NotImplementedError def collect_kpi(self): + # we can't get KPIs if the VNF is down + check_if_process_failed(self._vnf_process) result = { 'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0, diff --git a/yardstick/resources/scripts/install/ovs_deploy.bash b/yardstick/resources/scripts/install/ovs_deploy.bash new file mode 100755 index 000000000..d94f30db1 --- /dev/null +++ b/yardstick/resources/scripts/install/ovs_deploy.bash @@ -0,0 +1,122 @@ +#!/bin/bash +# +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +INSTALL_OVS_BIN="/usr/src" +cd $INSTALL_OVS_BIN + +if [[ $EUID -ne 0 ]]; then + echo "Must be root to run $0" + exit 1; +fi + +prerequisite() +{ + echo "Install required libraries to run collectd..." + pkg=(git flex bison build-essential pkg-config automake autotools-dev libltdl-dev cmake qemu-kvm libvirt-bin bridge-utils numactl libnuma-dev libpcap-dev) + for i in "${pkg[@]}"; do + dpkg-query -W --showformat='${Status}\n' "${i}"|grep "install ok installed" + if [ "$?" -eq "1" ]; then + apt-get update + apt-get -y install "${i}"; + fi + done + echo "Done" +} + +download_zip() +{ + url=$1 + download_type=$2 + if [ -n "${download_type}" ]; then + echo "Download ${download_type} zip" + fi + # rm goes into calling code + echo "${url}" + if [ ! -e "${url##*/}" ]; then + wget "${url}" + fi + tar xvf "${url##*/}" +} + +dpdk_build() +{ + pushd . + if [[ $DPDK_VERSION != "" ]]; then + export DPDK_DIR=$INSTALL_OVS_BIN/dpdk-stable-$DPDK_VERSION + export RTE_TARGET=x86_64-native-linuxapp-gcc + export DPDK_BUILD=$DPDK_DIR/$RTE_TARGET + rm -rf "$DPDK_DIR" + DPDK_DOWNLOAD="http://fast.dpdk.org/rel/dpdk-$DPDK_VERSION.tar.xz" + download_zip "${DPDK_DOWNLOAD}" "DPDK" + cd dpdk-stable-"$DPDK_VERSION" + make install -j T=$RTE_TARGET + fi + popd +} + +ovs() +{ + pushd . + if [[ $OVS_VERSION != "" ]]; then + rm -rf openswitch-"$OVS_VERSION" + OVS_DOWNLOAD="http://openvswitch.org/releases/openvswitch-$OVS_VERSION.tar.gz" + download_zip "${OVS_DOWNLOAD}" "OVS" + cd openvswitch-"$OVS_VERSION" + export OVS_DIR=/usr/src/openvswitch-$OVS_VERSION + ./boot.sh + if [[ $DPDK_VERSION != "" ]]; then + ./configure --with-dpdk="$DPDK_BUILD" + else + ./configure + fi + make install -j + fi + popd +} + +main() +{ + dpdk_build + ovs +} + +for i in "$@" +do + case $i in + -o=*|--ovs=*) + OVS_VERSION="${i#*=}" + ;; + -d=*|--dpdk=*) + DPDK_VERSION="${i#*=}" + ;; + -p=*|--proxy=*) + export http_proxy="${i#*=}" + export https_proxy="${i#*=}" + ;; + -h|--help) + echo "CommandLine options:" + echo "====================" + echo "1. ovs_dpdk install mode:" + echo "./ovs_install.sh --ovs=<2.7.0> --dpdk=<supported dpdk versoin for given ovs> -p=<proxy>" + echo + exit + ;; + *) + ;; + esac +done + +main |