diff options
26 files changed, 890 insertions, 83 deletions
diff --git a/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml b/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml new file mode 100644 index 000000000..184ed6881 --- /dev/null +++ b/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml @@ -0,0 +1,96 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +{% set provider = provider or none %} +{% set physical_networks = physical_networks or ['physnet1', 'physnet2'] %} +{% set segmentation_id = segmentation_id or none %} + +schema: yardstick:task:0.1 +scenarios: +- type: NSPerf + traffic_profile: ../../traffic_profiles/ipv4_throughput.yaml + topology: vfw-tg-topology.yaml + nodes: + tg__0: trafficgen_1.yardstick + vnf__0: vnf.yardstick + options: + hugepages_gb: 8 + framesize: + uplink: {64B: 100} + downlink: {64B: 100} + flow: + src_ip: [{'tg__0': 'xe0'}] + dst_ip: [{'tg__0': 'xe1'}] + count: 1 + traffic_type: 4 + rfc2544: + allowed_drop_rate: 0.0001 - 0.0001 + vnf__0: + rules: acl_1rule.yaml + vnf_config: {lb_config: 'SW', lb_count: 1, worker_config: '1C/1T', worker_threads: 1} + runner: + type: IterationIPC + iterations: 10 + timeout: 60 +context: + # put node context first, so we don't HEAT deploy if node has errors + name: yardstick + image: yardstick-samplevnfs + flavor: + vcpus: 10 + ram: 12288 + disk: 6 + extra_specs: + hw:cpu_sockets: 1 + hw:cpu_cores: 10 + hw:cpu_threads: 1 + user: ubuntu + placement_groups: + pgrp1: + policy: "availability" + servers: + vnf: + floating_ip: true + placement: "pgrp1" + trafficgen_1: + floating_ip: true + placement: "pgrp1" + networks: + mgmt: + cidr: '10.0.1.0/24' + xe0: + cidr: '10.0.2.0/24' + gateway_ip: 'null' + {% if provider %} + provider: {{ provider }} + physical_network: {{ physical_networks[0] }} + {% if segmentation_id %} + segmentation_id: {{ segmentation_id }} + {% endif %} + {% endif %} + port_security_enabled: False + enable_dhcp: 'false' + xe1: + cidr: '10.0.3.0/24' + gateway_ip: 'null' + {% if provider %} + provider: {{ provider }} + physical_network: {{ physical_networks[1] }} + {% if segmentation_id %} + segmentation_id: {{ segmentation_id }} + {% endif %} + {% endif %} + port_security_enabled: False + enable_dhcp: 'false' diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index fbdf6c281..af2557441 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -12,27 +12,26 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +# +# This is a modified copy of ``rally/rally/benchmark/runners/base.py`` -# yardstick comment: this is a modified copy of -# rally/rally/benchmark/runners/base.py - -from __future__ import absolute_import - +import importlib import logging import multiprocessing import subprocess import time import traceback -from subprocess import CalledProcessError - -import importlib -from six.moves.queue import Empty +from six import moves -import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario +from yardstick.common import messaging +from yardstick.common.messaging import payloads +from yardstick.common.messaging import producer +from yardstick.common import utils from yardstick.dispatcher.base import Base as DispatcherBase + log = logging.getLogger(__name__) @@ -41,7 +40,7 @@ def _execute_shell_command(command): exitcode = 0 try: output = subprocess.check_output(command, shell=True) - except CalledProcessError: + except subprocess.CalledProcessError: exitcode = -1 output = traceback.format_exc() log.error("exec command '%s' error:\n ", command) @@ -245,7 +244,7 @@ class Runner(object): log.debug("output_queue size %s", self.output_queue.qsize()) try: result.update(self.output_queue.get(True, 1)) - except Empty: + except moves.queue.Empty: pass return result @@ -259,7 +258,7 @@ class Runner(object): log.debug("result_queue size %s", self.result_queue.qsize()) try: one_record = self.result_queue.get(True, 1) - except Empty: + except moves.queue.Empty: pass else: if output_in_influxdb: @@ -272,3 +271,22 @@ class Runner(object): dispatchers = DispatcherBase.get(self.config['output_config']) dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb')) dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id) + + +class RunnerProducer(producer.MessagingProducer): + """Class implementing the message producer for runners""" + + def __init__(self, _id): + super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id) + + def start_iteration(self, version=1, data=None): + data = {} if not data else data + self.send_message( + messaging.RUNNER_METHOD_START_ITERATION, + payloads.RunnerPayload(version=version, data=data)) + + def stop_iteration(self, version=1, data=None): + data = {} if not data else data + self.send_message( + messaging.RUNNER_METHOD_STOP_ITERATION, + payloads.RunnerPayload(version=version, data=data)) diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py new file mode 100644 index 000000000..a0335fdc7 --- /dev/null +++ b/yardstick/benchmark/runners/iteration_ipc.py @@ -0,0 +1,205 @@ +# Copyright 2018: Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""A runner that runs a configurable number of times before it returns. Each + iteration has a configurable timeout. The loop control depends on the + feedback received from the running VNFs. The context PIDs from the VNFs + to listen the messages from are given in the scenario "setup" method. +""" + +import logging +import multiprocessing +import time +import traceback + +import os + +from yardstick.benchmark.runners import base as base_runner +from yardstick.common import exceptions +from yardstick.common import messaging +from yardstick.common import utils +from yardstick.common.messaging import consumer +from yardstick.common.messaging import payloads + + +LOG = logging.getLogger(__name__) + +QUEUE_PUT_TIMEOUT = 10 +ITERATION_TIMEOUT = 180 + + +class RunnerIterationIPCEndpoint(consumer.NotificationHandler): + """Endpoint class for ``RunnerIterationIPCConsumer``""" + + def tg_method_started(self, ctxt, **kwargs): + if ctxt['id'] in self._ctx_ids: + self._queue.put( + {'id': ctxt['id'], + 'action': messaging.TG_METHOD_STARTED, + 'payload': payloads.TrafficGeneratorPayload.dict_to_obj( + kwargs)}, + QUEUE_PUT_TIMEOUT) + + def tg_method_finished(self, ctxt, **kwargs): + if ctxt['id'] in self._ctx_ids: + self._queue.put( + {'id': ctxt['id'], + 'action': messaging.TG_METHOD_FINISHED, + 'payload': payloads.TrafficGeneratorPayload.dict_to_obj( + kwargs)}) + + def tg_method_iteration(self, ctxt, **kwargs): + if ctxt['id'] in self._ctx_ids: + self._queue.put( + {'id': ctxt['id'], + 'action': messaging.TG_METHOD_ITERATION, + 'payload': payloads.TrafficGeneratorPayload.dict_to_obj( + kwargs)}) + + +class RunnerIterationIPCConsumer(consumer.MessagingConsumer): + """MQ consumer for "IterationIPC" runner""" + + def __init__(self, _id, ctx_ids): + self._id = _id + self._queue = multiprocessing.Queue() + endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)] + super(RunnerIterationIPCConsumer, self).__init__( + messaging.TOPIC_TG, ctx_ids, endpoints) + self._kpi_per_id = {ctx: [] for ctx in ctx_ids} + self.iteration_index = None + + def is_all_kpis_received_in_iteration(self): + """Check if all producers registered have sent the ITERATION msg + + During the present iteration, all producers (traffic generators) must + start and finish the traffic injection, and at the end of the traffic + injection a TG_METHOD_ITERATION must be sent. This function will check + all KPIs in the present iteration are received. E.g.: + self.iteration_index = 2 + + self._kpi_per_id = { + 'ctx1': [kpi0, kpi1, kpi2], + 'ctx2': [kpi0, kpi1]} --> return False + + self._kpi_per_id = { + 'ctx1': [kpi0, kpi1, kpi2], + 'ctx2': [kpi0, kpi1, kpi2]} --> return True + """ + while not self._queue.empty(): + msg = self._queue.get(True, 1) + if msg['action'] == messaging.TG_METHOD_ITERATION: + id_iter_list = self._kpi_per_id[msg['id']] + id_iter_list.append(msg['payload'].kpi) + + return all(len(id_iter_list) == self.iteration_index + for id_iter_list in self._kpi_per_id.values()) + + +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted, output_queue): # pragma: no cover + runner_cfg = scenario_cfg['runner'] + + timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT) + iterations = runner_cfg.get('iterations', 1) + run_step = runner_cfg.get('run_step', 'setup,run,teardown') + LOG.info('Worker START. Iterations %d times, class %s', iterations, cls) + + runner_cfg['runner_id'] = os.getpid() + + benchmark = cls(scenario_cfg, context_cfg) + method = getattr(benchmark, method_name) + + if 'setup' not in run_step: + raise exceptions.RunnerIterationIPCSetupActionNeeded() + benchmark.setup() + producer_ctxs = benchmark.get_mq_ids() + if not producer_ctxs: + raise exceptions.RunnerIterationIPCNoCtxs() + + mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs) + mq_consumer.start_rpc_server() + mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id']) + + iteration_index = 1 + while 'run' in run_step: + LOG.debug('runner=%(runner)s seq=%(sequence)s START', + {'runner': runner_cfg['runner_id'], + 'sequence': iteration_index}) + data = {} + result = None + errors = '' + mq_consumer.iteration_index = iteration_index + mq_producer.start_iteration() + + try: + utils.wait_until_true( + mq_consumer.is_all_kpis_received_in_iteration, + timeout=timeout, sleep=2) + result = method(data) + except Exception: # pylint: disable=broad-except + errors = traceback.format_exc() + LOG.exception(errors) + + mq_producer.stop_iteration() + + if result: + output_queue.put(result, True, QUEUE_PUT_TIMEOUT) + benchmark_output = {'timestamp': time.time(), + 'sequence': iteration_index, + 'data': data, + 'errors': errors} + queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT) + + LOG.debug('runner=%(runner)s seq=%(sequence)s END', + {'runner': runner_cfg['runner_id'], + 'sequence': iteration_index}) + + iteration_index += 1 + if iteration_index > iterations or aborted.is_set(): + LOG.info('"IterationIPC" worker END') + break + + if 'teardown' in run_step: + try: + benchmark.teardown() + except Exception: + LOG.exception('Exception during teardown process') + mq_consumer.stop_rpc_server() + raise SystemExit(1) + + LOG.debug('Data queue size = %s', queue.qsize()) + LOG.debug('Output queue size = %s', output_queue.qsize()) + mq_consumer.stop_rpc_server() + + +class IterationIPCRunner(base_runner.Runner): + """Run a scenario for a configurable number of times. + + Each iteration has a configurable timeout. The loop control depends on the + feedback received from the running VNFs. The context PIDs from the VNFs to + listen the messages from are given in the scenario "setup" method. + """ + __execution_type__ = 'IterationIPC' + + def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = '{}-{}-{}'.format( + self.__execution_type__, scenario_cfg.get('type'), os.getpid()) + self.process = multiprocessing.Process( + name=name, + target=_worker_process, + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted, self.output_queue)) + self.process.start() diff --git a/yardstick/benchmark/scenarios/base.py b/yardstick/benchmark/scenarios/base.py index 30ac1bea9..90a87ac29 100644 --- a/yardstick/benchmark/scenarios/base.py +++ b/yardstick/benchmark/scenarios/base.py @@ -119,3 +119,7 @@ class Scenario(object): except TypeError: dic[k] = v return dic + + def get_mq_ids(self): # pragma: no cover + """Return stored MQ producer IDs, if defined""" + pass diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py index eb62d6222..3bb168c70 100644 --- a/yardstick/benchmark/scenarios/networking/vnf_generic.py +++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py @@ -50,7 +50,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): __scenario_type__ = "NSPerf" - def __init__(self, scenario_cfg, context_cfg): # Yardstick API + def __init__(self, scenario_cfg, context_cfg): # pragma: no cover super(NetworkServiceTestCase, self).__init__() self.scenario_cfg = scenario_cfg self.context_cfg = context_cfg @@ -61,6 +61,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): self.traffic_profile = None self.node_netdevs = {} self.bin_path = get_nsb_option('bin_path', '') + self._mq_ids = [] def _get_ip_flow_range(self, ip_start_range): @@ -168,18 +169,18 @@ class NetworkServiceTestCase(scenario_base.Scenario): topology_yaml = vnfdgen.generate_vnfd(topology, topolgy_data) self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0] - def _find_vnf_name_from_id(self, vnf_id): + def _find_vnf_name_from_id(self, vnf_id): # pragma: no cover return next((vnfd["vnfd-id-ref"] for vnfd in self.topology["constituent-vnfd"] if vnf_id == vnfd["member-vnf-index"]), None) - def _find_vnfd_from_vnf_idx(self, vnf_id): + def _find_vnfd_from_vnf_idx(self, vnf_id): # pragma: no cover return next((vnfd for vnfd in self.topology["constituent-vnfd"] if vnf_id == vnfd["member-vnf-index"]), None) @staticmethod - def find_node_if(nodes, name, if_name, vld_id): + def find_node_if(nodes, name, if_name, vld_id): # pragma: no cover try: # check for xe0, xe1 intf = nodes[name]["interfaces"][if_name] @@ -272,14 +273,14 @@ class NetworkServiceTestCase(scenario_base.Scenario): node0_if["peer_intf"] = node1_copy node1_if["peer_intf"] = node0_copy - def _update_context_with_topology(self): + def _update_context_with_topology(self): # pragma: no cover for vnfd in self.topology["constituent-vnfd"]: vnf_idx = vnfd["member-vnf-index"] vnf_name = self._find_vnf_name_from_id(vnf_idx) vnfd = self._find_vnfd_from_vnf_idx(vnf_idx) self.context_cfg["nodes"][vnf_name].update(vnfd) - def _generate_pod_yaml(self): + def _generate_pod_yaml(self): # pragma: no cover context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id'])) # convert OrderedDict to a list # pod.yaml nodes is a list @@ -293,7 +294,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): explicit_start=True) @staticmethod - def _serialize_node(node): + def _serialize_node(node): # pragma: no cover new_node = copy.deepcopy(node) # name field is required # remove context suffix @@ -315,7 +316,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): self._update_context_with_topology() @classmethod - def get_vnf_impl(cls, vnf_model_id): + def get_vnf_impl(cls, vnf_model_id): # pragma: no cover """ Find the implementing class from vnf_model["vnf"]["name"] field :param vnf_model_id: parsed vnfd model ID field @@ -343,7 +344,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): raise exceptions.IncorrectConfig(error_msg=message) @staticmethod - def create_interfaces_from_node(vnfd, node): + def create_interfaces_from_node(vnfd, node): # pragma: no cover ext_intfs = vnfd["vdu"][0]["external-interface"] = [] # have to sort so xe0 goes first for intf_name, intf in sorted(node['interfaces'].items()): @@ -412,10 +413,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): return vnfs def setup(self): - """ Setup infrastructure, provission VNFs & start traffic - - :return: - """ + """Setup infrastructure, provission VNFs & start traffic""" # 1. Verify if infrastructure mapping can meet topology self.map_topology_to_infrastructure() # 1a. Load VNF models @@ -457,6 +455,11 @@ class NetworkServiceTestCase(scenario_base.Scenario): for traffic_gen in traffic_runners: LOG.info("Starting traffic on %s", traffic_gen.name) traffic_gen.run_traffic(self.traffic_profile) + self._mq_ids.append(traffic_gen.get_mq_producer_id()) + + def get_mq_ids(self): # pragma: no cover + """Return stored MQ producer IDs""" + return self._mq_ids def run(self, result): # yardstick API """ Yardstick calls run() at intervals defined in the yaml and @@ -495,10 +498,10 @@ class NetworkServiceTestCase(scenario_base.Scenario): LOG.exception("") raise RuntimeError("Error in teardown") - def pre_run_wait_time(self, time_seconds): + def pre_run_wait_time(self, time_seconds): # pragma: no cover """Time waited before executing the run method""" time.sleep(time_seconds) - def post_run_wait_time(self, time_seconds): + def post_run_wait_time(self, time_seconds): # pragma: no cover """Time waited after executing the run method""" pass diff --git a/yardstick/common/exceptions.py b/yardstick/common/exceptions.py index 2d160bef1..50def0647 100644 --- a/yardstick/common/exceptions.py +++ b/yardstick/common/exceptions.py @@ -193,6 +193,15 @@ class TaskRenderError(YardstickException): message = 'Failed to render template:\n%(input_task)s' +class RunnerIterationIPCSetupActionNeeded(YardstickException): + message = ('IterationIPC needs the "setup" action to retrieve the VNF ' + 'handling processes PIDs to receive the messages sent') + + +class RunnerIterationIPCNoCtxs(YardstickException): + message = 'Benchmark "setup" action did not return any VNF process PID' + + class TimerTimeout(YardstickException): message = 'Timer timeout expired, %(timeout)s seconds' diff --git a/yardstick/common/messaging/__init__.py b/yardstick/common/messaging/__init__.py index f0f012ec3..bd700d9b1 100644 --- a/yardstick/common/messaging/__init__.py +++ b/yardstick/common/messaging/__init__.py @@ -1,14 +1,3 @@ -# Copyright (c) 2018 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. @@ -28,9 +17,17 @@ TRANSPORT_URL = (MQ_SERVICE + '://' + MQ_USER + ':' + MQ_PASS + '@' + SERVER + RPC_SERVER_EXECUTOR = 'threading' # Topics. -RUNNER = 'runner' +TOPIC_TG = 'topic_traffic_generator' +TOPIC_RUNNER = 'topic_runner' # Methods. -# RUNNER methods: -RUNNER_INFO = 'runner_info' -RUNNER_LOOP = 'runner_loop' +# Traffic generator consumers methods. Names must match the methods implemented +# in the consumer endpoint class. +TG_METHOD_STARTED = 'tg_method_started' +TG_METHOD_FINISHED = 'tg_method_finished' +TG_METHOD_ITERATION = 'tg_method_iteration' + +# Runner consumers methods. Names must match the methods implemented in the +# consumer endpoint class. +RUNNER_METHOD_START_ITERATION = "runner_method_start_iteration" +RUNNER_METHOD_STOP_ITERATION = "runner_method_stop_iteration" diff --git a/yardstick/common/messaging/consumer.py b/yardstick/common/messaging/consumer.py index 24ec6f184..c99d7ed27 100644 --- a/yardstick/common/messaging/consumer.py +++ b/yardstick/common/messaging/consumer.py @@ -29,9 +29,9 @@ LOG = logging.getLogger(__name__) class NotificationHandler(object): """Abstract class to define a endpoint object for a MessagingConsumer""" - def __init__(self, _id, ctx_pids, queue): + def __init__(self, _id, ctx_ids, queue): self._id = _id - self._ctx_pids = ctx_pids + self._ctx_ids = ctx_ids self._queue = queue @@ -43,11 +43,11 @@ class MessagingConsumer(object): the messages published by a `MessagingNotifier`. """ - def __init__(self, topic, pids, endpoints, fanout=True): + def __init__(self, topic, ids, endpoints, fanout=True): """Init function. :param topic: (string) MQ exchange topic - :param pids: (list of int) list of PIDs of the processes implementing + :param ids: (list of int) list of IDs of the processes implementing the MQ Notifier which will be in the message context :param endpoints: (list of class) list of classes implementing the methods (see `MessagingNotifier.send_message) used by @@ -58,7 +58,7 @@ class MessagingConsumer(object): :returns: `MessagingConsumer` class object """ - self._pids = pids + self._ids = ids self._endpoints = endpoints self._transport = oslo_messaging.get_rpc_transport( cfg.CONF, url=messaging.TRANSPORT_URL) diff --git a/yardstick/common/messaging/payloads.py b/yardstick/common/messaging/payloads.py index d29d79808..8ede1e58e 100644 --- a/yardstick/common/messaging/payloads.py +++ b/yardstick/common/messaging/payloads.py @@ -51,3 +51,23 @@ class Payload(object): def dict_to_obj(cls, _dict): """Returns a Payload object built from the dictionary elements""" return cls(**_dict) + + +class TrafficGeneratorPayload(Payload): + """Base traffic generator payload class""" + REQUIRED_FIELDS = { + 'version', # (str) version of the payload transmitted. + 'iteration', # (int) iteration index during the traffic injection, + # starting from 1. + 'kpi' # (dict) collection of KPIs collected from the traffic + # injection. The content will depend on the generator and the + # traffic type. + } + + +class RunnerPayload(Payload): + """Base runner payload class""" + REQUIRED_FIELDS = { + 'version', # (str) version of the payload transmitted. + 'data' # (dict) generic container of data to be used if needed. + } diff --git a/yardstick/common/messaging/producer.py b/yardstick/common/messaging/producer.py index b6adc0c17..aadab649d 100644 --- a/yardstick/common/messaging/producer.py +++ b/yardstick/common/messaging/producer.py @@ -34,18 +34,18 @@ class MessagingProducer(object): messages in a message queue. """ - def __init__(self, topic, pid=os.getpid(), fanout=True): + def __init__(self, topic, _id=os.getpid(), fanout=True): """Init function. :param topic: (string) MQ exchange topic - :param pid: (int) PID of the process implementing this MQ Notifier + :param id: (int) ID of the process implementing this MQ Notifier :param fanout: (bool) MQ clients may request that a copy of the message be delivered to all servers listening on a topic by setting fanout to ``True``, rather than just one of them :returns: `MessagingNotifier` class object """ self._topic = topic - self._pid = pid + self._id = _id self._fanout = fanout self._transport = oslo_messaging.get_rpc_transport( cfg.CONF, url=messaging.TRANSPORT_URL) @@ -65,6 +65,11 @@ class MessagingProducer(object): consumer endpoints :param payload: (subclass `Payload`) payload content """ - self._notifier.cast({'pid': self._pid}, + self._notifier.cast({'id': self._id}, method, **payload.obj_to_dict()) + + @property + def id(self): + """Return MQ producer ID""" + return self._id diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 9ceac3167..fb41a4e4a 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -18,6 +18,9 @@ import abc import logging import six +from yardstick.common import messaging +from yardstick.common.messaging import payloads +from yardstick.common.messaging import producer from yardstick.network_services.helpers.samplevnf_helper import PortPairs @@ -138,6 +141,39 @@ class VnfdHelper(dict): yield port_name, port_num +class TrafficGeneratorProducer(producer.MessagingProducer): + """Class implementing the message producer for traffic generators + + This message producer must be instantiated in the process created + "run_traffic" process. + """ + def __init__(self, _id): + super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG, + _id=_id) + + def tg_method_started(self, version=1): + """Send a message to inform the traffic generation has started""" + self.send_message( + messaging.TG_METHOD_STARTED, + payloads.TrafficGeneratorPayload(version=version, iteration=0, + kpi={})) + + def tg_method_finished(self, version=1): + """Send a message to inform the traffic generation has finished""" + self.send_message( + messaging.TG_METHOD_FINISHED, + payloads.TrafficGeneratorPayload(version=version, iteration=0, + kpi={})) + + def tg_method_iteration(self, iteration, version=1, kpi=None): + """Send a message, with KPI, once an iteration has finished""" + kpi = {} if kpi is None else kpi + self.send_message( + messaging.TG_METHOD_ITERATION, + payloads.TrafficGeneratorPayload(version=version, + iteration=iteration, kpi=kpi)) + + @six.add_metaclass(abc.ABCMeta) class GenericVNF(object): """Class providing file-like API for generic VNF implementation @@ -216,6 +252,7 @@ class GenericTrafficGen(GenericVNF): super(GenericTrafficGen, self).__init__(name, vnfd) self.runs_traffic = True self.traffic_finished = False + self._mq_producer = None @abc.abstractmethod def run_traffic(self, traffic_profile): @@ -286,3 +323,16 @@ class GenericTrafficGen(GenericVNF): :return: True/False """ pass + + @staticmethod + def _setup_mq_producer(id): + """Setup the TG MQ producer to send messages between processes + + :return: (derived class from ``MessagingProducer``) MQ producer object + """ + return TrafficGeneratorProducer(id) + + def get_mq_producer_id(self): + """Return the MQ producer ID if initialized""" + if self._mq_producer: + return self._mq_producer.get_id() diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 6d28f4750..3241719e8 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -969,7 +969,7 @@ class ProxResourceHelper(ClientResourceHelper): self._test_type = self.setup_helper.find_in_section('global', 'name', None) return self._test_type - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, *args): self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 1ee71aa25..bc65380d3 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -14,14 +14,15 @@ import logging from multiprocessing import Queue, Value, Process - import os import posixpath import re -import six +import uuid import subprocess import time +import six + from trex_stl_lib.trex_stl_client import LoggerApi from trex_stl_lib.trex_stl_client import STLClient from trex_stl_lib.trex_stl_exceptions import STLError @@ -408,12 +409,13 @@ class ClientResourceHelper(ResourceHelper): time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, mq_producer): # if we don't do this we can hang waiting for the queue to drain # have to do this in the subprocess self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path + mq_producer.tg_method_started() try: self._build_ports() self.client = self._connect() @@ -421,8 +423,11 @@ class ClientResourceHelper(ResourceHelper): self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) + iteration_index = 0 while self._terminated.value == 0: + iteration_index += 1 self._run_traffic_once(traffic_profile) + mq_producer.tg_method_iteration(iteration_index) self.client.stop(self.all_ports) self.client.disconnect() @@ -433,6 +438,8 @@ class ClientResourceHelper(ResourceHelper): return # return if trex/tg server is stopped. raise + mq_producer.tg_method_finished() + def terminate(self): self._terminated.value = 1 # stop client @@ -911,12 +918,13 @@ class SampleVNFTrafficGen(GenericTrafficGen): LOG.info("%s TG Server is up and running.", self.APP_NAME) return self._tg_process.exitcode - def _traffic_runner(self, traffic_profile): + def _traffic_runner(self, traffic_profile, mq_id): # always drop connections first thing in new processes # so we don't get paramiko errors self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) - self.resource_helper.run_traffic(traffic_profile) + self._mq_producer = self._setup_mq_producer(mq_id) + self.resource_helper.run_traffic(traffic_profile, self._mq_producer) def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. @@ -926,10 +934,12 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, + traffic_profile.__class__.__name__, os.getpid()) - self._traffic_process = Process(name=name, target=self._traffic_runner, - args=(traffic_profile,)) + self._traffic_process = Process( + name=name, target=self._traffic_runner, + args=(traffic_profile, uuid.uuid1().int)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: @@ -938,8 +948,6 @@ class SampleVNFTrafficGen(GenericTrafficGen): if not self._traffic_process.is_alive(): break - return self._traffic_process.is_alive() - def collect_kpi(self): # check if the tg processes have exited physical_node = Context.get_physical_node_from_server( diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py index a989543f5..4050dc6e2 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py @@ -71,7 +71,7 @@ class PingResourceHelper(ClientResourceHelper): self._queue = Queue() self._parser = PingParser(self._queue) - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, *args): # drop the connection in order to force a new one self.ssh_helper.drop_connection() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py index a1f9fbeb4..875ae93b9 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py @@ -102,7 +102,7 @@ class IxiaResourceHelper(ClientResourceHelper): self.client.assign_ports() self.client.create_traffic_model() - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, *args): if self._terminated.value: return diff --git a/yardstick/tests/functional/common/messaging/test_messaging.py b/yardstick/tests/functional/common/messaging/test_messaging.py index 99874343b..f3e31e718 100644 --- a/yardstick/tests/functional/common/messaging/test_messaging.py +++ b/yardstick/tests/functional/common/messaging/test_messaging.py @@ -32,25 +32,25 @@ class DummyPayload(payloads.Payload): class DummyEndpoint(consumer.NotificationHandler): def info(self, ctxt, **kwargs): - if ctxt['pid'] in self._ctx_pids: - self._queue.put('ID {}, data: {}, pid: {}'.format( - self._id, kwargs['data'], ctxt['pid'])) + if ctxt['id'] in self._ctx_ids: + self._queue.put('Nr {}, data: {}, id: {}'.format( + self._id, kwargs['data'], ctxt['id'])) class DummyConsumer(consumer.MessagingConsumer): - def __init__(self, _id, ctx_pids, queue): + def __init__(self, _id, ctx_ids, queue): self._id = _id - endpoints = [DummyEndpoint(_id, ctx_pids, queue)] - super(DummyConsumer, self).__init__(TOPIC, ctx_pids, endpoints) + endpoints = [DummyEndpoint(_id, ctx_ids, queue)] + super(DummyConsumer, self).__init__(TOPIC, ctx_ids, endpoints) class DummyProducer(producer.MessagingProducer): pass -def _run_consumer(_id, ctx_pids, queue): - _consumer = DummyConsumer(_id, ctx_pids, queue) +def _run_consumer(_id, ctx_ids, queue): + _consumer = DummyConsumer(_id, ctx_ids, queue) _consumer.start_rpc_server() _consumer.wait() @@ -67,8 +67,8 @@ class MessagingTestCase(base.BaseFunctionalTestCase): num_consumers = 10 ctx_1 = 100001 ctx_2 = 100002 - producers = [DummyProducer(TOPIC, pid=ctx_1), - DummyProducer(TOPIC, pid=ctx_2)] + producers = [DummyProducer(TOPIC, _id=ctx_1), + DummyProducer(TOPIC, _id=ctx_2)] processes = [] for i in range(num_consumers): @@ -91,7 +91,7 @@ class MessagingTestCase(base.BaseFunctionalTestCase): output.append(output_queue.get(True, 1)) self.assertEqual(num_consumers * 4, len(output)) - msg_template = 'ID {}, data: {}, pid: {}' + msg_template = 'Nr {}, data: {}, id: {}' for i in range(num_consumers): for ctx in [ctx_1, ctx_2]: for message in ['message 0', 'message 1']: diff --git a/yardstick/tests/unit/benchmark/runner/test_base.py b/yardstick/tests/unit/benchmark/runner/test_base.py index 559c991f3..49ba1efe4 100644 --- a/yardstick/tests/unit/benchmark/runner/test_base.py +++ b/yardstick/tests/unit/benchmark/runner/test_base.py @@ -8,12 +8,17 @@ ############################################################################## import time +import uuid import mock +from oslo_config import cfg +import oslo_messaging import subprocess from yardstick.benchmark.runners import base as runner_base from yardstick.benchmark.runners import iteration +from yardstick.common import messaging +from yardstick.common.messaging import payloads from yardstick.tests.unit import base as ut_base @@ -94,3 +99,54 @@ class RunnerTestCase(ut_base.BaseUnitTestCase): with self.assertRaises(NotImplementedError): runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()) + + +class RunnerProducerTestCase(ut_base.BaseUnitTestCase): + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(cfg, 'CONF') + def test__init(self, mock_config, mock_transport, mock_rpcclient, + mock_target): + _id = uuid.uuid1().int + runner_producer = runner_base.RunnerProducer(_id) + mock_transport.assert_called_once_with( + mock_config, url='rabbit://yardstick:yardstick@localhost:5672/') + mock_target.assert_called_once_with(topic=messaging.TOPIC_RUNNER, + fanout=True, + server=messaging.SERVER) + mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target') + self.assertEqual(_id, runner_producer._id) + self.assertEqual(messaging.TOPIC_RUNNER, runner_producer._topic) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload') + def test_start_iteration(self, mock_runner_payload, *args): + runner_producer = runner_base.RunnerProducer(uuid.uuid1().int) + with mock.patch.object(runner_producer, + 'send_message') as mock_message: + runner_producer.start_iteration(version=10) + + mock_message.assert_called_once_with( + messaging.RUNNER_METHOD_START_ITERATION, 'runner_pload') + mock_runner_payload.assert_called_once_with(version=10, data={}) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload') + def test_stop_iteration(self, mock_runner_payload, *args): + runner_producer = runner_base.RunnerProducer(uuid.uuid1().int) + with mock.patch.object(runner_producer, + 'send_message') as mock_message: + runner_producer.stop_iteration(version=15) + + mock_message.assert_called_once_with( + messaging.RUNNER_METHOD_STOP_ITERATION, 'runner_pload') + mock_runner_payload.assert_called_once_with(version=15, data={}) diff --git a/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py b/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py new file mode 100644 index 000000000..10d14a8a0 --- /dev/null +++ b/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py @@ -0,0 +1,136 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +import time +import os +import uuid + +import mock + +from yardstick.benchmark.runners import iteration_ipc +from yardstick.common import messaging +from yardstick.common.messaging import payloads +from yardstick.tests.unit import base as ut_base + + +class RunnerIterationIPCEndpointTestCase(ut_base.BaseUnitTestCase): + + def setUp(self): + self._id = uuid.uuid1().int + self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int] + self._queue = multiprocessing.Queue() + self.runner = iteration_ipc.RunnerIterationIPCEndpoint( + self._id, self._ctx_ids, self._queue) + self._kwargs = {'version': 1, 'iteration': 10, 'kpi': {}} + self._pload_dict = payloads.TrafficGeneratorPayload.dict_to_obj( + self._kwargs).obj_to_dict() + + def test_tg_method_started(self): + self._queue.empty() + ctxt = {'id': self._ctx_ids[0]} + self.runner.tg_method_started(ctxt, **self._kwargs) + time.sleep(0.2) + + output = [] + while not self._queue.empty(): + output.append(self._queue.get(True, 1)) + + self.assertEqual(1, len(output)) + self.assertEqual(self._ctx_ids[0], output[0]['id']) + self.assertEqual(messaging.TG_METHOD_STARTED, output[0]['action']) + self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict()) + + def test_tg_method_finished(self): + self._queue.empty() + ctxt = {'id': self._ctx_ids[0]} + self.runner.tg_method_finished(ctxt, **self._kwargs) + time.sleep(0.2) + + output = [] + while not self._queue.empty(): + output.append(self._queue.get(True, 1)) + + self.assertEqual(1, len(output)) + self.assertEqual(self._ctx_ids[0], output[0]['id']) + self.assertEqual(messaging.TG_METHOD_FINISHED, output[0]['action']) + self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict()) + + def test_tg_method_iteration(self): + self._queue.empty() + ctxt = {'id': self._ctx_ids[0]} + self.runner.tg_method_iteration(ctxt, **self._kwargs) + time.sleep(0.2) + + output = [] + while not self._queue.empty(): + output.append(self._queue.get(True, 1)) + + self.assertEqual(1, len(output)) + self.assertEqual(self._ctx_ids[0], output[0]['id']) + self.assertEqual(messaging.TG_METHOD_ITERATION, output[0]['action']) + self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict()) + + +class RunnerIterationIPCConsumerTestCase(ut_base.BaseUnitTestCase): + + def setUp(self): + self._id = uuid.uuid1().int + self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int] + self.consumer = iteration_ipc.RunnerIterationIPCConsumer( + self._id, self._ctx_ids) + self.consumer._queue = mock.Mock() + + def test__init(self): + self.assertEqual({self._ctx_ids[0]: [], self._ctx_ids[1]: []}, + self.consumer._kpi_per_id) + + def test_is_all_kpis_received_in_iteration(self): + payload = payloads.TrafficGeneratorPayload( + version=1, iteration=1, kpi={}) + msg1 = {'action': messaging.TG_METHOD_ITERATION, + 'id': self._ctx_ids[0], 'payload': payload} + msg2 = {'action': messaging.TG_METHOD_ITERATION, + 'id': self._ctx_ids[1], 'payload': payload} + self.consumer.iteration_index = 1 + + self.consumer._queue.empty.side_effect = [False, True] + self.consumer._queue.get.return_value = msg1 + self.assertFalse(self.consumer.is_all_kpis_received_in_iteration()) + + self.consumer._queue.empty.side_effect = [False, True] + self.consumer._queue.get.return_value = msg2 + self.assertTrue(self.consumer.is_all_kpis_received_in_iteration()) + + +class IterationIPCRunnerTestCase(ut_base.BaseUnitTestCase): + + @mock.patch.object(iteration_ipc, '_worker_process') + @mock.patch.object(os, 'getpid', return_value=12345678) + @mock.patch.object(multiprocessing, 'Process', return_value=mock.Mock()) + def test__run_benchmark(self, mock_process, mock_getpid, mock_worker): + method = 'method' + scenario_cfg = {'type': 'scenario_type'} + context_cfg = 'context_cfg' + name = '%s-%s-%s' % ('IterationIPC', 'scenario_type', 12345678) + runner = iteration_ipc.IterationIPCRunner(mock.ANY) + mock_getpid.reset_mock() + + runner._run_benchmark('class', method, scenario_cfg, context_cfg) + mock_process.assert_called_once_with( + name=name, + target=mock_worker, + args=(runner.result_queue, 'class', method, scenario_cfg, + context_cfg, runner.aborted, runner.output_queue)) + mock_getpid.assert_called_once() diff --git a/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py b/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py index bb1a7aaca..77a54c0b8 100644 --- a/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py +++ b/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py @@ -553,6 +553,7 @@ class TestNetworkServiceTestCase(unittest.TestCase): tgen.verify_traffic = lambda x: verified_dict tgen.terminate = mock.Mock(return_value=True) tgen.name = "tgen__1" + tgen.run_traffic.return_value = 'tg_id' vnf = mock.Mock(autospec=GenericVNF) vnf.runs_traffic = False vnf.terminate = mock.Mock(return_value=True) @@ -565,7 +566,6 @@ class TestNetworkServiceTestCase(unittest.TestCase): self.s.load_vnf_models = mock.Mock(return_value=self.s.vnfs) self.s._fill_traffic_profile = \ mock.Mock(return_value=TRAFFIC_PROFILE) - self.assertIsNone(self.s.setup()) def test_setup_exception(self): with mock.patch("yardstick.ssh.SSH") as ssh: @@ -656,6 +656,9 @@ class TestNetworkServiceTestCase(unittest.TestCase): ) self.assertEqual(self.s.topology, 'fake_nsd') + def test_get_mq_ids(self): + self.assertEqual(self.s._mq_ids, self.s.get_mq_ids()) + def test_teardown(self): vnf = mock.Mock(autospec=GenericVNF) vnf.terminate = mock.Mock(return_value=True) diff --git a/yardstick/tests/unit/common/messaging/test_payloads.py b/yardstick/tests/unit/common/messaging/test_payloads.py index 00ec220c9..37b1f1926 100644 --- a/yardstick/tests/unit/common/messaging/test_payloads.py +++ b/yardstick/tests/unit/common/messaging/test_payloads.py @@ -44,3 +44,39 @@ class PayloadTestCase(ut_base.BaseUnitTestCase): _dict = {'version': 2, 'key1': 'value100', 'key2': 'value200'} payload = _DummyPayload.dict_to_obj(_dict) self.assertEqual(set(_dict.keys()), payload._fields) + + +class TrafficGeneratorPayloadTestCase(ut_base.BaseUnitTestCase): + + def test_init(self): + tg_payload = payloads.TrafficGeneratorPayload( + version=1, iteration=10, kpi={'key1': 'value1'}) + self.assertEqual(1, tg_payload.version) + self.assertEqual(10, tg_payload.iteration) + self.assertEqual({'key1': 'value1'}, tg_payload.kpi) + self.assertEqual(3, len(tg_payload._fields)) + + def test__init_missing_required_fields(self): + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.TrafficGeneratorPayload(version=1, iteration=10) + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.TrafficGeneratorPayload(iteration=10, kpi={}) + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.TrafficGeneratorPayload(iteration=10) + + +class RunnerPayloadTestCase(ut_base.BaseUnitTestCase): + + def test_init(self): + runner_payload = payloads.RunnerPayload(version=5, + data={'key1': 'value1'}) + self.assertEqual(5, runner_payload.version) + self.assertEqual({'key1': 'value1'}, runner_payload.data) + + def test__init_missing_required_fields(self): + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.RunnerPayload(version=1) + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.RunnerPayload(data=None) + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.RunnerPayload() diff --git a/yardstick/tests/unit/common/messaging/test_producer.py b/yardstick/tests/unit/common/messaging/test_producer.py index 0289689dc..22286e5c3 100644 --- a/yardstick/tests/unit/common/messaging/test_producer.py +++ b/yardstick/tests/unit/common/messaging/test_producer.py @@ -44,3 +44,10 @@ class MessagingProducerTestCase(ut_base.BaseUnitTestCase): topic='test_topic', fanout=True, server=messaging.SERVER) mock_RPCClient.assert_called_once_with('test_rpc_transport', 'test_Target') + + def test_id(self): + with mock.patch.object(oslo_messaging, 'RPCClient'), \ + mock.patch.object(oslo_messaging, 'get_rpc_transport'), \ + mock.patch.object(oslo_messaging, 'Target'): + msg_producer = _MessagingProducer('topic', 'id_to_check') + self.assertEqual('id_to_check', msg_producer.id) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py index ebedcb451..43e5ac839 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py @@ -15,10 +15,15 @@ import multiprocessing import os +import uuid import mock +from oslo_config import cfg +import oslo_messaging import unittest +from yardstick.common import messaging +from yardstick.common.messaging import payloads from yardstick.network_services.vnf_generic.vnf import base from yardstick.ssh import SSH @@ -140,6 +145,24 @@ VNFD = { } +class _DummyGenericTrafficGen(base.GenericTrafficGen): # pragma: no cover + + def run_traffic(self, *args): + pass + + def terminate(self): + pass + + def collect_kpi(self): + pass + + def instantiate(self, *args): + pass + + def scale(self, flavor=''): + pass + + class FileAbsPath(object): def __init__(self, module_file): super(FileAbsPath, self).__init__() @@ -221,7 +244,7 @@ class TestGenericVNF(unittest.TestCase): self.assertEqual(msg, str(exc.exception)) -class TestGenericTrafficGen(unittest.TestCase): +class GenericTrafficGenTestCase(unittest.TestCase): def test_definition(self): """Make sure that the abstract class cannot be instantiated""" @@ -234,3 +257,81 @@ class TestGenericTrafficGen(unittest.TestCase): "abstract methods collect_kpi, instantiate, run_traffic, " "scale, terminate") self.assertEqual(msg, str(exc.exception)) + + def test_get_mq_producer_id(self): + vnfd = {'benchmark': {'kpi': mock.ANY}, + 'vdu': [{'external-interface': 'ext_int'}] + } + tg = _DummyGenericTrafficGen('name', vnfd) + tg._mq_producer = mock.Mock() + tg._mq_producer.get_id.return_value = 'fake_id' + self.assertEqual('fake_id', tg.get_mq_producer_id()) + + +class TrafficGeneratorProducerTestCase(unittest.TestCase): + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(cfg, 'CONF') + def test__init(self, mock_config, mock_transport, mock_rpcclient, + mock_target): + _id = uuid.uuid1().int + tg_producer = base.TrafficGeneratorProducer(_id) + mock_transport.assert_called_once_with( + mock_config, url='rabbit://yardstick:yardstick@localhost:5672/') + mock_target.assert_called_once_with(topic=messaging.TOPIC_TG, + fanout=True, + server=messaging.SERVER) + mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target') + self.assertEqual(_id, tg_producer._id) + self.assertEqual(messaging.TOPIC_TG, tg_producer._topic) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'TrafficGeneratorPayload', + return_value='tg_pload') + def test_tg_method_started(self, mock_tg_payload, *args): + tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int) + with mock.patch.object(tg_producer, 'send_message') as mock_message: + tg_producer.tg_method_started(version=10) + + mock_message.assert_called_once_with(messaging.TG_METHOD_STARTED, + 'tg_pload') + mock_tg_payload.assert_called_once_with(version=10, iteration=0, + kpi={}) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'TrafficGeneratorPayload', + return_value='tg_pload') + def test_tg_method_finished(self, mock_tg_payload, *args): + tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int) + with mock.patch.object(tg_producer, 'send_message') as mock_message: + tg_producer.tg_method_finished(version=20) + + mock_message.assert_called_once_with(messaging.TG_METHOD_FINISHED, + 'tg_pload') + mock_tg_payload.assert_called_once_with(version=20, iteration=0, + kpi={}) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'TrafficGeneratorPayload', + return_value='tg_pload') + def test_tg_method_iteration(self, mock_tg_payload, *args): + tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int) + with mock.patch.object(tg_producer, 'send_message') as mock_message: + tg_producer.tg_method_iteration(100, version=30, kpi={'k': 'v'}) + + mock_message.assert_called_once_with(messaging.TG_METHOD_ITERATION, + 'tg_pload') + mock_tg_payload.assert_called_once_with(version=30, iteration=100, + kpi={'k': 'v'}) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py index 331e80d00..48ae3b505 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py @@ -1090,6 +1090,57 @@ class TestClientResourceHelper(unittest.TestCase): self.assertIs(client_resource_helper._connect(client), client) + @mock.patch.object(ClientResourceHelper, '_build_ports') + @mock.patch.object(ClientResourceHelper, '_run_traffic_once') + def test_run_traffic(self, mock_run_traffic_once, mock_build_ports): + client_resource_helper = ClientResourceHelper(mock.Mock()) + client = mock.Mock() + traffic_profile = mock.Mock() + mq_producer = mock.Mock() + with mock.patch.object(client_resource_helper, '_connect') \ + as mock_connect, \ + mock.patch.object(client_resource_helper, '_terminated') \ + as mock_terminated: + mock_connect.return_value = client + type(mock_terminated).value = mock.PropertyMock( + side_effect=[0, 1, lambda x: x]) + client_resource_helper.run_traffic(traffic_profile, mq_producer) + + mock_build_ports.assert_called_once() + traffic_profile.register_generator.assert_called_once() + mq_producer.tg_method_started.assert_called_once() + mq_producer.tg_method_finished.assert_called_once() + mq_producer.tg_method_iteration.assert_called_once_with(1) + mock_run_traffic_once.assert_called_once_with(traffic_profile) + + @mock.patch.object(ClientResourceHelper, '_build_ports') + @mock.patch.object(ClientResourceHelper, '_run_traffic_once', + side_effect=Exception) + def test_run_traffic_exception(self, mock_run_traffic_once, + mock_build_ports): + client_resource_helper = ClientResourceHelper(mock.Mock()) + client = mock.Mock() + traffic_profile = mock.Mock() + mq_producer = mock.Mock() + with mock.patch.object(client_resource_helper, '_connect') \ + as mock_connect, \ + mock.patch.object(client_resource_helper, '_terminated') \ + as mock_terminated: + mock_connect.return_value = client + type(mock_terminated).value = mock.PropertyMock(return_value=0) + mq_producer.reset_mock() + # NOTE(ralonsoh): "trex_stl_exceptions.STLError" is mocked + with self.assertRaises(Exception): + client_resource_helper.run_traffic(traffic_profile, + mq_producer) + + mock_build_ports.assert_called_once() + traffic_profile.register_generator.assert_called_once() + mock_run_traffic_once.assert_called_once_with(traffic_profile) + mq_producer.tg_method_started.assert_called_once() + mq_producer.tg_method_finished.assert_not_called() + mq_producer.tg_method_iteration.assert_not_called() + class TestRfc2544ResourceHelper(unittest.TestCase): diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py index 050aa4aa0..3e2f598d2 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py @@ -406,7 +406,8 @@ class TestProxTrafficGen(unittest.TestCase): sut.setup_helper.prox_config_dict = {} sut._connect_client = mock.Mock(autospec=STLClient) sut._connect_client.get_stats = mock.Mock(return_value="0") - sut._traffic_runner(mock_traffic_profile) + sut._setup_mq_producer = mock.Mock(return_value='mq_producer') + sut._traffic_runner(mock_traffic_profile, mock.ANY) @mock.patch('yardstick.network_services.vnf_generic.vnf.prox_helpers.socket') @mock.patch(SSH_HELPER) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py index 42ac40b50..4ade157a3 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py @@ -379,7 +379,8 @@ class TestIXIATrafficGen(unittest.TestCase): mock.mock_open(), create=True) @mock.patch('yardstick.network_services.vnf_generic.vnf.tg_rfc2544_ixia.LOG.exception') def _traffic_runner(*args): - result = sut._traffic_runner(mock_traffic_profile) + sut._setup_mq_producer = mock.Mock(return_value='mq_producer') + result = sut._traffic_runner(mock_traffic_profile, mock.ANY) self.assertIsNone(result) _traffic_runner() diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py index 350ba8448..700e910f9 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py @@ -387,8 +387,9 @@ class TestTrexTrafficGen(unittest.TestCase): # must generate cfg before we can run traffic so Trex port mapping is # created self.sut.resource_helper.generate_cfg() + self.sut._setup_mq_producer = mock.Mock() with mock.patch.object(self.sut.resource_helper, 'run_traffic'): - self.sut._traffic_runner(mock_traffic_profile) + self.sut._traffic_runner(mock_traffic_profile, mock.ANY) def test__generate_trex_cfg(self): vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0] @@ -453,9 +454,8 @@ class TestTrexTrafficGen(unittest.TestCase): self.sut.ssh_helper.run = mock.Mock() self.sut._traffic_runner = mock.Mock(return_value=0) self.sut.resource_helper.client_started.value = 1 - result = self.sut.run_traffic(mock_traffic_profile) + self.sut.run_traffic(mock_traffic_profile) self.sut._traffic_process.terminate() - self.assertIsNotNone(result) def test_terminate(self): vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0] |