aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ansible/install_trex_standalone.yml51
-rw-r--r--ansible/roles/install_samplevnf/vars/main.yml2
-rw-r--r--ansible/ubuntu_server_baremetal_deploy_samplevnfs.yml2
-rw-r--r--ansible/ubuntu_server_cloudimg_modify_samplevnfs.yml2
-rw-r--r--samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml96
-rw-r--r--tests/opnfv/test_cases/opnfv_yardstick_tc087.yaml6
-rw-r--r--tests/opnfv/test_cases/opnfv_yardstick_tc092.yaml6
-rw-r--r--tests/opnfv/test_cases/opnfv_yardstick_tc093.yaml6
-rw-r--r--yardstick/benchmark/contexts/kubernetes.py24
-rwxr-xr-xyardstick/benchmark/runners/base.py44
-rw-r--r--yardstick/benchmark/runners/duration.py1
-rw-r--r--yardstick/benchmark/runners/iteration_ipc.py205
-rw-r--r--yardstick/benchmark/scenarios/base.py4
-rw-r--r--yardstick/benchmark/scenarios/networking/vnf_generic.py33
-rw-r--r--yardstick/common/constants.py4
-rw-r--r--yardstick/common/exceptions.py42
-rw-r--r--yardstick/common/kubernetes_utils.py86
-rw-r--r--yardstick/common/messaging/__init__.py27
-rw-r--r--yardstick/common/messaging/consumer.py10
-rw-r--r--yardstick/common/messaging/payloads.py20
-rw-r--r--yardstick/common/messaging/producer.py13
-rw-r--r--yardstick/common/utils.py22
-rw-r--r--yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py9
-rw-r--r--yardstick/network_services/vnf_generic/vnf/base.py50
-rw-r--r--yardstick/network_services/vnf_generic/vnf/prox_helpers.py2
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py28
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_ping.py2
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py2
-rw-r--r--yardstick/network_services/vnf_generic/vnf/vpe_vnf.py4
-rw-r--r--yardstick/orchestrator/kubernetes.py302
-rw-r--r--yardstick/tests/functional/common/messaging/test_messaging.py22
-rw-r--r--yardstick/tests/functional/common/test_utils.py38
-rw-r--r--yardstick/tests/unit/benchmark/contexts/test_kubernetes.py4
-rw-r--r--yardstick/tests/unit/benchmark/runner/test_base.py56
-rw-r--r--yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py136
-rw-r--r--yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py5
-rw-r--r--yardstick/tests/unit/common/messaging/test_payloads.py36
-rw-r--r--yardstick/tests/unit/common/messaging/test_producer.py7
-rw-r--r--yardstick/tests/unit/common/test_kubernetes_utils.py224
-rw-r--r--yardstick/tests/unit/common/test_utils.py27
-rw-r--r--yardstick/tests/unit/network_services/libs/ixia_libs/test_ixnet_api.py10
-rw-r--r--yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py103
-rw-r--r--yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py51
-rw-r--r--yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py3
-rw-r--r--yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py3
-rw-r--r--yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py6
-rw-r--r--yardstick/tests/unit/orchestrator/test_kubernetes.py343
47 files changed, 1963 insertions, 216 deletions
diff --git a/ansible/install_trex_standalone.yml b/ansible/install_trex_standalone.yml
deleted file mode 100644
index 9cf64142b..000000000
--- a/ansible/install_trex_standalone.yml
+++ /dev/null
@@ -1,51 +0,0 @@
-# Copyright (c) 2017 Intel Corporation.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
----
-- hosts: yardstick
- vars:
- ansible_python_interpreter: "/usr/bin/env python"
- # needed for virtualenv
- NSB_INSTALL_DIR: /root/nsb_install
- INSTALL_BIN_PATH: /opt/nsb_bin
- #TREX_DOWNLOAD: "https://trex-tgn.cisco.com/trex/release/v2.05.tar.gz"
- TREX_VERSION: v2.20
- TREX_DOWNLOAD: "https://trex-tgn.cisco.com/trex/release/{{ TREX_VERSION }}.tar.gz"
-
- tasks:
- - get_url:
- url: "{{ TREX_DOWNLOAD }}"
- dest: "{{ NSB_INSTALL_DIR }}"
- checksum: "sha256:b9620341e552d2ef71d5ffa39ef92f12a1186836c250390db77bd7228497b91c"
-
- - unarchive:
- src: "{{ NSB_INSTALL_DIR }}/{{ TREX_DOWNLOAD|basename }}"
- dest: "{{ NSB_INSTALL_DIR }}"
- copy: no
-
- - file: path="{{ INSTALL_BIN_PATH }}/trex" state=absent
- - file: path="{{ INSTALL_BIN_PATH }}/trex" state=directory
-
- - command: mv "{{ NSB_INSTALL_DIR }}/{{ TREX_DOWNLOAD|basename|regex_replace('\.tar.gz', '') }}" "{{ INSTALL_BIN_PATH }}/trex/scripts"
-
- - file: path="{{ INSTALL_BIN_PATH }}/trex/scripts/automation/trex_control_plane/stl/__init__.py" state=touch
-
- - command: cp "{{ INSTALL_BIN_PATH }}/trex/scripts/dpdk_nic_bind.py" "{{ INSTALL_BIN_PATH }}"
-
- - name: add scripts to PYTHONPATH
- lineinfile:
- dest: /etc/environment
- regexp: "^PYTHONPATH="
- line: "PYTHONPATH={{ INSTALL_BIN_PATH }}/trex/scripts/automation/trex_control_plane:{{ INSTALL_BIN_PATH }}/trex/scripts/automation/trex_control_plane/stl:{{ NSB_INSTALL_DIR }}/yardstick"
- state: present
- create: yes
diff --git a/ansible/roles/install_samplevnf/vars/main.yml b/ansible/roles/install_samplevnf/vars/main.yml
index c92a9b09f..e2a37377a 100644
--- a/ansible/roles/install_samplevnf/vars/main.yml
+++ b/ansible/roles/install_samplevnf/vars/main.yml
@@ -48,13 +48,11 @@ vnf_build_dirs:
ACL: vACL
FW: vFW
CGNAPT: vCGNAPT
- PE: vPE
UDP_Replay: UDP_Replay
PROX: DPPD-PROX
vnf_app_names:
ACL: vACL
FW: vFW
CGNAPT: vCGNAPT
- PE: vPE
UDP_Replay: UDP_Replay
PROX: prox
diff --git a/ansible/ubuntu_server_baremetal_deploy_samplevnfs.yml b/ansible/ubuntu_server_baremetal_deploy_samplevnfs.yml
index d858257b1..3a29a8a90 100644
--- a/ansible/ubuntu_server_baremetal_deploy_samplevnfs.yml
+++ b/ansible/ubuntu_server_baremetal_deploy_samplevnfs.yml
@@ -45,8 +45,6 @@
vnf_name: FW
- role: install_samplevnf
vnf_name: CGNAPT
- - role: install_samplevnf
- vnf_name: PE
# build shared DPDK for collectd only, required DPDK downloaded already
- install_dpdk_shared
- install_rabbitmq
diff --git a/ansible/ubuntu_server_cloudimg_modify_samplevnfs.yml b/ansible/ubuntu_server_cloudimg_modify_samplevnfs.yml
index aab5a741c..b27933bd1 100644
--- a/ansible/ubuntu_server_cloudimg_modify_samplevnfs.yml
+++ b/ansible/ubuntu_server_cloudimg_modify_samplevnfs.yml
@@ -55,8 +55,6 @@
vnf_name: FW
- role: install_samplevnf
vnf_name: CGNAPT
- - role: install_samplevnf
- vnf_name: PE
# build shared DPDK for collectd only, required DPDK downloaded already
- install_dpdk_shared
- install_rabbitmq
diff --git a/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml b/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml
new file mode 100644
index 000000000..184ed6881
--- /dev/null
+++ b/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml
@@ -0,0 +1,96 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+{% set provider = provider or none %}
+{% set physical_networks = physical_networks or ['physnet1', 'physnet2'] %}
+{% set segmentation_id = segmentation_id or none %}
+
+schema: yardstick:task:0.1
+scenarios:
+- type: NSPerf
+ traffic_profile: ../../traffic_profiles/ipv4_throughput.yaml
+ topology: vfw-tg-topology.yaml
+ nodes:
+ tg__0: trafficgen_1.yardstick
+ vnf__0: vnf.yardstick
+ options:
+ hugepages_gb: 8
+ framesize:
+ uplink: {64B: 100}
+ downlink: {64B: 100}
+ flow:
+ src_ip: [{'tg__0': 'xe0'}]
+ dst_ip: [{'tg__0': 'xe1'}]
+ count: 1
+ traffic_type: 4
+ rfc2544:
+ allowed_drop_rate: 0.0001 - 0.0001
+ vnf__0:
+ rules: acl_1rule.yaml
+ vnf_config: {lb_config: 'SW', lb_count: 1, worker_config: '1C/1T', worker_threads: 1}
+ runner:
+ type: IterationIPC
+ iterations: 10
+ timeout: 60
+context:
+ # put node context first, so we don't HEAT deploy if node has errors
+ name: yardstick
+ image: yardstick-samplevnfs
+ flavor:
+ vcpus: 10
+ ram: 12288
+ disk: 6
+ extra_specs:
+ hw:cpu_sockets: 1
+ hw:cpu_cores: 10
+ hw:cpu_threads: 1
+ user: ubuntu
+ placement_groups:
+ pgrp1:
+ policy: "availability"
+ servers:
+ vnf:
+ floating_ip: true
+ placement: "pgrp1"
+ trafficgen_1:
+ floating_ip: true
+ placement: "pgrp1"
+ networks:
+ mgmt:
+ cidr: '10.0.1.0/24'
+ xe0:
+ cidr: '10.0.2.0/24'
+ gateway_ip: 'null'
+ {% if provider %}
+ provider: {{ provider }}
+ physical_network: {{ physical_networks[0] }}
+ {% if segmentation_id %}
+ segmentation_id: {{ segmentation_id }}
+ {% endif %}
+ {% endif %}
+ port_security_enabled: False
+ enable_dhcp: 'false'
+ xe1:
+ cidr: '10.0.3.0/24'
+ gateway_ip: 'null'
+ {% if provider %}
+ provider: {{ provider }}
+ physical_network: {{ physical_networks[1] }}
+ {% if segmentation_id %}
+ segmentation_id: {{ segmentation_id }}
+ {% endif %}
+ {% endif %}
+ port_security_enabled: False
+ enable_dhcp: 'false'
diff --git a/tests/opnfv/test_cases/opnfv_yardstick_tc087.yaml b/tests/opnfv/test_cases/opnfv_yardstick_tc087.yaml
index d7441836d..13125ade8 100644
--- a/tests/opnfv/test_cases/opnfv_yardstick_tc087.yaml
+++ b/tests/opnfv/test_cases/opnfv_yardstick_tc087.yaml
@@ -74,7 +74,7 @@ scenarios:
type: Duration
duration: 1
sla:
- action: monitor
+ action: assert
-
@@ -172,7 +172,7 @@ scenarios:
type: Duration
duration: 1
sla:
- action: monitor
+ action: assert
-
type: "GeneralHA"
@@ -239,7 +239,7 @@ scenarios:
type: Duration
duration: 1
sla:
- action: monitor
+ action: assert
contexts:
diff --git a/tests/opnfv/test_cases/opnfv_yardstick_tc092.yaml b/tests/opnfv/test_cases/opnfv_yardstick_tc092.yaml
index 85ec510df..f2996bcc6 100644
--- a/tests/opnfv/test_cases/opnfv_yardstick_tc092.yaml
+++ b/tests/opnfv/test_cases/opnfv_yardstick_tc092.yaml
@@ -73,7 +73,7 @@ scenarios:
type: Duration
duration: 1
sla:
- action: monitor
+ action: assert
-
type: "GeneralHA"
@@ -170,7 +170,7 @@ scenarios:
type: Duration
duration: 1
sla:
- action: monitor
+ action: assert
-
type: "GeneralHA"
@@ -237,7 +237,7 @@ scenarios:
type: Duration
duration: 1
sla:
- action: monitor
+ action: assert
contexts:
diff --git a/tests/opnfv/test_cases/opnfv_yardstick_tc093.yaml b/tests/opnfv/test_cases/opnfv_yardstick_tc093.yaml
index a034471aa..27e78a451 100644
--- a/tests/opnfv/test_cases/opnfv_yardstick_tc093.yaml
+++ b/tests/opnfv/test_cases/opnfv_yardstick_tc093.yaml
@@ -75,7 +75,7 @@ scenarios:
type: Duration
duration: 1
sla:
- action: monitor
+ action: assert
-
@@ -208,7 +208,7 @@ scenarios:
type: Duration
duration: 1
sla:
- action: monitor
+ action: assert
-
type: "GeneralHA"
@@ -274,7 +274,7 @@ scenarios:
type: Duration
duration: 1
sla:
- action: monitor
+ action: assert
contexts:
diff --git a/yardstick/benchmark/contexts/kubernetes.py b/yardstick/benchmark/contexts/kubernetes.py
index 18c41212d..52d17df4d 100644
--- a/yardstick/benchmark/contexts/kubernetes.py
+++ b/yardstick/benchmark/contexts/kubernetes.py
@@ -47,6 +47,8 @@ class KubernetesContext(Context):
LOG.info('Creating ssh key')
self._set_ssh_key()
+ self._create_crd()
+ self._create_networks()
LOG.info('Launch containers')
self._create_rcs()
self._create_services()
@@ -60,6 +62,8 @@ class KubernetesContext(Context):
self._delete_rcs()
self._delete_pods()
self._delete_services()
+ self._delete_networks()
+ self._delete_crd()
super(KubernetesContext, self).undeploy()
@@ -106,6 +110,26 @@ class KubernetesContext(Context):
def _delete_pod(self, pod):
k8s_utils.delete_pod(pod)
+ def _create_crd(self):
+ LOG.info('Create Custom Resource Definition elements')
+ for crd in self.template.crd:
+ crd.create()
+
+ def _delete_crd(self):
+ LOG.info('Delete Custom Resource Definition elements')
+ for crd in self.template.crd:
+ crd.delete()
+
+ def _create_networks(self): # pragma: no cover
+ LOG.info('Create Network elements')
+ for net in self.template.network_objs:
+ net.create()
+
+ def _delete_networks(self): # pragma: no cover
+ LOG.info('Create Network elements')
+ for net in self.template.network_objs:
+ net.delete()
+
def _get_key_path(self):
task_id = self.name.split('-')[-1]
k = 'files/yardstick_key-{}'.format(task_id)
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index fbdf6c281..af2557441 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -12,27 +12,26 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
import logging
import multiprocessing
import subprocess
import time
import traceback
-from subprocess import CalledProcessError
-
-import importlib
-from six.moves.queue import Empty
+from six import moves
-import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.common import utils
from yardstick.dispatcher.base import Base as DispatcherBase
+
log = logging.getLogger(__name__)
@@ -41,7 +40,7 @@ def _execute_shell_command(command):
exitcode = 0
try:
output = subprocess.check_output(command, shell=True)
- except CalledProcessError:
+ except subprocess.CalledProcessError:
exitcode = -1
output = traceback.format_exc()
log.error("exec command '%s' error:\n ", command)
@@ -245,7 +244,7 @@ class Runner(object):
log.debug("output_queue size %s", self.output_queue.qsize())
try:
result.update(self.output_queue.get(True, 1))
- except Empty:
+ except moves.queue.Empty:
pass
return result
@@ -259,7 +258,7 @@ class Runner(object):
log.debug("result_queue size %s", self.result_queue.qsize())
try:
one_record = self.result_queue.get(True, 1)
- except Empty:
+ except moves.queue.Empty:
pass
else:
if output_in_influxdb:
@@ -272,3 +271,22 @@ class Runner(object):
dispatchers = DispatcherBase.get(self.config['output_config'])
dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
+
+
+class RunnerProducer(producer.MessagingProducer):
+ """Class implementing the message producer for runners"""
+
+ def __init__(self, _id):
+ super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
+
+ def start_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_START_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))
+
+ def stop_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_STOP_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index 60f1fa536..14fd8bb47 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -74,6 +74,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
except y_exc.SLAValidationError as error:
# SLA validation failed in scenario, determine what to do now
if sla_action == "assert":
+ benchmark.teardown()
raise
elif sla_action == "monitor":
LOG.warning("SLA validation failed: %s", error.args)
diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py
new file mode 100644
index 000000000..a0335fdc7
--- /dev/null
+++ b/yardstick/benchmark/runners/iteration_ipc.py
@@ -0,0 +1,205 @@
+# Copyright 2018: Intel Corporation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""A runner that runs a configurable number of times before it returns. Each
+ iteration has a configurable timeout. The loop control depends on the
+ feedback received from the running VNFs. The context PIDs from the VNFs
+ to listen the messages from are given in the scenario "setup" method.
+"""
+
+import logging
+import multiprocessing
+import time
+import traceback
+
+import os
+
+from yardstick.benchmark.runners import base as base_runner
+from yardstick.common import exceptions
+from yardstick.common import messaging
+from yardstick.common import utils
+from yardstick.common.messaging import consumer
+from yardstick.common.messaging import payloads
+
+
+LOG = logging.getLogger(__name__)
+
+QUEUE_PUT_TIMEOUT = 10
+ITERATION_TIMEOUT = 180
+
+
+class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
+ """Endpoint class for ``RunnerIterationIPCConsumer``"""
+
+ def tg_method_started(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_ids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_STARTED,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)},
+ QUEUE_PUT_TIMEOUT)
+
+ def tg_method_finished(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_ids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_FINISHED,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)})
+
+ def tg_method_iteration(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_ids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_ITERATION,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)})
+
+
+class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
+ """MQ consumer for "IterationIPC" runner"""
+
+ def __init__(self, _id, ctx_ids):
+ self._id = _id
+ self._queue = multiprocessing.Queue()
+ endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
+ super(RunnerIterationIPCConsumer, self).__init__(
+ messaging.TOPIC_TG, ctx_ids, endpoints)
+ self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
+ self.iteration_index = None
+
+ def is_all_kpis_received_in_iteration(self):
+ """Check if all producers registered have sent the ITERATION msg
+
+ During the present iteration, all producers (traffic generators) must
+ start and finish the traffic injection, and at the end of the traffic
+ injection a TG_METHOD_ITERATION must be sent. This function will check
+ all KPIs in the present iteration are received. E.g.:
+ self.iteration_index = 2
+
+ self._kpi_per_id = {
+ 'ctx1': [kpi0, kpi1, kpi2],
+ 'ctx2': [kpi0, kpi1]} --> return False
+
+ self._kpi_per_id = {
+ 'ctx1': [kpi0, kpi1, kpi2],
+ 'ctx2': [kpi0, kpi1, kpi2]} --> return True
+ """
+ while not self._queue.empty():
+ msg = self._queue.get(True, 1)
+ if msg['action'] == messaging.TG_METHOD_ITERATION:
+ id_iter_list = self._kpi_per_id[msg['id']]
+ id_iter_list.append(msg['payload'].kpi)
+
+ return all(len(id_iter_list) == self.iteration_index
+ for id_iter_list in self._kpi_per_id.values())
+
+
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted, output_queue): # pragma: no cover
+ runner_cfg = scenario_cfg['runner']
+
+ timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
+ iterations = runner_cfg.get('iterations', 1)
+ run_step = runner_cfg.get('run_step', 'setup,run,teardown')
+ LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
+
+ runner_cfg['runner_id'] = os.getpid()
+
+ benchmark = cls(scenario_cfg, context_cfg)
+ method = getattr(benchmark, method_name)
+
+ if 'setup' not in run_step:
+ raise exceptions.RunnerIterationIPCSetupActionNeeded()
+ benchmark.setup()
+ producer_ctxs = benchmark.get_mq_ids()
+ if not producer_ctxs:
+ raise exceptions.RunnerIterationIPCNoCtxs()
+
+ mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
+ mq_consumer.start_rpc_server()
+ mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
+
+ iteration_index = 1
+ while 'run' in run_step:
+ LOG.debug('runner=%(runner)s seq=%(sequence)s START',
+ {'runner': runner_cfg['runner_id'],
+ 'sequence': iteration_index})
+ data = {}
+ result = None
+ errors = ''
+ mq_consumer.iteration_index = iteration_index
+ mq_producer.start_iteration()
+
+ try:
+ utils.wait_until_true(
+ mq_consumer.is_all_kpis_received_in_iteration,
+ timeout=timeout, sleep=2)
+ result = method(data)
+ except Exception: # pylint: disable=broad-except
+ errors = traceback.format_exc()
+ LOG.exception(errors)
+
+ mq_producer.stop_iteration()
+
+ if result:
+ output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
+ benchmark_output = {'timestamp': time.time(),
+ 'sequence': iteration_index,
+ 'data': data,
+ 'errors': errors}
+ queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
+
+ LOG.debug('runner=%(runner)s seq=%(sequence)s END',
+ {'runner': runner_cfg['runner_id'],
+ 'sequence': iteration_index})
+
+ iteration_index += 1
+ if iteration_index > iterations or aborted.is_set():
+ LOG.info('"IterationIPC" worker END')
+ break
+
+ if 'teardown' in run_step:
+ try:
+ benchmark.teardown()
+ except Exception:
+ LOG.exception('Exception during teardown process')
+ mq_consumer.stop_rpc_server()
+ raise SystemExit(1)
+
+ LOG.debug('Data queue size = %s', queue.qsize())
+ LOG.debug('Output queue size = %s', output_queue.qsize())
+ mq_consumer.stop_rpc_server()
+
+
+class IterationIPCRunner(base_runner.Runner):
+ """Run a scenario for a configurable number of times.
+
+ Each iteration has a configurable timeout. The loop control depends on the
+ feedback received from the running VNFs. The context PIDs from the VNFs to
+ listen the messages from are given in the scenario "setup" method.
+ """
+ __execution_type__ = 'IterationIPC'
+
+ def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = '{}-{}-{}'.format(
+ self.__execution_type__, scenario_cfg.get('type'), os.getpid())
+ self.process = multiprocessing.Process(
+ name=name,
+ target=_worker_process,
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted, self.output_queue))
+ self.process.start()
diff --git a/yardstick/benchmark/scenarios/base.py b/yardstick/benchmark/scenarios/base.py
index 30ac1bea9..90a87ac29 100644
--- a/yardstick/benchmark/scenarios/base.py
+++ b/yardstick/benchmark/scenarios/base.py
@@ -119,3 +119,7 @@ class Scenario(object):
except TypeError:
dic[k] = v
return dic
+
+ def get_mq_ids(self): # pragma: no cover
+ """Return stored MQ producer IDs, if defined"""
+ pass
diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py
index eb62d6222..3bb168c70 100644
--- a/yardstick/benchmark/scenarios/networking/vnf_generic.py
+++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py
@@ -50,7 +50,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
__scenario_type__ = "NSPerf"
- def __init__(self, scenario_cfg, context_cfg): # Yardstick API
+ def __init__(self, scenario_cfg, context_cfg): # pragma: no cover
super(NetworkServiceTestCase, self).__init__()
self.scenario_cfg = scenario_cfg
self.context_cfg = context_cfg
@@ -61,6 +61,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
self.traffic_profile = None
self.node_netdevs = {}
self.bin_path = get_nsb_option('bin_path', '')
+ self._mq_ids = []
def _get_ip_flow_range(self, ip_start_range):
@@ -168,18 +169,18 @@ class NetworkServiceTestCase(scenario_base.Scenario):
topology_yaml = vnfdgen.generate_vnfd(topology, topolgy_data)
self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
- def _find_vnf_name_from_id(self, vnf_id):
+ def _find_vnf_name_from_id(self, vnf_id): # pragma: no cover
return next((vnfd["vnfd-id-ref"]
for vnfd in self.topology["constituent-vnfd"]
if vnf_id == vnfd["member-vnf-index"]), None)
- def _find_vnfd_from_vnf_idx(self, vnf_id):
+ def _find_vnfd_from_vnf_idx(self, vnf_id): # pragma: no cover
return next((vnfd
for vnfd in self.topology["constituent-vnfd"]
if vnf_id == vnfd["member-vnf-index"]), None)
@staticmethod
- def find_node_if(nodes, name, if_name, vld_id):
+ def find_node_if(nodes, name, if_name, vld_id): # pragma: no cover
try:
# check for xe0, xe1
intf = nodes[name]["interfaces"][if_name]
@@ -272,14 +273,14 @@ class NetworkServiceTestCase(scenario_base.Scenario):
node0_if["peer_intf"] = node1_copy
node1_if["peer_intf"] = node0_copy
- def _update_context_with_topology(self):
+ def _update_context_with_topology(self): # pragma: no cover
for vnfd in self.topology["constituent-vnfd"]:
vnf_idx = vnfd["member-vnf-index"]
vnf_name = self._find_vnf_name_from_id(vnf_idx)
vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
self.context_cfg["nodes"][vnf_name].update(vnfd)
- def _generate_pod_yaml(self):
+ def _generate_pod_yaml(self): # pragma: no cover
context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
# convert OrderedDict to a list
# pod.yaml nodes is a list
@@ -293,7 +294,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
explicit_start=True)
@staticmethod
- def _serialize_node(node):
+ def _serialize_node(node): # pragma: no cover
new_node = copy.deepcopy(node)
# name field is required
# remove context suffix
@@ -315,7 +316,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
self._update_context_with_topology()
@classmethod
- def get_vnf_impl(cls, vnf_model_id):
+ def get_vnf_impl(cls, vnf_model_id): # pragma: no cover
""" Find the implementing class from vnf_model["vnf"]["name"] field
:param vnf_model_id: parsed vnfd model ID field
@@ -343,7 +344,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
raise exceptions.IncorrectConfig(error_msg=message)
@staticmethod
- def create_interfaces_from_node(vnfd, node):
+ def create_interfaces_from_node(vnfd, node): # pragma: no cover
ext_intfs = vnfd["vdu"][0]["external-interface"] = []
# have to sort so xe0 goes first
for intf_name, intf in sorted(node['interfaces'].items()):
@@ -412,10 +413,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
return vnfs
def setup(self):
- """ Setup infrastructure, provission VNFs & start traffic
-
- :return:
- """
+ """Setup infrastructure, provission VNFs & start traffic"""
# 1. Verify if infrastructure mapping can meet topology
self.map_topology_to_infrastructure()
# 1a. Load VNF models
@@ -457,6 +455,11 @@ class NetworkServiceTestCase(scenario_base.Scenario):
for traffic_gen in traffic_runners:
LOG.info("Starting traffic on %s", traffic_gen.name)
traffic_gen.run_traffic(self.traffic_profile)
+ self._mq_ids.append(traffic_gen.get_mq_producer_id())
+
+ def get_mq_ids(self): # pragma: no cover
+ """Return stored MQ producer IDs"""
+ return self._mq_ids
def run(self, result): # yardstick API
""" Yardstick calls run() at intervals defined in the yaml and
@@ -495,10 +498,10 @@ class NetworkServiceTestCase(scenario_base.Scenario):
LOG.exception("")
raise RuntimeError("Error in teardown")
- def pre_run_wait_time(self, time_seconds):
+ def pre_run_wait_time(self, time_seconds): # pragma: no cover
"""Time waited before executing the run method"""
time.sleep(time_seconds)
- def post_run_wait_time(self, time_seconds):
+ def post_run_wait_time(self, time_seconds): # pragma: no cover
"""Time waited after executing the run method"""
pass
diff --git a/yardstick/common/constants.py b/yardstick/common/constants.py
index 1ebd32509..2f14d4bc4 100644
--- a/yardstick/common/constants.py
+++ b/yardstick/common/constants.py
@@ -171,3 +171,7 @@ TESTSUITE_PRE = 'opnfv_'
# OpenStack cloud default config parameters
OS_CLOUD_DEFAULT_CONFIG = {'verify': False}
+
+# Kubernetes
+SCOPE_NAMESPACED = 'Namespaced'
+SCOPE_CLUSTER = 'Cluster'
diff --git a/yardstick/common/exceptions.py b/yardstick/common/exceptions.py
index 935c77866..641c4e1c4 100644
--- a/yardstick/common/exceptions.py
+++ b/yardstick/common/exceptions.py
@@ -14,6 +14,8 @@
from oslo_utils import excutils
+from yardstick.common import constants
+
class ProcessExecutionError(RuntimeError):
def __init__(self, message, returncode):
@@ -191,6 +193,15 @@ class TaskRenderError(YardstickException):
message = 'Failed to render template:\n%(input_task)s'
+class RunnerIterationIPCSetupActionNeeded(YardstickException):
+ message = ('IterationIPC needs the "setup" action to retrieve the VNF '
+ 'handling processes PIDs to receive the messages sent')
+
+
+class RunnerIterationIPCNoCtxs(YardstickException):
+ message = 'Benchmark "setup" action did not return any VNF process PID'
+
+
class TimerTimeout(YardstickException):
message = 'Timer timeout expired, %(timeout)s seconds'
@@ -199,10 +210,41 @@ class WaitTimeout(YardstickException):
message = 'Wait timeout while waiting for condition'
+class KubernetesApiException(YardstickException):
+ message = ('Kubernetes API errors. Action: %(action)s, '
+ 'resource: %(resource)s')
+
+
+class KubernetesConfigFileNotFound(YardstickException):
+ message = 'Config file (%s) not found' % constants.K8S_CONF_FILE
+
+
class KubernetesTemplateInvalidVolumeType(YardstickException):
message = 'No valid "volume" types present in %(volume)s'
+class KubernetesCRDObjectDefinitionError(YardstickException):
+ message = ('Kubernetes Custom Resource Definition Object error, missing '
+ 'parameters: %(missing_parameters)s')
+
+
+class KubernetesNetworkObjectDefinitionError(YardstickException):
+ message = ('Kubernetes Network object definition error, missing '
+ 'parameters: %(missing_parameters)s')
+
+
+class KubernetesNetworkObjectKindMissing(YardstickException):
+ message = 'Kubernetes kind "Network" is not defined'
+
+
+class KubernetesWrongRestartPolicy(YardstickException):
+ message = 'Restart policy "%(rpolicy)s" is not valid'
+
+
+class KubernetesContainerPortNotDefined(YardstickException):
+ message = 'Container port not defined in "%(port)s"'
+
+
class ScenarioCreateNetworkError(YardstickException):
message = 'Create Neutron Network Scenario failed'
diff --git a/yardstick/common/kubernetes_utils.py b/yardstick/common/kubernetes_utils.py
index ee8e8edcd..42267fc41 100644
--- a/yardstick/common/kubernetes_utils.py
+++ b/yardstick/common/kubernetes_utils.py
@@ -13,6 +13,8 @@ from kubernetes import config
from kubernetes.client.rest import ApiException
from yardstick.common import constants as consts
+from yardstick.common import exceptions
+
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
@@ -22,12 +24,26 @@ def get_core_api(): # pragma: no cover
try:
config.load_kube_config(config_file=consts.K8S_CONF_FILE)
except IOError:
- LOG.exception('config file not found')
- raise
-
+ raise exceptions.KubernetesConfigFileNotFound()
return client.CoreV1Api()
+def get_extensions_v1beta_api():
+ try:
+ config.load_kube_config(config_file=consts.K8S_CONF_FILE)
+ except IOError:
+ raise exceptions.KubernetesConfigFileNotFound()
+ return client.ApiextensionsV1beta1Api()
+
+
+def get_custom_objects_api():
+ try:
+ config.load_kube_config(config_file=consts.K8S_CONF_FILE)
+ except IOError:
+ raise exceptions.KubernetesConfigFileNotFound()
+ return client.CustomObjectsApi()
+
+
def get_node_list(**kwargs): # pragma: no cover
core_v1_api = get_core_api()
try:
@@ -187,6 +203,70 @@ def delete_config_map(name,
raise
+def create_custom_resource_definition(body):
+ api = get_extensions_v1beta_api()
+ body_obj = client.V1beta1CustomResourceDefinition(
+ spec=body['spec'], metadata=body['metadata'])
+ try:
+ api.create_custom_resource_definition(body_obj)
+ except ValueError:
+ # NOTE(ralonsoh): bug in kubernetes-client/python 6.0.0
+ # https://github.com/kubernetes-client/python/issues/491
+ pass
+ except ApiException:
+ raise exceptions.KubernetesApiException(
+ action='create', resource='CustomResourceDefinition')
+
+
+def delete_custom_resource_definition(name):
+ api = get_extensions_v1beta_api()
+ body_obj = client.V1DeleteOptions()
+ try:
+ api.delete_custom_resource_definition(name, body_obj)
+ except ApiException:
+ raise exceptions.KubernetesApiException(
+ action='delete', resource='CustomResourceDefinition')
+
+
+def get_custom_resource_definition(kind):
+ api = get_extensions_v1beta_api()
+ try:
+ crd_list = api.list_custom_resource_definition()
+ for crd_obj in (crd_obj for crd_obj in crd_list.items
+ if crd_obj.spec.names.kind == kind):
+ return crd_obj
+ return None
+ except ApiException:
+ raise exceptions.KubernetesApiException(
+ action='delete', resource='CustomResourceDefinition')
+
+
+def create_network(scope, group, version, plural, body, namespace='default'):
+ api = get_custom_objects_api()
+ try:
+ if scope == consts.SCOPE_CLUSTER:
+ api.create_cluster_custom_object(group, version, plural, body)
+ else:
+ api.create_namespaced_custom_object(
+ group, version, namespace, plural, body)
+ except ApiException:
+ raise exceptions.KubernetesApiException(
+ action='create', resource='Custom Object: Network')
+
+
+def delete_network(scope, group, version, plural, name, namespace='default'):
+ api = get_custom_objects_api()
+ try:
+ if scope == consts.SCOPE_CLUSTER:
+ api.delete_cluster_custom_object(group, version, plural, name, {})
+ else:
+ api.delete_namespaced_custom_object(
+ group, version, namespace, plural, name, {})
+ except ApiException:
+ raise exceptions.KubernetesApiException(
+ action='delete', resource='Custom Object: Network')
+
+
def get_pod_list(namespace='default'): # pragma: no cover
core_v1_api = get_core_api()
try:
diff --git a/yardstick/common/messaging/__init__.py b/yardstick/common/messaging/__init__.py
index f0f012ec3..bd700d9b1 100644
--- a/yardstick/common/messaging/__init__.py
+++ b/yardstick/common/messaging/__init__.py
@@ -1,14 +1,3 @@
-# Copyright (c) 2018 Intel Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -28,9 +17,17 @@ TRANSPORT_URL = (MQ_SERVICE + '://' + MQ_USER + ':' + MQ_PASS + '@' + SERVER +
RPC_SERVER_EXECUTOR = 'threading'
# Topics.
-RUNNER = 'runner'
+TOPIC_TG = 'topic_traffic_generator'
+TOPIC_RUNNER = 'topic_runner'
# Methods.
-# RUNNER methods:
-RUNNER_INFO = 'runner_info'
-RUNNER_LOOP = 'runner_loop'
+# Traffic generator consumers methods. Names must match the methods implemented
+# in the consumer endpoint class.
+TG_METHOD_STARTED = 'tg_method_started'
+TG_METHOD_FINISHED = 'tg_method_finished'
+TG_METHOD_ITERATION = 'tg_method_iteration'
+
+# Runner consumers methods. Names must match the methods implemented in the
+# consumer endpoint class.
+RUNNER_METHOD_START_ITERATION = "runner_method_start_iteration"
+RUNNER_METHOD_STOP_ITERATION = "runner_method_stop_iteration"
diff --git a/yardstick/common/messaging/consumer.py b/yardstick/common/messaging/consumer.py
index 24ec6f184..c99d7ed27 100644
--- a/yardstick/common/messaging/consumer.py
+++ b/yardstick/common/messaging/consumer.py
@@ -29,9 +29,9 @@ LOG = logging.getLogger(__name__)
class NotificationHandler(object):
"""Abstract class to define a endpoint object for a MessagingConsumer"""
- def __init__(self, _id, ctx_pids, queue):
+ def __init__(self, _id, ctx_ids, queue):
self._id = _id
- self._ctx_pids = ctx_pids
+ self._ctx_ids = ctx_ids
self._queue = queue
@@ -43,11 +43,11 @@ class MessagingConsumer(object):
the messages published by a `MessagingNotifier`.
"""
- def __init__(self, topic, pids, endpoints, fanout=True):
+ def __init__(self, topic, ids, endpoints, fanout=True):
"""Init function.
:param topic: (string) MQ exchange topic
- :param pids: (list of int) list of PIDs of the processes implementing
+ :param ids: (list of int) list of IDs of the processes implementing
the MQ Notifier which will be in the message context
:param endpoints: (list of class) list of classes implementing the
methods (see `MessagingNotifier.send_message) used by
@@ -58,7 +58,7 @@ class MessagingConsumer(object):
:returns: `MessagingConsumer` class object
"""
- self._pids = pids
+ self._ids = ids
self._endpoints = endpoints
self._transport = oslo_messaging.get_rpc_transport(
cfg.CONF, url=messaging.TRANSPORT_URL)
diff --git a/yardstick/common/messaging/payloads.py b/yardstick/common/messaging/payloads.py
index d29d79808..8ede1e58e 100644
--- a/yardstick/common/messaging/payloads.py
+++ b/yardstick/common/messaging/payloads.py
@@ -51,3 +51,23 @@ class Payload(object):
def dict_to_obj(cls, _dict):
"""Returns a Payload object built from the dictionary elements"""
return cls(**_dict)
+
+
+class TrafficGeneratorPayload(Payload):
+ """Base traffic generator payload class"""
+ REQUIRED_FIELDS = {
+ 'version', # (str) version of the payload transmitted.
+ 'iteration', # (int) iteration index during the traffic injection,
+ # starting from 1.
+ 'kpi' # (dict) collection of KPIs collected from the traffic
+ # injection. The content will depend on the generator and the
+ # traffic type.
+ }
+
+
+class RunnerPayload(Payload):
+ """Base runner payload class"""
+ REQUIRED_FIELDS = {
+ 'version', # (str) version of the payload transmitted.
+ 'data' # (dict) generic container of data to be used if needed.
+ }
diff --git a/yardstick/common/messaging/producer.py b/yardstick/common/messaging/producer.py
index b6adc0c17..aadab649d 100644
--- a/yardstick/common/messaging/producer.py
+++ b/yardstick/common/messaging/producer.py
@@ -34,18 +34,18 @@ class MessagingProducer(object):
messages in a message queue.
"""
- def __init__(self, topic, pid=os.getpid(), fanout=True):
+ def __init__(self, topic, _id=os.getpid(), fanout=True):
"""Init function.
:param topic: (string) MQ exchange topic
- :param pid: (int) PID of the process implementing this MQ Notifier
+ :param id: (int) ID of the process implementing this MQ Notifier
:param fanout: (bool) MQ clients may request that a copy of the message
be delivered to all servers listening on a topic by
setting fanout to ``True``, rather than just one of them
:returns: `MessagingNotifier` class object
"""
self._topic = topic
- self._pid = pid
+ self._id = _id
self._fanout = fanout
self._transport = oslo_messaging.get_rpc_transport(
cfg.CONF, url=messaging.TRANSPORT_URL)
@@ -65,6 +65,11 @@ class MessagingProducer(object):
consumer endpoints
:param payload: (subclass `Payload`) payload content
"""
- self._notifier.cast({'pid': self._pid},
+ self._notifier.cast({'id': self._id},
method,
**payload.obj_to_dict())
+
+ @property
+ def id(self):
+ """Return MQ producer ID"""
+ return self._id
diff --git a/yardstick/common/utils.py b/yardstick/common/utils.py
index f9fe0e336..85cecc714 100644
--- a/yardstick/common/utils.py
+++ b/yardstick/common/utils.py
@@ -527,3 +527,25 @@ def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
if exception and issubclass(exception, Exception):
raise exception # pylint: disable=raising-bad-type
raise exceptions.WaitTimeout
+
+
+def send_socket_command(host, port, command):
+ """Send a string command to a specific port in a host
+
+ :param host: (str) ip or hostname of the host
+ :param port: (int) port number
+ :param command: (str) command to send
+ :return: 0 if success, error number if error
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ ret = 0
+ try:
+ err_number = sock.connect_ex((host, int(port)))
+ if err_number != 0:
+ return err_number
+ sock.sendall(six.b(command))
+ except Exception: # pylint: disable=broad-except
+ ret = 1
+ finally:
+ sock.close()
+ return ret
diff --git a/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py b/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py
index 393f60f7c..74deeecb5 100644
--- a/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py
+++ b/yardstick/network_services/libs/ixia_libs/ixnet/ixnet_api.py
@@ -166,9 +166,10 @@ class IxNextgen(object): # pragma: no cover
:return: list of paired frame sizes and weights
"""
weighted_range_pairs = []
- for size, weight in framesize.items():
- weighted_range_pairs.append(int(size.upper().replace('B', '')))
- weighted_range_pairs.append(int(weight))
+ for size, weight in ((s, w) for (s, w) in framesize.items()
+ if int(w) != 0):
+ size = int(size.upper().replace('B', ''))
+ weighted_range_pairs.append([size, size, int(weight)])
return weighted_range_pairs
def iter_over_get_lists(self, x1, x2, y2, offset=0):
@@ -339,7 +340,7 @@ class IxNextgen(object): # pragma: no cover
"percentLineRate" no used)
- Frame size: custom IMIX [1] definition; a list of packet size in
bytes and the weight. E.g.:
- [64, 10, 128, 15, 512, 5]
+ [[64, 64, 10], [128, 128, 15], [512, 512, 5]]
[1] https://en.wikipedia.org/wiki/Internet_Mix
diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py
index 9ceac3167..fb41a4e4a 100644
--- a/yardstick/network_services/vnf_generic/vnf/base.py
+++ b/yardstick/network_services/vnf_generic/vnf/base.py
@@ -18,6 +18,9 @@ import abc
import logging
import six
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
from yardstick.network_services.helpers.samplevnf_helper import PortPairs
@@ -138,6 +141,39 @@ class VnfdHelper(dict):
yield port_name, port_num
+class TrafficGeneratorProducer(producer.MessagingProducer):
+ """Class implementing the message producer for traffic generators
+
+ This message producer must be instantiated in the process created
+ "run_traffic" process.
+ """
+ def __init__(self, _id):
+ super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
+ _id=_id)
+
+ def tg_method_started(self, version=1):
+ """Send a message to inform the traffic generation has started"""
+ self.send_message(
+ messaging.TG_METHOD_STARTED,
+ payloads.TrafficGeneratorPayload(version=version, iteration=0,
+ kpi={}))
+
+ def tg_method_finished(self, version=1):
+ """Send a message to inform the traffic generation has finished"""
+ self.send_message(
+ messaging.TG_METHOD_FINISHED,
+ payloads.TrafficGeneratorPayload(version=version, iteration=0,
+ kpi={}))
+
+ def tg_method_iteration(self, iteration, version=1, kpi=None):
+ """Send a message, with KPI, once an iteration has finished"""
+ kpi = {} if kpi is None else kpi
+ self.send_message(
+ messaging.TG_METHOD_ITERATION,
+ payloads.TrafficGeneratorPayload(version=version,
+ iteration=iteration, kpi=kpi))
+
+
@six.add_metaclass(abc.ABCMeta)
class GenericVNF(object):
"""Class providing file-like API for generic VNF implementation
@@ -216,6 +252,7 @@ class GenericTrafficGen(GenericVNF):
super(GenericTrafficGen, self).__init__(name, vnfd)
self.runs_traffic = True
self.traffic_finished = False
+ self._mq_producer = None
@abc.abstractmethod
def run_traffic(self, traffic_profile):
@@ -286,3 +323,16 @@ class GenericTrafficGen(GenericVNF):
:return: True/False
"""
pass
+
+ @staticmethod
+ def _setup_mq_producer(id):
+ """Setup the TG MQ producer to send messages between processes
+
+ :return: (derived class from ``MessagingProducer``) MQ producer object
+ """
+ return TrafficGeneratorProducer(id)
+
+ def get_mq_producer_id(self):
+ """Return the MQ producer ID if initialized"""
+ if self._mq_producer:
+ return self._mq_producer.get_id()
diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
index 6d28f4750..3241719e8 100644
--- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
+++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
@@ -969,7 +969,7 @@ class ProxResourceHelper(ClientResourceHelper):
self._test_type = self.setup_helper.find_in_section('global', 'name', None)
return self._test_type
- def run_traffic(self, traffic_profile):
+ def run_traffic(self, traffic_profile, *args):
self._queue.cancel_join_thread()
self.lower = 0.0
self.upper = 100.0
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
index 1ee71aa25..bc65380d3 100644
--- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -14,14 +14,15 @@
import logging
from multiprocessing import Queue, Value, Process
-
import os
import posixpath
import re
-import six
+import uuid
import subprocess
import time
+import six
+
from trex_stl_lib.trex_stl_client import LoggerApi
from trex_stl_lib.trex_stl_client import STLClient
from trex_stl_lib.trex_stl_exceptions import STLError
@@ -408,12 +409,13 @@ class ClientResourceHelper(ResourceHelper):
time.sleep(self.QUEUE_WAIT_TIME)
self._queue.put(samples)
- def run_traffic(self, traffic_profile):
+ def run_traffic(self, traffic_profile, mq_producer):
# if we don't do this we can hang waiting for the queue to drain
# have to do this in the subprocess
self._queue.cancel_join_thread()
# fixme: fix passing correct trex config file,
# instead of searching the default path
+ mq_producer.tg_method_started()
try:
self._build_ports()
self.client = self._connect()
@@ -421,8 +423,11 @@ class ClientResourceHelper(ResourceHelper):
self.client.remove_all_streams(self.all_ports) # remove all streams
traffic_profile.register_generator(self)
+ iteration_index = 0
while self._terminated.value == 0:
+ iteration_index += 1
self._run_traffic_once(traffic_profile)
+ mq_producer.tg_method_iteration(iteration_index)
self.client.stop(self.all_ports)
self.client.disconnect()
@@ -433,6 +438,8 @@ class ClientResourceHelper(ResourceHelper):
return # return if trex/tg server is stopped.
raise
+ mq_producer.tg_method_finished()
+
def terminate(self):
self._terminated.value = 1 # stop client
@@ -911,12 +918,13 @@ class SampleVNFTrafficGen(GenericTrafficGen):
LOG.info("%s TG Server is up and running.", self.APP_NAME)
return self._tg_process.exitcode
- def _traffic_runner(self, traffic_profile):
+ def _traffic_runner(self, traffic_profile, mq_id):
# always drop connections first thing in new processes
# so we don't get paramiko errors
self.ssh_helper.drop_connection()
LOG.info("Starting %s client...", self.APP_NAME)
- self.resource_helper.run_traffic(traffic_profile)
+ self._mq_producer = self._setup_mq_producer(mq_id)
+ self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
def run_traffic(self, traffic_profile):
""" Generate traffic on the wire according to the given params.
@@ -926,10 +934,12 @@ class SampleVNFTrafficGen(GenericTrafficGen):
:param traffic_profile:
:return: True/False
"""
- name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+ name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+ traffic_profile.__class__.__name__,
os.getpid())
- self._traffic_process = Process(name=name, target=self._traffic_runner,
- args=(traffic_profile,))
+ self._traffic_process = Process(
+ name=name, target=self._traffic_runner,
+ args=(traffic_profile, uuid.uuid1().int))
self._traffic_process.start()
# Wait for traffic process to start
while self.resource_helper.client_started.value == 0:
@@ -938,8 +948,6 @@ class SampleVNFTrafficGen(GenericTrafficGen):
if not self._traffic_process.is_alive():
break
- return self._traffic_process.is_alive()
-
def collect_kpi(self):
# check if the tg processes have exited
physical_node = Context.get_physical_node_from_server(
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
index a989543f5..4050dc6e2 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
@@ -71,7 +71,7 @@ class PingResourceHelper(ClientResourceHelper):
self._queue = Queue()
self._parser = PingParser(self._queue)
- def run_traffic(self, traffic_profile):
+ def run_traffic(self, traffic_profile, *args):
# drop the connection in order to force a new one
self.ssh_helper.drop_connection()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py
index a1f9fbeb4..875ae93b9 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py
@@ -102,7 +102,7 @@ class IxiaResourceHelper(ClientResourceHelper):
self.client.assign_ports()
self.client.create_traffic_model()
- def run_traffic(self, traffic_profile):
+ def run_traffic(self, traffic_profile, *args):
if self._terminated.value:
return
diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
index 57ea2eee3..bfff45c67 100644
--- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
@@ -239,7 +239,7 @@ class ConfigCreate(object):
class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
- APP_NAME = 'vPE'
+ APP_NAME = 'vPE_vnf'
CFG_CONFIG = "/tmp/vpe_config"
CFG_SCRIPT = "/tmp/vpe_script"
TM_CONFIG = "/tmp/full_tm_profile_10G.cfg"
@@ -286,7 +286,7 @@ class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
class VpeApproxVnf(SampleVNF):
""" This class handles vPE VNF model-driver definitions """
- APP_NAME = 'vPE'
+ APP_NAME = 'vPE_vnf'
APP_WORD = 'vpe'
COLLECT_KPI = VPE_COLLECT_KPI
WAIT_TIME = 20
diff --git a/yardstick/orchestrator/kubernetes.py b/yardstick/orchestrator/kubernetes.py
index 8ccb98853..bb01b33fa 100644
--- a/yardstick/orchestrator/kubernetes.py
+++ b/yardstick/orchestrator/kubernetes.py
@@ -9,9 +9,12 @@
import copy
+from oslo_serialization import jsonutils
+
+from yardstick.common import constants
from yardstick.common import exceptions
-from yardstick.common import utils
from yardstick.common import kubernetes_utils as k8s_utils
+from yardstick.common import utils
class ContainerObject(object):
@@ -19,6 +22,8 @@ class ContainerObject(object):
SSH_MOUNT_PATH = '/tmp/.ssh/'
IMAGE_DEFAULT = 'openretriever/yardstick'
COMMAND_DEFAULT = '/bin/bash'
+ RESOURCES = ('requests', 'limits')
+ PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
def __init__(self, name, ssh_key, **kwargs):
self._name = name
@@ -27,6 +32,10 @@ class ContainerObject(object):
self._command = [kwargs.get('command', self.COMMAND_DEFAULT)]
self._args = kwargs.get('args', [])
self._volume_mounts = kwargs.get('volumeMounts', [])
+ self._security_context = kwargs.get('securityContext')
+ self._env = kwargs.get('env', [])
+ self._resources = kwargs.get('resources', {})
+ self._ports = kwargs.get('ports', [])
def _create_volume_mounts(self):
"""Return all "volumeMounts" items per container"""
@@ -47,24 +56,55 @@ class ContainerObject(object):
def get_container_item(self):
"""Create a "container" item"""
container_name = '{}-container'.format(self._name)
- return {'args': self._args,
- 'command': self._command,
- 'image': self._image,
- 'name': container_name,
- 'volumeMounts': self._create_volume_mounts()}
-
-
-class KubernetesObject(object):
+ container = {'args': self._args,
+ 'command': self._command,
+ 'image': self._image,
+ 'name': container_name,
+ 'volumeMounts': self._create_volume_mounts()}
+ if self._security_context:
+ container['securityContext'] = self._security_context
+ if self._env:
+ container['env'] = []
+ for env in self._env:
+ container['env'].append({'name': env['name'],
+ 'value': env['value']})
+ if self._ports:
+ container['ports'] = []
+ for port in self._ports:
+ if 'containerPort' not in port.keys():
+ raise exceptions.KubernetesContainerPortNotDefined(
+ port=port)
+ _port = {port_option: value for port_option, value
+ in port.items() if port_option in self.PORT_OPTIONS}
+ container['ports'].append(_port)
+ if self._resources:
+ container['resources'] = {}
+ for res in (res for res in self._resources if
+ res in self.RESOURCES):
+ container['resources'][res] = self._resources[res]
+ return container
+
+
+class ReplicationControllerObject(object):
SSHKEY_DEFAULT = 'yardstick_key'
+ RESTART_POLICY = ('Always', 'OnFailure', 'Never')
+ TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
def __init__(self, name, **kwargs):
- super(KubernetesObject, self).__init__()
+ super(ReplicationControllerObject, self).__init__()
parameters = copy.deepcopy(kwargs)
self.name = name
self.node_selector = parameters.pop('nodeSelector', {})
self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
self._volumes = parameters.pop('volumes', [])
+ self._security_context = parameters.pop('securityContext', None)
+ self._networks = parameters.pop('networks', [])
+ self._tolerations = parameters.pop('tolerations', [])
+ self._restart_policy = parameters.pop('restartPolicy', 'Always')
+ if self._restart_policy not in self.RESTART_POLICY:
+ raise exceptions.KubernetesWrongRestartPolicy(
+ rpolicy=self._restart_policy)
containers = parameters.pop('containers', None)
if containers:
@@ -85,14 +125,14 @@ class KubernetesObject(object):
"replicas": 1,
"template": {
"metadata": {
- "labels": {
- "app": name
- }
+ "labels": {"app": name}
},
"spec": {
"containers": [],
"volumes": [],
- "nodeSelector": {}
+ "nodeSelector": {},
+ "restartPolicy": self._restart_policy,
+ "tolerations": []
}
}
}
@@ -102,6 +142,9 @@ class KubernetesObject(object):
self._add_containers()
self._add_node_selector()
self._add_volumes()
+ self._add_security_context()
+ self._add_networks()
+ self._add_tolerations()
def get_template(self):
return self.template
@@ -153,34 +196,214 @@ class KubernetesObject(object):
return {'name': name,
type_name: type_data}
+ def _add_security_context(self):
+ if self._security_context:
+ utils.set_dict_value(self.template,
+ 'spec.template.spec.securityContext',
+ self._security_context)
+
+ def _add_networks(self):
+ networks = []
+ for net in self._networks:
+ networks.append({'name': net})
+
+ if not networks:
+ return
-class ServiceObject(object):
+ annotations = {'networks': jsonutils.dumps(networks)}
+ utils.set_dict_value(self.template,
+ 'spec.template.metadata.annotations',
+ annotations)
+
+ def _add_tolerations(self):
+ tolerations = []
+ for tol in self._tolerations:
+ tolerations.append({k: tol[k] for k in tol
+ if k in self.TOLERATIONS_KEYS})
+
+ tolerations = ([{'operator': 'Exists'}] if not tolerations
+ else tolerations)
+ utils.set_dict_value(self.template,
+ 'spec.template.spec.tolerations',
+ tolerations)
- def __init__(self, name):
- self.name = '{}-service'.format(name)
+
+class ServiceNodePortObject(object):
+
+ def __init__(self, name, **kwargs):
+ """Service kind "NodePort" object
+
+ :param name: (string) name of the Service
+ :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
+ nodePort
+ """
+ self._name = '{}-service'.format(name)
self.template = {
- 'metadata': {
- 'name': '{}-service'.format(name)
- },
+ 'metadata': {'name': '{}-service'.format(name)},
'spec': {
'type': 'NodePort',
- 'ports': [
- {
- 'port': 22,
- 'protocol': 'TCP'
- }
- ],
- 'selector': {
- 'app': name
- }
+ 'ports': [],
+ 'selector': {'app': name}
}
}
+ self._add_port(22, protocol='TCP')
+ node_ports = copy.deepcopy(kwargs.get('node_ports', []))
+ for port in node_ports:
+ port_number = port.pop('port')
+ self._add_port(port_number, **port)
+
+ def _add_port(self, port, protocol=None, name=None, targetPort=None,
+ nodePort=None):
+ _port = {'port': port}
+ if protocol:
+ _port['protocol'] = protocol
+ if name:
+ _port['name'] = name
+ if targetPort:
+ _port['targetPort'] = targetPort
+ if nodePort:
+ _port['nodePort'] = nodePort
+ self.template['spec']['ports'].append(_port)
+
def create(self):
k8s_utils.create_service(self.template)
def delete(self):
- k8s_utils.delete_service(self.name)
+ k8s_utils.delete_service(self._name)
+
+
+class CustomResourceDefinitionObject(object):
+
+ MANDATORY_PARAMETERS = {'name'}
+
+ def __init__(self, ctx_name, **kwargs):
+ if not self.MANDATORY_PARAMETERS.issubset(kwargs):
+ missing_parameters = ', '.join(
+ str(param) for param in
+ (self.MANDATORY_PARAMETERS - set(kwargs)))
+ raise exceptions.KubernetesCRDObjectDefinitionError(
+ missing_parameters=missing_parameters)
+
+ singular = kwargs['name']
+ plural = singular + 's'
+ kind = singular.title()
+ version = kwargs.get('version', 'v1')
+ scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
+ group = ctx_name + '.com'
+ self._name = metadata_name = plural + '.' + group
+
+ self._template = {
+ 'metadata': {
+ 'name': metadata_name
+ },
+ 'spec': {
+ 'group': group,
+ 'version': version,
+ 'scope': scope,
+ 'names': {'plural': plural,
+ 'singular': singular,
+ 'kind': kind}
+ }
+ }
+
+ def create(self):
+ k8s_utils.create_custom_resource_definition(self._template)
+
+ def delete(self):
+ k8s_utils.delete_custom_resource_definition(self._name)
+
+
+class NetworkObject(object):
+
+ MANDATORY_PARAMETERS = {'plugin', 'args'}
+ KIND = 'Network'
+
+ def __init__(self, name, **kwargs):
+ if not self.MANDATORY_PARAMETERS.issubset(kwargs):
+ missing_parameters = ', '.join(
+ str(param) for param in
+ (self.MANDATORY_PARAMETERS - set(kwargs)))
+ raise exceptions.KubernetesNetworkObjectDefinitionError(
+ missing_parameters=missing_parameters)
+
+ self._name = name
+ self._plugin = kwargs['plugin']
+ self._args = kwargs['args']
+ self._crd = None
+ self._template = None
+ self._group = None
+ self._version = None
+ self._plural = None
+ self._scope = None
+
+ @property
+ def crd(self):
+ if self._crd:
+ return self._crd
+ crd = k8s_utils.get_custom_resource_definition(self.KIND)
+ if not crd:
+ raise exceptions.KubernetesNetworkObjectKindMissing()
+ self._crd = crd
+ return self._crd
+
+ @property
+ def group(self):
+ if self._group:
+ return self._group
+ self._group = self.crd.spec.group
+ return self._group
+
+ @property
+ def version(self):
+ if self._version:
+ return self._version
+ self._version = self.crd.spec.version
+ return self._version
+
+ @property
+ def plural(self):
+ if self._plural:
+ return self._plural
+ self._plural = self.crd.spec.names.plural
+ return self._plural
+
+ @property
+ def scope(self):
+ if self._scope:
+ return self._scope
+ self._scope = self.crd.spec.scope
+ return self._scope
+
+ @property
+ def template(self):
+ """"Network" object template
+
+ This template can be rendered only once the CRD "Network" is created in
+ Kubernetes. This function call must be delayed until the creation of
+ the CRD "Network".
+ """
+ if self._template:
+ return self._template
+
+ self._template = {
+ 'apiVersion': '{}/{}'.format(self.group, self.version),
+ 'kind': self.KIND,
+ 'metadata': {
+ 'name': self._name
+ },
+ 'plugin': self._plugin,
+ 'args': self._args
+ }
+ return self._template
+
+ def create(self):
+ k8s_utils.create_network(self.scope, self.group, self.version,
+ self.plural, self.template)
+
+ def delete(self):
+ k8s_utils.delete_network(self.scope, self.group, self.version,
+ self.plural, self._name)
class KubernetesTemplate(object):
@@ -193,16 +416,21 @@ class KubernetesTemplate(object):
"""
context_cfg = copy.deepcopy(context_cfg)
servers_cfg = context_cfg.pop('servers', {})
+ crd_cfg = context_cfg.pop('custom_resources', [])
+ networks_cfg = context_cfg.pop('networks', {})
self.name = name
self.ssh_key = '{}-key'.format(name)
- self.rcs = [self._get_rc_name(rc) for rc in servers_cfg]
- self.k8s_objs = [KubernetesObject(self._get_rc_name(rc),
- ssh_key=self.ssh_key,
- **cfg)
- for rc, cfg in servers_cfg.items()]
- self.service_objs = [ServiceObject(s) for s in self.rcs]
-
+ self.rcs = {self._get_rc_name(rc): cfg
+ for rc, cfg in servers_cfg.items()}
+ self.k8s_objs = [ReplicationControllerObject(
+ rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
+ self.service_objs = [ServiceNodePortObject(rc, **cfg)
+ for rc, cfg in self.rcs.items()]
+ self.crd = [CustomResourceDefinitionObject(self.name, **crd)
+ for crd in crd_cfg]
+ self.network_objs = [NetworkObject(net_name, **net_data)
+ for net_name, net_data in networks_cfg.items()]
self.pods = []
def _get_rc_name(self, rc_name):
diff --git a/yardstick/tests/functional/common/messaging/test_messaging.py b/yardstick/tests/functional/common/messaging/test_messaging.py
index 99874343b..f3e31e718 100644
--- a/yardstick/tests/functional/common/messaging/test_messaging.py
+++ b/yardstick/tests/functional/common/messaging/test_messaging.py
@@ -32,25 +32,25 @@ class DummyPayload(payloads.Payload):
class DummyEndpoint(consumer.NotificationHandler):
def info(self, ctxt, **kwargs):
- if ctxt['pid'] in self._ctx_pids:
- self._queue.put('ID {}, data: {}, pid: {}'.format(
- self._id, kwargs['data'], ctxt['pid']))
+ if ctxt['id'] in self._ctx_ids:
+ self._queue.put('Nr {}, data: {}, id: {}'.format(
+ self._id, kwargs['data'], ctxt['id']))
class DummyConsumer(consumer.MessagingConsumer):
- def __init__(self, _id, ctx_pids, queue):
+ def __init__(self, _id, ctx_ids, queue):
self._id = _id
- endpoints = [DummyEndpoint(_id, ctx_pids, queue)]
- super(DummyConsumer, self).__init__(TOPIC, ctx_pids, endpoints)
+ endpoints = [DummyEndpoint(_id, ctx_ids, queue)]
+ super(DummyConsumer, self).__init__(TOPIC, ctx_ids, endpoints)
class DummyProducer(producer.MessagingProducer):
pass
-def _run_consumer(_id, ctx_pids, queue):
- _consumer = DummyConsumer(_id, ctx_pids, queue)
+def _run_consumer(_id, ctx_ids, queue):
+ _consumer = DummyConsumer(_id, ctx_ids, queue)
_consumer.start_rpc_server()
_consumer.wait()
@@ -67,8 +67,8 @@ class MessagingTestCase(base.BaseFunctionalTestCase):
num_consumers = 10
ctx_1 = 100001
ctx_2 = 100002
- producers = [DummyProducer(TOPIC, pid=ctx_1),
- DummyProducer(TOPIC, pid=ctx_2)]
+ producers = [DummyProducer(TOPIC, _id=ctx_1),
+ DummyProducer(TOPIC, _id=ctx_2)]
processes = []
for i in range(num_consumers):
@@ -91,7 +91,7 @@ class MessagingTestCase(base.BaseFunctionalTestCase):
output.append(output_queue.get(True, 1))
self.assertEqual(num_consumers * 4, len(output))
- msg_template = 'ID {}, data: {}, pid: {}'
+ msg_template = 'Nr {}, data: {}, id: {}'
for i in range(num_consumers):
for ctx in [ctx_1, ctx_2]:
for message in ['message 0', 'message 1']:
diff --git a/yardstick/tests/functional/common/test_utils.py b/yardstick/tests/functional/common/test_utils.py
index b5333bbde..b9f1f773a 100644
--- a/yardstick/tests/functional/common/test_utils.py
+++ b/yardstick/tests/functional/common/test_utils.py
@@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import multiprocessing
import unittest
+import socket
import sys
+import time
from yardstick.common import utils
@@ -32,3 +35,38 @@ class ImportModulesFromPackageTestCase(unittest.TestCase):
library_obj = getattr(module_obj, library_name)
class_obj = getattr(library_obj, class_name)
self.assertEqual(class_name, class_obj().__class__.__name__)
+
+
+class SendSocketCommandTestCase(unittest.TestCase):
+
+ @staticmethod
+ def _run_socket_server(port):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind(('localhost', port))
+ sock.listen(1)
+ conn = None
+ while not conn:
+ conn, _ = sock.accept()
+ sock.close()
+
+ @staticmethod
+ def _terminate_server(socket_server):
+ # Wait until the socket server closes the open port.
+ time.sleep(1)
+ if socket_server and socket_server.is_alive():
+ socket_server.terminate()
+
+ def test_send_command(self):
+ port = 47001
+
+ socket_server = multiprocessing.Process(
+ name='run_socket_server',
+ target=SendSocketCommandTestCase._run_socket_server,
+ args=(port, )).start()
+
+ self.addCleanup(self._terminate_server, socket_server)
+
+ # Wait until the socket is open.
+ time.sleep(0.5)
+ self.assertEqual(
+ 0, utils.send_socket_command('localhost', port, 'test_command'))
diff --git a/yardstick/tests/unit/benchmark/contexts/test_kubernetes.py b/yardstick/tests/unit/benchmark/contexts/test_kubernetes.py
index e561df9f8..941d50e82 100644
--- a/yardstick/tests/unit/benchmark/contexts/test_kubernetes.py
+++ b/yardstick/tests/unit/benchmark/contexts/test_kubernetes.py
@@ -164,12 +164,12 @@ class KubernetesTestCase(unittest.TestCase):
self.k8s_context._get_node_ip()
mock_get_node_list.assert_called_once()
- @mock.patch('yardstick.orchestrator.kubernetes.ServiceObject.create')
+ @mock.patch.object(orchestrator_kubernetes.ServiceNodePortObject, 'create')
def test_create_services(self, mock_create):
self.k8s_context._create_services()
mock_create.assert_called()
- @mock.patch('yardstick.orchestrator.kubernetes.ServiceObject.delete')
+ @mock.patch.object(orchestrator_kubernetes.ServiceNodePortObject, 'delete')
def test_delete_services(self, mock_delete):
self.k8s_context._delete_services()
mock_delete.assert_called()
diff --git a/yardstick/tests/unit/benchmark/runner/test_base.py b/yardstick/tests/unit/benchmark/runner/test_base.py
index 559c991f3..49ba1efe4 100644
--- a/yardstick/tests/unit/benchmark/runner/test_base.py
+++ b/yardstick/tests/unit/benchmark/runner/test_base.py
@@ -8,12 +8,17 @@
##############################################################################
import time
+import uuid
import mock
+from oslo_config import cfg
+import oslo_messaging
import subprocess
from yardstick.benchmark.runners import base as runner_base
from yardstick.benchmark.runners import iteration
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
from yardstick.tests.unit import base as ut_base
@@ -94,3 +99,54 @@ class RunnerTestCase(ut_base.BaseUnitTestCase):
with self.assertRaises(NotImplementedError):
runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
+
+
+class RunnerProducerTestCase(ut_base.BaseUnitTestCase):
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(cfg, 'CONF')
+ def test__init(self, mock_config, mock_transport, mock_rpcclient,
+ mock_target):
+ _id = uuid.uuid1().int
+ runner_producer = runner_base.RunnerProducer(_id)
+ mock_transport.assert_called_once_with(
+ mock_config, url='rabbit://yardstick:yardstick@localhost:5672/')
+ mock_target.assert_called_once_with(topic=messaging.TOPIC_RUNNER,
+ fanout=True,
+ server=messaging.SERVER)
+ mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target')
+ self.assertEqual(_id, runner_producer._id)
+ self.assertEqual(messaging.TOPIC_RUNNER, runner_producer._topic)
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload')
+ def test_start_iteration(self, mock_runner_payload, *args):
+ runner_producer = runner_base.RunnerProducer(uuid.uuid1().int)
+ with mock.patch.object(runner_producer,
+ 'send_message') as mock_message:
+ runner_producer.start_iteration(version=10)
+
+ mock_message.assert_called_once_with(
+ messaging.RUNNER_METHOD_START_ITERATION, 'runner_pload')
+ mock_runner_payload.assert_called_once_with(version=10, data={})
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload')
+ def test_stop_iteration(self, mock_runner_payload, *args):
+ runner_producer = runner_base.RunnerProducer(uuid.uuid1().int)
+ with mock.patch.object(runner_producer,
+ 'send_message') as mock_message:
+ runner_producer.stop_iteration(version=15)
+
+ mock_message.assert_called_once_with(
+ messaging.RUNNER_METHOD_STOP_ITERATION, 'runner_pload')
+ mock_runner_payload.assert_called_once_with(version=15, data={})
diff --git a/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py b/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py
new file mode 100644
index 000000000..10d14a8a0
--- /dev/null
+++ b/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py
@@ -0,0 +1,136 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import multiprocessing
+import time
+import os
+import uuid
+
+import mock
+
+from yardstick.benchmark.runners import iteration_ipc
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.tests.unit import base as ut_base
+
+
+class RunnerIterationIPCEndpointTestCase(ut_base.BaseUnitTestCase):
+
+ def setUp(self):
+ self._id = uuid.uuid1().int
+ self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
+ self._queue = multiprocessing.Queue()
+ self.runner = iteration_ipc.RunnerIterationIPCEndpoint(
+ self._id, self._ctx_ids, self._queue)
+ self._kwargs = {'version': 1, 'iteration': 10, 'kpi': {}}
+ self._pload_dict = payloads.TrafficGeneratorPayload.dict_to_obj(
+ self._kwargs).obj_to_dict()
+
+ def test_tg_method_started(self):
+ self._queue.empty()
+ ctxt = {'id': self._ctx_ids[0]}
+ self.runner.tg_method_started(ctxt, **self._kwargs)
+ time.sleep(0.2)
+
+ output = []
+ while not self._queue.empty():
+ output.append(self._queue.get(True, 1))
+
+ self.assertEqual(1, len(output))
+ self.assertEqual(self._ctx_ids[0], output[0]['id'])
+ self.assertEqual(messaging.TG_METHOD_STARTED, output[0]['action'])
+ self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
+
+ def test_tg_method_finished(self):
+ self._queue.empty()
+ ctxt = {'id': self._ctx_ids[0]}
+ self.runner.tg_method_finished(ctxt, **self._kwargs)
+ time.sleep(0.2)
+
+ output = []
+ while not self._queue.empty():
+ output.append(self._queue.get(True, 1))
+
+ self.assertEqual(1, len(output))
+ self.assertEqual(self._ctx_ids[0], output[0]['id'])
+ self.assertEqual(messaging.TG_METHOD_FINISHED, output[0]['action'])
+ self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
+
+ def test_tg_method_iteration(self):
+ self._queue.empty()
+ ctxt = {'id': self._ctx_ids[0]}
+ self.runner.tg_method_iteration(ctxt, **self._kwargs)
+ time.sleep(0.2)
+
+ output = []
+ while not self._queue.empty():
+ output.append(self._queue.get(True, 1))
+
+ self.assertEqual(1, len(output))
+ self.assertEqual(self._ctx_ids[0], output[0]['id'])
+ self.assertEqual(messaging.TG_METHOD_ITERATION, output[0]['action'])
+ self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
+
+
+class RunnerIterationIPCConsumerTestCase(ut_base.BaseUnitTestCase):
+
+ def setUp(self):
+ self._id = uuid.uuid1().int
+ self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
+ self.consumer = iteration_ipc.RunnerIterationIPCConsumer(
+ self._id, self._ctx_ids)
+ self.consumer._queue = mock.Mock()
+
+ def test__init(self):
+ self.assertEqual({self._ctx_ids[0]: [], self._ctx_ids[1]: []},
+ self.consumer._kpi_per_id)
+
+ def test_is_all_kpis_received_in_iteration(self):
+ payload = payloads.TrafficGeneratorPayload(
+ version=1, iteration=1, kpi={})
+ msg1 = {'action': messaging.TG_METHOD_ITERATION,
+ 'id': self._ctx_ids[0], 'payload': payload}
+ msg2 = {'action': messaging.TG_METHOD_ITERATION,
+ 'id': self._ctx_ids[1], 'payload': payload}
+ self.consumer.iteration_index = 1
+
+ self.consumer._queue.empty.side_effect = [False, True]
+ self.consumer._queue.get.return_value = msg1
+ self.assertFalse(self.consumer.is_all_kpis_received_in_iteration())
+
+ self.consumer._queue.empty.side_effect = [False, True]
+ self.consumer._queue.get.return_value = msg2
+ self.assertTrue(self.consumer.is_all_kpis_received_in_iteration())
+
+
+class IterationIPCRunnerTestCase(ut_base.BaseUnitTestCase):
+
+ @mock.patch.object(iteration_ipc, '_worker_process')
+ @mock.patch.object(os, 'getpid', return_value=12345678)
+ @mock.patch.object(multiprocessing, 'Process', return_value=mock.Mock())
+ def test__run_benchmark(self, mock_process, mock_getpid, mock_worker):
+ method = 'method'
+ scenario_cfg = {'type': 'scenario_type'}
+ context_cfg = 'context_cfg'
+ name = '%s-%s-%s' % ('IterationIPC', 'scenario_type', 12345678)
+ runner = iteration_ipc.IterationIPCRunner(mock.ANY)
+ mock_getpid.reset_mock()
+
+ runner._run_benchmark('class', method, scenario_cfg, context_cfg)
+ mock_process.assert_called_once_with(
+ name=name,
+ target=mock_worker,
+ args=(runner.result_queue, 'class', method, scenario_cfg,
+ context_cfg, runner.aborted, runner.output_queue))
+ mock_getpid.assert_called_once()
diff --git a/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py b/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py
index bb1a7aaca..77a54c0b8 100644
--- a/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py
+++ b/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py
@@ -553,6 +553,7 @@ class TestNetworkServiceTestCase(unittest.TestCase):
tgen.verify_traffic = lambda x: verified_dict
tgen.terminate = mock.Mock(return_value=True)
tgen.name = "tgen__1"
+ tgen.run_traffic.return_value = 'tg_id'
vnf = mock.Mock(autospec=GenericVNF)
vnf.runs_traffic = False
vnf.terminate = mock.Mock(return_value=True)
@@ -565,7 +566,6 @@ class TestNetworkServiceTestCase(unittest.TestCase):
self.s.load_vnf_models = mock.Mock(return_value=self.s.vnfs)
self.s._fill_traffic_profile = \
mock.Mock(return_value=TRAFFIC_PROFILE)
- self.assertIsNone(self.s.setup())
def test_setup_exception(self):
with mock.patch("yardstick.ssh.SSH") as ssh:
@@ -656,6 +656,9 @@ class TestNetworkServiceTestCase(unittest.TestCase):
)
self.assertEqual(self.s.topology, 'fake_nsd')
+ def test_get_mq_ids(self):
+ self.assertEqual(self.s._mq_ids, self.s.get_mq_ids())
+
def test_teardown(self):
vnf = mock.Mock(autospec=GenericVNF)
vnf.terminate = mock.Mock(return_value=True)
diff --git a/yardstick/tests/unit/common/messaging/test_payloads.py b/yardstick/tests/unit/common/messaging/test_payloads.py
index 00ec220c9..37b1f1926 100644
--- a/yardstick/tests/unit/common/messaging/test_payloads.py
+++ b/yardstick/tests/unit/common/messaging/test_payloads.py
@@ -44,3 +44,39 @@ class PayloadTestCase(ut_base.BaseUnitTestCase):
_dict = {'version': 2, 'key1': 'value100', 'key2': 'value200'}
payload = _DummyPayload.dict_to_obj(_dict)
self.assertEqual(set(_dict.keys()), payload._fields)
+
+
+class TrafficGeneratorPayloadTestCase(ut_base.BaseUnitTestCase):
+
+ def test_init(self):
+ tg_payload = payloads.TrafficGeneratorPayload(
+ version=1, iteration=10, kpi={'key1': 'value1'})
+ self.assertEqual(1, tg_payload.version)
+ self.assertEqual(10, tg_payload.iteration)
+ self.assertEqual({'key1': 'value1'}, tg_payload.kpi)
+ self.assertEqual(3, len(tg_payload._fields))
+
+ def test__init_missing_required_fields(self):
+ with self.assertRaises(exceptions.PayloadMissingAttributes):
+ payloads.TrafficGeneratorPayload(version=1, iteration=10)
+ with self.assertRaises(exceptions.PayloadMissingAttributes):
+ payloads.TrafficGeneratorPayload(iteration=10, kpi={})
+ with self.assertRaises(exceptions.PayloadMissingAttributes):
+ payloads.TrafficGeneratorPayload(iteration=10)
+
+
+class RunnerPayloadTestCase(ut_base.BaseUnitTestCase):
+
+ def test_init(self):
+ runner_payload = payloads.RunnerPayload(version=5,
+ data={'key1': 'value1'})
+ self.assertEqual(5, runner_payload.version)
+ self.assertEqual({'key1': 'value1'}, runner_payload.data)
+
+ def test__init_missing_required_fields(self):
+ with self.assertRaises(exceptions.PayloadMissingAttributes):
+ payloads.RunnerPayload(version=1)
+ with self.assertRaises(exceptions.PayloadMissingAttributes):
+ payloads.RunnerPayload(data=None)
+ with self.assertRaises(exceptions.PayloadMissingAttributes):
+ payloads.RunnerPayload()
diff --git a/yardstick/tests/unit/common/messaging/test_producer.py b/yardstick/tests/unit/common/messaging/test_producer.py
index 0289689dc..22286e5c3 100644
--- a/yardstick/tests/unit/common/messaging/test_producer.py
+++ b/yardstick/tests/unit/common/messaging/test_producer.py
@@ -44,3 +44,10 @@ class MessagingProducerTestCase(ut_base.BaseUnitTestCase):
topic='test_topic', fanout=True, server=messaging.SERVER)
mock_RPCClient.assert_called_once_with('test_rpc_transport',
'test_Target')
+
+ def test_id(self):
+ with mock.patch.object(oslo_messaging, 'RPCClient'), \
+ mock.patch.object(oslo_messaging, 'get_rpc_transport'), \
+ mock.patch.object(oslo_messaging, 'Target'):
+ msg_producer = _MessagingProducer('topic', 'id_to_check')
+ self.assertEqual('id_to_check', msg_producer.id)
diff --git a/yardstick/tests/unit/common/test_kubernetes_utils.py b/yardstick/tests/unit/common/test_kubernetes_utils.py
new file mode 100644
index 000000000..bf9992b57
--- /dev/null
+++ b/yardstick/tests/unit/common/test_kubernetes_utils.py
@@ -0,0 +1,224 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+from kubernetes import client
+from kubernetes.client import rest
+from kubernetes import config
+
+from yardstick.common import constants
+from yardstick.common import exceptions
+from yardstick.common import kubernetes_utils
+from yardstick.tests.unit import base
+
+
+class GetExtensionsV1betaApiTestCase(base.BaseUnitTestCase):
+
+ @mock.patch.object(client, 'ApiextensionsV1beta1Api', return_value='api')
+ @mock.patch.object(config, 'load_kube_config')
+ def test_execute_correct(self, mock_load_kube_config, mock_api):
+ self.assertEqual('api', kubernetes_utils.get_extensions_v1beta_api())
+ mock_load_kube_config.assert_called_once_with(
+ config_file=constants.K8S_CONF_FILE)
+ mock_api.assert_called_once()
+
+ @mock.patch.object(config, 'load_kube_config')
+ def test_execute_exception(self, mock_load_kube_config):
+ mock_load_kube_config.side_effect = IOError
+ with self.assertRaises(exceptions.KubernetesConfigFileNotFound):
+ kubernetes_utils.get_extensions_v1beta_api()
+
+
+class GetCustomObjectsApiTestCase(base.BaseUnitTestCase):
+
+ @mock.patch.object(client, 'CustomObjectsApi', return_value='api')
+ @mock.patch.object(config, 'load_kube_config')
+ def test_execute_correct(self, mock_load_kube_config, mock_api):
+ self.assertEqual('api', kubernetes_utils.get_custom_objects_api())
+ mock_load_kube_config.assert_called_once_with(
+ config_file=constants.K8S_CONF_FILE)
+ mock_api.assert_called_once()
+
+ @mock.patch.object(config, 'load_kube_config')
+ def test_execute_exception(self, mock_load_kube_config):
+ mock_load_kube_config.side_effect = IOError
+ with self.assertRaises(exceptions.KubernetesConfigFileNotFound):
+ kubernetes_utils.get_custom_objects_api()
+
+
+class CreateCustomResourceDefinitionTestCase(base.BaseUnitTestCase):
+
+ @mock.patch.object(client, 'V1beta1CustomResourceDefinition',
+ return_value='crd_obj')
+ @mock.patch.object(kubernetes_utils, 'get_extensions_v1beta_api')
+ def test_execute_correct(self, mock_get_api, mock_crd):
+ mock_create_crd = mock.Mock()
+ mock_get_api.return_value = mock_create_crd
+ body = {'spec': 'fake_spec', 'metadata': 'fake_metadata'}
+
+ kubernetes_utils.create_custom_resource_definition(body)
+ mock_get_api.assert_called_once()
+ mock_crd.assert_called_once_with(spec='fake_spec',
+ metadata='fake_metadata')
+ mock_create_crd.create_custom_resource_definition.\
+ assert_called_once_with('crd_obj')
+
+ @mock.patch.object(client, 'V1beta1CustomResourceDefinition',
+ return_value='crd_obj')
+ @mock.patch.object(kubernetes_utils, 'get_extensions_v1beta_api')
+ def test_execute_exception(self, mock_get_api, mock_crd):
+ mock_create_crd = mock.Mock()
+ mock_create_crd.create_custom_resource_definition.\
+ side_effect = rest.ApiException
+ mock_get_api.return_value = mock_create_crd
+ body = {'spec': 'fake_spec', 'metadata': 'fake_metadata'}
+
+ with self.assertRaises(exceptions.KubernetesApiException):
+ kubernetes_utils.create_custom_resource_definition(body)
+ mock_get_api.assert_called_once()
+ mock_crd.assert_called_once_with(spec='fake_spec',
+ metadata='fake_metadata')
+ mock_create_crd.create_custom_resource_definition.\
+ assert_called_once_with('crd_obj')
+
+
+class DeleteCustomResourceDefinitionTestCase(base.BaseUnitTestCase):
+
+ @mock.patch.object(client, 'V1DeleteOptions', return_value='del_obj')
+ @mock.patch.object(kubernetes_utils, 'get_extensions_v1beta_api')
+ def test_execute_correct(self, mock_get_api, mock_delobj):
+ mock_delete_crd = mock.Mock()
+ mock_get_api.return_value = mock_delete_crd
+
+ kubernetes_utils.delete_custom_resource_definition('name')
+ mock_get_api.assert_called_once()
+ mock_delobj.assert_called_once()
+ mock_delete_crd.delete_custom_resource_definition.\
+ assert_called_once_with('name', 'del_obj')
+
+ @mock.patch.object(client, 'V1DeleteOptions', return_value='del_obj')
+ @mock.patch.object(kubernetes_utils, 'get_extensions_v1beta_api')
+ def test_execute_exception(self, mock_get_api, mock_delobj):
+ mock_delete_crd = mock.Mock()
+ mock_delete_crd.delete_custom_resource_definition.\
+ side_effect = rest.ApiException
+ mock_get_api.return_value = mock_delete_crd
+
+ with self.assertRaises(exceptions.KubernetesApiException):
+ kubernetes_utils.delete_custom_resource_definition('name')
+ mock_delobj.assert_called_once()
+ mock_delete_crd.delete_custom_resource_definition.\
+ assert_called_once_with('name', 'del_obj')
+
+
+class GetCustomResourceDefinitionTestCase(base.BaseUnitTestCase):
+
+ @mock.patch.object(kubernetes_utils, 'get_extensions_v1beta_api')
+ def test_execute_value(self, mock_get_api):
+ crd_obj = mock.Mock()
+ crd_obj.spec.names.kind = 'some_kind'
+ crd_list = mock.Mock()
+ crd_list.items = [crd_obj]
+ mock_api = mock.Mock()
+ mock_api.list_custom_resource_definition.return_value = crd_list
+ mock_get_api.return_value = mock_api
+ self.assertEqual(
+ crd_obj,
+ kubernetes_utils.get_custom_resource_definition('some_kind'))
+
+ @mock.patch.object(kubernetes_utils, 'get_extensions_v1beta_api')
+ def test_execute_none(self, mock_get_api):
+ crd_obj = mock.Mock()
+ crd_obj.spec.names.kind = 'some_kind'
+ crd_list = mock.Mock()
+ crd_list.items = [crd_obj]
+ mock_api = mock.Mock()
+ mock_api.list_custom_resource_definition.return_value = crd_list
+ mock_get_api.return_value = mock_api
+ self.assertIsNone(
+ kubernetes_utils.get_custom_resource_definition('other_kind'))
+
+ @mock.patch.object(kubernetes_utils, 'get_extensions_v1beta_api')
+ def test_execute_exception(self, mock_get_api):
+ mock_api = mock.Mock()
+ mock_api.list_custom_resource_definition.\
+ side_effect = rest.ApiException
+ mock_get_api.return_value = mock_api
+ with self.assertRaises(exceptions.KubernetesApiException):
+ kubernetes_utils.get_custom_resource_definition('kind')
+
+
+class CreateNetworkTestCase(base.BaseUnitTestCase):
+ @mock.patch.object(kubernetes_utils, 'get_custom_objects_api')
+ def test_execute_correct(self, mock_get_api):
+ mock_api = mock.Mock()
+ mock_get_api.return_value = mock_api
+ group = 'group.com'
+ version = mock.Mock()
+ plural = 'networks'
+ body = mock.Mock()
+
+ kubernetes_utils.create_network(
+ constants.SCOPE_CLUSTER, group, version, plural, body)
+ mock_api.create_cluster_custom_object.assert_called_once_with(
+ group, version, plural, body)
+
+ mock_api.reset_mock()
+ kubernetes_utils.create_network(
+ constants.SCOPE_NAMESPACED, group, version, plural, body)
+ mock_api.create_namespaced_custom_object.assert_called_once_with(
+ group, version, 'default', plural, body)
+
+
+ @mock.patch.object(kubernetes_utils, 'get_custom_objects_api')
+ def test_execute_exception(self, mock_get_api):
+ mock_api = mock.Mock()
+ mock_api.create_cluster_custom_object.side_effect = rest.ApiException
+ mock_get_api.return_value = mock_api
+ with self.assertRaises(exceptions.KubernetesApiException):
+ kubernetes_utils.create_network(
+ constants.SCOPE_CLUSTER, mock.ANY, mock.ANY, mock.ANY,
+ mock.ANY)
+
+
+class DeleteNetworkTestCase(base.BaseUnitTestCase):
+ @mock.patch.object(kubernetes_utils, 'get_custom_objects_api')
+ def test_execute_correct(self, mock_get_api):
+ mock_api = mock.Mock()
+ mock_get_api.return_value = mock_api
+ group = 'group.com'
+ version = mock.Mock()
+ plural = 'networks'
+ name = 'network'
+
+ kubernetes_utils.delete_network(
+ constants.SCOPE_CLUSTER, group, version, plural, name)
+ mock_api.delete_cluster_custom_object.assert_called_once_with(
+ group, version, plural, name, {})
+
+ mock_api.reset_mock()
+ kubernetes_utils.delete_network(
+ constants.SCOPE_NAMESPACED, group, version, plural, name)
+ mock_api.delete_namespaced_custom_object.assert_called_once_with(
+ group, version, 'default', plural, name, {})
+
+ @mock.patch.object(kubernetes_utils, 'get_custom_objects_api')
+ def test_execute_exception(self, mock_get_api):
+ mock_api = mock.Mock()
+ mock_api.delete_cluster_custom_object.side_effect = rest.ApiException
+ mock_get_api.return_value = mock_api
+ with self.assertRaises(exceptions.KubernetesApiException):
+ kubernetes_utils.delete_network(
+ constants.SCOPE_CLUSTER, mock.ANY, mock.ANY, mock.ANY,
+ mock.ANY)
diff --git a/yardstick/tests/unit/common/test_utils.py b/yardstick/tests/unit/common/test_utils.py
index 6247afd18..446afdd38 100644
--- a/yardstick/tests/unit/common/test_utils.py
+++ b/yardstick/tests/unit/common/test_utils.py
@@ -16,6 +16,7 @@ import mock
import os
import six
from six.moves import configparser
+import socket
import time
import unittest
@@ -1282,3 +1283,29 @@ class WaitUntilTrueTestCase(ut_base.BaseUnitTestCase):
self.assertIsNone(
utils.wait_until_true(lambda: False, timeout=1, sleep=1,
exception=MyTimeoutException))
+
+
+class SendSocketCommandTestCase(unittest.TestCase):
+
+ @mock.patch.object(socket, 'socket')
+ def test_execute_correct(self, mock_socket):
+ mock_socket_obj = mock.Mock()
+ mock_socket_obj.connect_ex.return_value = 0
+ mock_socket.return_value = mock_socket_obj
+ self.assertEqual(0, utils.send_socket_command('host', 22, 'command'))
+ mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
+ mock_socket_obj.connect_ex.assert_called_once_with(('host', 22))
+ mock_socket_obj.sendall.assert_called_once_with(six.b('command'))
+ mock_socket_obj.close.assert_called_once()
+
+ @mock.patch.object(socket, 'socket')
+ def test_execute_exception(self, mock_socket):
+ mock_socket_obj = mock.Mock()
+ mock_socket_obj.connect_ex.return_value = 0
+ mock_socket.return_value = mock_socket_obj
+ mock_socket_obj.sendall.side_effect = socket.error
+ self.assertEqual(1, utils.send_socket_command('host', 22, 'command'))
+ mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
+ mock_socket_obj.connect_ex.assert_called_once_with(('host', 22))
+ mock_socket_obj.sendall.assert_called_once_with(six.b('command'))
+ mock_socket_obj.close.assert_called_once()
diff --git a/yardstick/tests/unit/network_services/libs/ixia_libs/test_ixnet_api.py b/yardstick/tests/unit/network_services/libs/ixia_libs/test_ixnet_api.py
index 34afa3d5b..541855aa8 100644
--- a/yardstick/tests/unit/network_services/libs/ixia_libs/test_ixnet_api.py
+++ b/yardstick/tests/unit/network_services/libs/ixia_libs/test_ixnet_api.py
@@ -203,13 +203,9 @@ class TestIxNextgen(unittest.TestCase):
ixnet_gen._ixnet = self.ixnet
framesize = {'64B': '75', '512b': '25'}
output = ixnet_gen._parse_framesize(framesize)
- for idx in range(len(framesize)):
- if output[idx * 2] == 64:
- self.assertEqual(75, output[idx * 2 + 1])
- elif output[idx * 2] == 512:
- self.assertEqual(25, output[idx * 2 + 1])
- else:
- raise self.failureException('Framesize (64, 512) not present')
+ self.assertEqual(2, len(output))
+ self.assertIn([64, 64, 75], output)
+ self.assertIn([512, 512, 25], output)
@mock.patch.object(IxNetwork, 'IxNet')
def test_connect(self, mock_ixnet):
diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py
index ebedcb451..43e5ac839 100644
--- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py
+++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py
@@ -15,10 +15,15 @@
import multiprocessing
import os
+import uuid
import mock
+from oslo_config import cfg
+import oslo_messaging
import unittest
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
from yardstick.network_services.vnf_generic.vnf import base
from yardstick.ssh import SSH
@@ -140,6 +145,24 @@ VNFD = {
}
+class _DummyGenericTrafficGen(base.GenericTrafficGen): # pragma: no cover
+
+ def run_traffic(self, *args):
+ pass
+
+ def terminate(self):
+ pass
+
+ def collect_kpi(self):
+ pass
+
+ def instantiate(self, *args):
+ pass
+
+ def scale(self, flavor=''):
+ pass
+
+
class FileAbsPath(object):
def __init__(self, module_file):
super(FileAbsPath, self).__init__()
@@ -221,7 +244,7 @@ class TestGenericVNF(unittest.TestCase):
self.assertEqual(msg, str(exc.exception))
-class TestGenericTrafficGen(unittest.TestCase):
+class GenericTrafficGenTestCase(unittest.TestCase):
def test_definition(self):
"""Make sure that the abstract class cannot be instantiated"""
@@ -234,3 +257,81 @@ class TestGenericTrafficGen(unittest.TestCase):
"abstract methods collect_kpi, instantiate, run_traffic, "
"scale, terminate")
self.assertEqual(msg, str(exc.exception))
+
+ def test_get_mq_producer_id(self):
+ vnfd = {'benchmark': {'kpi': mock.ANY},
+ 'vdu': [{'external-interface': 'ext_int'}]
+ }
+ tg = _DummyGenericTrafficGen('name', vnfd)
+ tg._mq_producer = mock.Mock()
+ tg._mq_producer.get_id.return_value = 'fake_id'
+ self.assertEqual('fake_id', tg.get_mq_producer_id())
+
+
+class TrafficGeneratorProducerTestCase(unittest.TestCase):
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(cfg, 'CONF')
+ def test__init(self, mock_config, mock_transport, mock_rpcclient,
+ mock_target):
+ _id = uuid.uuid1().int
+ tg_producer = base.TrafficGeneratorProducer(_id)
+ mock_transport.assert_called_once_with(
+ mock_config, url='rabbit://yardstick:yardstick@localhost:5672/')
+ mock_target.assert_called_once_with(topic=messaging.TOPIC_TG,
+ fanout=True,
+ server=messaging.SERVER)
+ mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target')
+ self.assertEqual(_id, tg_producer._id)
+ self.assertEqual(messaging.TOPIC_TG, tg_producer._topic)
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+ return_value='tg_pload')
+ def test_tg_method_started(self, mock_tg_payload, *args):
+ tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+ with mock.patch.object(tg_producer, 'send_message') as mock_message:
+ tg_producer.tg_method_started(version=10)
+
+ mock_message.assert_called_once_with(messaging.TG_METHOD_STARTED,
+ 'tg_pload')
+ mock_tg_payload.assert_called_once_with(version=10, iteration=0,
+ kpi={})
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+ return_value='tg_pload')
+ def test_tg_method_finished(self, mock_tg_payload, *args):
+ tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+ with mock.patch.object(tg_producer, 'send_message') as mock_message:
+ tg_producer.tg_method_finished(version=20)
+
+ mock_message.assert_called_once_with(messaging.TG_METHOD_FINISHED,
+ 'tg_pload')
+ mock_tg_payload.assert_called_once_with(version=20, iteration=0,
+ kpi={})
+
+ @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target')
+ @mock.patch.object(oslo_messaging, 'RPCClient')
+ @mock.patch.object(oslo_messaging, 'get_rpc_transport',
+ return_value='rpc_transport')
+ @mock.patch.object(payloads, 'TrafficGeneratorPayload',
+ return_value='tg_pload')
+ def test_tg_method_iteration(self, mock_tg_payload, *args):
+ tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int)
+ with mock.patch.object(tg_producer, 'send_message') as mock_message:
+ tg_producer.tg_method_iteration(100, version=30, kpi={'k': 'v'})
+
+ mock_message.assert_called_once_with(messaging.TG_METHOD_ITERATION,
+ 'tg_pload')
+ mock_tg_payload.assert_called_once_with(version=30, iteration=100,
+ kpi={'k': 'v'})
diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py
index 331e80d00..48ae3b505 100644
--- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py
+++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py
@@ -1090,6 +1090,57 @@ class TestClientResourceHelper(unittest.TestCase):
self.assertIs(client_resource_helper._connect(client), client)
+ @mock.patch.object(ClientResourceHelper, '_build_ports')
+ @mock.patch.object(ClientResourceHelper, '_run_traffic_once')
+ def test_run_traffic(self, mock_run_traffic_once, mock_build_ports):
+ client_resource_helper = ClientResourceHelper(mock.Mock())
+ client = mock.Mock()
+ traffic_profile = mock.Mock()
+ mq_producer = mock.Mock()
+ with mock.patch.object(client_resource_helper, '_connect') \
+ as mock_connect, \
+ mock.patch.object(client_resource_helper, '_terminated') \
+ as mock_terminated:
+ mock_connect.return_value = client
+ type(mock_terminated).value = mock.PropertyMock(
+ side_effect=[0, 1, lambda x: x])
+ client_resource_helper.run_traffic(traffic_profile, mq_producer)
+
+ mock_build_ports.assert_called_once()
+ traffic_profile.register_generator.assert_called_once()
+ mq_producer.tg_method_started.assert_called_once()
+ mq_producer.tg_method_finished.assert_called_once()
+ mq_producer.tg_method_iteration.assert_called_once_with(1)
+ mock_run_traffic_once.assert_called_once_with(traffic_profile)
+
+ @mock.patch.object(ClientResourceHelper, '_build_ports')
+ @mock.patch.object(ClientResourceHelper, '_run_traffic_once',
+ side_effect=Exception)
+ def test_run_traffic_exception(self, mock_run_traffic_once,
+ mock_build_ports):
+ client_resource_helper = ClientResourceHelper(mock.Mock())
+ client = mock.Mock()
+ traffic_profile = mock.Mock()
+ mq_producer = mock.Mock()
+ with mock.patch.object(client_resource_helper, '_connect') \
+ as mock_connect, \
+ mock.patch.object(client_resource_helper, '_terminated') \
+ as mock_terminated:
+ mock_connect.return_value = client
+ type(mock_terminated).value = mock.PropertyMock(return_value=0)
+ mq_producer.reset_mock()
+ # NOTE(ralonsoh): "trex_stl_exceptions.STLError" is mocked
+ with self.assertRaises(Exception):
+ client_resource_helper.run_traffic(traffic_profile,
+ mq_producer)
+
+ mock_build_ports.assert_called_once()
+ traffic_profile.register_generator.assert_called_once()
+ mock_run_traffic_once.assert_called_once_with(traffic_profile)
+ mq_producer.tg_method_started.assert_called_once()
+ mq_producer.tg_method_finished.assert_not_called()
+ mq_producer.tg_method_iteration.assert_not_called()
+
class TestRfc2544ResourceHelper(unittest.TestCase):
diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py
index 050aa4aa0..3e2f598d2 100644
--- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py
+++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py
@@ -406,7 +406,8 @@ class TestProxTrafficGen(unittest.TestCase):
sut.setup_helper.prox_config_dict = {}
sut._connect_client = mock.Mock(autospec=STLClient)
sut._connect_client.get_stats = mock.Mock(return_value="0")
- sut._traffic_runner(mock_traffic_profile)
+ sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
+ sut._traffic_runner(mock_traffic_profile, mock.ANY)
@mock.patch('yardstick.network_services.vnf_generic.vnf.prox_helpers.socket')
@mock.patch(SSH_HELPER)
diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py
index 5d6ce29cf..f34faaec7 100644
--- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py
+++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py
@@ -381,7 +381,8 @@ class TestIXIATrafficGen(unittest.TestCase):
mock.mock_open(), create=True)
@mock.patch('yardstick.network_services.vnf_generic.vnf.tg_rfc2544_ixia.LOG.exception')
def _traffic_runner(*args):
- result = sut._traffic_runner(mock_traffic_profile)
+ sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
+ result = sut._traffic_runner(mock_traffic_profile, mock.ANY)
self.assertIsNone(result)
_traffic_runner()
diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py
index 350ba8448..700e910f9 100644
--- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py
+++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py
@@ -387,8 +387,9 @@ class TestTrexTrafficGen(unittest.TestCase):
# must generate cfg before we can run traffic so Trex port mapping is
# created
self.sut.resource_helper.generate_cfg()
+ self.sut._setup_mq_producer = mock.Mock()
with mock.patch.object(self.sut.resource_helper, 'run_traffic'):
- self.sut._traffic_runner(mock_traffic_profile)
+ self.sut._traffic_runner(mock_traffic_profile, mock.ANY)
def test__generate_trex_cfg(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
@@ -453,9 +454,8 @@ class TestTrexTrafficGen(unittest.TestCase):
self.sut.ssh_helper.run = mock.Mock()
self.sut._traffic_runner = mock.Mock(return_value=0)
self.sut.resource_helper.client_started.value = 1
- result = self.sut.run_traffic(mock_traffic_profile)
+ self.sut.run_traffic(mock_traffic_profile)
self.sut._traffic_process.terminate()
- self.assertIsNotNone(result)
def test_terminate(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
diff --git a/yardstick/tests/unit/orchestrator/test_kubernetes.py b/yardstick/tests/unit/orchestrator/test_kubernetes.py
index 4323c026a..f248338ee 100644
--- a/yardstick/tests/unit/orchestrator/test_kubernetes.py
+++ b/yardstick/tests/unit/orchestrator/test_kubernetes.py
@@ -66,7 +66,11 @@ service ssh restart;while true ; do sleep 10000; done"
],
"nodeSelector": {
"kubernetes.io/hostname": "node-01"
- }
+ },
+ "restartPolicy": "Always",
+ "tolerations": [
+ {"operator": "Exists"}
+ ]
}
}
}
@@ -77,12 +81,21 @@ service ssh restart;while true ; do sleep 10000; done"
service ssh restart;while true ; do sleep 10000; done'],
'ssh_key': 'k8s-86096c30-key',
'nodeSelector': {'kubernetes.io/hostname': 'node-01'},
- 'volumes': []
+ 'volumes': [],
+ 'restartPolicy': 'Always'
}
name = 'host-k8s-86096c30'
- output_r = kubernetes.KubernetesObject(name, **input_s).get_template()
+ output_r = kubernetes.ReplicationControllerObject(
+ name, **input_s).get_template()
self.assertEqual(output_r, output_t)
+ def test_get_template_invalid_restart_policy(self):
+ input_s = {'restartPolicy': 'invalid_option'}
+ name = 'host-k8s-86096c30'
+ with self.assertRaises(exceptions.KubernetesWrongRestartPolicy):
+ kubernetes.ReplicationControllerObject(
+ name, **input_s).get_template()
+
class GetRcPodsTestCase(base.BaseUnitTestCase):
@@ -108,14 +121,14 @@ service ssh restart;while true ; do sleep 10000; done']
self.assertEqual(pods, [])
-class KubernetesObjectTestCase(base.BaseUnitTestCase):
+class ReplicationControllerObjectTestCase(base.BaseUnitTestCase):
def test__init_one_container(self):
pod_name = 'pod_name'
_kwargs = {'args': ['arg1', 'arg2'],
'image': 'fake_image',
'command': 'fake_command'}
- k8s_obj = kubernetes.KubernetesObject(pod_name, **_kwargs)
+ k8s_obj = kubernetes.ReplicationControllerObject(pod_name, **_kwargs)
self.assertEqual(1, len(k8s_obj._containers))
container = k8s_obj._containers[0]
self.assertEqual(['arg1', 'arg2'], container._args)
@@ -131,7 +144,7 @@ class KubernetesObjectTestCase(base.BaseUnitTestCase):
'image': 'fake_image_%s' % i,
'command': 'fake_command_%s' % i})
_kwargs = {'containers': containers}
- k8s_obj = kubernetes.KubernetesObject(pod_name, **_kwargs)
+ k8s_obj = kubernetes.ReplicationControllerObject(pod_name, **_kwargs)
self.assertEqual(5, len(k8s_obj._containers))
for i in range(5):
container = k8s_obj._containers[i]
@@ -145,8 +158,8 @@ class KubernetesObjectTestCase(base.BaseUnitTestCase):
'configMap': {'name': 'fake_sshkey'}}
volume2 = {'name': 'volume2',
'configMap': 'data'}
- k8s_obj = kubernetes.KubernetesObject('name', ssh_key='fake_sshkey',
- volumes=[volume2])
+ k8s_obj = kubernetes.ReplicationControllerObject(
+ 'name', ssh_key='fake_sshkey', volumes=[volume2])
k8s_obj._add_volumes()
volumes = k8s_obj.template['spec']['template']['spec']['volumes']
self.assertEqual(sorted([volume1, volume2], key=lambda k: k['name']),
@@ -155,7 +168,8 @@ class KubernetesObjectTestCase(base.BaseUnitTestCase):
def test__add_volumes_no_volumes(self):
volume1 = {'name': 'fake_sshkey',
'configMap': {'name': 'fake_sshkey'}}
- k8s_obj = kubernetes.KubernetesObject('name', ssh_key='fake_sshkey')
+ k8s_obj = kubernetes.ReplicationControllerObject(
+ 'name', ssh_key='fake_sshkey')
k8s_obj._add_volumes()
volumes = k8s_obj.template['spec']['template']['spec']['volumes']
self.assertEqual([volume1], volumes)
@@ -163,7 +177,8 @@ class KubernetesObjectTestCase(base.BaseUnitTestCase):
def test__create_ssh_key_volume(self):
expected = {'name': 'fake_sshkey',
'configMap': {'name': 'fake_sshkey'}}
- k8s_obj = kubernetes.KubernetesObject('name', ssh_key='fake_sshkey')
+ k8s_obj = kubernetes.ReplicationControllerObject(
+ 'name', ssh_key='fake_sshkey')
self.assertEqual(expected, k8s_obj._create_ssh_key_volume())
def test__create_volume_item(self):
@@ -172,13 +187,76 @@ class KubernetesObjectTestCase(base.BaseUnitTestCase):
vol_type: 'data'}
self.assertEqual(
volume,
- kubernetes.KubernetesObject._create_volume_item(volume))
+ kubernetes.ReplicationControllerObject.
+ _create_volume_item(volume))
def test__create_volume_item_invalid_type(self):
volume = {'name': 'vol_name',
'invalid_type': 'data'}
with self.assertRaises(exceptions.KubernetesTemplateInvalidVolumeType):
- kubernetes.KubernetesObject._create_volume_item(volume)
+ kubernetes.ReplicationControllerObject._create_volume_item(volume)
+
+ def test__add_security_context(self):
+ k8s_obj = kubernetes.ReplicationControllerObject('pod_name')
+ self.assertNotIn('securityContext',
+ k8s_obj.template['spec']['template']['spec'])
+
+ k8s_obj._security_context = {'key_pod': 'value_pod'}
+ k8s_obj._add_security_context()
+ self.assertEqual(
+ {'key_pod': 'value_pod'},
+ k8s_obj.template['spec']['template']['spec']['securityContext'])
+
+ def test__add_security_context_by_init(self):
+ containers = []
+ for i in range(5):
+ containers.append(
+ {'securityContext': {'key%s' % i: 'value%s' % i}})
+ _kwargs = {'containers': containers,
+ 'securityContext': {'key_pod': 'value_pod'}}
+ k8s_obj = kubernetes.ReplicationControllerObject('pod_name', **_kwargs)
+ self.assertEqual(
+ {'key_pod': 'value_pod'},
+ k8s_obj.template['spec']['template']['spec']['securityContext'])
+ for i in range(5):
+ container = (
+ k8s_obj.template['spec']['template']['spec']['containers'][i])
+ self.assertEqual({'key%s' % i: 'value%s' % i},
+ container['securityContext'])
+
+ def test__add_networks(self):
+ k8s_obj = kubernetes.ReplicationControllerObject(
+ 'name', networks=['network1', 'network2', 'network3'])
+ k8s_obj._add_networks()
+ networks = k8s_obj.\
+ template['spec']['template']['metadata']['annotations']['networks']
+ expected = ('[{"name": "network1"}, {"name": "network2"}, '
+ '{"name": "network3"}]')
+ self.assertEqual(expected, networks)
+
+ def test__add_tolerations(self):
+ _kwargs = {'tolerations': [{'key': 'key1',
+ 'value': 'value2',
+ 'effect': 'effect3',
+ 'operator': 'operator4',
+ 'wrong_key': 'error_key'}]
+ }
+ k8s_obj = kubernetes.ReplicationControllerObject('pod_name', **_kwargs)
+ k8s_obj._add_tolerations()
+ _tol = k8s_obj.template['spec']['template']['spec']['tolerations']
+ self.assertEqual(1, len(_tol))
+ self.assertEqual({'key': 'key1',
+ 'value': 'value2',
+ 'effect': 'effect3',
+ 'operator': 'operator4'},
+ _tol[0])
+
+ def test__add_tolerations_default(self):
+ k8s_obj = kubernetes.ReplicationControllerObject('pod_name')
+ k8s_obj._add_tolerations()
+ _tol = k8s_obj.template['spec']['template']['spec']['tolerations']
+ self.assertEqual(1, len(_tol))
+ self.assertEqual({'operator': 'Exists'}, _tol[0])
class ContainerObjectTestCase(base.BaseUnitTestCase):
@@ -227,3 +305,244 @@ class ContainerObjectTestCase(base.BaseUnitTestCase):
'name': 'cname-container',
'volumeMounts': container_obj._create_volume_mounts()}
self.assertEqual(expected, container_obj.get_container_item())
+
+ def test_get_container_item_with_security_context(self):
+ volume_mount = {'name': 'fake_name',
+ 'mountPath': 'fake_path'}
+ args = ['arg1', 'arg2']
+ container_obj = kubernetes.ContainerObject(
+ 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
+ args=args, securityContext={'key': 'value'})
+ expected = {'args': args,
+ 'command': [kubernetes.ContainerObject.COMMAND_DEFAULT],
+ 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
+ 'name': 'cname-container',
+ 'volumeMounts': container_obj._create_volume_mounts(),
+ 'securityContext': {'key': 'value'}}
+ self.assertEqual(expected, container_obj.get_container_item())
+
+ def test_get_container_item_with_env(self):
+ volume_mount = {'name': 'fake_name',
+ 'mountPath': 'fake_path'}
+ args = ['arg1', 'arg2']
+ container_obj = kubernetes.ContainerObject(
+ 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
+ args=args, env=[{'name': 'fake_var_name',
+ 'value': 'fake_var_value'}])
+ expected = {'args': args,
+ 'command': [kubernetes.ContainerObject.COMMAND_DEFAULT],
+ 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
+ 'name': 'cname-container',
+ 'volumeMounts': container_obj._create_volume_mounts(),
+ 'env': [{'name': 'fake_var_name',
+ 'value': 'fake_var_value'}]}
+ self.assertEqual(expected, container_obj.get_container_item())
+
+ def test_get_container_item_with_ports_multi_parameter(self):
+ volume_mount = {'name': 'fake_name',
+ 'mountPath': 'fake_path'}
+ args = ['arg1', 'arg2']
+ container_obj = kubernetes.ContainerObject(
+ 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
+ args=args, ports=[{'containerPort': 'fake_port_name',
+ 'hostPort': 'fake_host_port',
+ 'name': 'fake_name',
+ 'protocol': 'fake_protocol',
+ 'invalid_varible': 'fakeinvalid_varible',
+ 'hostIP': 'fake_port_number'}])
+ expected = {'args': args,
+ 'command': [
+ kubernetes.ContainerObject.COMMAND_DEFAULT],
+ 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
+ 'name': 'cname-container',
+ 'volumeMounts': container_obj._create_volume_mounts(),
+ 'ports': [{'containerPort': 'fake_port_name',
+ 'hostPort': 'fake_host_port',
+ 'name': 'fake_name',
+ 'protocol': 'fake_protocol',
+ 'hostIP': 'fake_port_number'}]}
+ self.assertEqual(expected, container_obj.get_container_item())
+
+ def test_get_container_item_with_ports_no_container_port(self):
+ with self.assertRaises(exceptions.KubernetesContainerPortNotDefined):
+ volume_mount = {'name': 'fake_name',
+ 'mountPath': 'fake_path'}
+ args = ['arg1', 'arg2']
+ container_obj = kubernetes.ContainerObject(
+ 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
+ args=args, ports=[{'hostPort': 'fake_host_port',
+ 'name': 'fake_name',
+ 'protocol': 'fake_protocol',
+ 'hostIP': 'fake_port_number'}])
+ container_obj.get_container_item()
+
+ def test_get_container_item_with_resources(self):
+ volume_mount = {'name': 'fake_name',
+ 'mountPath': 'fake_path'}
+ args = ['arg1', 'arg2']
+ resources = {'requests': {'key1': 'val1'},
+ 'limits': {'key2': 'val2'},
+ 'other_key': {'key3': 'val3'}}
+ container_obj = kubernetes.ContainerObject(
+ 'cname', ssh_key='fake_sshkey', volumeMount=[volume_mount],
+ args=args, resources=resources)
+ expected = {'args': args,
+ 'command': [kubernetes.ContainerObject.COMMAND_DEFAULT],
+ 'image': kubernetes.ContainerObject.IMAGE_DEFAULT,
+ 'name': 'cname-container',
+ 'volumeMounts': container_obj._create_volume_mounts(),
+ 'resources': {'requests': {'key1': 'val1'},
+ 'limits': {'key2': 'val2'}}}
+ self.assertEqual(expected, container_obj.get_container_item())
+
+
+class CustomResourceDefinitionObjectTestCase(base.BaseUnitTestCase):
+
+ def test__init(self):
+ template = {
+ 'metadata': {
+ 'name': 'newcrds.ctx_name.com'
+ },
+ 'spec': {
+ 'group': 'ctx_name.com',
+ 'version': 'v2',
+ 'scope': 'scope',
+ 'names': {'plural': 'newcrds',
+ 'singular': 'newcrd',
+ 'kind': 'Newcrd'}
+ }
+ }
+ crd_obj = kubernetes.CustomResourceDefinitionObject(
+ 'ctx_name', name='newcrd', version='v2', scope='scope')
+ self.assertEqual('newcrds.ctx_name.com', crd_obj._name)
+ self.assertEqual(template, crd_obj._template)
+
+ def test__init_missing_parameter(self):
+ with self.assertRaises(exceptions.KubernetesCRDObjectDefinitionError):
+ kubernetes.CustomResourceDefinitionObject('ctx_name',
+ noname='name')
+
+
+class NetworkObjectTestCase(base.BaseUnitTestCase):
+
+ def setUp(self):
+ self.net_obj = kubernetes.NetworkObject(name='fake_name',
+ plugin='fake_plugin',
+ args='fake_args')
+
+ def test__init_missing_parameter(self):
+ with self.assertRaises(
+ exceptions.KubernetesNetworkObjectDefinitionError):
+ kubernetes.NetworkObject('network_name', plugin='plugin')
+ with self.assertRaises(
+ exceptions.KubernetesNetworkObjectDefinitionError):
+ kubernetes.NetworkObject('network_name', args='args')
+
+ @mock.patch.object(kubernetes_utils, 'get_custom_resource_definition')
+ def test_crd(self, mock_get_crd):
+ mock_crd = mock.Mock()
+ mock_get_crd.return_value = mock_crd
+ net_obj = copy.deepcopy(self.net_obj)
+ self.assertEqual(mock_crd, net_obj.crd)
+
+ def test_template(self):
+ net_obj = copy.deepcopy(self.net_obj)
+ expected = {'apiVersion': 'group.com/v2',
+ 'kind': kubernetes.NetworkObject.KIND,
+ 'metadata': {
+ 'name': 'fake_name'},
+ 'plugin': 'fake_plugin',
+ 'args': 'fake_args'}
+ crd = mock.Mock()
+ crd.spec.group = 'group.com'
+ crd.spec.version = 'v2'
+ net_obj._crd = crd
+ self.assertEqual(expected, net_obj.template)
+
+ def test_group(self):
+ net_obj = copy.deepcopy(self.net_obj)
+ net_obj._crd = mock.Mock()
+ net_obj._crd.spec.group = 'fake_group'
+ self.assertEqual('fake_group', net_obj.group)
+
+ def test_version(self):
+ net_obj = copy.deepcopy(self.net_obj)
+ net_obj._crd = mock.Mock()
+ net_obj._crd.spec.version = 'version_4'
+ self.assertEqual('version_4', net_obj.version)
+
+ def test_plural(self):
+ net_obj = copy.deepcopy(self.net_obj)
+ net_obj._crd = mock.Mock()
+ net_obj._crd.spec.names.plural = 'name_ending_in_s'
+ self.assertEqual('name_ending_in_s', net_obj.plural)
+
+ def test_scope(self):
+ net_obj = copy.deepcopy(self.net_obj)
+ net_obj._crd = mock.Mock()
+ net_obj._crd.spec.scope = 'Cluster'
+ self.assertEqual('Cluster', net_obj.scope)
+
+ @mock.patch.object(kubernetes_utils, 'create_network')
+ def test_create(self, mock_create_network):
+ net_obj = copy.deepcopy(self.net_obj)
+ net_obj._scope = 'scope'
+ net_obj._group = 'group'
+ net_obj._version = 'version'
+ net_obj._plural = 'plural'
+ net_obj._template = 'template'
+ net_obj.create()
+ mock_create_network.assert_called_once_with(
+ 'scope', 'group', 'version', 'plural', 'template')
+
+ @mock.patch.object(kubernetes_utils, 'delete_network')
+ def test_delete(self, mock_delete_network):
+ net_obj = copy.deepcopy(self.net_obj)
+ net_obj._scope = 'scope'
+ net_obj._group = 'group'
+ net_obj._version = 'version'
+ net_obj._plural = 'plural'
+ net_obj._name = 'name'
+ net_obj.delete()
+ mock_delete_network.assert_called_once_with(
+ 'scope', 'group', 'version', 'plural', 'name')
+
+
+class ServiceNodePortObjectTestCase(base.BaseUnitTestCase):
+
+ def test__init(self):
+ with mock.patch.object(kubernetes.ServiceNodePortObject, '_add_port') \
+ as mock_add_port:
+ kubernetes.ServiceNodePortObject('fake_name',
+ node_ports=[{'port': 80}])
+
+ mock_add_port.assert_has_calls([mock.call(22, protocol='TCP'),
+ mock.call(80)])
+
+ def test__add_port(self):
+ nodeport_object = kubernetes.ServiceNodePortObject('fake_name')
+ port_ssh = {'port': 22,
+ 'protocol': 'TCP',}
+ port_definition = {'port': 80,
+ 'protocol': 'TCP',
+ 'name': 'web',
+ 'targetPort': 10080,
+ 'nodePort': 30080}
+ port = copy.deepcopy(port_definition)
+ port.pop('port')
+ nodeport_object._add_port(80, **port)
+ self.assertEqual([port_ssh, port_definition],
+ nodeport_object.template['spec']['ports'])
+
+ @mock.patch.object(kubernetes_utils, 'create_service')
+ def test_create(self, mock_create_service):
+ nodeport_object = kubernetes.ServiceNodePortObject('fake_name')
+ nodeport_object.template = 'fake_template'
+ nodeport_object.create()
+ mock_create_service.assert_called_once_with('fake_template')
+
+ @mock.patch.object(kubernetes_utils, 'delete_service')
+ def test_delete(self, mock_delete_service):
+ nodeport_object = kubernetes.ServiceNodePortObject('fake_name')
+ nodeport_object.delete()
+ mock_delete_service.assert_called_once_with('fake_name-service')