diff options
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf')
21 files changed, 680 insertions, 284 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/acl_vnf.py b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py index 11a602472..69d29bf76 100644 --- a/yardstick/network_services/vnf_generic/vnf/acl_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/acl_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -246,9 +246,12 @@ class AclApproxVnf(SampleVNF): 'packets_dropped': 2, } - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = AclApproxSetupEnvSetupEnvHelper - super(AclApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(AclApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) + + def wait_for_instantiate(self): + """Wait for VNF to initialize""" + self.wait_for_initialize() diff --git a/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py index 115fddcf0..d1d9667db 100644 --- a/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ LOG = logging.getLogger(__name__) class AgnosticVnf(base.GenericVNF): """ AgnosticVnf implementation. """ - def __init__(self, name, vnfd, task_id): - super(AgnosticVnf, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd): + super(AgnosticVnf, self).__init__(name, vnfd) def instantiate(self, scenario_cfg, context_cfg): pass diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 0fb310075..8064ae927 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,10 +17,6 @@ import abc import logging import six -from yardstick.common import messaging -from yardstick.common.messaging import consumer -from yardstick.common.messaging import payloads -from yardstick.common.messaging import producer from yardstick.network_services.helpers.samplevnf_helper import PortPairs @@ -141,70 +137,6 @@ class VnfdHelper(dict): yield port_name, port_num -class TrafficGeneratorProducer(producer.MessagingProducer): - """Class implementing the message producer for traffic generators - - This message producer must be instantiated in the process created - "run_traffic" process. - """ - def __init__(self, _id): - super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG, - _id=_id) - - def tg_method_started(self, version=1): - """Send a message to inform the traffic generation has started""" - self.send_message( - messaging.TG_METHOD_STARTED, - payloads.TrafficGeneratorPayload(version=version, iteration=0, - kpi={})) - - def tg_method_finished(self, version=1): - """Send a message to inform the traffic generation has finished""" - self.send_message( - messaging.TG_METHOD_FINISHED, - payloads.TrafficGeneratorPayload(version=version, iteration=0, - kpi={})) - - def tg_method_iteration(self, iteration, version=1, kpi=None): - """Send a message, with KPI, once an iteration has finished""" - kpi = {} if kpi is None else kpi - self.send_message( - messaging.TG_METHOD_ITERATION, - payloads.TrafficGeneratorPayload(version=version, - iteration=iteration, kpi=kpi)) - - -@six.add_metaclass(abc.ABCMeta) -class GenericVNFEndpoint(consumer.NotificationHandler): - """Endpoint class for ``GenericVNFConsumer``""" - - @abc.abstractmethod - def runner_method_start_iteration(self, ctxt, **kwargs): - """Endpoint when RUNNER_METHOD_START_ITERATION is received - - :param ctxt: (dict) {'id': <Producer ID>} - :param kwargs: (dict) ``payloads.RunnerPayload`` context - """ - - @abc.abstractmethod - def runner_method_stop_iteration(self, ctxt, **kwargs): - """Endpoint when RUNNER_METHOD_STOP_ITERATION is received - - :param ctxt: (dict) {'id': <Producer ID>} - :param kwargs: (dict) ``payloads.RunnerPayload`` context - """ - - -class GenericVNFConsumer(consumer.MessagingConsumer): - """MQ consumer for ``GenericVNF`` derived classes""" - - def __init__(self, ctx_ids, endpoints): - if not isinstance(endpoints, list): - endpoints = [endpoints] - super(GenericVNFConsumer, self).__init__(messaging.TOPIC_RUNNER, - ctx_ids, endpoints) - - @six.add_metaclass(abc.ABCMeta) class GenericVNF(object): """Class providing file-like API for generic VNF implementation @@ -217,9 +149,8 @@ class GenericVNF(object): UPLINK = PortPairs.UPLINK DOWNLINK = PortPairs.DOWNLINK - def __init__(self, name, vnfd, task_id): + def __init__(self, name, vnfd): self.name = name - self._task_id = task_id self.vnfd_helper = VnfdHelper(vnfd) # List of statistics we can obtain from this VNF # - ETSI MANO 6.3.1.1 monitoring_parameter @@ -280,11 +211,10 @@ class GenericVNF(object): class GenericTrafficGen(GenericVNF): """Class providing file-like API for generic traffic generator""" - def __init__(self, name, vnfd, task_id): - super(GenericTrafficGen, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd): + super(GenericTrafficGen, self).__init__(name, vnfd) self.runs_traffic = True self.traffic_finished = False - self._mq_producer = None @abc.abstractmethod def run_traffic(self, traffic_profile): @@ -355,16 +285,3 @@ class GenericTrafficGen(GenericVNF): :return: True/False """ pass - - @staticmethod - def _setup_mq_producer(id): - """Setup the TG MQ producer to send messages between processes - - :return: (derived class from ``MessagingProducer``) MQ producer object - """ - return TrafficGeneratorProducer(id) - - def get_mq_producer_id(self): - """Return the MQ producer ID if initialized""" - if self._mq_producer: - return self._mq_producer.id diff --git a/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py index 14f1e2e97..ee4a581b1 100644 --- a/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/cgnapt_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -85,12 +85,12 @@ class CgnaptApproxVnf(SampleVNF): "packets_dropped": 4, } - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = CgnaptApproxSetupEnvHelper - super(CgnaptApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(CgnaptApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) def _vnf_up_post(self): super(CgnaptApproxVnf, self)._vnf_up_post() @@ -120,3 +120,7 @@ class CgnaptApproxVnf(SampleVNF): self.vnf_execute(cmd) time.sleep(WAIT_FOR_STATIC_NAPT) + + def wait_for_instantiate(self): + """Wait for VNF to initialize""" + self.wait_for_initialize() diff --git a/yardstick/network_services/vnf_generic/vnf/epc_vnf.py b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py index 66d16d07f..8112963e9 100644 --- a/yardstick/network_services/vnf_generic/vnf/epc_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ LOG = logging.getLogger(__name__) class EPCVnf(base.GenericVNF): - def __init__(self, name, vnfd, task_id): - super(EPCVnf, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd): + super(EPCVnf, self).__init__(name, vnfd) def instantiate(self, scenario_cfg, context_cfg): """Prepare VNF for operation and start the VNF process/VM diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index cd3035ef8..a3d0c19cd 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -1146,7 +1146,7 @@ class ProxResourceHelper(ClientResourceHelper): self._test_type = self.setup_helper.find_in_section('global', 'name', None) return self._test_type - def run_traffic(self, traffic_profile, *args): + def run_traffic(self, traffic_profile): self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 diff --git a/yardstick/network_services/vnf_generic/vnf/prox_irq.py b/yardstick/network_services/vnf_generic/vnf/prox_irq.py index dda26b0fe..614066e46 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_irq.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_irq.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,13 +28,13 @@ LOG = logging.getLogger(__name__) class ProxIrq(SampleVNFTrafficGen): - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): vnfd_cpy = copy.deepcopy(vnfd) - super(ProxIrq, self).__init__(name, vnfd_cpy, task_id) + super(ProxIrq, self).__init__(name, vnfd_cpy) self._vnf_wrapper = ProxApproxVnf( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + name, vnfd, setup_env_helper_type, resource_helper_type) self.bin_path = get_nsb_option('bin_path', '') self.name = self._vnf_wrapper.name self.ssh_helper = self._vnf_wrapper.ssh_helper @@ -83,9 +83,9 @@ class ProxIrqVNF(ProxIrq, SampleVNFTrafficGen): APP_NAME = 'ProxIrqVNF' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - ProxIrq.__init__(self, name, vnfd, task_id, setup_env_helper_type, + ProxIrq.__init__(self, name, vnfd, setup_env_helper_type, resource_helper_type) self.start_test_time = None @@ -150,9 +150,9 @@ class ProxIrqGen(ProxIrq, SampleVNFTrafficGen): APP_NAME = 'ProxIrqGen' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): - ProxIrq.__init__(self, name, vnfd, task_id, setup_env_helper_type, + ProxIrq.__init__(self, name, vnfd, setup_env_helper_type, resource_helper_type) self.start_test_time = None self.end_test_time = None diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py index c3b50369b..c9abc757e 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -35,8 +35,7 @@ class ProxApproxVnf(SampleVNF): VNF_PROMPT = "PROX started" LUA_PARAMETER_NAME = "sut" - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = ProxDpdkVnfSetupEnvHelper @@ -47,8 +46,8 @@ class ProxApproxVnf(SampleVNF): self.prev_packets_sent = 0 self.prev_tsc = 0 self.tsc_hz = 0 - super(ProxApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + super(ProxApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) def _vnf_up_post(self): self.resource_helper.up_post() diff --git a/yardstick/network_services/vnf_generic/vnf/router_vnf.py b/yardstick/network_services/vnf_generic/vnf/router_vnf.py index e99de9cb3..f1486bdb4 100644 --- a/yardstick/network_services/vnf_generic/vnf/router_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/router_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -34,8 +34,7 @@ class RouterVNF(SampleVNF): WAIT_TIME = 1 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = DpdkVnfSetupEnvHelper @@ -43,8 +42,7 @@ class RouterVNF(SampleVNF): vnfd['mgmt-interface'].pop("pkey", "") vnfd['mgmt-interface']['password'] = 'password' - super(RouterVNF, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + super(RouterVNF, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) def instantiate(self, scenario_cfg, context_cfg): self.scenario_helper.scenario_cfg = scenario_cfg diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 8833b88f2..2a477bb8d 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2018 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +14,10 @@ import logging import decimal -from multiprocessing import Queue, Value, Process +from multiprocessing import Queue, Value, Process, JoinableQueue import os import posixpath import re -import uuid import subprocess import time @@ -232,6 +231,9 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): exit_status = self.dpdk_bind_helper.check_dpdk_driver() if exit_status == 0: return + else: + LOG.critical("DPDK Driver not installed") + return def _setup_resources(self): # what is this magic? how do we know which socket is for which port? @@ -397,26 +399,26 @@ class ClientResourceHelper(ResourceHelper): time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) - def run_traffic(self, traffic_profile, mq_producer): + def run_traffic(self, traffic_profile): # if we don't do this we can hang waiting for the queue to drain # have to do this in the subprocess self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path - mq_producer.tg_method_started() try: self._build_ports() self.client = self._connect() + if self.client is None: + LOG.critical("Failure to Connect ... unable to continue") + return + self.client.reset(ports=self.all_ports) self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) - iteration_index = 0 while self._terminated.value == 0: - iteration_index += 1 if self._run_traffic_once(traffic_profile): self._terminated.value = 1 - mq_producer.tg_method_iteration(iteration_index) self.client.stop(self.all_ports) self.client.disconnect() @@ -427,8 +429,6 @@ class ClientResourceHelper(ResourceHelper): return # return if trex/tg server is stopped. raise - mq_producer.tg_method_finished() - def terminate(self): self._terminated.value = 1 # stop client @@ -461,22 +461,35 @@ class ClientResourceHelper(ResourceHelper): server=self.vnfd_helper.mgmt_interface["ip"], verbose_level=LoggerApi.VERBOSE_QUIET) - # try to connect with 5s intervals, 30s max + # try to connect with 5s intervals for idx in range(6): try: client.connect() - break + for idx2 in range(6): + if client.is_connected(): + return client + LOG.info("Waiting to confirm connection %s .. Attempt %s", + idx, idx2) + time.sleep(1) + client.disconnect(stop_traffic=True, release_ports=True) except STLError: LOG.info("Unable to connect to Trex Server.. Attempt %s", idx) time.sleep(5) - return client + if client.is_connected(): + return client + else: + LOG.critical("Connection failure ..TRex username: %s server: %s", + self.vnfd_helper.mgmt_interface["user"], + self.vnfd_helper.mgmt_interface["ip"]) + return None class Rfc2544ResourceHelper(object): DEFAULT_CORRELATED_TRAFFIC = False DEFAULT_LATENCY = False DEFAULT_TOLERANCE = '0.0001 - 0.0001' + DEFAULT_RESOLUTION = '0.1' def __init__(self, scenario_helper): super(Rfc2544ResourceHelper, self).__init__() @@ -488,6 +501,7 @@ class Rfc2544ResourceHelper(object): self._tolerance_low = None self._tolerance_high = None self._tolerance_precision = None + self._resolution = None @property def rfc2544(self): @@ -527,6 +541,13 @@ class Rfc2544ResourceHelper(object): self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY) return self._latency + @property + def resolution(self): + if self._resolution is None: + self._resolution = float(self.get_rfc2544('resolution', + self.DEFAULT_RESOLUTION)) + return self._resolution + def get_rfc2544(self, name, default=None): return self.rfc2544.get(name, default) @@ -619,7 +640,6 @@ class ScenarioHelper(object): test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT) return test_duration if test_duration > test_timeout else test_timeout - class SampleVNF(GenericVNF): """ Class providing file-like API for generic VNF implementation """ @@ -629,9 +649,8 @@ class SampleVNF(GenericVNF): APP_NAME = "SampleVNF" # we run the VNF interactively, so the ssh command will timeout after this long - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): - super(SampleVNF, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + super(SampleVNF, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -764,6 +783,53 @@ class SampleVNF(GenericVNF): # by other VNF output self.q_in.put('\r\n') + def wait_for_initialize(self): + buf = [] + vnf_prompt_found = False + prompt_command = '\r\n' + script_name = 'non_existent_script_name' + done_string = 'Cannot open file "{}"'.format(script_name) + time.sleep(self.WAIT_TIME) # Give some time for config to load + while True: + if not self._vnf_process.is_alive(): + raise RuntimeError("%s VNF process died." % self.APP_NAME) + while self.q_out.qsize() > 0: + buf.append(self.q_out.get()) + message = ''.join(buf) + + if self.VNF_PROMPT in message and not vnf_prompt_found: + # Once we got VNF promt, it doesn't mean that the VNF is + # up and running/initialized completely. But we can run + # addition (any) VNF command and wait for it to complete + # as it will be finished ONLY at the end of the VNF + # initialization. So, this approach can be used to + # indentify that VNF is completely initialized. + LOG.info("Got %s VNF prompt.", self.APP_NAME) + prompt_command = "run {}\r\n".format(script_name) + self.q_in.put(prompt_command) + # Cut the buffer since we are not interesting to find + # the VNF prompt anymore + prompt_pos = message.find(self.VNF_PROMPT) + buf = [message[prompt_pos + len(self.VNF_PROMPT):]] + vnf_prompt_found = True + continue + + if done_string in message: + LOG.info("%s VNF is up and running.", self.APP_NAME) + self._vnf_up_post() + self.queue_wrapper.clear() + return self._vnf_process.exitcode + + if "PANIC" in message: + raise RuntimeError("Error starting %s VNF." % + self.APP_NAME) + + LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME) + time.sleep(self.WAIT_TIME_FOR_SCRIPT) + # Send command again to display the expected prompt in case the + # expected text was corrupted by other VNF output + self.q_in.put(prompt_command) + def start_collect(self): self.resource_helper.start_collect() @@ -862,9 +928,8 @@ class SampleVNFTrafficGen(GenericTrafficGen): APP_NAME = 'Sample' RUN_WAIT = 1 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): - super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id) + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): + super(SampleVNFTrafficGen, self).__init__(name, vnfd) self.bin_path = get_nsb_option('bin_path', '') self.scenario_helper = ScenarioHelper(self.name) @@ -886,6 +951,39 @@ class SampleVNFTrafficGen(GenericTrafficGen): self.traffic_finished = False self._tg_process = None self._traffic_process = None + self._tasks_queue = JoinableQueue() + self._result_queue = Queue() + + def _test_runner(self, traffic_profile, tasks, results): + self.resource_helper.run_test(traffic_profile, tasks, results) + + def _init_traffic_process(self, traffic_profile): + name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, + traffic_profile.__class__.__name__, + os.getpid()) + self._traffic_process = Process(name=name, target=self._test_runner, + args=( + traffic_profile, self._tasks_queue, + self._result_queue)) + + self._traffic_process.start() + while self.resource_helper.client_started.value == 0: + time.sleep(1) + if not self._traffic_process.is_alive(): + break + + def run_traffic_once(self, traffic_profile): + if self.resource_helper.client_started.value == 0: + self._init_traffic_process(traffic_profile) + + # continue test - run next iteration + LOG.info("Run next iteration ...") + self._tasks_queue.put('RUN_TRAFFIC') + + def wait_on_traffic(self): + self._tasks_queue.join() + result = self._result_queue.get() + return result def _start_server(self): # we can't share ssh paramiko objects to force new connection @@ -923,13 +1021,12 @@ class SampleVNFTrafficGen(GenericTrafficGen): LOG.info("%s TG Server is up and running.", self.APP_NAME) return self._tg_process.exitcode - def _traffic_runner(self, traffic_profile, mq_id): + def _traffic_runner(self, traffic_profile): # always drop connections first thing in new processes # so we don't get paramiko errors self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) - self._mq_producer = self._setup_mq_producer(mq_id) - self.resource_helper.run_traffic(traffic_profile, self._mq_producer) + self.resource_helper.run_traffic(traffic_profile) def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. @@ -939,12 +1036,10 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, - traffic_profile.__class__.__name__, + name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, os.getpid()) - self._traffic_process = Process( - name=name, target=self._traffic_runner, - args=(traffic_profile, uuid.uuid1().int)) + self._traffic_process = Process(name=name, target=self._traffic_runner, + args=(traffic_profile,)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: @@ -953,6 +1048,8 @@ class SampleVNFTrafficGen(GenericTrafficGen): if not self._traffic_process.is_alive(): break + return self._traffic_process.is_alive() + def collect_kpi(self): # check if the tg processes have exited physical_node = Context.get_physical_node_from_server( diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ixload.py b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py index d25402740..38b00a4b2 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ixload.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ixload.py @@ -126,13 +126,12 @@ class IxLoadResourceHelper(sample_vnf.ClientResourceHelper): class IxLoadTrafficGen(sample_vnf.SampleVNFTrafficGen): - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = IxLoadResourceHelper - super(IxLoadTrafficGen, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + super(IxLoadTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) self._result = {} def update_gateways(self, links): @@ -143,7 +142,12 @@ class IxLoadTrafficGen(sample_vnf.SampleVNFTrafficGen): "external-interface"] if intf["virtual-interface"]["vld_id"] == name) - links[name]["ip"]["gateway"] = gateway + try: + links[name]["ip"]["gateway"] = gateway + except KeyError: + LOG.error("Invalid traffic profile: No IP section defined for %s", name) + raise + except StopIteration: LOG.debug("Cant find gateway for link %s", name) links[name]["ip"]["gateway"] = "0.0.0.0" diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py index 2fba89b22..285374a92 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -35,11 +35,11 @@ LOG = logging.getLogger(__name__) class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen): APP_NAME = 'LandslideTG' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = LandslideResourceHelper - super(LandslideTrafficGen, self).__init__(name, vnfd, task_id, + super(LandslideTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py index a3b5afa39..5c8819119 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -71,7 +71,7 @@ class PingResourceHelper(ClientResourceHelper): self._queue = Queue() self._parser = PingParser(self._queue) - def run_traffic(self, traffic_profile, *args): + def run_traffic(self, traffic_profile): # drop the connection in order to force a new one self.ssh_helper.drop_connection() @@ -103,14 +103,14 @@ class PingTrafficGen(SampleVNFTrafficGen): APP_NAME = 'Ping' RUN_WAIT = 4 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = PingSetupEnvHelper if resource_helper_type is None: resource_helper_type = PingResourceHelper - super(PingTrafficGen, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) self._result = {} def _check_status(self): diff --git a/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py b/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py index 9d452213f..5da2178af 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_pktgen.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,9 +13,7 @@ # limitations under the License. import logging -import multiprocessing import time -import uuid from yardstick.common import constants from yardstick.common import exceptions @@ -26,8 +24,7 @@ from yardstick.network_services.vnf_generic.vnf import base as vnf_base LOG = logging.getLogger(__name__) -class PktgenTrafficGen(vnf_base.GenericTrafficGen, - vnf_base.GenericVNFEndpoint): +class PktgenTrafficGen(vnf_base.GenericTrafficGen): """DPDK Pktgen traffic generator Website: http://pktgen-dpdk.readthedocs.io/en/latest/index.html @@ -35,15 +32,8 @@ class PktgenTrafficGen(vnf_base.GenericTrafficGen, TIMEOUT = 30 - def __init__(self, name, vnfd, task_id): - vnf_base.GenericTrafficGen.__init__(self, name, vnfd, task_id) - self.queue = multiprocessing.Queue() - self._id = uuid.uuid1().int - self._mq_producer = self._setup_mq_producer(self._id) - vnf_base.GenericVNFEndpoint.__init__(self, self._id, [task_id], - self.queue) - self._consumer = vnf_base.GenericVNFConsumer([task_id], self) - self._consumer.start_rpc_server() + def __init__(self, name, vnfd): + vnf_base.GenericTrafficGen.__init__(self, name, vnfd) self._traffic_profile = None self._node_ip = vnfd['mgmt-interface'].get('ip') self._lua_node_port = self._get_lua_node_port( @@ -71,7 +61,7 @@ class PktgenTrafficGen(vnf_base.GenericTrafficGen, def wait_for_instantiate(self): # pragma: no cover pass - def runner_method_start_iteration(self, ctxt, **kwargs): + def runner_method_start_iteration(self): # pragma: no cover LOG.debug('Start method') # NOTE(ralonsoh): 'rate' should be modified between iterations. The @@ -81,11 +71,6 @@ class PktgenTrafficGen(vnf_base.GenericTrafficGen, self._traffic_profile.rate(self._rate) time.sleep(4) self._traffic_profile.stop() - self._mq_producer.tg_method_iteration(1, 1, {}) - - def runner_method_stop_iteration(self, ctxt, **kwargs): # pragma: no cover - # pragma: no cover - LOG.debug('Stop method') @staticmethod def _get_lua_node_port(service_ports): diff --git a/yardstick/network_services/vnf_generic/vnf/tg_prox.py b/yardstick/network_services/vnf_generic/vnf/tg_prox.py index d12c42ec8..65b7bac10 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_prox.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_prox.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017 Intel Corporation +# Copyright (c) 2017-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,13 +29,13 @@ class ProxTrafficGen(SampleVNFTrafficGen): LUA_PARAMETER_NAME = "gen" WAIT_TIME = 1 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): vnfd_cpy = copy.deepcopy(vnfd) - super(ProxTrafficGen, self).__init__(name, vnfd_cpy, task_id) + super(ProxTrafficGen, self).__init__(name, vnfd_cpy) self._vnf_wrapper = ProxApproxVnf( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + name, vnfd, setup_env_helper_type, resource_helper_type) self.bin_path = get_nsb_option('bin_path', '') self.name = self._vnf_wrapper.name self.ssh_helper = self._vnf_wrapper.ssh_helper diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py index 1d37f8f6f..5d69fc8c8 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,8 +15,11 @@ import ipaddress import logging import six +import collections +from six import moves from yardstick.common import utils +from yardstick.common import exceptions from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper @@ -27,10 +30,12 @@ LOG = logging.getLogger(__name__) WAIT_AFTER_CFG_LOAD = 10 WAIT_FOR_TRAFFIC = 30 -WAIT_PROTOCOLS_STARTED = 360 +WAIT_PROTOCOLS_STARTED = 420 class IxiaBasicScenario(object): + """Ixia Basic scenario for flow from port to port""" + def __init__(self, client, context_cfg, ixia_cfg): self.client = client @@ -43,21 +48,142 @@ class IxiaBasicScenario(object): def apply_config(self): pass + def run_protocols(self): + pass + + def stop_protocols(self): + pass + def create_traffic_model(self, traffic_profile=None): # pylint: disable=unused-argument vports = self.client.get_vports() self._uplink_vports = vports[::2] self._downlink_vports = vports[1::2] self.client.create_traffic_model(self._uplink_vports, - self._downlink_vports) + self._downlink_vports, traffic_profile) - def run_protocols(self): + def _get_stats(self): + return self.client.get_statistics() + + def generate_samples(self, resource_helper, ports, duration): + stats = self._get_stats() + + samples = {} + # this is not DPDK port num, but this is whatever number we gave + # when we selected ports and programmed the profile + for port_num in ports: + try: + # reverse lookup port name from port_num so the stats dict is descriptive + intf = resource_helper.vnfd_helper.find_interface_by_port(port_num) + port_name = intf['name'] + avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num] + min_latency = stats['Store-Forward_Min_latency_ns'][port_num] + max_latency = stats['Store-Forward_Max_latency_ns'][port_num] + samples[port_name] = { + 'rx_throughput_kps': float(stats['Rx_Rate_Kbps'][port_num]), + 'tx_throughput_kps': float(stats['Tx_Rate_Kbps'][port_num]), + 'rx_throughput_mbps': float(stats['Rx_Rate_Mbps'][port_num]), + 'tx_throughput_mbps': float(stats['Tx_Rate_Mbps'][port_num]), + 'RxThroughputBps': float(stats['Bytes_Rx'][port_num]) / duration, + 'TxThroughputBps': float(stats['Bytes_Tx'][port_num]) / duration, + 'in_packets': int(stats['Valid_Frames_Rx'][port_num]), + 'out_packets': int(stats['Frames_Tx'][port_num]), + 'in_bytes': int(stats['Bytes_Rx'][port_num]), + 'out_bytes': int(stats['Bytes_Tx'][port_num]), + 'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration, + 'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration, + 'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0), + 'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0), + 'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0) + } + except IndexError: + pass + + return samples + + def update_tracking_options(self): pass - def stop_protocols(self): + def get_tc_rfc2544_options(self): pass +class IxiaL3Scenario(IxiaBasicScenario): + """Ixia scenario for L3 flow between static ip's""" + + def _add_static_ips(self): + vports = self.client.get_vports() + uplink_intf_vport = [(self.client.get_static_interface(vport), vport) + for vport in vports[::2]] + downlink_intf_vport = [(self.client.get_static_interface(vport), vport) + for vport in vports[1::2]] + + for index in range(len(uplink_intf_vport)): + intf, vport = uplink_intf_vport[index] + try: + iprange = self.ixia_cfg['flow'].get('src_ip')[index] + start_ip = utils.get_ip_range_start(iprange) + count = utils.get_ip_range_count(iprange) + self.client.add_static_ipv4(intf, vport, start_ip, count, '32') + except IndexError: + raise exceptions.IncorrectFlowOption( + option="src_ip", link="uplink_{}".format(index)) + + intf, vport = downlink_intf_vport[index] + try: + iprange = self.ixia_cfg['flow'].get('dst_ip')[index] + start_ip = utils.get_ip_range_start(iprange) + count = utils.get_ip_range_count(iprange) + self.client.add_static_ipv4(intf, vport, start_ip, count, '32') + except IndexError: + raise exceptions.IncorrectFlowOption( + option="dst_ip", link="downlink_{}".format(index)) + + def _add_interfaces(self): + vports = self.client.get_vports() + uplink_vports = (vport for vport in vports[::2]) + downlink_vports = (vport for vport in vports[1::2]) + + ix_node = next(node for _, node in self.context_cfg['nodes'].items() + if node['role'] == 'IxNet') + + for intf in ix_node['interfaces'].values(): + ip = intf.get('local_ip') + mac = intf.get('local_mac') + gateway = None + try: + gateway = next(route.get('gateway') + for route in ix_node.get('routing_table') + if route.get('if') == intf.get('ifname')) + except StopIteration: + LOG.debug("Gateway not provided") + + if 'uplink' in intf.get('vld_id'): + self.client.add_interface(next(uplink_vports), + ip, mac, gateway) + else: + self.client.add_interface(next(downlink_vports), + ip, mac, gateway) + + def apply_config(self): + self._add_interfaces() + self._add_static_ips() + + def create_traffic_model(self, traffic_profile=None): + # pylint: disable=unused-argument + vports = self.client.get_vports() + self._uplink_vports = vports[::2] + self._downlink_vports = vports[1::2] + + uplink_endpoints = [port + '/protocols/static' + for port in self._uplink_vports] + downlink_endpoints = [port + '/protocols/static' + for port in self._downlink_vports] + + self.client.create_ipv4_traffic_model(uplink_endpoints, + downlink_endpoints) + + class IxiaPppoeClientScenario(object): def __init__(self, client, context_cfg, ixia_cfg): @@ -87,8 +213,12 @@ class IxiaPppoeClientScenario(object): traffic_profile.full_profile) endpoints_obj_pairs = \ self._get_endpoints_src_dst_obj_pairs(endpoints_id_pairs) - uplink_endpoints = endpoints_obj_pairs[::2] - downlink_endpoints = endpoints_obj_pairs[1::2] + if endpoints_obj_pairs: + uplink_endpoints = endpoints_obj_pairs[::2] + downlink_endpoints = endpoints_obj_pairs[1::2] + else: + uplink_endpoints = self._access_topologies + downlink_endpoints = self._core_topologies self.client.create_ipv4_traffic_model(uplink_endpoints, downlink_endpoints) @@ -181,18 +311,14 @@ class IxiaPppoeClientScenario(object): device groups pairs between which flow groups will be created: 1. In case uplink/downlink flows in traffic profile doesn't have - specified 'port' key, flows will be created between each device - group on access port and device group on corresponding core port. + specified 'port' key, flows will be created between topologies + on corresponding access and core port. E.g.: - Device groups created on access port xe0: dg1, dg2, dg3 - Device groups created on core port xe1: dg4 + Access topology on xe0: topology1 + Core topology on xe1: topology2 Flows will be created between: - dg1 -> dg4 - dg4 -> dg1 - dg2 -> dg4 - dg4 -> dg2 - dg3 -> dg4 - dg4 -> dg3 + topology1 -> topology2 + topology2 -> topology1 2. In case uplink/downlink flows in traffic profile have specified 'port' key, flows will be created between device groups on this @@ -253,13 +379,6 @@ class IxiaPppoeClientScenario(object): [endpoint_obj_pairs.extend([up, down]) for up, down in zip(uplink_dev_groups, downlink_dev_groups)] - if not endpoint_obj_pairs: - for up, down in zip(uplink_ports, downlink_ports): - uplink_dev_groups = port_to_dev_group_mapping[up] - downlink_dev_groups = \ - port_to_dev_group_mapping[down] * len(uplink_dev_groups) - [endpoint_obj_pairs.extend(list(i)) - for i in zip(uplink_dev_groups, downlink_dev_groups)] return endpoint_obj_pairs def _fill_ixia_config(self): @@ -353,6 +472,172 @@ class IxiaPppoeClientScenario(object): bgp_type=ipv4["bgp"].get("bgp_type")) self.protocols.append(bgp_peer_obj) + def update_tracking_options(self): + priority_map = { + 'raw': 'ipv4Raw0', + 'tos': {'precedence': 'ipv4Precedence0'}, + 'dscp': {'defaultPHB': 'ipv4DefaultPhb0', + 'selectorPHB': 'ipv4ClassSelectorPhb0', + 'assuredPHB': 'ipv4AssuredForwardingPhb0', + 'expeditedPHB': 'ipv4ExpeditedForwardingPhb0'} + } + + prio_trackby_key = 'ipv4Precedence0' + + try: + priority = list(self._ixia_cfg['priority'])[0] + if priority == 'raw': + prio_trackby_key = priority_map[priority] + elif priority in ['tos', 'dscp']: + priority_type = list(self._ixia_cfg['priority'][priority])[0] + prio_trackby_key = priority_map[priority][priority_type] + except KeyError: + pass + + tracking_options = ['flowGroup0', 'vlanVlanId0', prio_trackby_key] + self.client.set_flow_tracking(tracking_options) + + def get_tc_rfc2544_options(self): + return self._ixia_cfg.get('rfc2544') + + def _get_stats(self): + return self.client.get_pppoe_scenario_statistics() + + @staticmethod + def get_flow_id_data(stats, flow_id, key): + result = [float(flow.get(key)) for flow in stats if flow['id'] == flow_id] + return sum(result) / len(result) + + def get_priority_flows_stats(self, samples, duration): + results = {} + priorities = set([flow['IP_Priority'] for flow in samples]) + for priority in priorities: + tx_frames = sum( + [int(flow['Tx_Frames']) for flow in samples + if flow['IP_Priority'] == priority]) + rx_frames = sum( + [int(flow['Rx_Frames']) for flow in samples + if flow['IP_Priority'] == priority]) + prio_flows_num = len([flow for flow in samples + if flow['IP_Priority'] == priority]) + avg_latency_ns = sum( + [int(flow['Store-Forward_Avg_latency_ns']) for flow in samples + if flow['IP_Priority'] == priority]) / prio_flows_num + min_latency_ns = sum( + [int(flow['Store-Forward_Min_latency_ns']) for flow in samples + if flow['IP_Priority'] == priority]) / prio_flows_num + max_latency_ns = sum( + [int(flow['Store-Forward_Max_latency_ns']) for flow in samples + if flow['IP_Priority'] == priority]) / prio_flows_num + tx_throughput = float(tx_frames) / duration + rx_throughput = float(rx_frames) / duration + results[priority] = { + 'in_packets': rx_frames, + 'out_packets': tx_frames, + 'RxThroughput': round(rx_throughput, 3), + 'TxThroughput': round(tx_throughput, 3), + 'avg_latency_ns': utils.safe_cast(avg_latency_ns, int, 0), + 'min_latency_ns': utils.safe_cast(min_latency_ns, int, 0), + 'max_latency_ns': utils.safe_cast(max_latency_ns, int, 0) + } + return results + + def generate_samples(self, resource_helper, ports, duration): + + stats = self._get_stats() + samples = {} + ports_stats = stats['port_statistics'] + flows_stats = stats['flow_statistic'] + pppoe_subs_per_port = stats['pppox_client_per_port'] + + # Get sorted list of ixia ports names + ixia_port_names = sorted([data['port_name'] for data in ports_stats]) + + # Set 'port_id' key for ports stats items + for item in ports_stats: + port_id = item.pop('port_name').split('-')[-1].strip() + item['port_id'] = int(port_id) + + # Set 'id' key for flows stats items + for item in flows_stats: + flow_id = item.pop('Flow_Group').split('-')[1].strip() + item['id'] = int(flow_id) + + # Set 'port_id' key for pppoe subs per port stats + for item in pppoe_subs_per_port: + port_id = item.pop('subs_port').split('-')[-1].strip() + item['port_id'] = int(port_id) + + # Map traffic flows to ports + port_flow_map = collections.defaultdict(set) + for item in flows_stats: + tx_port = item.pop('Tx_Port') + tx_port_index = ixia_port_names.index(tx_port) + port_flow_map[tx_port_index].update([item['id']]) + + # Sort ports stats + ports_stats = sorted(ports_stats, key=lambda k: k['port_id']) + + # Get priority flows stats + prio_flows_stats = self.get_priority_flows_stats(flows_stats, duration) + samples['priority_stats'] = prio_flows_stats + + # this is not DPDK port num, but this is whatever number we gave + # when we selected ports and programmed the profile + for port_num in ports: + try: + # reverse lookup port name from port_num so the stats dict is descriptive + intf = resource_helper.vnfd_helper.find_interface_by_port(port_num) + port_name = intf['name'] + port_id = ports_stats[port_num]['port_id'] + port_subs_stats = \ + [port_data for port_data in pppoe_subs_per_port + if port_data.get('port_id') == port_id] + + avg_latency = \ + sum([float(self.get_flow_id_data( + flows_stats, flow, 'Store-Forward_Avg_latency_ns')) + for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num]) + min_latency = \ + sum([float(self.get_flow_id_data( + flows_stats, flow, 'Store-Forward_Min_latency_ns')) + for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num]) + max_latency = \ + sum([float(self.get_flow_id_data( + flows_stats, flow, 'Store-Forward_Max_latency_ns')) + for flow in port_flow_map[port_num]]) / len(port_flow_map[port_num]) + + samples[port_name] = { + 'rx_throughput_kps': float(ports_stats[port_num]['Rx_Rate_Kbps']), + 'tx_throughput_kps': float(ports_stats[port_num]['Tx_Rate_Kbps']), + 'rx_throughput_mbps': float(ports_stats[port_num]['Rx_Rate_Mbps']), + 'tx_throughput_mbps': float(ports_stats[port_num]['Tx_Rate_Mbps']), + 'RxThroughputBps': float(ports_stats[port_num]['Bytes_Rx']) / duration, + 'TxThroughputBps': float(ports_stats[port_num]['Bytes_Tx']) / duration, + 'in_packets': int(ports_stats[port_num]['Valid_Frames_Rx']), + 'out_packets': int(ports_stats[port_num]['Frames_Tx']), + 'in_bytes': int(ports_stats[port_num]['Bytes_Rx']), + 'out_bytes': int(ports_stats[port_num]['Bytes_Tx']), + 'RxThroughput': float(ports_stats[port_num]['Valid_Frames_Rx']) / duration, + 'TxThroughput': float(ports_stats[port_num]['Frames_Tx']) / duration, + 'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0), + 'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0), + 'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0) + } + + if port_subs_stats: + samples[port_name].update( + {'sessions_up': int(port_subs_stats[0]['Sessions_Up']), + 'sessions_down': int(port_subs_stats[0]['Sessions_Down']), + 'sessions_not_started': int(port_subs_stats[0]['Sessions_Not_Started']), + 'sessions_total': int(port_subs_stats[0]['Sessions_Total'])} + ) + + except IndexError: + pass + + return samples + class IxiaRfc2544Helper(Rfc2544ResourceHelper): @@ -370,6 +655,7 @@ class IxiaResourceHelper(ClientResourceHelper): self._ixia_scenarios = { "IxiaBasic": IxiaBasicScenario, + "IxiaL3": IxiaL3Scenario, "IxiaPppoeClient": IxiaPppoeClientScenario, } @@ -388,9 +674,6 @@ class IxiaResourceHelper(ClientResourceHelper): def _connect(self, client=None): self.client.connect(self.vnfd_helper) - def get_stats(self, *args, **kwargs): - return self.client.get_statistics() - def setup(self): super(IxiaResourceHelper, self).setup() self._init_ix_scenario() @@ -400,36 +683,7 @@ class IxiaResourceHelper(ClientResourceHelper): self._terminated.value = 1 def generate_samples(self, ports, duration): - stats = self.get_stats() - - samples = {} - # this is not DPDK port num, but this is whatever number we gave - # when we selected ports and programmed the profile - for port_num in ports: - try: - # reverse lookup port name from port_num so the stats dict is descriptive - intf = self.vnfd_helper.find_interface_by_port(port_num) - port_name = intf['name'] - avg_latency = stats['Store-Forward_Avg_latency_ns'][port_num] - min_latency = stats['Store-Forward_Min_latency_ns'][port_num] - max_latency = stats['Store-Forward_Max_latency_ns'][port_num] - samples[port_name] = { - 'rx_throughput_kps': float(stats['Rx_Rate_Kbps'][port_num]), - 'tx_throughput_kps': float(stats['Tx_Rate_Kbps'][port_num]), - 'rx_throughput_mbps': float(stats['Rx_Rate_Mbps'][port_num]), - 'tx_throughput_mbps': float(stats['Tx_Rate_Mbps'][port_num]), - 'in_packets': int(stats['Valid_Frames_Rx'][port_num]), - 'out_packets': int(stats['Frames_Tx'][port_num]), - 'RxThroughput': float(stats['Valid_Frames_Rx'][port_num]) / duration, - 'TxThroughput': float(stats['Frames_Tx'][port_num]) / duration, - 'Store-Forward_Avg_latency_ns': utils.safe_cast(avg_latency, int, 0), - 'Store-Forward_Min_latency_ns': utils.safe_cast(min_latency, int, 0), - 'Store-Forward_Max_latency_ns': utils.safe_cast(max_latency, int, 0) - } - except IndexError: - pass - - return samples + return self._ix_scenario.generate_samples(self, ports, duration) def _init_ix_scenario(self): ixia_config = self.scenario_helper.scenario_cfg.get('ixia_config', 'IxiaBasic') @@ -450,13 +704,17 @@ class IxiaResourceHelper(ClientResourceHelper): self._ix_scenario.apply_config() self._ix_scenario.create_traffic_model(traffic_profile) - def run_traffic(self, traffic_profile, *args): + def update_tracking_options(self): + self._ix_scenario.update_tracking_options() + + def run_traffic(self, traffic_profile): if self._terminated.value: return min_tol = self.rfc_helper.tolerance_low max_tol = self.rfc_helper.tolerance_high precision = self.rfc_helper.tolerance_precision + resolution = self.rfc_helper.resolution default = "00:00:00:00:00:00" self._build_ports() @@ -477,17 +735,19 @@ class IxiaResourceHelper(ClientResourceHelper): try: while not self._terminated.value: - first_run = traffic_profile.execute_traffic( - self, self.client, mac) + first_run = traffic_profile.execute_traffic(self, self.client, + mac) self.client_started.value = 1 # pylint: disable=unnecessary-lambda utils.wait_until_true(lambda: self.client.is_traffic_stopped(), timeout=traffic_profile.config.duration * 2) + rfc2544_opts = self._ix_scenario.get_tc_rfc2544_options() samples = self.generate_samples(traffic_profile.ports, traffic_profile.config.duration) completed, samples = traffic_profile.get_drop_percentage( - samples, min_tol, max_tol, precision, first_run=first_run) + samples, min_tol, max_tol, precision, resolution, + first_run=first_run, tc_rfc2544_opts=rfc2544_opts) self._queue.put(samples) if completed: @@ -497,23 +757,93 @@ class IxiaResourceHelper(ClientResourceHelper): LOG.exception('Run Traffic terminated') self._ix_scenario.stop_protocols() + self.client_started.value = 0 self._terminated.value = 1 - def collect_kpi(self): - self.rfc_helper.iteration.value += 1 - return super(IxiaResourceHelper, self).collect_kpi() + def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover + LOG.info("Ixia resource_helper run_test") + if self._terminated.value: + return + + min_tol = self.rfc_helper.tolerance_low + max_tol = self.rfc_helper.tolerance_high + precision = self.rfc_helper.tolerance_precision + resolution = self.rfc_helper.resolution + default = "00:00:00:00:00:00" + + self._build_ports() + traffic_profile.update_traffic_profile(self) + self._initialize_client(traffic_profile) + + mac = {} + for port_name in self.vnfd_helper.port_pairs.all_ports: + intf = self.vnfd_helper.find_interface(name=port_name) + virt_intf = intf["virtual-interface"] + # we only know static traffic id by reading the json + # this is used by _get_ixia_trafficrofile + port_num = self.vnfd_helper.port_num(intf) + mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default) + mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default) + + self._ix_scenario.run_protocols() + + try: + completed = False + self.rfc_helper.iteration.value = 0 + self.client_started.value = 1 + while completed is False and not self._terminated.value: + LOG.info("Wait for task ...") + + try: + task = tasks_queue.get(True, 5) + except moves.queue.Empty: + continue + else: + if task != 'RUN_TRAFFIC': + continue + + self.rfc_helper.iteration.value += 1 + LOG.info("Got %s task, start iteration %d", task, + self.rfc_helper.iteration.value) + first_run = traffic_profile.execute_traffic(self, self.client, + mac) + # pylint: disable=unnecessary-lambda + utils.wait_until_true(lambda: self.client.is_traffic_stopped(), + timeout=traffic_profile.config.duration * 2) + samples = self.generate_samples(traffic_profile.ports, + traffic_profile.config.duration) + + completed, samples = traffic_profile.get_drop_percentage( + samples, min_tol, max_tol, precision, resolution, + first_run=first_run) + samples['Iteration'] = self.rfc_helper.iteration.value + self._queue.put(samples) + + if completed: + LOG.debug("IxiaResourceHelper::run_test - test completed") + results_queue.put('COMPLETE') + else: + results_queue.put('CONTINUE') + tasks_queue.task_done() + + except Exception: # pylint: disable=broad-except + LOG.exception('Run Traffic terminated') + + self._ix_scenario.stop_protocols() + self.client_started.value = 0 + LOG.debug("IxiaResourceHelper::run_test done") class IxiaTrafficGen(SampleVNFTrafficGen): APP_NAME = 'Ixia' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = IxiaResourceHelper - super(IxiaTrafficGen, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(IxiaTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) self._ixia_traffic_gen = None self.ixia_file_name = '' self.vnf_port_pairs = [] diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py index 7ecb12478..a9c0222ac 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,12 +15,14 @@ import logging import time +from six import moves from yardstick.common import utils from yardstick.network_services.vnf_generic.vnf import sample_vnf from yardstick.network_services.vnf_generic.vnf import tg_trex +from trex_stl_lib.trex_stl_exceptions import STLError -LOGGING = logging.getLogger(__name__) +LOG = logging.getLogger(__name__) class TrexRfcResourceHelper(tg_trex.TrexResourceHelper): @@ -48,7 +50,8 @@ class TrexRfcResourceHelper(tg_trex.TrexResourceHelper): completed, output = traffic_profile.get_drop_percentage( samples, self.rfc2544_helper.tolerance_low, self.rfc2544_helper.tolerance_high, - self.rfc2544_helper.correlated_traffic) + self.rfc2544_helper.correlated_traffic, + self.rfc2544_helper.resolution) self._queue.put(output) return completed @@ -58,6 +61,56 @@ class TrexRfcResourceHelper(tg_trex.TrexResourceHelper): def clear_client_stats(self, ports): self.client.clear_stats(ports=ports) + def run_test(self, traffic_profile, tasks_queue, results_queue, *args): # pragma: no cover + LOG.debug("Trex resource_helper run_test") + if self._terminated.value: + return + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + self._queue.cancel_join_thread() + try: + self._build_ports() + self.client = self._connect() + self.client.reset(ports=self.all_ports) + self.client.remove_all_streams(self.all_ports) # remove all streams + traffic_profile.register_generator(self) + + completed = False + self.rfc2544_helper.iteration.value = 0 + self.client_started.value = 1 + while completed is False and not self._terminated.value: + LOG.debug("Wait for task ...") + try: + task = tasks_queue.get(True, 5) + except moves.queue.Empty: + LOG.debug("Wait for task timeout, continue waiting...") + continue + else: + if task != 'RUN_TRAFFIC': + continue + self.rfc2544_helper.iteration.value += 1 + LOG.info("Got %s task, start iteration %d", task, + self.rfc2544_helper.iteration.value) + completed = self._run_traffic_once(traffic_profile) + if completed: + LOG.debug("%s::run_test - test completed", + self.__class__.__name__) + results_queue.put('COMPLETE') + else: + results_queue.put('CONTINUE') + tasks_queue.task_done() + + self.client.stop(self.all_ports) + self.client.disconnect() + self._terminated.value = 0 + except STLError: + if self._terminated.value: + LOG.debug("traffic generator is stopped") + return # return if trex/tg server is stopped. + raise + + self.client_started.value = 0 + LOG.debug("%s::run_test done", self.__class__.__name__) class TrexTrafficGenRFC(tg_trex.TrexTrafficGen): """ @@ -65,9 +118,9 @@ class TrexTrafficGenRFC(tg_trex.TrexTrafficGen): traffic for rfc2544 testcase. """ - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = TrexRfcResourceHelper - super(TrexTrafficGenRFC, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py index 4296da84c..0cb66a714 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -179,6 +179,8 @@ class TrexResourceHelper(ClientResourceHelper): 'tx_throughput_bps': float(port_stats.get('tx_bps', 0.0)), 'in_packets': int(port_stats.get('ipackets', 0)), 'out_packets': int(port_stats.get('opackets', 0)), + 'in_bytes': int(port_stats.get('ibytes', 0)), + 'out_bytes': int(port_stats.get('obytes', 0)), 'timestamp': timestamp } @@ -200,14 +202,15 @@ class TrexTrafficGen(SampleVNFTrafficGen): APP_NAME = 'TRex' - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = TrexResourceHelper + if setup_env_helper_type is None: setup_env_helper_type = TrexDpdkVnfSetupEnvHelper - super(TrexTrafficGen, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) def _check_status(self): return self.resource_helper.check_status() diff --git a/yardstick/network_services/vnf_generic/vnf/udp_replay.py b/yardstick/network_services/vnf_generic/vnf/udp_replay.py index e3fde1a79..3f9994486 100644 --- a/yardstick/network_services/vnf_generic/vnf/udp_replay.py +++ b/yardstick/network_services/vnf_generic/vnf/udp_replay.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -60,14 +60,15 @@ class UdpReplayApproxVnf(SampleVNF): PIPELINE_COMMAND = REPLAY_PIPELINE_COMMAND - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if resource_helper_type is None: resource_helper_type = UdpReplayResourceHelper + if setup_env_helper_type is None: setup_env_helper_type = UdpReplaySetupEnvHelper - super(UdpReplayApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(UdpReplayApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, + resource_helper_type) def _build_pipeline_kwargs(self): ports = self.vnfd_helper.port_pairs.all_ports diff --git a/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py index a1523dee3..743f2d4bb 100644 --- a/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vfw_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -52,9 +52,12 @@ class FWApproxVnf(SampleVNF): 'packets_dropped': 3, } - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = FWApproxSetupEnvHelper - super(FWApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(FWApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) + + def wait_for_instantiate(self): + """Wait for VNF to initialize""" + self.wait_for_initialize() diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index dd3221386..322ecd016 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -158,12 +158,11 @@ class VpeApproxVnf(SampleVNF): COLLECT_KPI = VPE_COLLECT_KPI WAIT_TIME = 20 - def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, - resource_helper_type=None): + def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None): if setup_env_helper_type is None: setup_env_helper_type = VpeApproxSetupEnvHelper - super(VpeApproxVnf, self).__init__( - name, vnfd, task_id, setup_env_helper_type, resource_helper_type) + + super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type) def get_stats(self, *args, **kwargs): raise NotImplementedError |