diff options
Diffstat (limited to 'yardstick')
39 files changed, 2357 insertions, 1522 deletions
diff --git a/yardstick/benchmark/contexts/kubernetes.py b/yardstick/benchmark/contexts/kubernetes.py index a39f63137..2334e5076 100644 --- a/yardstick/benchmark/contexts/kubernetes.py +++ b/yardstick/benchmark/contexts/kubernetes.py @@ -54,6 +54,7 @@ class KubernetesContext(Context): LOG.info('Launch containers') self._create_rcs() + self._create_services() time.sleep(1) self.template.get_rc_pods() @@ -63,6 +64,7 @@ class KubernetesContext(Context): self._delete_ssh_key() self._delete_rcs() self._delete_pods() + self._delete_services() super(KubernetesContext, self).undeploy() @@ -80,6 +82,14 @@ class KubernetesContext(Context): return False return True + def _create_services(self): + for obj in self.template.service_objs: + obj.create() + + def _delete_services(self): + for obj in self.template.service_objs: + obj.delete() + def _create_rcs(self): for obj in self.template.k8s_objs: self._create_rc(obj.get_template()) @@ -126,15 +136,22 @@ class KubernetesContext(Context): utils.remove_file(self.public_key_path) def _get_server(self, name): - resp = k8s_utils.get_pod_list() - hosts = ({'name': n.metadata.name, - 'ip': n.status.pod_ip, - 'user': 'root', - 'key_filename': self.key_path, - 'private_ip': n.status.pod_ip} - for n in resp.items if n.metadata.name.startswith(name)) - - return next(hosts, None) + service_name = '{}-service'.format(name) + service = k8s_utils.get_service_by_name(service_name).ports[0] + + host = { + 'name': service.name, + 'ip': self._get_node_ip(), + 'private_ip': k8s_utils.get_pod_by_name(name).status.pod_ip, + 'ssh_port': service.node_port, + 'user': 'root', + 'key_filename': self.key_path, + } + + return host + + def _get_node_ip(self): + return k8s_utils.get_node_list().items[0].status.addresses[0].address def _get_network(self, attr_name): return None diff --git a/yardstick/benchmark/contexts/standalone/__init__.py b/yardstick/benchmark/contexts/standalone/__init__.py index f0ef1d560..e69de29bb 100644 --- a/yardstick/benchmark/contexts/standalone/__init__.py +++ b/yardstick/benchmark/contexts/standalone/__init__.py @@ -1,211 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""This module handle non managed standalone virtualization node.""" - -from __future__ import absolute_import -import logging -import os -import errno -import collections -import time - -from yardstick.benchmark.contexts.base import Context -from yardstick.common.constants import YARDSTICK_ROOT_PATH -from yardstick.common.utils import import_modules_from_package, itersubclasses -from yardstick.common.yaml_loader import yaml_load - -LOG = logging.getLogger(__name__) - - -class StandaloneContext(Context): - """ This class handles standalone nodes - VM running on Non-Managed NFVi - Configuration: vswitch, ovs, ovs-dpdk, sr-iov, linuxbridge - """ - - __context_type__ = "Standalone" - - def __init__(self): - self.name = None - self.file_path = None - self.nodes = [] - self.networks = {} - self.nfvi_node = [] - self.nfvi_obj = None - self.attrs = {} - super(StandaloneContext, self).__init__() - - def read_config_file(self): - """Read from config file""" - - with open(self.file_path) as stream: - LOG.info("Parsing pod file: %s", self.file_path) - cfg = yaml_load(stream) - return cfg - - def get_nfvi_obj(self): - print("{0}".format(self.nfvi_node[0]['role'])) - context_type = self.get_context_impl(self.nfvi_node[0]['role']) - nfvi_obj = context_type() - nfvi_obj.__init__() - nfvi_obj.parse_pod_and_get_data(self.file_path) - return nfvi_obj - - def init(self, attrs): - """initializes itself from the supplied arguments""" - - self.name = attrs["name"] - self.file_path = file_path = attrs.get("file", "pod.yaml") - - try: - cfg = self.read_config_file() - except IOError as io_error: - if io_error.errno != errno.ENOENT: - raise - self.file_path = os.path.join(YARDSTICK_ROOT_PATH, file_path) - cfg = self.read_config_file() - - self.vm_deploy = attrs.get("vm_deploy", True) - self.nodes.extend([node for node in cfg["nodes"] - if str(node["role"]) != "Sriov" and - str(node["role"]) != "Ovsdpdk"]) - for node in cfg["nodes"]: - if str(node["role"]) == "Sriov": - self.nfvi_node.extend([node for node in cfg["nodes"] - if str(node["role"]) == "Sriov"]) - if str(node["role"]) == "Ovsdpdk": - self.nfvi_node.extend([node for node in cfg["nodes"] - if str(node["role"]) == "Ovsdpdk"]) - LOG.info("{0}".format(node["role"])) - else: - LOG.debug("Node role is other than SRIOV and OVS") - self.nfvi_obj = self.get_nfvi_obj() - self.attrs = attrs - # add optional static network definition - self.networks.update(cfg.get("networks", {})) - self.nfvi_obj = self.get_nfvi_obj() - LOG.debug("Nodes: %r", self.nodes) - LOG.debug("NFVi Node: %r", self.nfvi_node) - LOG.debug("Networks: %r", self.networks) - - def deploy(self): - """don't need to deploy""" - - # Todo: NFVi deploy (sriov, vswitch, ovs etc) based on the config. - if not self.vm_deploy: - return - - # Todo: NFVi deploy (sriov, vswitch, ovs etc) based on the config. - self.nfvi_obj.ssh_remote_machine() - if self.nfvi_obj.first_run is True: - self.nfvi_obj.install_req_libs() - - nic_details = self.nfvi_obj.get_nic_details() - print("{0}".format(nic_details)) - - if self.nfvi_node[0]["role"] == "Sriov": - self.nfvi_obj.setup_sriov_context( - self.nfvi_obj.sriov[0]['phy_ports'], - nic_details, - self.nfvi_obj.sriov[0]['phy_driver']) - if self.nfvi_node[0]["role"] == "Ovsdpdk": - self.nfvi_obj.setup_ovs(self.nfvi_obj.ovs[0]["phy_ports"]) - self.nfvi_obj.start_ovs_serverswitch() - time.sleep(5) - self.nfvi_obj.setup_ovs_bridge() - self.nfvi_obj.add_oflows() - self.nfvi_obj.setup_ovs_context( - self.nfvi_obj.ovs[0]['phy_ports'], - nic_details, - self.nfvi_obj.ovs[0]['phy_driver']) - pass - - def undeploy(self): - """don't need to undeploy""" - - if not self.vm_deploy: - return - # Todo: NFVi undeploy (sriov, vswitch, ovs etc) based on the config. - # self.nfvi_obj = self.get_nfvi_obj() - self.nfvi_obj.ssh_remote_machine() - self.nfvi_obj.destroy_vm() - pass - - def _get_server(self, attr_name): - """lookup server info by name from context - - Keyword arguments: - attr_name -- A name for a server listed in nodes config file - """ - node_name, name = self.split_name(attr_name) - if name is None or self.name != name: - return None - - matching_nodes = (n for n in self.nodes if n["name"] == node_name) - try: - # A clone is created in order to avoid affecting the - # original one. - node = dict(next(matching_nodes)) - except StopIteration: - return None - - try: - duplicate = next(matching_nodes) - except StopIteration: - pass - else: - raise ValueError("Duplicate nodes!!! Nodes: %s %s", - (node, duplicate)) - - node["name"] = attr_name - return node - - def _get_network(self, attr_name): - if not isinstance(attr_name, collections.Mapping): - network = self.networks.get(attr_name) - - else: - # Don't generalize too much Just support vld_id - vld_id = attr_name.get('vld_id', {}) - # for standalone context networks are dicts - iter1 = (n for n in self.networks.values() if n.get('vld_id') == vld_id) - network = next(iter1, None) - - if network is None: - return None - - result = { - # name is required - "name": network["name"], - "vld_id": network.get("vld_id"), - "segmentation_id": network.get("segmentation_id"), - "network_type": network.get("network_type"), - "physical_network": network.get("physical_network"), - } - return result - - def get_context_impl(self, nfvi_type): - """ Find the implementing class from vnf_model["vnf"]["name"] field - - :param vnf_model: dictionary containing a parsed vnfd - :return: subclass of GenericVNF - """ - import_modules_from_package( - "yardstick.benchmark.contexts") - expected_name = nfvi_type - impl = [c for c in itersubclasses(StandaloneContext) - if c.__name__ == expected_name] - try: - return next(iter(impl)) - except StopIteration: - raise ValueError("No implementation for %s", expected_name) diff --git a/yardstick/benchmark/contexts/standalone/model.py b/yardstick/benchmark/contexts/standalone/model.py new file mode 100644 index 000000000..4491660e0 --- /dev/null +++ b/yardstick/benchmark/contexts/standalone/model.py @@ -0,0 +1,493 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +import os +import re +import time +import glob +import uuid +import random +import logging +import itertools +import errno + +from netaddr import IPNetwork +import xml.etree.ElementTree as ET + +from yardstick import ssh +from yardstick.common.constants import YARDSTICK_ROOT_PATH +from yardstick.common.yaml_loader import yaml_load +from yardstick.network_services.utils import PciAddress +from yardstick.common.utils import write_file + +LOG = logging.getLogger(__name__) + +VM_TEMPLATE = """ +<domain type="kvm"> + <name>{vm_name}</name> + <uuid>{random_uuid}</uuid> + <memory unit="MB">{memory}</memory> + <currentMemory unit="MB">{memory}</currentMemory> + <memoryBacking> + <hugepages /> + </memoryBacking> + <vcpu placement="static">{vcpu}</vcpu> + <os> + <type arch="x86_64" machine="pc-i440fx-utopic">hvm</type> + <boot dev="hd" /> + </os> + <features> + <acpi /> + <apic /> + <pae /> + </features> + <cpu mode='host-passthrough'> + <topology cores="{cpu}" sockets="{socket}" threads="{threads}" /> + <numa> + <cell id='0' cpus='{numa_cpus}' memory='{memory}' unit='MB' memAccess='shared'/> + </numa> + </cpu> + <clock offset="utc"> + <timer name="rtc" tickpolicy="catchup" /> + <timer name="pit" tickpolicy="delay" /> + <timer name="hpet" present="no" /> + </clock> + <on_poweroff>destroy</on_poweroff> + <on_reboot>restart</on_reboot> + <on_crash>restart</on_crash> + <devices> + <emulator>/usr/bin/kvm-spice</emulator> + <disk device="disk" type="file"> + <driver name="qemu" type="qcow2" /> + <source file="{vm_image}"/> + <target bus="virtio" dev="vda" /> + </disk> + <graphics autoport="yes" listen="0.0.0.0" port="-1" type="vnc" /> + <interface type="bridge"> + <mac address='{mac_addr}'/> + <source bridge="br-int" /> + <model type='virtio'/> + </interface> + </devices> +</domain> +""" +WAIT_FOR_BOOT = 30 + + +class Libvirt(object): + """ This class handles all the libvirt updates to lauch VM + """ + + @staticmethod + def check_if_vm_exists_and_delete(vm_name, connection): + cmd_template = "virsh list --name | grep -i %s" + status = connection.execute(cmd_template % vm_name)[0] + if status == 0: + LOG.info("VM '%s' is already present.. destroying" % vm_name) + connection.execute("virsh destroy %s" % vm_name) + + @staticmethod + def virsh_create_vm(connection, cfg): + err = connection.execute("virsh create %s" % cfg)[0] + LOG.info("VM create status: %s" % (err)) + + @staticmethod + def virsh_destroy_vm(vm_name, connection): + connection.execute("virsh destroy %s" % vm_name) + + @staticmethod + def add_interface_address(interface, pci_address): + vm_pci = ET.SubElement(interface, 'address') + vm_pci.set('type', 'pci') + vm_pci.set('domain', '0x%s' % pci_address.domain) + vm_pci.set('bus', '0x%s' % pci_address.bus) + vm_pci.set('slot', '0x%s' % pci_address.slot) + vm_pci.set('function', '0x%s' % pci_address.function) + return vm_pci + + @classmethod + def add_ovs_interface(cls, vpath, port_num, vpci, vports_mac, xml): + vhost_path = '{0}/var/run/openvswitch/dpdkvhostuser{1}' + root = ET.parse(xml) + pci_address = PciAddress.parse_address(vpci.strip(), multi_line=True) + device = root.find('devices') + + interface = ET.SubElement(device, 'interface') + interface.set('type', 'vhostuser') + mac = ET.SubElement(interface, 'mac') + mac.set('address', vports_mac) + + source = ET.SubElement(interface, 'source') + source.set('type', 'unix') + source.set('path', vhost_path.format(vpath, port_num)) + source.set('mode', 'client') + + model = ET.SubElement(interface, 'model') + model.set('type', 'virtio') + + driver = ET.SubElement(interface, 'driver') + driver.set('queues', '4') + + host = ET.SubElement(driver, 'host') + host.set('mrg_rxbuf', 'off') + + cls.add_interface_address(interface, pci_address) + + root.write(xml) + + @classmethod + def add_sriov_interfaces(cls, vm_pci, vf_pci, vfmac, xml): + root = ET.parse(xml) + pci_address = PciAddress.parse_address(vf_pci.strip(), multi_line=True) + device = root.find('devices') + + interface = ET.SubElement(device, 'interface') + interface.set('managed', 'yes') + interface.set('type', 'hostdev') + + mac = ET.SubElement(interface, 'mac') + mac.set('address', vfmac) + source = ET.SubElement(interface, 'source') + + addr = ET.SubElement(source, "address") + addr.set('domain', "0x0") + addr.set('bus', "{0}".format(pci_address.bus)) + addr.set('function', "{0}".format(pci_address.function)) + addr.set('slot', "0x{0}".format(pci_address.slot)) + addr.set('type', "pci") + + pci_vm_address = PciAddress.parse_address(vm_pci.strip(), multi_line=True) + cls.add_interface_address(interface, pci_vm_address) + + root.write(xml) + + @staticmethod + def create_snapshot_qemu(connection, index, vm_image): + # build snapshot image + image = "/var/lib/libvirt/images/%s.qcow2" % index + connection.execute("rm %s" % image) + qemu_template = "qemu-img create -f qcow2 -o backing_file=%s %s" + connection.execute(qemu_template % (vm_image, image)) + + return image + + @classmethod + def build_vm_xml(cls, connection, flavor, cfg, vm_name, index): + memory = flavor.get('ram', '4096') + extra_spec = flavor.get('extra_specs', {}) + cpu = extra_spec.get('hw:cpu_cores', '2') + socket = extra_spec.get('hw:cpu_sockets', '1') + threads = extra_spec.get('hw:cpu_threads', '2') + vcpu = int(cpu) * int(threads) + numa_cpus = '0-%s' % (vcpu - 1) + + mac = StandaloneContextHelper.get_mac_address(0x00) + image = cls.create_snapshot_qemu(connection, index, + flavor.get("images", None)) + vm_xml = VM_TEMPLATE.format( + vm_name=vm_name, + random_uuid=uuid.uuid4(), + mac_addr=mac, + memory=memory, vcpu=vcpu, cpu=cpu, + numa_cpus=numa_cpus, + socket=socket, threads=threads, + vm_image=image) + + write_file(cfg, vm_xml) + + return [vcpu, mac] + + @staticmethod + def split_cpu_list(cpu_list): + if not cpu_list: + return [] + + ranges = cpu_list.split(',') + bounds = ([int(b) for b in r.split('-')] for r in ranges) + range_objects = \ + (range(bound[0], bound[1] + 1 if len(bound) == 2 + else bound[0] + 1) for bound in bounds) + + return sorted(itertools.chain.from_iterable(range_objects)) + + @classmethod + def get_numa_nodes(cls): + nodes_sysfs = glob.iglob("/sys/devices/system/node/node*") + nodes = {} + for node_sysfs in nodes_sysfs: + num = os.path.basename(node_sysfs).replace("node", "") + with open(os.path.join(node_sysfs, "cpulist")) as cpulist_file: + cpulist = cpulist_file.read().strip() + nodes[num] = cls.split_cpu_list(cpulist) + LOG.info("nodes: {0}".format(nodes)) + return nodes + + @staticmethod + def update_interrupts_hugepages_perf(connection): + connection.execute("echo 1 > /sys/module/kvm/parameters/allow_unsafe_assigned_interrupts") + connection.execute("echo never > /sys/kernel/mm/transparent_hugepage/enabled") + + @classmethod + def pin_vcpu_for_perf(cls, connection, vm_name, cpu): + nodes = cls.get_numa_nodes() + num_nodes = len(nodes) + vcpi_pin_template = "virsh vcpupin {0} {1} {2}" + for i in range(0, int(cpu)): + core = nodes[str(num_nodes - 1)][i % len(nodes[str(num_nodes - 1)])] + connection.execute(vcpi_pin_template.format(vm_name, i, core)) + cls.update_interrupts_hugepages_perf(connection) + + +class StandaloneContextHelper(object): + """ This class handles all the common code for standalone + """ + def __init__(self): + self.file_path = None + super(StandaloneContextHelper, self).__init__() + + @staticmethod + def install_req_libs(connection, extra_pkgs=[]): + pkgs = ["qemu-kvm", "libvirt-bin", "bridge-utils", "numactl", "fping"] + pkgs.extend(extra_pkgs) + cmd_template = "dpkg-query -W --showformat='${Status}\\n' \"%s\"|grep 'ok installed'" + for pkg in pkgs: + if connection.execute(cmd_template % pkg)[0]: + connection.execute("apt-get update") + connection.execute("apt-get -y install %s" % pkg) + else: + # all installed + return + + @staticmethod + def get_kernel_module(connection, pci, driver): + if not driver: + out = connection.execute("lspci -k -s %s" % pci)[1] + driver = out.split("Kernel modules:").pop().strip() + return driver + + @classmethod + def get_nic_details(cls, connection, networks, dpdk_nic_bind): + for key, ports in networks.items(): + if key == "mgmt": + continue + + phy_ports = ports['phy_port'] + phy_driver = ports.get('phy_driver', None) + driver = cls.get_kernel_module(connection, phy_ports, phy_driver) + + # Make sure that ports are bound to kernel drivers e.g. i40e/ixgbe + bind_cmd = "{dpdk_nic_bind} --force -b {driver} {port}" + lshw_cmd = "lshw -c network -businfo | grep '{port}'" + link_show_cmd = "ip -s link show {interface}" + + cmd = bind_cmd.format(dpdk_nic_bind=dpdk_nic_bind, + driver=driver, port=ports['phy_port']) + connection.execute(cmd) + + out = connection.execute(lshw_cmd.format(port=phy_ports))[1] + interface = out.split()[1] + + connection.execute(link_show_cmd.format(interface=interface)) + + ports.update({ + 'interface': str(interface), + 'driver': driver + }) + LOG.info("{0}".format(networks)) + + return networks + + @staticmethod + def get_virtual_devices(connection, pci): + cmd = "cat /sys/bus/pci/devices/{0}/virtfn0/uevent" + output = connection.execute(cmd.format(pci))[1] + + pattern = "PCI_SLOT_NAME=({})".format(PciAddress.PCI_PATTERN_STR) + m = re.search(pattern, output, re.MULTILINE) + + pf_vfs = {} + if m: + pf_vfs = {pci: m.group(1).rstrip()} + + LOG.info("pf_vfs:\n%s", pf_vfs) + + return pf_vfs + + def read_config_file(self): + """Read from config file""" + + with open(self.file_path) as stream: + LOG.info("Parsing pod file: %s", self.file_path) + cfg = yaml_load(stream) + return cfg + + def parse_pod_file(self, file_path, nfvi_role='Sriov'): + self.file_path = file_path + nodes = [] + nfvi_host = [] + try: + cfg = self.read_config_file() + except IOError as io_error: + if io_error.errno != errno.ENOENT: + raise + self.file_path = os.path.join(YARDSTICK_ROOT_PATH, file_path) + cfg = self.read_config_file() + + nodes.extend([node for node in cfg["nodes"] if str(node["role"]) != nfvi_role]) + nfvi_host.extend([node for node in cfg["nodes"] if str(node["role"]) == nfvi_role]) + if not nfvi_host: + raise("Node role is other than SRIOV") + + host_mgmt = {'user': nfvi_host[0]['user'], + 'ip': str(IPNetwork(nfvi_host[0]['ip']).ip), + 'password': nfvi_host[0]['password'], + 'ssh_port': nfvi_host[0].get('ssh_port', 22), + 'key_filename': nfvi_host[0].get('key_filename')} + + return [nodes, nfvi_host, host_mgmt] + + @staticmethod + def get_mac_address(end=0x7f): + mac = [0x52, 0x54, 0x00, + random.randint(0x00, end), + random.randint(0x00, 0xff), + random.randint(0x00, 0xff)] + mac_address = ':'.join('%02x' % x for x in mac) + return mac_address + + @staticmethod + def get_mgmt_ip(connection, mac, cidr, node): + mgmtip = None + times = 10 + while not mgmtip and times: + connection.execute("fping -c 1 -g %s > /dev/null 2>&1" % cidr) + out = connection.execute("ip neighbor | grep '%s'" % mac)[1] + LOG.info("fping -c 1 -g %s > /dev/null 2>&1" % cidr) + if out.strip(): + mgmtip = str(out.split(" ")[0]).strip() + client = ssh.SSH.from_node(node, overrides={"ip": mgmtip}) + client.wait() + break + + time.sleep(WAIT_FOR_BOOT) # FixMe: How to find if VM is booted? + times = times - 1 + return mgmtip + + @classmethod + def wait_for_vnfs_to_start(cls, connection, servers, nodes): + for node in nodes: + vnf = servers[node["name"]] + mgmtip = vnf["network_ports"]["mgmt"]["cidr"] + ip = cls.get_mgmt_ip(connection, node["mac"], mgmtip, node) + if ip: + node["ip"] = ip + return nodes + + +class Server(object): + """ This class handles geting vnf nodes + """ + + @staticmethod + def build_vnf_interfaces(vnf, ports): + interfaces = {} + index = 0 + + for key, vfs in vnf["network_ports"].items(): + if key == "mgmt": + mgmtip = str(IPNetwork(vfs['cidr']).ip) + continue + + vf = ports[vfs[0]] + ip = IPNetwork(vf['cidr']) + interfaces.update({ + key: { + 'vpci': vf['vpci'], + 'driver': "%svf" % vf['driver'], + 'local_mac': vf['mac'], + 'dpdk_port_num': index, + 'local_ip': str(ip.ip), + 'netmask': str(ip.netmask) + }, + }) + index = index + 1 + + return mgmtip, interfaces + + @classmethod + def generate_vnf_instance(cls, flavor, ports, ip, key, vnf, mac): + mgmtip, interfaces = cls.build_vnf_interfaces(vnf, ports) + + result = { + "ip": mgmtip, + "mac": mac, + "host": ip, + "user": flavor.get('user', 'root'), + "interfaces": interfaces, + "routing_table": [], + # empty IPv6 routing table + "nd_route_tbl": [], + "name": key, "role": key + } + + try: + result['key_filename'] = flavor['key_filename'] + except KeyError: + pass + + try: + result['password'] = flavor['password'] + except KeyError: + pass + LOG.info(result) + return result + + +class OvsDeploy(object): + """ This class handles deploy of ovs dpdk + Configuration: ovs_dpdk + """ + + OVS_DEPLOY_SCRIPT = "ovs_deploy.bash" + + def __init__(self, connection, bin_path, ovs_properties): + self.connection = connection + self.bin_path = bin_path + self.ovs_properties = ovs_properties + + def prerequisite(self): + pkgs = ["git", "build-essential", "pkg-config", "automake", + "autotools-dev", "libltdl-dev", "cmake", "libnuma-dev", + "libpcap-dev"] + StandaloneContextHelper.install_req_libs(self.connection, pkgs) + + def ovs_deploy(self): + ovs_deploy = os.path.join(YARDSTICK_ROOT_PATH, + "yardstick/resources/scripts/install/", + self.OVS_DEPLOY_SCRIPT) + if os.path.isfile(ovs_deploy): + self.prerequisite() + remote_ovs_deploy = os.path.join(self.bin_path, self.OVS_DEPLOY_SCRIPT) + LOG.info(remote_ovs_deploy) + self.connection.put(ovs_deploy, remote_ovs_deploy) + + http_proxy = os.environ.get('http_proxy', '') + ovs_details = self.ovs_properties.get("version", {}) + ovs = ovs_details.get("ovs", "2.6.0") + dpdk = ovs_details.get("dpdk", "16.11.1") + + cmd = "sudo -E %s --ovs='%s' --dpdk='%s' -p='%s'" % (remote_ovs_deploy, + ovs, dpdk, http_proxy) + self.connection.execute(cmd) diff --git a/yardstick/benchmark/contexts/standalone/ovs_dpdk.py b/yardstick/benchmark/contexts/standalone/ovs_dpdk.py new file mode 100644 index 000000000..833c3fb80 --- /dev/null +++ b/yardstick/benchmark/contexts/standalone/ovs_dpdk.py @@ -0,0 +1,383 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +import os +import logging +import collections +import time + +from collections import OrderedDict + +from yardstick import ssh +from yardstick.network_services.utils import get_nsb_option +from yardstick.network_services.utils import provision_tool +from yardstick.benchmark.contexts.base import Context +from yardstick.benchmark.contexts.standalone.model import Libvirt +from yardstick.benchmark.contexts.standalone.model import StandaloneContextHelper +from yardstick.benchmark.contexts.standalone.model import Server +from yardstick.benchmark.contexts.standalone.model import OvsDeploy +from yardstick.network_services.utils import PciAddress + +LOG = logging.getLogger(__name__) + + +class OvsDpdkContext(Context): + """ This class handles OVS standalone nodes - VM running on Non-Managed NFVi + Configuration: ovs_dpdk + """ + + __context_type__ = "StandaloneOvsDpdk" + + SUPPORTED_OVS_TO_DPDK_MAP = { + '2.6.0': '16.07.1', + '2.6.1': '16.07.2', + '2.7.0': '16.11.1', + '2.7.1': '16.11.2', + '2.7.2': '16.11.3', + '2.8.0': '17.05.2' + } + + DEFAULT_OVS = '2.6.0' + + PKILL_TEMPLATE = "pkill %s %s" + + def __init__(self): + self.file_path = None + self.sriov = [] + self.first_run = True + self.dpdk_nic_bind = "" + self.vm_names = [] + self.name = None + self.nfvi_host = [] + self.nodes = [] + self.networks = {} + self.attrs = {} + self.vm_flavor = None + self.servers = None + self.helper = StandaloneContextHelper() + self.vnf_node = Server() + self.ovs_properties = {} + self.wait_for_vswitchd = 10 + super(OvsDpdkContext, self).__init__() + + def init(self, attrs): + """initializes itself from the supplied arguments""" + + self.name = attrs["name"] + self.file_path = attrs.get("file", "pod.yaml") + + self.nodes, self.nfvi_host, self.host_mgmt = \ + self.helper.parse_pod_file(self.file_path, 'OvsDpdk') + + self.attrs = attrs + self.vm_flavor = attrs.get('flavor', {}) + self.servers = attrs.get('servers', {}) + self.vm_deploy = attrs.get("vm_deploy", True) + self.ovs_properties = attrs.get('ovs_properties', {}) + # add optional static network definition + self.networks = attrs.get("networks", {}) + + LOG.debug("Nodes: %r", self.nodes) + LOG.debug("NFVi Node: %r", self.nfvi_host) + LOG.debug("Networks: %r", self.networks) + + def setup_ovs(self): + vpath = self.ovs_properties.get("vpath", "/usr/local") + xargs_kill_cmd = self.PKILL_TEMPLATE % ('-9', 'ovs') + + create_from = os.path.join(vpath, 'etc/openvswitch/conf.db') + create_to = os.path.join(vpath, 'share/openvswitch/vswitch.ovsschema') + + cmd_list = [ + "chmod 0666 /dev/vfio/*", + "chmod a+x /dev/vfio", + "pkill -9 ovs", + xargs_kill_cmd, + "killall -r 'ovs*'", + "mkdir -p {0}/etc/openvswitch".format(vpath), + "mkdir -p {0}/var/run/openvswitch".format(vpath), + "rm {0}/etc/openvswitch/conf.db".format(vpath), + "ovsdb-tool create {0} {1}".format(create_from, create_to), + "modprobe vfio-pci", + "chmod a+x /dev/vfio", + "chmod 0666 /dev/vfio/*", + ] + for cmd in cmd_list: + self.connection.execute(cmd) + bind_cmd = "{dpdk_nic_bind} --force -b {driver} {port}" + phy_driver = "vfio-pci" + for key, port in self.networks.items(): + vpci = port.get("phy_port") + self.connection.execute(bind_cmd.format(dpdk_nic_bind=self.dpdk_nic_bind, + driver=phy_driver, port=vpci)) + + def start_ovs_serverswitch(self): + vpath = self.ovs_properties.get("vpath") + pmd_nums = int(self.ovs_properties.get("pmd_threads", 2)) + ovs_sock_path = '/var/run/openvswitch/db.sock' + log_path = '/var/log/openvswitch/ovs-vswitchd.log' + + pmd_mask = hex(sum(2 ** num for num in range(pmd_nums)) << 1) + socket0 = self.ovs_properties.get("ram", {}).get("socket_0", "2048") + socket1 = self.ovs_properties.get("ram", {}).get("socket_1", "2048") + + ovs_other_config = "ovs-vsctl {0}set Open_vSwitch . other_config:{1}" + detach_cmd = "ovs-vswitchd unix:{0}{1} --pidfile --detach --log-file={2}" + + cmd_list = [ + "mkdir -p /usr/local/var/run/openvswitch", + "ovsdb-server --remote=punix:/{0}/{1} --pidfile --detach".format(vpath, + ovs_sock_path), + ovs_other_config.format("--no-wait ", "dpdk-init=true"), + ovs_other_config.format("--no-wait ", "dpdk-socket-mem='%s,%s'" % (socket0, socket1)), + detach_cmd.format(vpath, ovs_sock_path, log_path), + ovs_other_config.format("", "pmd-cpu-mask=%s" % pmd_mask), + ] + + for cmd in cmd_list: + LOG.info(cmd) + self.connection.execute(cmd) + time.sleep(self.wait_for_vswitchd) + + def setup_ovs_bridge_add_flows(self): + dpdk_args = "" + dpdk_list = [] + vpath = self.ovs_properties.get("vpath", "/usr/local") + version = self.ovs_properties.get('version', {}) + ovs_ver = [int(x) for x in version.get('ovs', self.DEFAULT_OVS).split('.')] + ovs_add_port = \ + "ovs-vsctl add-port {br} {port} -- set Interface {port} type={type_}{dpdk_args}" + ovs_add_queue = "ovs-vsctl set Interface {port} options:n_rxq={queue}" + chmod_vpath = "chmod 0777 {0}/var/run/openvswitch/dpdkvhostuser*" + + cmd_dpdk_list = [ + "ovs-vsctl del-br br0", + "rm -rf /usr/local/var/run/openvswitch/dpdkvhostuser*", + "ovs-vsctl add-br br0 -- set bridge br0 datapath_type=netdev", + ] + + ordered_network = OrderedDict(self.networks) + for index, (key, vnf) in enumerate(ordered_network.items()): + if ovs_ver >= [2, 7, 0]: + dpdk_args = " options:dpdk-devargs=%s" % vnf.get("phy_port") + dpdk_list.append(ovs_add_port.format(br='br0', port='dpdk%s' % vnf.get("port_num", 0), + type_='dpdk', dpdk_args=dpdk_args)) + dpdk_list.append(ovs_add_queue.format(port='dpdk%s' % vnf.get("port_num", 0), + queue=self.ovs_properties.get("queues", 4))) + + # Sorting the array to make sure we execute dpdk0... in the order + list.sort(dpdk_list) + cmd_dpdk_list.extend(dpdk_list) + + # Need to do two for loop to maintain the dpdk/vhost ports. + for index, _ in enumerate(ordered_network): + cmd_dpdk_list.append(ovs_add_port.format(br='br0', port='dpdkvhostuser%s' % index, + type_='dpdkvhostuser', dpdk_args="")) + + for cmd in cmd_dpdk_list: + LOG.info(cmd) + self.connection.execute(cmd) + + # Fixme: add flows code + ovs_flow = "ovs-ofctl add-flow br0 in_port=%s,action=output:%s" + + network_count = len(ordered_network) + 1 + for in_port, out_port in zip(range(1, network_count), + range(network_count, network_count * 2)): + self.connection.execute(ovs_flow % (in_port, out_port)) + self.connection.execute(ovs_flow % (out_port, in_port)) + + self.connection.execute(chmod_vpath.format(vpath)) + + def cleanup_ovs_dpdk_env(self): + self.connection.execute("ovs-vsctl del-br br0") + self.connection.execute("pkill -9 ovs") + + def check_ovs_dpdk_env(self): + self.cleanup_ovs_dpdk_env() + + version = self.ovs_properties.get("version", {}) + ovs_ver = version.get("ovs", self.DEFAULT_OVS) + dpdk_ver = version.get("dpdk", "16.07.2").split('.') + + supported_version = self.SUPPORTED_OVS_TO_DPDK_MAP.get(ovs_ver, None) + if supported_version is None or supported_version.split('.')[:2] != dpdk_ver[:2]: + raise Exception("Unsupported ovs '{}'. Please check the config...".format(ovs_ver)) + + status = self.connection.execute("ovs-vsctl -V | grep -i '%s'" % ovs_ver)[0] + if status: + deploy = OvsDeploy(self.connection, + get_nsb_option("bin_path"), + self.ovs_properties) + deploy.ovs_deploy() + + def deploy(self): + """don't need to deploy""" + + # Todo: NFVi deploy (sriov, vswitch, ovs etc) based on the config. + if not self.vm_deploy: + return + + self.connection = ssh.SSH.from_node(self.host_mgmt) + self.dpdk_nic_bind = provision_tool( + self.connection, + os.path.join(get_nsb_option("bin_path"), "dpdk-devbind.py")) + + # Check dpdk/ovs version, if not present install + self.check_ovs_dpdk_env() + # Todo: NFVi deploy (sriov, vswitch, ovs etc) based on the config. + StandaloneContextHelper.install_req_libs(self.connection) + self.networks = StandaloneContextHelper.get_nic_details(self.connection, + self.networks, + self.dpdk_nic_bind) + + self.setup_ovs() + self.start_ovs_serverswitch() + self.setup_ovs_bridge_add_flows() + self.nodes = self.setup_ovs_dpdk_context() + LOG.debug("Waiting for VM to come up...") + self.nodes = StandaloneContextHelper.wait_for_vnfs_to_start(self.connection, + self.servers, + self.nodes) + + def undeploy(self): + + if not self.vm_deploy: + return + + # Cleanup the ovs installation... + self.cleanup_ovs_dpdk_env() + + # Bind nics back to kernel + bind_cmd = "{dpdk_nic_bind} --force -b {driver} {port}" + for key, port in self.networks.items(): + vpci = port.get("phy_port") + phy_driver = port.get("driver") + self.connection.execute(bind_cmd.format(dpdk_nic_bind=self.dpdk_nic_bind, + driver=phy_driver, port=vpci)) + + # Todo: NFVi undeploy (sriov, vswitch, ovs etc) based on the config. + for vm in self.vm_names: + Libvirt.check_if_vm_exists_and_delete(vm, self.connection) + + def _get_server(self, attr_name): + """lookup server info by name from context + + Keyword arguments: + attr_name -- A name for a server listed in nodes config file + """ + node_name, name = self.split_name(attr_name) + if name is None or self.name != name: + return None + + matching_nodes = (n for n in self.nodes if n["name"] == node_name) + try: + # A clone is created in order to avoid affecting the + # original one. + node = dict(next(matching_nodes)) + except StopIteration: + return None + + try: + duplicate = next(matching_nodes) + except StopIteration: + pass + else: + raise ValueError("Duplicate nodes!!! Nodes: %s %s", + (node, duplicate)) + + node["name"] = attr_name + return node + + def _get_network(self, attr_name): + if not isinstance(attr_name, collections.Mapping): + network = self.networks.get(attr_name) + + else: + # Don't generalize too much Just support vld_id + vld_id = attr_name.get('vld_id', {}) + # for standalone context networks are dicts + iter1 = (n for n in self.networks.values() if n.get('vld_id') == vld_id) + network = next(iter1, None) + + if network is None: + return None + + result = { + # name is required + "name": network["name"], + "vld_id": network.get("vld_id"), + "segmentation_id": network.get("segmentation_id"), + "network_type": network.get("network_type"), + "physical_network": network.get("physical_network"), + } + return result + + def configure_nics_for_ovs_dpdk(self): + portlist = OrderedDict(self.networks) + for key, ports in portlist.items(): + mac = StandaloneContextHelper.get_mac_address() + portlist[key].update({'mac': mac}) + self.networks = portlist + LOG.info("Ports %s" % self.networks) + + def _enable_interfaces(self, index, vfs, cfg): + vpath = self.ovs_properties.get("vpath", "/usr/local") + vf = self.networks[vfs[0]] + port_num = vf.get('port_num', 0) + vpci = PciAddress.parse_address(vf['vpci'].strip(), multi_line=True) + # Generate the vpci for the interfaces + slot = index + port_num + 10 + vf['vpci'] = \ + "{}:{}:{:02x}.{}".format(vpci.domain, vpci.bus, slot, vpci.function) + Libvirt.add_ovs_interface(vpath, port_num, vf['vpci'], vf['mac'], str(cfg)) + + def setup_ovs_dpdk_context(self): + nodes = [] + + self.configure_nics_for_ovs_dpdk() + + for index, (key, vnf) in enumerate(OrderedDict(self.servers).items()): + cfg = '/tmp/vm_ovs_%d.xml' % index + vm_name = "vm_%d" % index + + # 1. Check and delete VM if already exists + Libvirt.check_if_vm_exists_and_delete(vm_name, self.connection) + + vcpu, mac = Libvirt.build_vm_xml(self.connection, self.vm_flavor, cfg, vm_name, index) + # 2: Cleanup already available VMs + for idx, (vkey, vfs) in enumerate(OrderedDict(vnf["network_ports"]).items()): + if vkey == "mgmt": + continue + self._enable_interfaces(index, vfs, cfg) + + # copy xml to target... + self.connection.put(cfg, cfg) + + # FIXME: launch through libvirt + LOG.info("virsh create ...") + Libvirt.virsh_create_vm(self.connection, cfg) + + # 5: Tunning for better performace + Libvirt.pin_vcpu_for_perf(self.connection, vm_name, vcpu) + self.vm_names.append(vm_name) + + # build vnf node details + nodes.append(self.vnf_node.generate_vnf_instance(self.vm_flavor, + self.networks, + self.host_mgmt.get('ip'), + key, vnf, mac)) + + return nodes diff --git a/yardstick/benchmark/contexts/standalone/ovsdpdk.py b/yardstick/benchmark/contexts/standalone/ovsdpdk.py deleted file mode 100644 index cf5529d89..000000000 --- a/yardstick/benchmark/contexts/standalone/ovsdpdk.py +++ /dev/null @@ -1,369 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import absolute_import -import os -import yaml -import time -import glob -import itertools -import logging -from yardstick import ssh -from yardstick.benchmark.contexts.standalone import StandaloneContext - -BIN_PATH = "/opt/isb_bin/" -DPDK_NIC_BIND = "dpdk_nic_bind.py" - -log = logging.getLogger(__name__) - -VM_TEMPLATE = """ -<domain type='kvm'> - <name>vm1</name> - <uuid>18230c0c-635d-4c50-b2dc-a213d30acb34</uuid> - <memory unit='KiB'>20971520</memory> - <currentMemory unit="KiB">20971520</currentMemory> - <memoryBacking> - <hugepages/> - </memoryBacking> - <vcpu placement='static'>20</vcpu> - <os> - <type arch='x86_64' machine='pc'>hvm</type> - <boot dev='hd'/> - </os> - <features> - <acpi/> - <apic/> - </features> - <cpu match="exact" mode='host-model'> - <model fallback='allow'/> - <topology sockets='1' cores='10' threads='2'/> - </cpu> - <on_poweroff>destroy</on_poweroff> - <on_reboot>restart</on_reboot> - <on_crash>destroy</on_crash> - <devices> - <emulator>/usr/bin/qemu-system-x86_64</emulator> - <disk type='file' device='disk'> - <driver name='qemu' type='qcow2' cache='none'/> - <source file="{vm_image}"/> - <target dev='vda' bus='virtio'/> - <address bus="0x00" domain="0x0000" - function="0x0" slot="0x04" type="pci" /> - </disk> - <!--disk type='dir' device='disk'> - <driver name='qemu' type='fat'/> - <source dir='/opt/isb_bin/dpdk'/> - <target dev='vdb' bus='virtio'/> - <readonly/> - </disk--> - <interface type="bridge"> - <mac address="00:00:00:ab:cd:ef" /> - <source bridge="br-int" /> - </interface> - <interface type='vhostuser'> - <mac address='00:00:00:00:00:01'/> - <source type='unix' path='/usr/local/var/run/openvswitch/dpdkvhostuser0' mode='client'/> - <model type='virtio'/> - <driver queues='4'> - <host mrg_rxbuf='off'/> - </driver> - </interface> - <interface type='vhostuser'> - <mac address='00:00:00:00:00:02'/> - <source type='unix' path='/usr/local/var/run/openvswitch/dpdkvhostuser1' mode='client'/> - <model type='virtio'/> - <driver queues='4'> - <host mrg_rxbuf='off'/> - </driver> - </interface> - <serial type='pty'> - <target port='0'/> - </serial> - <console type='pty'> - <target type='serial' port='0'/> - </console> - <graphics autoport="yes" listen="0.0.0.0" port="1" type="vnc" /> - </devices> -</domain> -""" - - -class Ovsdpdk(StandaloneContext): - def __init__(self): - self.name = None - self.file_path = None - self.nodes = [] - self.vm_deploy = False - self.ovs = [] - self.first_run = True - self.dpdk_nic_bind = BIN_PATH + DPDK_NIC_BIND - self.user = "" - self.ssh_ip = "" - self.passwd = "" - self.ssh_port = "" - self.auth_type = "" - - def init(self): - '''initializes itself''' - log.debug("In init") - self.parse_pod_and_get_data() - - def parse_pod_and_get_data(self, file_path): - self.file_path = file_path - print("parsing pod file: {0}".format(self.file_path)) - try: - with open(self.file_path) as stream: - cfg = yaml.load(stream) - except IOError: - print("File {0} does not exist".format(self.file_path)) - raise - - self.ovs.extend([node for node in cfg["nodes"] - if node["role"] == "Ovsdpdk"]) - self.user = self.ovs[0]['user'] - self.ssh_ip = self.ovs[0]['ip'] - if self.ovs[0]['auth_type'] == "password": - self.passwd = self.ovs[0]['password'] - else: - self.ssh_port = self.ovs[0]['ssh_port'] - self.key_filename = self.ovs[0]['key_filename'] - - def ssh_remote_machine(self): - if self.ovs[0]['auth_type'] == "password": - self.connection = ssh.SSH( - self.user, - self.ssh_ip, - password=self.passwd) - self.connection.wait() - else: - if self.ssh_port is not None: - ssh_port = self.ssh_port - else: - ssh_port = ssh.DEFAULT_PORT - self.connection = ssh.SSH( - self.user, - self.ssh_ip, - port=ssh_port, - key_filename=self.key_filename) - self.connection.wait() - - def get_nic_details(self): - nic_details = {} - nic_details['interface'] = {} - nic_details['pci'] = self.ovs[0]['phy_ports'] - nic_details['phy_driver'] = self.ovs[0]['phy_driver'] - nic_details['vports_mac'] = self.ovs[0]['vports_mac'] - # Make sure that ports are bound to kernel drivers e.g. i40e/ixgbe - for i, _ in enumerate(nic_details['pci']): - err, out, _ = self.connection.execute( - "{dpdk_nic_bind} --force -b {driver} {port}".format( - dpdk_nic_bind=self.dpdk_nic_bind, - driver=self.ovs[0]['phy_driver'], - port=self.ovs[0]['phy_ports'][i])) - err, out, _ = self.connection.execute( - "lshw -c network -businfo | grep '{port}'".format( - port=self.ovs[0]['phy_ports'][i])) - a = out.split()[1] - err, out, _ = self.connection.execute( - "ip -s link show {interface}".format( - interface=out.split()[1])) - nic_details['interface'][i] = str(a) - print("{0}".format(nic_details)) - return nic_details - - def install_req_libs(self): - if self.first_run: - err, out, _ = self.connection.execute("apt-get update") - print("{0}".format(out)) - err, out, _ = self.connection.execute( - "apt-get -y install qemu-kvm libvirt-bin") - print("{0}".format(out)) - err, out, _ = self.connection.execute( - "apt-get -y install libvirt-dev bridge-utils numactl") - print("{0}".format(out)) - self.first_run = False - - def setup_ovs(self, vpcis): - self.connection.execute("/usr/bin/chmod 0666 /dev/vfio/*") - self.connection.execute("/usr/bin/chmod a+x /dev/vfio") - self.connection.execute("pkill -9 ovs") - self.connection.execute("ps -ef | grep ovs | grep -v grep | " - "awk '{print $2}' | xargs -r kill -9") - self.connection.execute("killall -r 'ovs*'") - self.connection.execute( - "mkdir -p {0}/etc/openvswitch".format(self.ovs[0]["vpath"])) - self.connection.execute( - "mkdir -p {0}/var/run/openvswitch".format(self.ovs[0]["vpath"])) - self.connection.execute( - "rm {0}/etc/openvswitch/conf.db".format(self.ovs[0]["vpath"])) - self.connection.execute( - "ovsdb-tool create {0}/etc/openvswitch/conf.db " - "{0}/share/openvswitch/" - "vswitch.ovsschema".format(self.ovs[0]["vpath"])) - self.connection.execute("modprobe vfio-pci") - self.connection.execute("chmod a+x /dev/vfio") - self.connection.execute("chmod 0666 /dev/vfio/*") - for vpci in vpcis: - self.connection.execute( - "/opt/isb_bin/dpdk_nic_bind.py " - "--bind=vfio-pci {0}".format(vpci)) - - def start_ovs_serverswitch(self): - self.connection.execute("mkdir -p /usr/local/var/run/openvswitch") - self.connection.execute( - "ovsdb-server --remote=punix:" - "/usr/local/var/run/openvswitch/db.sock --pidfile --detach") - self.connection.execute( - "ovs-vsctl --no-wait set " - "Open_vSwitch . other_config:dpdk-init=true") - self.connection.execute( - "ovs-vsctl --no-wait set " - "Open_vSwitch . other_config:dpdk-lcore-mask=0x3") - self.connection.execute( - "ovs-vsctl --no-wait set " - "Open_vSwitch . other_config:dpdk-socket-mem='2048,0'") - self.connection.execute( - "ovs-vswitchd unix:{0}/" - "var/run/openvswitch/db.sock --pidfile --detach " - "--log-file=/var/log/openvswitch/" - "ovs-vswitchd.log".format( - self.ovs[0]["vpath"])) - self.connection.execute( - "ovs-vsctl set Open_vSwitch . other_config:pmd-cpu-mask=2C") - - def setup_ovs_bridge(self): - self.connection.execute("ovs-vsctl del-br br0") - self.connection.execute( - "rm -rf /usr/local/var/run/openvswitch/dpdkvhostuser*") - self.connection.execute( - "ovs-vsctl add-br br0 -- set bridge br0 datapath_type=netdev") - self.connection.execute( - "ovs-vsctl add-port br0 dpdk0 -- set Interface dpdk0 type=dpdk") - self.connection.execute( - "ovs-vsctl add-port br0 dpdk1 -- set Interface dpdk1 type=dpdk") - self.connection.execute( - "ovs-vsctl add-port br0 dpdkvhostuser0 -- set Interface " - "dpdkvhostuser0 type=dpdkvhostuser") - self.connection.execute("ovs-vsctl add-port br0 dpdkvhostuser1 " - "-- set Interface dpdkvhostuser1 " - "type=dpdkvhostuser") - self.connection.execute( - "chmod 0777 {0}/var/run/" - "openvswitch/dpdkvhostuser*".format(self.ovs[0]["vpath"])) - - def add_oflows(self): - self.connection.execute("ovs-ofctl del-flows br0") - for flow in self.ovs[0]["flow"]: - self.connection.execute(flow) - self.connection.execute("ovs-ofctl dump-flows br0") - self.connection.execute( - "ovs-vsctl set Interface dpdk0 options:n_rxq=4") - self.connection.execute( - "ovs-vsctl set Interface dpdk1 options:n_rxq=4") - - def setup_ovs_context(self, pcis, nic_details, host_driver): - - ''' 1: Setup vm_ovs.xml to launch VM.''' - cfg_ovs = '/tmp/vm_ovs.xml' - vm_ovs_xml = VM_TEMPLATE.format(vm_image=self.ovs[0]["images"]) - with open(cfg_ovs, 'w') as f: - f.write(vm_ovs_xml) - - ''' 2: Create and start the VM''' - self.connection.put(cfg_ovs, cfg_ovs) - time.sleep(10) - err, out = self.check_output("virsh list --name | grep -i vm1") - if out == "vm1": - print("VM is already present") - else: - ''' FIXME: launch through libvirt''' - print("virsh create ...") - err, out, _ = self.connection.execute( - "virsh create /tmp/vm_ovs.xml") - time.sleep(10) - print("err : {0}".format(err)) - print("{0}".format(_)) - print("out : {0}".format(out)) - - ''' 3: Tuning for better performace.''' - self.pin_vcpu(pcis) - self.connection.execute( - "echo 1 > /sys/module/kvm/parameters/" - "allow_unsafe_assigned_interrupts") - self.connection.execute( - "echo never > /sys/kernel/mm/transparent_hugepage/enabled") - print("After tuning performance ...") - - ''' This is roughly compatible with check_output function in subprocess - module which is only available in python 2.7.''' - def check_output(self, cmd, stderr=None): - '''Run a command and capture its output''' - err, out, _ = self.connection.execute(cmd) - return err, out - - def read_from_file(self, filename): - data = "" - with open(filename, 'r') as the_file: - data = the_file.read() - return data - - def write_to_file(self, filename, content): - with open(filename, 'w') as the_file: - the_file.write(content) - - def pin_vcpu(self, pcis): - nodes = self.get_numa_nodes() - print("{0}".format(nodes)) - num_nodes = len(nodes) - for i in range(0, 10): - self.connection.execute( - "virsh vcpupin vm1 {0} {1}".format( - i, nodes[str(num_nodes - 1)][i % len(nodes[str(num_nodes - 1)])])) - - def get_numa_nodes(self): - nodes_sysfs = glob.iglob("/sys/devices/system/node/node*") - nodes = {} - for node_sysfs in nodes_sysfs: - num = os.path.basename(node_sysfs).replace("node", "") - with open(os.path.join(node_sysfs, "cpulist")) as cpulist_file: - cpulist = cpulist_file.read().strip() - print("cpulist: {0}".format(cpulist)) - nodes[num] = self.split_cpu_list(cpulist) - print("nodes: {0}".format(nodes)) - return nodes - - def split_cpu_list(self, cpu_list): - if cpu_list: - ranges = cpu_list.split(',') - bounds = ([int(b) for b in r.split('-')] for r in ranges) - range_objects =\ - (range(bound[0], bound[1] + 1 if len(bound) == 2 - else bound[0] + 1) for bound in bounds) - - return sorted(itertools.chain.from_iterable(range_objects)) - else: - return [] - - def destroy_vm(self): - host_driver = self.ovs[0]['phy_driver'] - err, out = self.check_output("virsh list --name | grep -i vm1") - print("{0}".format(out)) - if err == 0: - self.connection.execute("virsh shutdown vm1") - self.connection.execute("virsh destroy vm1") - self.check_output("rmmod {0}".format(host_driver))[1].splitlines() - self.check_output("modprobe {0}".format(host_driver))[ - 1].splitlines() - else: - print("error : ", err) diff --git a/yardstick/benchmark/contexts/standalone/sriov.py b/yardstick/benchmark/contexts/standalone/sriov.py index fe27d2579..55d7057a9 100644 --- a/yardstick/benchmark/contexts/standalone/sriov.py +++ b/yardstick/benchmark/contexts/standalone/sriov.py @@ -14,418 +14,248 @@ from __future__ import absolute_import import os -import yaml -import re -import time -import glob -import uuid -import random import logging -import itertools -import xml.etree.ElementTree as ET +import collections +from collections import OrderedDict + from yardstick import ssh from yardstick.network_services.utils import get_nsb_option from yardstick.network_services.utils import provision_tool -from yardstick.benchmark.contexts.standalone import StandaloneContext - -log = logging.getLogger(__name__) - -VM_TEMPLATE = """ -<domain type="kvm"> - <name>vm1</name> - <uuid>{random_uuid}</uuid> - <memory unit="KiB">102400</memory> - <currentMemory unit="KiB">102400</currentMemory> - <memoryBacking> - <hugepages /> - </memoryBacking> - <vcpu placement="static">20</vcpu> - <os> - <type arch="x86_64" machine="pc-i440fx-utopic">hvm</type> - <boot dev="hd" /> - </os> - <features> - <acpi /> - <apic /> - <pae /> - </features> - <cpu match="exact" mode="custom"> - <model fallback="allow">SandyBridge</model> - <topology cores="10" sockets="1" threads="2" /> - </cpu> - <clock offset="utc"> - <timer name="rtc" tickpolicy="catchup" /> - <timer name="pit" tickpolicy="delay" /> - <timer name="hpet" present="no" /> - </clock> - <on_poweroff>destroy</on_poweroff> - <on_reboot>restart</on_reboot> - <on_crash>restart</on_crash> - <devices> - <emulator>/usr/bin/kvm-spice</emulator> - <disk device="disk" type="file"> - <driver name="qemu" type="qcow2" /> - <source file="{vm_image}"/> - <target bus="virtio" dev="vda" /> - <address bus="0x00" domain="0x0000" -function="0x0" slot="0x04" type="pci" /> - </disk> - <controller index="0" model="ich9-ehci1" type="usb"> - <address bus="0x00" domain="0x0000" -function="0x7" slot="0x05" type="pci" /> - </controller> - <controller index="0" model="ich9-uhci1" type="usb"> - <master startport="0" /> - <address bus="0x00" domain="0x0000" function="0x0" -multifunction="on" slot="0x05" type="pci" /> - </controller> - <controller index="0" model="ich9-uhci2" type="usb"> - <master startport="2" /> - <address bus="0x00" domain="0x0000" -function="0x1" slot="0x05" type="pci" /> - </controller> - <controller index="0" model="ich9-uhci3" type="usb"> - <master startport="4" /> - <address bus="0x00" domain="0x0000" -function="0x2" slot="0x05" type="pci" /> - </controller> - <controller index="0" model="pci-root" type="pci" /> - <serial type="pty"> - <target port="0" /> - </serial> - <console type="pty"> - <target port="0" type="serial" /> - </console> - <input bus="usb" type="tablet" /> - <input bus="ps2" type="mouse" /> - <input bus="ps2" type="keyboard" /> - <graphics autoport="yes" listen="0.0.0.0" port="-1" type="vnc" /> - <video> - <model heads="1" type="cirrus" vram="16384" /> - <address bus="0x00" domain="0x0000" -function="0x0" slot="0x02" type="pci" /> - </video> - <memballoon model="virtio"> - <address bus="0x00" domain="0x0000" -function="0x0" slot="0x06" type="pci" /> - </memballoon> - <interface type="bridge"> - <mac address="{mac_addr}" /> - <source bridge="virbr0" /> - </interface> - </devices> -</domain> -""" - - -class Sriov(StandaloneContext): +from yardstick.benchmark.contexts.base import Context +from yardstick.benchmark.contexts.standalone.model import Libvirt +from yardstick.benchmark.contexts.standalone.model import StandaloneContextHelper +from yardstick.benchmark.contexts.standalone.model import Server +from yardstick.network_services.utils import PciAddress + +LOG = logging.getLogger(__name__) + + +class SriovContext(Context): + """ This class handles SRIOV standalone nodes - VM running on Non-Managed NFVi + Configuration: sr-iov + """ + + __context_type__ = "StandaloneSriov" + def __init__(self): - self.name = None self.file_path = None - self.nodes = [] - self.vm_deploy = False self.sriov = [] self.first_run = True self.dpdk_nic_bind = "" - self.user = "" - self.ssh_ip = "" - self.passwd = "" - self.ssh_port = "" - self.auth_type = "" - - def init(self): - log.debug("In init") - self.parse_pod_and_get_data(self.file_path) - - def parse_pod_and_get_data(self, file_path): - self.file_path = file_path - log.debug("parsing pod file: {0}".format(self.file_path)) - try: - with open(self.file_path) as stream: - cfg = yaml.load(stream) - except IOError: - log.error("File {0} does not exist".format(self.file_path)) - raise - - self.sriov.extend([node for node in cfg["nodes"] - if node["role"] == "Sriov"]) - self.user = self.sriov[0]['user'] - self.ssh_ip = self.sriov[0]['ip'] - if self.sriov[0]['auth_type'] == "password": - self.passwd = self.sriov[0]['password'] - else: - self.ssh_port = self.sriov[0]['ssh_port'] - self.key_filename = self.sriov[0]['key_filename'] - - def ssh_remote_machine(self): - if self.sriov[0]['auth_type'] == "password": - self.connection = ssh.SSH( - self.user, - self.ssh_ip, - password=self.passwd) - self.connection.wait() - else: - if self.ssh_port is not None: - ssh_port = self.ssh_port - else: - ssh_port = ssh.DEFAULT_PORT - self.connection = ssh.SSH( - self.user, - self.ssh_ip, - port=ssh_port, - key_filename=self.key_filename) - self.connection.wait() + self.vm_names = [] + self.name = None + self.nfvi_host = [] + self.nodes = [] + self.networks = {} + self.attrs = {} + self.vm_flavor = None + self.servers = None + self.helper = StandaloneContextHelper() + self.vnf_node = Server() + self.drivers = [] + super(SriovContext, self).__init__() + + def init(self, attrs): + """initializes itself from the supplied arguments""" + + self.name = attrs["name"] + self.file_path = attrs.get("file", "pod.yaml") + + self.nodes, self.nfvi_host, self.host_mgmt = \ + self.helper.parse_pod_file(self.file_path, 'Sriov') + + self.attrs = attrs + self.vm_flavor = attrs.get('flavor', {}) + self.servers = attrs.get('servers', {}) + self.vm_deploy = attrs.get("vm_deploy", True) + # add optional static network definition + self.networks = attrs.get("networks", {}) + + LOG.debug("Nodes: %r", self.nodes) + LOG.debug("NFVi Node: %r", self.nfvi_host) + LOG.debug("Networks: %r", self.networks) + + def deploy(self): + """don't need to deploy""" + + # Todo: NFVi deploy (sriov, vswitch, ovs etc) based on the config. + if not self.vm_deploy: + return + + self.connection = ssh.SSH.from_node(self.host_mgmt) self.dpdk_nic_bind = provision_tool( self.connection, os.path.join(get_nsb_option("bin_path"), "dpdk_nic_bind.py")) - def get_nic_details(self): - nic_details = {} - nic_details = { - 'interface': {}, - 'pci': self.sriov[0]['phy_ports'], - 'phy_driver': self.sriov[0]['phy_driver'], - 'vf_macs': self.sriov[0]['vf_macs'] - } - # Make sure that ports are bound to kernel drivers e.g. i40e/ixgbe - for i, _ in enumerate(nic_details['pci']): - err, out, _ = self.connection.execute( - "{dpdk_nic_bind} --force -b {driver} {port}".format( - dpdk_nic_bind=self.dpdk_nic_bind, - driver=self.sriov[0]['phy_driver'], - port=self.sriov[0]['phy_ports'][i])) - err, out, _ = self.connection.execute( - "lshw -c network -businfo | grep '{port}'".format( - port=self.sriov[0]['phy_ports'][i])) - a = out.split()[1] - err, out, _ = self.connection.execute( - "ip -s link show {interface}".format( - interface=out.split()[1])) - nic_details['interface'][i] = str(a) - log.info("{0}".format(nic_details)) - return nic_details - - def install_req_libs(self): - if self.first_run: - log.info("Installing required libraries...") - err, out, _ = self.connection.execute("apt-get update") - log.debug("{0}".format(out)) - err, out, _ = self.connection.execute( - "apt-get -y install qemu-kvm libvirt-bin") - log.debug("{0}".format(out)) - err, out, _ = self.connection.execute( - "apt-get -y install libvirt-dev bridge-utils numactl") - log.debug("{0}".format(out)) - self.first_run = False - - def configure_nics_for_sriov(self, host_driver, nic_details): - vf_pci = [[], []] - self.connection.execute( - "rmmod {0}".format(host_driver))[1].splitlines() - self.connection.execute( - "modprobe {0} num_vfs=1".format(host_driver))[1].splitlines() - nic_details['vf_pci'] = {} - for i in range(len(nic_details['pci'])): - self.connection.execute( - "echo 1 > /sys/bus/pci/devices/{0}/sriov_numvfs".format( - nic_details['pci'][i])) - err, out, _ = self.connection.execute( - "ip link set {interface} vf 0 mac {mac}".format( - interface=nic_details['interface'][i], - mac=nic_details['vf_macs'][i])) - time.sleep(3) - vf_pci[i] = self.get_vf_datas( - 'vf_pci', - nic_details['pci'][i], - nic_details['vf_macs'][i]) - nic_details['vf_pci'][i] = vf_pci[i] - log.debug("NIC DETAILS : {0}".format(nic_details)) - return nic_details - - def setup_sriov_context(self, pcis, nic_details, host_driver): - blacklist = "/etc/modprobe.d/blacklist.conf" - - # 1 : Blacklist the vf driver in /etc/modprobe.d/blacklist.conf - vfnic = "{0}vf".format(host_driver) - lines = self.read_from_file(blacklist) - if vfnic not in lines: - vfblacklist = "blacklist {vfnic}".format(vfnic=vfnic) - self.connection.execute( - "echo {vfblacklist} >> {blacklist}".format( - vfblacklist=vfblacklist, - blacklist=blacklist)) - - # 2 : modprobe host_driver with num_vfs - nic_details = self.configure_nics_for_sriov(host_driver, nic_details) - - # 3: Setup vm_sriov.xml to launch VM - cfg_sriov = '/tmp/vm_sriov.xml' - mac = [0x00, 0x24, 0x81, - random.randint(0x00, 0x7f), - random.randint(0x00, 0xff), - random.randint(0x00, 0xff)] - mac_address = ':'.join(map(lambda x: "%02x" % x, mac)) - vm_sriov_xml = VM_TEMPLATE.format( - random_uuid=uuid.uuid4(), - mac_addr=mac_address, - vm_image=self.sriov[0]["images"]) - with open(cfg_sriov, 'w') as f: - f.write(vm_sriov_xml) - - vf = nic_details['vf_pci'] - for index in range(len(nic_details['vf_pci'])): - self.add_sriov_interface( - index, - vf[index]['vf_pci'], - mac_address, - "/tmp/vm_sriov.xml") - self.connection.execute( - "ifconfig {interface} up".format( - interface=nic_details['interface'][index])) - - # 4: Create and start the VM - self.connection.put(cfg_sriov, cfg_sriov) - time.sleep(10) - err, out = self.check_output("virsh list --name | grep -i vm1") + # Todo: NFVi deploy (sriov, vswitch, ovs etc) based on the config. + StandaloneContextHelper.install_req_libs(self.connection) + self.networks = StandaloneContextHelper.get_nic_details(self.connection, + self.networks, + self.dpdk_nic_bind) + self.nodes = self.setup_sriov_context() + + LOG.debug("Waiting for VM to come up...") + self.nodes = StandaloneContextHelper.wait_for_vnfs_to_start(self.connection, + self.servers, + self.nodes) + + def undeploy(self): + """don't need to undeploy""" + + if not self.vm_deploy: + return + + # Todo: NFVi undeploy (sriov, vswitch, ovs etc) based on the config. + for vm in self.vm_names: + Libvirt.check_if_vm_exists_and_delete(vm, self.connection) + + # Bind nics back to kernel + for key, ports in self.networks.items(): + # enable VFs for given... + build_vfs = "echo 0 > /sys/bus/pci/devices/{0}/sriov_numvfs" + self.connection.execute(build_vfs.format(ports.get('phy_port'))) + + def _get_server(self, attr_name): + """lookup server info by name from context + + Keyword arguments: + attr_name -- A name for a server listed in nodes config file + """ + node_name, name = self.split_name(attr_name) + if name is None or self.name != name: + return None + + matching_nodes = (n for n in self.nodes if n["name"] == node_name) try: - if out == "vm1": - log.info("VM is already present") - else: - # FIXME: launch through libvirt - log.info("virsh create ...") - err, out, _ = self.connection.execute( - "virsh create /tmp/vm_sriov.xml") - time.sleep(10) - log.error("err : {0}".format(err)) - log.error("{0}".format(_)) - log.debug("out : {0}".format(out)) - except ValueError: - raise - - # 5: Tunning for better performace - self.pin_vcpu(pcis) - self.connection.execute( - "echo 1 > /sys/module/kvm/parameters/" - "allow_unsafe_assigned_interrupts") - self.connection.execute( - "echo never > /sys/kernel/mm/transparent_hugepage/enabled") - - def add_sriov_interface(self, index, vf_pci, vfmac, xml): - root = ET.parse(xml) - pattern = "0000:(\d+):(\d+).(\d+)" - m = re.search(pattern, vf_pci, re.MULTILINE) - device = root.find('devices') - - interface = ET.SubElement(device, 'interface') - interface.set('managed', 'yes') - interface.set('type', 'hostdev') - - mac = ET.SubElement(interface, 'mac') - mac.set('address', vfmac) - source = ET.SubElement(interface, 'source') - - addr = ET.SubElement(source, "address") - addr.set('domain', "0x0") - addr.set('bus', "{0}".format(m.group(1))) - addr.set('function', "{0}".format(m.group(3))) - addr.set('slot', "{0}".format(m.group(2))) - addr.set('type', "pci") - - vf_pci = ET.SubElement(interface, 'address') - vf_pci.set('type', 'pci') - vf_pci.set('domain', '0x0000') - vf_pci.set('bus', '0x00') - vf_pci.set('slot', '0x0{0}'.format(index + 7)) - vf_pci.set('function', '0x00') - - root.write(xml) - - # This is roughly compatible with check_output function in subprocess - # module which is only available in python 2.7 - def check_output(self, cmd, stderr=None): - # Run a command and capture its output - err, out, _ = self.connection.execute(cmd) - return err, out - - def get_virtual_devices(self, pci): - pf_vfs = {} - err, extra_info = self.check_output( - "cat /sys/bus/pci/devices/{0}/virtfn0/uevent".format(pci)) - pattern = "PCI_SLOT_NAME=(?P<name>[0-9:.\s.]+)" - m = re.search(pattern, extra_info, re.MULTILINE) - - if m: - pf_vfs.update({pci: str(m.group(1).rstrip())}) - log.info("pf_vfs : {0}".format(pf_vfs)) - return pf_vfs - - def get_vf_datas(self, key, value, vfmac): - vfret = {} - pattern = "0000:(\d+):(\d+).(\d+)" - - vfret["mac"] = vfmac - vfs = self.get_virtual_devices(value) - log.info("vfs: {0}".format(vfs)) - for k, v in vfs.items(): - m = re.search(pattern, k, re.MULTILINE) - m1 = re.search(pattern, value, re.MULTILINE) - if m.group(1) == m1.group(1): - vfret["vf_pci"] = str(v) - break + # A clone is created in order to avoid affecting the + # original one. + node = dict(next(matching_nodes)) + except StopIteration: + return None - return vfret - - def read_from_file(self, filename): - data = "" - with open(filename, 'r') as the_file: - data = the_file.read() - return data - - def write_to_file(self, filename, content): - with open(filename, 'w') as the_file: - the_file.write(content) - - def pin_vcpu(self, pcis): - nodes = self.get_numa_nodes() - log.info("{0}".format(nodes)) - num_nodes = len(nodes) - for i in range(0, 10): - self.connection.execute( - "virsh vcpupin vm1 {0} {1}".format( - i, nodes[str(num_nodes - 1)][i % len(nodes[str(num_nodes - 1)])])) - - def get_numa_nodes(self): - nodes_sysfs = glob.iglob("/sys/devices/system/node/node*") - nodes = {} - for node_sysfs in nodes_sysfs: - num = os.path.basename(node_sysfs).replace("node", "") - with open(os.path.join(node_sysfs, "cpulist")) as cpulist_file: - cpulist = cpulist_file.read().strip() - nodes[num] = self.split_cpu_list(cpulist) - log.info("nodes: {0}".format(nodes)) - return nodes + try: + duplicate = next(matching_nodes) + except StopIteration: + pass + else: + raise ValueError("Duplicate nodes!!! Nodes: %s %s", + (node, duplicate)) - def split_cpu_list(self, cpu_list): - if cpu_list: - ranges = cpu_list.split(',') - bounds = ([int(b) for b in r.split('-')] for r in ranges) - range_objects =\ - (range(bound[0], bound[1] + 1 if len(bound) == 2 - else bound[0] + 1) for bound in bounds) + node["name"] = attr_name + return node + + def _get_network(self, attr_name): + if not isinstance(attr_name, collections.Mapping): + network = self.networks.get(attr_name) - return sorted(itertools.chain.from_iterable(range_objects)) - else: - return [] - - def destroy_vm(self): - host_driver = self.sriov[0]["phy_driver"] - err, out = self.check_output("virsh list --name | grep -i vm1") - log.info("{0}".format(out)) - if err == 0: - self.connection.execute("virsh shutdown vm1") - self.connection.execute("virsh destroy vm1") - self.check_output("rmmod {0}".format(host_driver))[1].splitlines() - self.check_output("modprobe {0}".format(host_driver))[ - 1].splitlines() else: - log.error("error : {0}".format(err)) + # Don't generalize too much Just support vld_id + vld_id = attr_name.get('vld_id', {}) + # for standalone context networks are dicts + iter1 = (n for n in self.networks.values() if n.get('vld_id') == vld_id) + network = next(iter1, None) + + if network is None: + return None + + result = { + # name is required + "name": network["name"], + "vld_id": network.get("vld_id"), + "segmentation_id": network.get("segmentation_id"), + "network_type": network.get("network_type"), + "physical_network": network.get("physical_network"), + } + return result + + def configure_nics_for_sriov(self): + vf_cmd = "ip link set {0} vf 0 mac {1}" + for key, ports in self.networks.items(): + vf_pci = [] + host_driver = ports.get('driver') + if host_driver not in self.drivers: + self.connection.execute("rmmod %svf" % host_driver) + self.drivers.append(host_driver) + + # enable VFs for given... + build_vfs = "echo 1 > /sys/bus/pci/devices/{0}/sriov_numvfs" + self.connection.execute(build_vfs.format(ports.get('phy_port'))) + + # configure VFs... + mac = StandaloneContextHelper.get_mac_address() + interface = ports.get('interface') + if interface is not None: + self.connection.execute(vf_cmd.format(interface, mac)) + + vf_pci = self.get_vf_data('vf_pci', ports.get('phy_port'), mac, interface) + ports.update({ + 'vf_pci': vf_pci, + 'mac': mac + }) + + LOG.info("Ports %s" % self.networks) + + def _enable_interfaces(self, index, idx, vfs, cfg): + vf = self.networks[vfs[0]] + vpci = PciAddress.parse_address(vf['vpci'].strip(), multi_line=True) + # Generate the vpci for the interfaces + slot = index + idx + 10 + vf['vpci'] = \ + "{}:{}:{:02x}.{}".format(vpci.domain, vpci.bus, slot, vpci.function) + Libvirt.add_sriov_interfaces( + vf['vpci'], vf['vf_pci']['vf_pci'], vf['mac'], str(cfg)) + self.connection.execute("ifconfig %s up" % vf['interface']) + + def setup_sriov_context(self): + nodes = [] + + # 1 : modprobe host_driver with num_vfs + self.configure_nics_for_sriov() + + for index, (key, vnf) in enumerate(OrderedDict(self.servers).items()): + cfg = '/tmp/vm_sriov_%s.xml' % str(index) + vm_name = "vm_%s" % str(index) + + # 1. Check and delete VM if already exists + Libvirt.check_if_vm_exists_and_delete(vm_name, self.connection) + + vcpu, mac = Libvirt.build_vm_xml(self.connection, self.vm_flavor, cfg, vm_name, index) + # 2: Cleanup already available VMs + for idx, (vkey, vfs) in enumerate(OrderedDict(vnf["network_ports"]).items()): + if vkey == "mgmt": + continue + self._enable_interfaces(index, idx, vfs, cfg) + + # copy xml to target... + self.connection.put(cfg, cfg) + + # FIXME: launch through libvirt + LOG.info("virsh create ...") + Libvirt.virsh_create_vm(self.connection, cfg) + + # 5: Tunning for better performace + Libvirt.pin_vcpu_for_perf(self.connection, vm_name, vcpu) + self.vm_names.append(vm_name) + + # build vnf node details + nodes.append(self.vnf_node.generate_vnf_instance(self.vm_flavor, + self.networks, + self.host_mgmt.get('ip'), + key, vnf, mac)) + + return nodes + + def get_vf_data(self, key, value, vfmac, pfif): + vf_data = { + "mac": vfmac, + "pf_if": pfif + } + vfs = StandaloneContextHelper.get_virtual_devices(self.connection, value) + for k, v in vfs.items(): + m = PciAddress.parse_address(k.strip(), multi_line=True) + m1 = PciAddress.parse_address(value.strip(), multi_line=True) + if m.bus == m1.bus: + vf_data.update({"vf_pci": str(v)}) + break + + return vf_data diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index 75703cf50..53298d8d3 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -125,9 +125,10 @@ class Task(object): # pragma: no cover except KeyboardInterrupt: raise except Exception: - LOG.exception("Running test case %s failed!", case_name) + LOG.error('Testcase: "%s" FAILED!!!', case_name, exc_info=True) testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []} else: + LOG.info('Testcase: "%s" SUCCESS!!!', case_name) testcases[case_name] = {'criteria': 'PASS', 'tc_data': data} if args.keep_deploy: @@ -259,23 +260,23 @@ class Task(object): # pragma: no cover # Wait for runners to finish for runner in runners: - status = runner_join(runner) + status = runner_join(runner, self.outputs, result) if status != 0: - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) LOG.info("Runner ended, output in %s", output_file) else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): runner = self.run_one_scenario(scenario, output_file) - status = runner_join(runner) + status = runner_join(runner, self.outputs, result) if status != 0: - LOG.error('Scenario: %s ERROR', scenario.get('type')) - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) + LOG.error('Scenario NO.%s: "%s" ERROR!', + scenarios.index(scenario) + 1, + scenario.get('type')) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) LOG.info("Runner ended, output in %s", output_file) # Abort background runners @@ -284,15 +285,13 @@ class Task(object): # pragma: no cover # Wait for background runners to finish for runner in background_runners: - status = runner.join(JOIN_TIMEOUT) + status = runner.join(self.outputs, result, JOIN_TIMEOUT) if status is None: # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) - runner.join(JOIN_TIMEOUT) + runner.join(self.outputs, result, JOIN_TIMEOUT) base_runner.Runner.release(runner) - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) print("Background task ended") return result @@ -325,23 +324,30 @@ class Task(object): # pragma: no cover # TODO support get multi hosts/vms info context_cfg = {} - if "host" in scenario_cfg: - context_cfg['host'] = Context.get_server(scenario_cfg["host"]) + server_name = scenario_cfg.get('options', {}).get('server_name', {}) - if "target" in scenario_cfg: - if is_ip_addr(scenario_cfg["target"]): - context_cfg['target'] = {} - context_cfg['target']["ipaddr"] = scenario_cfg["target"] + def config_context_target(cfg): + target = cfg['target'] + if is_ip_addr(target): + context_cfg['target'] = {"ipaddr": target} else: - context_cfg['target'] = Context.get_server( - scenario_cfg["target"]) - if self._is_same_heat_context(scenario_cfg["host"], - scenario_cfg["target"]): - context_cfg["target"]["ipaddr"] = \ - context_cfg["target"]["private_ip"] + context_cfg['target'] = Context.get_server(target) + if self._is_same_context(cfg["host"], target): + context_cfg['target']["ipaddr"] = context_cfg['target']["private_ip"] else: - context_cfg["target"]["ipaddr"] = \ - context_cfg["target"]["ip"] + context_cfg['target']["ipaddr"] = context_cfg['target']["ip"] + + host_name = server_name.get('host', scenario_cfg.get('host')) + if host_name: + context_cfg['host'] = Context.get_server(host_name) + + for item in [server_name, scenario_cfg]: + try: + config_context_target(item) + except KeyError: + pass + else: + break if "targets" in scenario_cfg: ip_list = [] @@ -351,8 +357,8 @@ class Task(object): # pragma: no cover context_cfg['target'] = {} else: context_cfg['target'] = Context.get_server(target) - if self._is_same_heat_context(scenario_cfg["host"], - target): + if self._is_same_context(scenario_cfg["host"], + target): ip_list.append(context_cfg["target"]["private_ip"]) else: ip_list.append(context_cfg["target"]["ip"]) @@ -370,7 +376,7 @@ class Task(object): # pragma: no cover return runner - def _is_same_heat_context(self, host_attr, target_attr): + def _is_same_context(self, host_attr, target_attr): """check if two servers are in the same heat context host_attr: either a name for a server created by yardstick or a dict with attribute name mapping when using external heat templates @@ -378,7 +384,7 @@ class Task(object): # pragma: no cover with attribute name mapping when using external heat templates """ for context in self.contexts: - if context.__context_type__ != "Heat": + if context.__context_type__ not in {"Heat", "Kubernetes"}: continue host = context._get_server(host_attr) @@ -635,9 +641,14 @@ def get_networks_from_nodes(nodes): return networks -def runner_join(runner): - """join (wait for) a runner, exit process at runner failure""" - status = runner.join() +def runner_join(runner, outputs, result): + """join (wait for) a runner, exit process at runner failure + :param outputs: + :type outputs: dict + :param result: + :type result: list + """ + status = runner.join(outputs, result) base_runner.Runner.release(runner) return status @@ -669,25 +680,24 @@ def parse_task_args(src_name, args): def change_server_name(scenario, suffix): - try: - host = scenario['host'] - except KeyError: - pass - else: - try: - host['name'] += suffix - except TypeError: - scenario['host'] += suffix - try: - target = scenario['target'] - except KeyError: - pass - else: + def add_suffix(cfg, key): try: - target['name'] += suffix - except TypeError: - scenario['target'] += suffix + value = cfg[key] + except KeyError: + pass + else: + try: + value['name'] += suffix + except TypeError: + cfg[key] += suffix + + server_name = scenario.get('options', {}).get('server_name', {}) + + add_suffix(scenario, 'host') + add_suffix(scenario, 'target') + add_suffix(server_name, 'host') + add_suffix(server_name, 'target') try: key = 'targets' diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 7898ae2bc..3ff064ae1 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -138,8 +138,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if errors and sla_action is None: break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) LOG.info("worker END") + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class ArithmeticRunner(base.Runner): diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index a69811f8a..3ecf67736 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -24,6 +24,9 @@ import subprocess import time import traceback + +from six.moves.queue import Empty + import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario @@ -116,7 +119,7 @@ class Runner(object): @staticmethod def terminate_all(): """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners") + log.debug("Terminating all runners", exc_info=True) # release dumper process as some errors before any runner is created if not Runner.runners: @@ -205,9 +208,21 @@ class Runner(object): """Abort the execution of a scenario""" self.aborted.set() - def join(self, timeout=None): - self.process.join(timeout) + QUEUE_JOIN_INTERVAL = 5 + + def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL): + while self.process.exitcode is None: + # drain the queue while we are running otherwise we won't terminate + outputs.update(self.get_output()) + result.extend(self.get_result()) + self.process.join(interval) + # drain after the process has exited + outputs.update(self.get_output()) + result.extend(self.get_result()) + + self.process.terminate() if self.periodic_action_process: + self.periodic_action_process.join(1) self.periodic_action_process.terminate() self.periodic_action_process = None @@ -217,11 +232,19 @@ class Runner(object): def get_output(self): result = {} while not self.output_queue.empty(): - result.update(self.output_queue.get()) + log.debug("output_queue size %s", self.output_queue.qsize()) + try: + result.update(self.output_queue.get(True, 1)) + except Empty: + pass return result def get_result(self): result = [] while not self.result_queue.empty(): - result.append(self.result_queue.get()) + log.debug("result_queue size %s", self.result_queue.qsize()) + try: + result.append(self.result_queue.get(True, 1)) + except Empty: + pass return result diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index c2c6a8f19..75942766d 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -54,6 +54,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, sla_action = scenario_cfg["sla"].get("action", "assert") start = time.time() + timeout = start + duration while True: LOG.debug("runner=%(runner)s seq=%(sequence)s START", @@ -71,9 +72,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg, elif sla_action == "monitor": LOG.warning("SLA validation failed: %s", assertion.args) errors = assertion.args - except Exception as e: + # catch all exceptions because with multiprocessing we can have un-picklable exception + # problems https://bugs.python.org/issue9400 + except Exception: errors = traceback.format_exc() - LOG.exception(e) + LOG.exception("") else: if result: output_queue.put(result) @@ -94,12 +97,22 @@ def _worker_process(queue, cls, method_name, scenario_cfg, sequence += 1 - if (errors and sla_action is None) or \ - (time.time() - start > duration or aborted.is_set()): + if (errors and sla_action is None) or time.time() > timeout or aborted.is_set(): LOG.info("Worker END") break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class DurationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py index afff27d75..01e76c6f4 100755 --- a/yardstick/benchmark/runners/dynamictp.py +++ b/yardstick/benchmark/runners/dynamictp.py @@ -141,7 +141,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.debug("iterator: %s iterations: %s", iterator, iterations) if "teardown" in run_step: - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) class IterationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 50fe106bd..4a7439588 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -90,7 +90,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.exception(e) else: if result: - output_queue.put(result) + LOG.debug("output_queue.put %s", result) + output_queue.put(result, True, 1) time.sleep(interval) @@ -101,7 +102,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - queue.put(benchmark_output) + LOG.debug("queue.put, %s", benchmark_output) + queue.put(benchmark_output, True, 1) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], @@ -114,7 +116,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.info("worker END") break if "teardown" in run_step: - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class IterationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index 68e272c57..f08ca5dde 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -105,8 +105,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if (errors and sla_action is None) or aborted.is_set(): break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) LOG.info("worker END") + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class SequenceRunner(base.Runner): diff --git a/yardstick/benchmark/scenarios/availability/attacker/attacker_baremetal.py b/yardstick/benchmark/scenarios/availability/attacker/attacker_baremetal.py index 50d44c1ca..979e3ab14 100644 --- a/yardstick/benchmark/scenarios/availability/attacker/attacker_baremetal.py +++ b/yardstick/benchmark/scenarios/availability/attacker/attacker_baremetal.py @@ -40,6 +40,21 @@ class BaremetalAttacker(BaseAttacker): self.connection = ssh.SSH.from_node(host, defaults={"user": "root"}) self.connection.wait(timeout=600) LOG.debug("ssh host success!") + + jump_host_name = self._config.get("jump_host", None) + self.jump_connection = None + if jump_host_name is not None: + jump_host = self._context.get(jump_host_name, None) + + LOG.debug("jump_host ip:%s user:%s", jump_host['ip'], jump_host['user']) + self.jump_connection = ssh.SSH.from_node( + jump_host, + # why do we allow pwd for password? + defaults={"user": "root", "password": jump_host.get("pwd")} + ) + self.jump_connection.wait(timeout=600) + LOG.debug("ssh jump host success!") + self.host_ip = host['ip'] self.ipmi_ip = host.get("ipmi_ip", None) @@ -49,6 +64,7 @@ class BaremetalAttacker(BaseAttacker): self.fault_cfg = BaseAttacker.attacker_cfgs.get('bare-metal-down') self.check_script = self.get_script_fullpath( self.fault_cfg['check_script']) + self.inject_script = self.get_script_fullpath(self.fault_cfg['inject_script']) self.recovery_script = self.get_script_fullpath( self.fault_cfg['recovery_script']) @@ -70,39 +86,27 @@ class BaremetalAttacker(BaseAttacker): return True def inject_fault(self): - exit_status, stdout, stderr = self.connection.execute( - "sudo shutdown -h now") - LOG.debug("inject fault ret: %s out:%s err:%s", - exit_status, stdout, stderr) - if not exit_status: - LOG.info("inject fault success") + LOG.info("Inject fault START") + cmd = "sudo /bin/bash -s {0} {1} {2} {3}".format( + self.ipmi_ip, self.ipmi_user, self.ipmi_pwd, "off") + with open(self.inject_script, "r") as stdin_file: + if self.jump_connection is not None: + LOG.info("Power off node via IPMI") + self.jump_connection.execute(cmd, stdin=stdin_file) + else: + _execute_shell_command(cmd, stdin=stdin_file) + LOG.info("Inject fault END") def recover(self): - jump_host_name = self._config.get("jump_host", None) - self.jump_connection = None - if jump_host_name is not None: - host = self._context.get(jump_host_name, None) - - LOG.debug("jump_host ip:%s user:%s", host['ip'], host['user']) - self.jump_connection = ssh.SSH.from_node( - host, - # why do we allow pwd for password? - defaults={"user": "root", "password": host.get("pwd")} - ) - self.jump_connection.wait(timeout=600) - LOG.debug("ssh jump host success!") - - if self.jump_connection is not None: - with open(self.recovery_script, "r") as stdin_file: - self.jump_connection.execute( - "sudo /bin/bash -s {0} {1} {2} {3}".format( - self.ipmi_ip, self.ipmi_user, self.ipmi_pwd, "on"), - stdin=stdin_file) - else: - _execute_shell_command( - "sudo /bin/bash -s {0} {1} {2} {3}".format( - self.ipmi_ip, self.ipmi_user, self.ipmi_pwd, "on"), - stdin=open(self.recovery_script, "r")) + LOG.info("Recover fault START") + cmd = "sudo /bin/bash -s {0} {1} {2} {3}".format( + self.ipmi_ip, self.ipmi_user, self.ipmi_pwd, "on") + with open(self.recovery_script, "r") as stdin_file: + if self.jump_connection is not None: + self.jump_connection.execute(cmd, stdin=stdin_file) + else: + _execute_shell_command(cmd, stdin=stdin_file) + LOG.info("Recover fault END") def _test(): # pragma: no cover diff --git a/yardstick/benchmark/scenarios/availability/attacker_conf.yaml b/yardstick/benchmark/scenarios/availability/attacker_conf.yaml index ee7ea7d83..5f43a701a 100644 --- a/yardstick/benchmark/scenarios/availability/attacker_conf.yaml +++ b/yardstick/benchmark/scenarios/availability/attacker_conf.yaml @@ -23,6 +23,7 @@ kill-lxc-process: bare-metal-down: check_script: ha_tools/check_host_ping.bash + inject_script: ha_tools/ipmi_power.bash recovery_script: ha_tools/ipmi_power.bash stop-service: diff --git a/yardstick/benchmark/scenarios/networking/netperf.py b/yardstick/benchmark/scenarios/networking/netperf.py index 08d5dd166..a8d9010ed 100755 --- a/yardstick/benchmark/scenarios/networking/netperf.py +++ b/yardstick/benchmark/scenarios/networking/netperf.py @@ -114,6 +114,10 @@ class Netperf(base.Scenario): cmd_args += " %s %s" % (option_pair[1], options[option_pair[0]]) + # Enable IP routing for UDP_STREAM test + if testname == "UDP_STREAM": + cmd_args += " -R 1" + cmd = "sudo bash netperf.sh %s" % (cmd_args) LOG.debug("Executing command: %s", cmd) status, stdout, stderr = self.client.execute(cmd) diff --git a/yardstick/benchmark/scenarios/networking/netperf_install_arm64.patch b/yardstick/benchmark/scenarios/networking/netperf_install_arm64.patch new file mode 100644 index 000000000..b41c1d207 --- /dev/null +++ b/yardstick/benchmark/scenarios/networking/netperf_install_arm64.patch @@ -0,0 +1,42 @@ +diff --git a/yardstick/benchmark/scenarios/networking/netperf_install.bash b/yardstick/benchmark/scenarios/networking/netperf_install.bash +index 0e3808f..f9362eb 100755 +--- a/yardstick/benchmark/scenarios/networking/netperf_install.bash ++++ b/yardstick/benchmark/scenarios/networking/netperf_install.bash +@@ -1,9 +1,9 @@ + #!/bin/bash + + ############################################################################## +-# Copyright (c) 2016 Huawei Technologies Co.,Ltd and others. ++# Copyright (c) 2017, Arm Limited. All rights reserved. + # +-# All rights reserved. This program and the accompanying materials ++# This program and the accompanying materials + # are made available under the terms of the Apache License, Version 2.0 + # which accompanies this distribution, and is available at + # http://www.apache.org/licenses/LICENSE-2.0 +@@ -19,21 +19,11 @@ then + fi + + echo "===Install netperf before test begin!!!===" +-cp /etc/apt/sources.list /etc/apt/sources.list_bkp +-cp /etc/resolv.conf /etc/resolv.conf_bkp +-echo "nameserver 8.8.4.4" >> /etc/resolv.conf +- +-cat <<EOF >/etc/apt/sources.list +-deb http://archive.ubuntu.com/ubuntu/ trusty main restricted universe multiverse +-deb http://archive.ubuntu.com/ubuntu/ trusty-security main restricted universe multiverse +-deb http://archive.ubuntu.com/ubuntu/ trusty-updates main restricted universe multiverse +-deb http://archive.ubuntu.com/ubuntu/ trusty-proposed main restricted universe multiverse +-deb http://archive.ubuntu.com/ubuntu/ trusty-backports main restricted universe multiverse +-EOF +- +-sudo apt-get update +-sudo apt-get install -y netperf + ++apt-get update -y ++apt-get install -y wget ++wget http://launchpadlibrarian.net/155043952/netperf_2.6.0-2_arm64.deb ++dpkg -i ./netperf_2.6.0-2_arm64.deb + service netperf start + + echo "===Install netperf before test end!!!===" diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py index 450f83f6a..3f61116bc 100644 --- a/yardstick/benchmark/scenarios/networking/vnf_generic.py +++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py @@ -140,8 +140,15 @@ class NetworkServiceTestCase(base.Scenario): def _get_ip_flow_range(self, ip_start_range): + # IP range is specified as 'x.x.x.x-y.y.y.y' + if isinstance(ip_start_range, six.string_types): + return ip_start_range + node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0')) - if node_name is not None: + if node_name is None: + # we are manually specifying the range + ip_addr_range = range_or_interface + else: node = self.context_cfg["nodes"].get(node_name, {}) try: # the ip_range is the interface name @@ -163,9 +170,6 @@ class NetworkServiceTestCase(base.Scenario): LOG.warning("Only single IP in range %s", ipaddr) # fall back to single IP range ip_addr_range = ip - else: - # we are manually specifying the range - ip_addr_range = range_or_interface return ip_addr_range def _get_traffic_flow(self): @@ -542,7 +546,11 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \ # we assume OrderedDict for consistenct in instantiation for node_name, node in context_cfg["nodes"].items(): LOG.debug(node) - file_name = node["VNF model"] + try: + file_name = node["VNF model"] + except KeyError: + LOG.debug("no model for %s, skipping", node_name) + continue file_path = scenario_cfg['task_path'] with open_relative_file(file_name, file_path) as stream: vnf_model = stream.read() diff --git a/yardstick/common/httpClient.py b/yardstick/common/httpClient.py index 11c2d752d..54f7be670 100644 --- a/yardstick/common/httpClient.py +++ b/yardstick/common/httpClient.py @@ -9,6 +9,7 @@ from __future__ import absolute_import import logging +import time from oslo_serialization import jsonutils import requests @@ -18,18 +19,21 @@ logger = logging.getLogger(__name__) class HttpClient(object): - def post(self, url, data): + def post(self, url, data, timeout=0): data = jsonutils.dump_as_bytes(data) headers = {'Content-Type': 'application/json'} - try: - response = requests.post(url, data=data, headers=headers) - result = response.json() - logger.debug('The result is: %s', result) - - return result - except Exception as e: - logger.debug('Failed: %s', e) - raise + t_end = time.time() + timeout + while True: + try: + response = requests.post(url, data=data, headers=headers) + result = response.json() + logger.debug('The result is: %s', result) + return result + except Exception: + if time.time() > t_end: + logger.exception('') + raise + time.sleep(1) def get(self, url): response = requests.get(url) diff --git a/yardstick/common/kubernetes_utils.py b/yardstick/common/kubernetes_utils.py index e4c232830..0cf7b9eab 100644 --- a/yardstick/common/kubernetes_utils.py +++ b/yardstick/common/kubernetes_utils.py @@ -28,6 +28,60 @@ def get_core_api(): # pragma: no cover return client.CoreV1Api() +def get_node_list(**kwargs): # pragma: no cover + core_v1_api = get_core_api() + try: + return core_v1_api.list_node(**kwargs) + except ApiException: + LOG.exception('Get node list failed') + raise + + +def create_service(template, + namespace='default', + wait=False, + **kwargs): # pragma: no cover + core_v1_api = get_core_api() + metadata = client.V1ObjectMeta(**template.get('metadata', {})) + + ports = [client.V1ServicePort(**port) for port in + template.get('spec', {}).get('ports', [])] + template['spec']['ports'] = ports + spec = client.V1ServiceSpec(**template.get('spec', {})) + + service = client.V1Service(metadata=metadata, spec=spec) + + try: + core_v1_api.create_namespaced_service('default', service) + except ApiException: + LOG.exception('Create Service failed') + raise + + +def delete_service(name, + namespace='default', + **kwargs): # pragma: no cover + core_v1_api = get_core_api() + try: + core_v1_api.delete_namespaced_service(name, namespace, **kwargs) + except ApiException: + LOG.exception('Delete Service failed') + + +def get_service_list(namespace='default', **kwargs): + core_v1_api = get_core_api() + try: + return core_v1_api.list_namespaced_service(namespace, **kwargs) + except ApiException: + LOG.exception('Get Service list failed') + raise + + +def get_service_by_name(name): # pragma: no cover + service_list = get_service_list() + return next((s.spec for s in service_list.items if s.metadata.name == name), None) + + def create_replication_controller(template, namespace='default', wait=False, @@ -135,3 +189,8 @@ def get_pod_list(namespace='default'): # pragma: no cover except ApiException: LOG.exception('Get pod list failed') raise + + +def get_pod_by_name(name): # pragma: no cover + pod_list = get_pod_list() + return next((n for n in pod_list.items if n.metadata.name.startswith(name)), None) diff --git a/yardstick/network_services/helpers/samplevnf_helper.py b/yardstick/network_services/helpers/samplevnf_helper.py index 5f87f788d..8159ec9f2 100644 --- a/yardstick/network_services/helpers/samplevnf_helper.py +++ b/yardstick/network_services/helpers/samplevnf_helper.py @@ -226,7 +226,7 @@ class MultiPortConfig(object): self.tmp_file = os.path.join("/tmp", tmp_file) self.pktq_out_os = [] self.socket = socket - self.start_core = "" + self.start_core = 0 self.pipeline_counter = "" self.txrx_pipeline = "" self._port_pairs = None @@ -268,9 +268,8 @@ class MultiPortConfig(object): def update_timer(self): timer_tpl = self.get_config_tpl_data('TIMER') - timer_tpl['core'] = self.gen_core(self.start_core) + timer_tpl['core'] = self.gen_core(0) self.update_write_parser(timer_tpl) - self.start_core += 1 def get_config_tpl_data(self, type_value): for section in self.read_parser.sections(): @@ -289,7 +288,6 @@ class MultiPortConfig(object): def init_write_parser_template(self, type_value='ARPICMP'): for section in self.read_parser.sections(): if type_value == self.parser_get(self.read_parser, section, 'type', object()): - self.start_core = self.read_parser.getint(section, 'core') self.pipeline_counter = self.read_parser.getint(section, 'core') self.txrx_pipeline = self.read_parser.getint(section, 'core') return @@ -385,7 +383,7 @@ class MultiPortConfig(object): self.port_pair_list) arpicmp_data = { - 'core': self.gen_core(self.start_core), + 'core': self.gen_core(0), 'pktq_in': swq_in_str, 'pktq_out': swq_out_str, # we need to disable ports_mac_list? @@ -416,7 +414,7 @@ class MultiPortConfig(object): return arpicmp_data - def generate_final_txrx_data(self): + def generate_final_txrx_data(self, core=0): swq_start = self.swq - self.ports_len * self.worker_threads txq_start = 0 @@ -431,7 +429,7 @@ class MultiPortConfig(object): 'pktq_in': swq_str, 'pktq_out': txq_str, 'pipeline_txrx_type': 'TXTX', - 'core': self.gen_core(self.start_core), + 'core': self.gen_core(core), } pktq_in = rxtx_data['pktq_in'] pktq_in = '{0} {1}'.format(pktq_in, self.pktq_out_os[self.lb_index - 1]) @@ -452,7 +450,7 @@ class MultiPortConfig(object): 'core': self.gen_core(self.start_core), } self.pipeline_counter += 1 - return txrx_data + return self.start_core, txrx_data def generate_lb_data(self): pktq_in = self.make_range_str('SWQ{}', self.swq, offset=self.ports_len) @@ -519,7 +517,6 @@ class MultiPortConfig(object): self.arpicmp_tpl.update(arpicmp_data) self.update_write_parser(self.arpicmp_tpl) - self.start_core += 1 if self.vnf_type == 'CGNAPT': self.pipeline_counter += 1 self.update_timer() @@ -536,7 +533,7 @@ class MultiPortConfig(object): self.ports_len = port_pair_count * 2 self.set_priv_que_handler() if self.lb_config == 'SW': - txrx_data = self.generate_initial_txrx_data() + core, txrx_data = self.generate_initial_txrx_data() self.txrx_tpl.update(txrx_data) self.update_write_parser(self.txrx_tpl) self.start_core += 1 @@ -560,10 +557,9 @@ class MultiPortConfig(object): self.generate_next_core_id() if self.lb_config == 'SW': - txrx_data = self.generate_final_txrx_data() + txrx_data = self.generate_final_txrx_data(core) self.txrx_tpl.update(txrx_data) self.update_write_parser(self.txrx_tpl) - self.start_core += 1 self.vnf_tpl = self.get_config_tpl_data(self.vnf_type) def generate_config(self): diff --git a/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py b/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py index 4b906508c..70ce4ff03 100644 --- a/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py +++ b/yardstick/network_services/libs/ixia_libs/IxNet/IxNet.py @@ -113,10 +113,10 @@ class IxNextgen(object): } MODE_SEEDS_MAP = { - 0: ('uplink_0', ['256', '2048']), + 0: ('uplink', ['256', '2048']), } - MODE_SEEDS_DEFAULT = 'downlink_0', ['2048', '256'] + MODE_SEEDS_DEFAULT = 'downlink', ['2048', '256'] @staticmethod def find_view_obj(view_name, views): @@ -125,24 +125,27 @@ class IxNextgen(object): @staticmethod def get_config(tg_cfg): + card = [] + port = [] external_interface = tg_cfg["vdu"][0]["external-interface"] - card_port0 = external_interface[0]["virtual-interface"]["vpci"] - card_port1 = external_interface[1]["virtual-interface"]["vpci"] - card0, port0 = card_port0.split(':')[:2] - card1, port1 = card_port1.split(':')[:2] + for intf in external_interface: + card_port0 = intf["virtual-interface"]["vpci"] + card0, port0 = card_port0.split(':')[:2] + card.append(card0) + port.append(port0) + cfg = { 'py_lib_path': tg_cfg["mgmt-interface"]["tg-config"]["py_lib_path"], 'machine': tg_cfg["mgmt-interface"]["ip"], 'port': tg_cfg["mgmt-interface"]["tg-config"]["tcl_port"], 'chassis': tg_cfg["mgmt-interface"]["tg-config"]["ixchassis"], - 'card1': card0, - 'port1': port0, - 'card2': card1, - 'port2': port1, + 'cards': card, + 'ports': port, 'output_dir': tg_cfg["mgmt-interface"]["tg-config"]["dut_result_dir"], 'version': tg_cfg["mgmt-interface"]["tg-config"]["version"], 'bidir': True, } + return cfg def __init__(self, ixnet=None): @@ -183,9 +186,13 @@ class IxNextgen(object): self.set_random_ip_multi_attribute(ip, seeds[1], fixed_bits, random_mask, l3_count) def add_ip_header(self, params, version): - for it, ep, i in self.iter_over_get_lists('/traffic', 'trafficItem', "configElement"): - mode, seeds = self.MODE_SEEDS_MAP.get(i, self.MODE_SEEDS_DEFAULT) - l3 = params[mode]['outer_l3'] + for it, ep, i in self.iter_over_get_lists('/traffic', 'trafficItem', "configElement", 1): + iter1 = (v['outer_l3'] for v in params.values() if str(v['id']) == str(i)) + try: + l3 = next(iter1, {}) + seeds = self.MODE_SEEDS_MAP.get(i, self.MODE_SEEDS_DEFAULT)[1] + except (KeyError, IndexError): + continue for ip, ip_bits, _ in self.iter_over_get_lists(ep, 'stack', 'field'): self.set_random_ip_multi_attributes(ip_bits, version, seeds, l3) @@ -222,10 +229,11 @@ class IxNextgen(object): def ix_assign_ports(self): vports = self.ixnet.getList(self.ixnet.getRoot(), 'vport') - ports = [ - (self._cfg['chassis'], self._cfg['card1'], self._cfg['port1']), - (self._cfg['chassis'], self._cfg['card2'], self._cfg['port2']), - ] + ports = [] + + chassis = self._cfg['chassis'] + ports = [(chassis, card, port) for card, port in + zip(self._cfg['cards'], self._cfg['ports'])] vport_list = self.ixnet.getList("/", "vport") self.ixnet.execute('assignPorts', ports, [], vport_list, True) @@ -246,8 +254,10 @@ class IxNextgen(object): helper = TrafficStreamHelper(traffic_item, stream, param_id) self.ixnet.setMultiAttribute(helper.transmissionControl, - '-type', '{0}'.format(param['traffic_type']), - '-duration', '{0}'.format(param['duration'])) + '-type', '{0}'.format(param.get('traffic_type', + 'continuous')), + '-duration', '{0}'.format(param.get('duration', + "30"))) stream_frame_rate_path = helper.frameRate self.ixnet.setMultiAttribute(stream_frame_rate_path, '-rate', param['iload']) @@ -276,10 +286,10 @@ class IxNextgen(object): def update_ether_multi_attributes(self, ether, l2): if "ethernet.header.destinationAddress" in ether: - self.update_ether_multi_attribute(ether, str(l2['dstmac'])) + self.update_ether_multi_attribute(ether, str(l2.get('dstmac', "00:00:00:00:00:02"))) if "ethernet.header.sourceAddress" in ether: - self.update_ether_multi_attribute(ether, str(l2['srcmac'])) + self.update_ether_multi_attribute(ether, str(l2.get('srcmac', "00:00:00:00:00:01"))) def ix_update_ether(self, params): for ti, ep, index in self.iter_over_get_lists('/traffic', 'trafficItem', diff --git a/yardstick/network_services/nfvi/collectd.conf b/yardstick/network_services/nfvi/collectd.conf index 3928dcbca..22bd5d49d 100644 --- a/yardstick/network_services/nfvi/collectd.conf +++ b/yardstick/network_services/nfvi/collectd.conf @@ -15,7 +15,7 @@ Hostname "nsb_stats" FQDNLookup true -Interval {interval} +Interval {{ interval }} ############################################################################## # LoadPlugin section # @@ -24,7 +24,9 @@ Interval {interval} ############################################################################## #LoadPlugin syslog -{loadplugin} +{% for plugin in loadplugins %} +LoadPlugin {{ plugin }} +{% endfor %} ############################################################################## # Plugin configuration # @@ -38,42 +40,31 @@ Interval {interval} #</Plugin> <Plugin amqp> - <Publish "name"> - Host "0.0.0.0" - Port "5672" - VHost "/" - User "admin" - Password "admin" - Exchange "amq.fanout" - RoutingKey "collectd" - Persistent false - StoreRates false - ConnectionRetryDelay 0 - </Publish> + <Publish "name"> + Host "0.0.0.0" + Port "5672" + VHost "/" + User "admin" + Password "admin" + Exchange "amq.fanout" + RoutingKey "collectd" + Persistent false + StoreRates false + ConnectionRetryDelay 0 + </Publish> </Plugin> <Plugin cpu> - ReportByCpu true - ReportByState true - ValuesPercentage true + ReportByCpu true + ReportByState true + ValuesPercentage true </Plugin> <Plugin memory> - ValuesAbsolute true - ValuesPercentage false -</Plugin> - -<Plugin "intel_rdt"> - Cores "" + ValuesAbsolute true + ValuesPercentage false </Plugin> -<Plugin intel_pmu> - ReportHardwareCacheEvents true - ReportKernelPMUEvents true - ReportSoftwareEvents true - EventList "/root/.cache/pmu-events/GenuineIntel-6-2D-core.json" - HardwareEvents "L2_RQSTS.CODE_RD_HIT,L2_RQSTS.CODE_RD_MISS" "L2_RQSTS.ALL_CODE_RD" -</Plugin> <Plugin hugepages> ReportPerNodeHP true @@ -83,15 +74,25 @@ Interval {interval} ValuesPercentage false </Plugin> -<Plugin hugepages> - ReportPerNodeHP true - ReportRootHP true - ValuesPages true - ValuesBytes false - ValuesPercentage false + +{% if "intel_rdt" in plugins %} +<Plugin "intel_rdt"> + Cores "" +</Plugin> +{% endif %} + +{% if "intel_pmu" in plugins %} +<Plugin intel_pmu> + ReportHardwareCacheEvents true + ReportKernelPMUEvents true + ReportSoftwareEvents true + EventList "/root/.cache/pmu-events/GenuineIntel-6-2D-core.json" + HardwareEvents "L2_RQSTS.CODE_RD_HIT,L2_RQSTS.CODE_RD_MISS" "L2_RQSTS.ALL_CODE_RD" </Plugin> +{% endif %} -<Plugin dpdkstat> +{% if "dpdkstat" in plugins %} +<Plugin "dpdkstat"> <EAL> Coremask "0x1" MemoryChannels "4" @@ -100,20 +101,24 @@ Interval {interval} </EAL> SharedMemObj "dpdk_collectd_stats_0" EnabledPortMask 0xffff - {dpdk_interface} +{% for port_name in port_names %} + PortName {{ port_name }} +{% endfor %} </Plugin> +{% endif %} -<Plugin virt> - Domain "samplevnf" +{% if "virt" in plugins %} +<Plugin "virt"> +# monitor all domains </Plugin> +{% endif %} -<Plugin ovs_stats> +{% if "ovs_stats" in plugins %} +<Plugin "ovs_stats"> Port "6640" Address "127.0.0.1" Socket "/usr/local/var/run/openvswitch/db.sock" - Bridges "br0" "br_ext" +# don't specify bridges, monitor all bridges </Plugin> +{% endif %} -<Include "/etc/collectd/collectd.conf.d"> - Filter "*.conf" -</Include> diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py index f2c9d40a7..e0027bbcb 100644 --- a/yardstick/network_services/nfvi/collectd.py +++ b/yardstick/network_services/nfvi/collectd.py @@ -35,6 +35,7 @@ class AmqpConsumer(object): self._consumer_tag = None self._url = amqp_url self._queue = queue + self._queue.cancel_join_thread() def connect(self): """ connect to amqp url """ diff --git a/yardstick/network_services/nfvi/collectd.sh b/yardstick/network_services/nfvi/collectd.sh index 296c4a213..bdc5abd03 100755 --- a/yardstick/network_services/nfvi/collectd.sh +++ b/yardstick/network_services/nfvi/collectd.sh @@ -142,7 +142,8 @@ else fi modprobe msr -cp $INSTALL_NSB_BIN/collectd.conf /opt/collectd/etc/ +# we overwrite the config file during _start_collectd so don't copy it +#cp $INSTALL_NSB_BIN/collectd.conf /opt/nsb_bin/collectd/etc/ sudo service rabbitmq-server restart echo "Check if admin user already created" rabbitmqctl list_users | grep '^admin$' > /dev/null diff --git a/yardstick/network_services/nfvi/resource.py b/yardstick/network_services/nfvi/resource.py index fa32a4dcf..d807f5e46 100644 --- a/yardstick/network_services/nfvi/resource.py +++ b/yardstick/network_services/nfvi/resource.py @@ -15,16 +15,22 @@ from __future__ import absolute_import from __future__ import print_function -import tempfile + import logging +from itertools import chain + +import jinja2 import os import os.path import re import multiprocessing +import pkg_resources from oslo_config import cfg +from oslo_utils.encodeutils import safe_decode from yardstick import ssh +from yardstick.common.task_template import finalize_for_yaml from yardstick.common.utils import validate_non_string_sequence from yardstick.network_services.nfvi.collectd import AmqpConsumer from yardstick.network_services.utils import get_nsb_option @@ -34,26 +40,36 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF ZMQ_OVS_PORT = 5567 ZMQ_POLLING_TIME = 12000 -LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "intel_rdt", "memory", - "hugepages", "dpdkstat", "virt", "ovs_stats", "intel_pmu"] +LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory", + "hugepages"] class ResourceProfile(object): """ This profile adds a resource at the beginning of the test session """ + COLLECTD_CONF = "collectd.conf" + AMPQ_PORT = 5672 + DEFAULT_INTERVAL = 25 - def __init__(self, mgmt, interfaces=None, cores=None): + def __init__(self, mgmt, port_names=None, cores=None, plugins=None, interval=None): + if plugins is None: + self.plugins = {} + else: + self.plugins = plugins + if interval is None: + self.interval = self.DEFAULT_INTERVAL + else: + self.interval = interval self.enable = True self.cores = validate_non_string_sequence(cores, default=[]) self._queue = multiprocessing.Queue() self.amqp_client = None - self.interfaces = validate_non_string_sequence(interfaces, default=[]) + self.port_names = validate_non_string_sequence(port_names, default=[]) - # why the host or ip? - self.vnfip = mgmt.get("host", mgmt["ip"]) - self.connection = ssh.SSH.from_node(mgmt, overrides={"ip": self.vnfip}) - self.connection.wait() + # we need to save mgmt so we can connect to port 5672 + self.mgmt = mgmt + self.connection = ssh.AutoConnectSSH.from_node(mgmt) def check_if_sa_running(self, process): """ verify if system agent is running """ @@ -62,7 +78,7 @@ class ResourceProfile(object): def run_collectd_amqp(self): """ run amqp consumer to collect the NFVi data """ - amqp_url = 'amqp://admin:admin@{}:5672/%2F'.format(self.vnfip) + amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT) amqp = AmqpConsumer(amqp_url, self._queue) try: amqp.run() @@ -124,7 +140,9 @@ class ResourceProfile(object): } testcase = "" - for key, value in metrics.items(): + # unicode decode + decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items()) + for key, value in decoded: key_split = key.split("/") res_key_iter = (key for key in key_split if "nsb_stats" not in key) res_key0 = next(res_key_iter) @@ -176,35 +194,36 @@ class ResourceProfile(object): msg = self.parse_collectd_result(metric, self.cores) return msg - def _provide_config_file(self, bin_path, nfvi_cfg, kwargs): - with open(os.path.join(bin_path, nfvi_cfg), 'r') as cfg: - template = cfg.read() - cfg, cfg_content = tempfile.mkstemp() - with os.fdopen(cfg, "w+") as cfg: - cfg.write(template.format(**kwargs)) - cfg_file = os.path.join(bin_path, nfvi_cfg) - self.connection.put(cfg_content, cfg_file) - - def _prepare_collectd_conf(self, bin_path): + def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs): + template = pkg_resources.resource_string("yardstick.network_services.nfvi", + nfvi_cfg).decode('utf-8') + cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True, + finalize=finalize_for_yaml).render( + **template_kwargs) + # cfg_content = io.StringIO(template.format(**template_kwargs)) + cfg_file = os.path.join(config_file_path, nfvi_cfg) + # must write as root, so use sudo + self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content) + + def _prepare_collectd_conf(self, config_file_path): """ Prepare collectd conf """ - loadplugin = "\n".join("LoadPlugin {0}".format(plugin) - for plugin in LIST_PLUGINS_ENABLED) - - interfaces = "\n".join("PortName '{0[name]}'".format(interface) - for interface in self.interfaces) kwargs = { - "interval": '25', - "loadplugin": loadplugin, - "dpdk_interface": interfaces, + "interval": self.interval, + "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())), + # Optional fields PortName is descriptive only, use whatever is present + "port_names": self.port_names, + # "ovs_bridge_interfaces": ["br-int"], + "plugins": self.plugins, } - self._provide_config_file(bin_path, 'collectd.conf', kwargs) + self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs) def _start_collectd(self, connection, bin_path): LOG.debug("Starting collectd to collect NFVi stats") - connection.execute('sudo pkill -9 collectd') + connection.execute('sudo pkill -x -9 collectd') bin_path = get_nsb_option("bin_path") - collectd_path = os.path.join(bin_path, "collectd", "collectd") + collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd") + config_file_path = os.path.join(bin_path, "collectd", "etc") exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0] if exit_status != 0: LOG.warning("%s is not present disabling", collectd_path) @@ -217,7 +236,9 @@ class ResourceProfile(object): # collectd_installer, http_proxy, https_proxy)) return LOG.debug("Starting collectd to collect NFVi stats") - self._prepare_collectd_conf(bin_path) + # ensure collectd.conf.d exists to avoid error/warning + connection.execute("sudo mkdir -p /etc/collectd/collectd.conf.d") + self._prepare_collectd_conf(config_file_path) # Reset amqp queue LOG.debug("reset and setup amqp to collect data from collectd") @@ -228,7 +249,7 @@ class ResourceProfile(object): connection.execute("sudo rabbitmqctl start_app") connection.execute("sudo service rabbitmq-server restart") - LOG.debug("Creating amdin user for rabbitmq in order to collect data from collectd") + LOG.debug("Creating admin user for rabbitmq in order to collect data from collectd") connection.execute("sudo rabbitmqctl delete_user guest") connection.execute("sudo rabbitmqctl add_user admin admin") connection.execute("sudo rabbitmqctl authenticate_user admin admin") @@ -241,7 +262,11 @@ class ResourceProfile(object): def initiate_systemagent(self, bin_path): """ Start system agent for NFVi collection on host """ if self.enable: - self._start_collectd(self.connection, bin_path) + try: + self._start_collectd(self.connection, bin_path) + except Exception: + LOG.exception("Exception during collectd start") + raise def start(self): """ start nfvi collection """ diff --git a/yardstick/network_services/traffic_profile/ixia_rfc2544.py b/yardstick/network_services/traffic_profile/ixia_rfc2544.py index cb8a34796..7881131a7 100644 --- a/yardstick/network_services/traffic_profile/ixia_rfc2544.py +++ b/yardstick/network_services/traffic_profile/ixia_rfc2544.py @@ -14,7 +14,6 @@ from __future__ import absolute_import import logging -import json from yardstick.network_services.traffic_profile.traffic_profile import \ TrexProfile @@ -24,55 +23,59 @@ LOG = logging.getLogger(__name__) class IXIARFC2544Profile(TrexProfile): + UPLINK = 'uplink' + DOWNLINK = 'downlink' + def _get_ixia_traffic_profile(self, profile_data, mac=None, xfile=None, static_traffic=None): if mac is None: mac = {} - if static_traffic is None: - static_traffic = {} - result = {} - if xfile: - with open(xfile) as stream: + for traffickey, values in profile_data.items(): + if not traffickey.startswith((self.UPLINK, self.DOWNLINK)): + continue + + try: + # values should be single-item dict, so just grab the first item try: - static_traffic = json.load(stream) - except Exception as exc: - LOG.debug(exc) - - for traffickey, trafficvalue in static_traffic.items(): - traffic = static_traffic[traffickey] - # outer_l2 - index = 0 - for key, value in profile_data[traffickey].items(): - framesize = value['outer_l2']['framesize'] - traffic['outer_l2']['framesize'] = framesize - traffic['framesPerSecond'] = True - traffic['bidir'] = False - traffic['outer_l2']['srcmac'] = \ - mac["src_mac_{}".format(traffic['id'])] - traffic['outer_l2']['dstmac'] = \ - mac["dst_mac_{}".format(traffic['id'])] - - # outer_l3 - if "outer_l3v6" in list(value.keys()): - traffic['outer_l3'] = value['outer_l3v6'] - srcip4 = value['outer_l3v6']['srcip6'] - traffic['outer_l3']['srcip4'] = srcip4.split("-")[0] - dstip4 = value['outer_l3v6']['dstip6'] - traffic['outer_l3']['dstip4'] = dstip4.split("-")[0] + key, value = next(iter(values.items())) + except StopIteration: + result[traffickey] = {} + continue + + port_id = value.get('id', 1) + port_index = port_id - 1 + try: + ip = value['outer_l3v6'] + except KeyError: + ip = value['outer_l3v4'] + src_key, dst_key = 'srcip4', 'dstip4' else: - traffic['outer_l3'] = value['outer_l3v4'] - srcip4 = value['outer_l3v4']['srcip4'] - traffic['outer_l3']['srcip4'] = srcip4.split("-")[0] - dstip4 = value['outer_l3v4']['dstip4'] - traffic['outer_l3']['dstip4'] = dstip4.split("-")[0] - - traffic['outer_l3']['type'] = key - traffic['outer_l3']['count'] = value['outer_l3v4']['count'] - # outer_l4 - traffic['outer_l4'] = value['outer_l4'] - index = index + 1 - result.update({traffickey: traffic}) + src_key, dst_key = 'srcip6', 'dstip6' + + result[traffickey] = { + 'bidir': False, + 'iload': '100', + 'id': port_id, + 'outer_l2': { + 'framesize': value['outer_l2']['framesize'], + 'framesPerSecond': True, + 'srcmac': mac['src_mac_{}'.format(port_index)], + 'dstmac': mac['dst_mac_{}'.format(port_index)], + }, + 'outer_l3': { + 'count': ip['count'], + 'dscp': ip['dscp'], + 'ttl': ip['ttl'], + src_key: ip[src_key].split("-")[0], + dst_key: ip[dst_key].split("-")[0], + 'type': key, + 'proto': ip['proto'], + }, + 'outer_l4': value['outer_l4'], + } + except Exception: + continue return result @@ -103,7 +106,9 @@ class IXIARFC2544Profile(TrexProfile): self.ports = [port for port in port_generator()] - def execute_traffic(self, traffic_generator, ixia_obj, mac={}, xfile=None): + def execute_traffic(self, traffic_generator, ixia_obj, mac=None, xfile=None): + if mac is None: + mac = {} if self.first_run: self.full_profile = {} self.pg_id = 0 @@ -121,15 +126,18 @@ class IXIARFC2544Profile(TrexProfile): return str(multiplier) def start_ixia_latency(self, traffic_generator, ixia_obj, - mac={}, xfile=None): + mac=None, xfile=None): + if mac is None: + mac = {} self.update_traffic_profile(traffic_generator) traffic = \ self._get_ixia_traffic_profile(self.full_profile, mac, xfile) - self._ixia_traffic_generate(traffic_generator, traffic, - ixia_obj, xfile) + self._ixia_traffic_generate(traffic_generator, traffic, ixia_obj) def get_drop_percentage(self, traffic_generator, samples, tol_min, - tolerance, ixia_obj, mac={}, xfile=None): + tolerance, ixia_obj, mac=None, xfile=None): + if mac is None: + mac = {} status = 'Running' drop_percent = 100 in_packets = sum([samples[iface]['in_packets'] for iface in samples]) diff --git a/yardstick/network_services/traffic_profile/prox_ACL.py b/yardstick/network_services/traffic_profile/prox_ACL.py index 7f2255d99..7a2280016 100644 --- a/yardstick/network_services/traffic_profile/prox_ACL.py +++ b/yardstick/network_services/traffic_profile/prox_ACL.py @@ -45,7 +45,8 @@ class ProxACLProfile(ProxProfile): test_value = self.upper_bound # throughput and packet loss from the last successful test for _ in range(self.prox_config["attempts"]): - result, port_samples = traffic_gen.run_test(pkt_size, duration, - test_value, self.tolerated_loss) + result, port_samples = self._profile_helper.run_test(pkt_size, duration, + test_value, self.tolerated_loss) + samples = result.get_samples(pkt_size, result.pkt_loss, port_samples) self.queue.put(samples) diff --git a/yardstick/network_services/traffic_profile/prox_binsearch.py b/yardstick/network_services/traffic_profile/prox_binsearch.py index 385702b75..1fd6ec41a 100644 --- a/yardstick/network_services/traffic_profile/prox_binsearch.py +++ b/yardstick/network_services/traffic_profile/prox_binsearch.py @@ -57,8 +57,6 @@ class ProxBinSearchProfile(ProxProfile): def run_test_with_pkt_size(self, traffic_gen, pkt_size, duration): """Run the test for a single packet size. - :param queue: queue object we put samples into - :type queue: Queue :param traffic_gen: traffic generator instance :type traffic_gen: TrafficGen :param pkt_size: The packet size to test with. @@ -86,8 +84,8 @@ class ProxBinSearchProfile(ProxProfile): # throughput and packet loss from the most recent successful test successful_pkt_loss = 0.0 for test_value in self.bounds_iterator(LOG): - result, port_samples = traffic_gen.run_test(pkt_size, duration, - test_value, self.tolerated_loss) + result, port_samples = self._profile_helper.run_test(pkt_size, duration, + test_value, self.tolerated_loss) if result.success: LOG.debug("Success! Increasing lower bound") diff --git a/yardstick/network_services/traffic_profile/prox_mpls_tag_untag.py b/yardstick/network_services/traffic_profile/prox_mpls_tag_untag.py index 7e3cfa852..0e1048b5d 100644 --- a/yardstick/network_services/traffic_profile/prox_mpls_tag_untag.py +++ b/yardstick/network_services/traffic_profile/prox_mpls_tag_untag.py @@ -57,8 +57,6 @@ class ProxMplsTagUntagProfile(ProxProfile): def run_test_with_pkt_size(self, traffic_gen, pkt_size, duration): """Run the test for a single packet size. - :param queue: queue object we put samples into - :type queue: Queue :param traffic_gen: traffic generator instance :type traffic_gen: TrafficGen :param pkt_size: The packet size to test with. @@ -86,8 +84,8 @@ class ProxMplsTagUntagProfile(ProxProfile): # throughput and packet loss from the most recent successful test successful_pkt_loss = 0.0 for test_value in self.bounds_iterator(LOG): - result, port_samples = traffic_gen.run_test(pkt_size, duration, - test_value, self.tolerated_loss) + result, port_samples = self._profile_helper.run_test(pkt_size, duration, + test_value, self.tolerated_loss) if result.success: LOG.debug("Success! Increasing lower bound") diff --git a/yardstick/network_services/traffic_profile/prox_profile.py b/yardstick/network_services/traffic_profile/prox_profile.py index 0a9de9bb6..170dfd96f 100644 --- a/yardstick/network_services/traffic_profile/prox_profile.py +++ b/yardstick/network_services/traffic_profile/prox_profile.py @@ -18,6 +18,7 @@ from __future__ import absolute_import import logging from yardstick.network_services.traffic_profile.base import TrafficProfile +from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxProfileHelper LOG = logging.getLogger(__name__) @@ -56,10 +57,17 @@ class ProxProfile(TrafficProfile): self.lower_bound = float(self.prox_config.get('lower_bound', 10.0)) self.upper_bound = float(self.prox_config.get('upper_bound', 100.0)) self.step_value = float(self.prox_config.get('step_value', 10.0)) + self._profile_helper = None + + def make_profile_helper(self, traffic_gen): + if self._profile_helper is None: + self._profile_helper = ProxProfileHelper.make_profile_helper(traffic_gen) + return self._profile_helper def init(self, queue): self.pkt_size_iterator = iter(self.pkt_sizes) self.queue = queue + self.queue.cancel_join_thread() def bounds_iterator(self, logger=None): if logger: @@ -89,6 +97,8 @@ class ProxProfile(TrafficProfile): raise NotImplementedError def execute_traffic(self, traffic_generator): + self.make_profile_helper(traffic_generator) + try: pkt_size = next(self.pkt_size_iterator) except StopIteration: diff --git a/yardstick/network_services/traffic_profile/prox_ramp.py b/yardstick/network_services/traffic_profile/prox_ramp.py index 0f7995c1d..2aeab9aff 100644 --- a/yardstick/network_services/traffic_profile/prox_ramp.py +++ b/yardstick/network_services/traffic_profile/prox_ramp.py @@ -39,8 +39,8 @@ class ProxRampProfile(ProxProfile): LOG.info("Testing with packet size %d", pkt_size) for test_value in self.bounds_iterator(LOG): - test_result = traffic_gen.resource_helper.run_test(pkt_size, duration, - test_value, self.tolerated_loss) + test_result = self._profile_helper.run_test(pkt_size, duration, + test_value, self.tolerated_loss) if not test_result.success: LOG.debug("Failure... stopping") diff --git a/yardstick/network_services/utils.py b/yardstick/network_services/utils.py index d52e27c15..eac3c814f 100644 --- a/yardstick/network_services/utils.py +++ b/yardstick/network_services/utils.py @@ -16,6 +16,7 @@ from __future__ import absolute_import import logging import os +import re from oslo_config import cfg from oslo_config.cfg import NoSuchOptError @@ -38,6 +39,59 @@ OPTS = [ CONF.register_opts(OPTS, group="nsb") +HEXADECIMAL = "[0-9a-zA-Z]" + + +class PciAddress(object): + + PCI_PATTERN_STR = HEXADECIMAL.join([ + "(", + "{4}):(", # domain (4 bytes) + "{2}):(", # bus (2 bytes) + "{2}).(", # function (2 bytes) + ")", # slot (1 byte) + ]) + + PCI_PATTERN = re.compile(PCI_PATTERN_STR) + + @classmethod + def parse_address(cls, text, multi_line=False): + if multi_line: + text = text.replace(os.linesep, '') + match = cls.PCI_PATTERN.search(text) + return cls(match.group(0)) + + def __init__(self, address): + super(PciAddress, self).__init__() + match = self.PCI_PATTERN.match(address) + if not match: + raise ValueError('Invalid PCI address: {}'.format(address)) + self.address = address + self.match = match + + def __repr__(self): + return self.address + + @property + def domain(self): + return self.match.group(1) + + @property + def bus(self): + return self.match.group(2) + + @property + def slot(self): + return self.match.group(3) + + @property + def function(self): + return self.match.group(4) + + def values(self): + return [self.match.group(n) for n in range(1, 5)] + + def get_nsb_option(option, default=None): """return requested option for yardstick.conf""" @@ -55,6 +109,8 @@ def provision_tool(connection, tool_path, tool_file=None): :return - Tool path """ + if not tool_path: + tool_path = get_nsb_option('tool_path') if tool_file: tool_path = os.path.join(tool_path, tool_file) bin_path = get_nsb_option("bin_path") @@ -64,6 +120,7 @@ def provision_tool(connection, tool_path, tool_file=None): logging.warning("%s not found on %s, will try to copy from localhost", tool_path, connection.host) + bin_path = get_nsb_option("bin_path") connection.execute('mkdir -p "%s"' % bin_path) connection.put(tool_path, tool_path) return tool_path diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 42e3d2a48..67634a79c 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -106,20 +106,28 @@ class VnfdHelper(dict): if int(virtual_intf['dpdk_port_num']) == port: return interface - def port_num(self, name): + def port_num(self, port): # we need interface name -> DPDK port num (PMD ID) -> LINK ID # LINK ID -> PMD ID is governed by the port mask """ :rtype: int - :type name: str + :type port: str """ - intf = self.find_interface(name=name) + if isinstance(port, dict): + intf = port + else: + intf = self.find_interface(name=port) return int(intf["virtual-interface"]["dpdk_port_num"]) def port_nums(self, intfs): return [self.port_num(i) for i in intfs] + def ports_iter(self): + for port_name in self.port_pairs.all_ports: + port_num = self.port_num(port_name) + yield port_name, port_num + class VNFObject(object): diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index d24710132..ba4d44c41 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -26,19 +26,19 @@ from collections import OrderedDict, namedtuple import time from contextlib import contextmanager from itertools import repeat, chain +from multiprocessing import Queue import six -from multiprocessing import Queue from six.moves import zip, StringIO from six.moves import cStringIO from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file +from yardstick.common import utils from yardstick.common.utils import SocketTopology, ip_to_hex, join_non_strings, try_int from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper - PROX_PORT = 8474 SECTION_NAME = 0 @@ -82,7 +82,6 @@ CONFIGURATION_OPTIONS = ( class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread')): - CORE_RE = re.compile(r"core\s+(\d+)(?:s(\d+))?(h)?$") def __new__(cls, *args): @@ -115,7 +114,6 @@ class CoreSocketTuple(namedtuple('CoreTuple', 'core_id, socket_id, hyperthread') class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')): - def __new__(cls, *args): try: assert args[0] is not str(args[0]) @@ -129,7 +127,6 @@ class TotStatsTuple(namedtuple('TotStats', 'rx,tx,tsc,hz')): class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_rx,' 'delta_tx,delta_tsc,' 'latency,rx_total,tx_total,pps')): - @property def pkt_loss(self): try: @@ -191,7 +188,6 @@ class ProxTestDataTuple(namedtuple('ProxTestDataTuple', 'tolerated,tsc_hz,delta_ class PacketDump(object): - @staticmethod def assert_func(func, value1, value2, template=None): assert func(value1, value2), template.format(value1, value2) @@ -268,6 +264,7 @@ class ProxSocketHelper(object): self._sock = sock self._pkt_dumps = [] + self.master_stats = None def connect(self, ip, port): """Connect to the prox instance on the remote system""" @@ -323,6 +320,7 @@ class ProxSocketHelper(object): def get_data(self, pkt_dump_only=False, timeout=1): """ read data from the socket """ + # This method behaves slightly differently depending on whether it is # called to read the response to a command (pkt_dump_only = 0) or if # it is called specifically to read a packet dump (pkt_dump_only = 1). @@ -434,10 +432,15 @@ class ProxSocketHelper(object): LOG.debug("Set value for core(s) %s", cores) self._run_template_over_cores("reset values {} 0\n", cores) - def set_speed(self, cores, speed): + def set_speed(self, cores, speed, tasks=None): """ set speed on the remote instance """ - LOG.debug("Set speed for core(s) %s to %g", cores, speed) - self._run_template_over_cores("speed {} 0 {}\n", cores, speed) + if tasks is None: + tasks = [0] * len(cores) + elif len(tasks) != len(cores): + LOG.error("set_speed: cores and tasks must have the same len") + LOG.debug("Set speed for core(s)/tasks(s) %s to %g", list(zip(cores, tasks)), speed) + for (core, task) in list(zip(cores, tasks)): + self.put_command("speed {} {} {}\n".format(core, task, speed)) def slope_speed(self, cores_speed, duration, n_steps=0): """will start to increase speed from 0 to N where N is taken from @@ -564,7 +567,7 @@ class ProxSocketHelper(object): """Activate dump on rx on the specified core""" LOG.debug("Activating dump on RX for core %d, task %d, count %d", core_id, task_id, count) self.put_command("dump_rx {} {} {}\n".format(core_id, task_id, count)) - time.sleep(1.5) # Give PROX time to set up packet dumping + time.sleep(1.5) # Give PROX time to set up packet dumping def quit(self): self.stop_all() @@ -584,6 +587,9 @@ class ProxSocketHelper(object): time.sleep(3) +_LOCAL_OBJECT = object() + + class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): # the actual app is lowercase APP_NAME = 'prox' @@ -594,6 +600,8 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): "sut": "gen", } + CONFIG_QUEUE_TIMEOUT = 120 + def __init__(self, vnfd_helper, ssh_helper, scenario_helper): self.remote_path = None super(ProxDpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper) @@ -601,6 +609,34 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): self._prox_config_data = None self.additional_files = {} self.config_queue = Queue() + self._global_section = None + + @property + def prox_config_data(self): + if self._prox_config_data is None: + # this will block, but it needs too + self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT) + return self._prox_config_data + + @property + def global_section(self): + if self._global_section is None and self.prox_config_data: + self._global_section = self.find_section("global") + return self._global_section + + def find_section(self, name, default=_LOCAL_OBJECT): + result = next((value for key, value in self.prox_config_data if key == name), default) + if result is _LOCAL_OBJECT: + raise KeyError('{} not found in Prox config'.format(name)) + return result + + def find_in_section(self, section_name, section_key, default=_LOCAL_OBJECT): + section = self.find_section(section_name, []) + result = next((value for key, value in section if key == section_key), default) + if result is _LOCAL_OBJECT: + template = '{} not found in {} section of Prox config' + raise KeyError(template.format(section_key, section_name)) + return result def _build_pipeline_kwargs(self): tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME) @@ -739,9 +775,9 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): lua = os.linesep.join(('{}:"{}"'.format(k, v) for k, v in p.items())) return lua - def upload_prox_lua(self, config_dir, prox_config_dict): + def upload_prox_lua(self, config_dir, prox_config_data): # we could have multiple lua directives - lau_dict = prox_config_dict.get('lua', {}) + lau_dict = prox_config_data.get('lua', {}) find_iter = (re.findall(r'\("([^"]+)"\)', k) for k in lau_dict) lua_file = next((found[0] for found in find_iter if found), None) if not lua_file: @@ -751,24 +787,15 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): remote_path = os.path.join(config_dir, lua_file) return self.put_string_to_file(out, remote_path) - def upload_prox_config(self, config_file, prox_config_dict): + def upload_prox_config(self, config_file, prox_config_data): # prox can't handle spaces around ' = ' so use custom method - out = StringIO(self.write_prox_config(prox_config_dict)) + out = StringIO(self.write_prox_config(prox_config_data)) out.seek(0) remote_path = os.path.join("/tmp", config_file) self.ssh_helper.put_file_obj(out, remote_path) return remote_path - CONFIG_QUEUE_TIMEOUT = 120 - - @property - def prox_config_data(self): - if self._prox_config_data is None: - # this will block, but it needs too - self._prox_config_data = self.config_queue.get(True, self.CONFIG_QUEUE_TIMEOUT) - return self._prox_config_data - def build_config_file(self): task_path = self.scenario_helper.task_path options = self.scenario_helper.options @@ -812,20 +839,12 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper): class ProxResourceHelper(ClientResourceHelper): RESOURCE_WORD = 'prox' - PROX_CORE_GEN_MODE = "gen" - PROX_CORE_LAT_MODE = "lat" - PROX_CORE_MPLS_TEST = "MPLS tag/untag" PROX_MODE = "" WAIT_TIME = 3 @staticmethod - def line_rate_to_pps(pkt_size, n_ports): - # FIXME Don't hardcode 10Gb/s - return n_ports * TEN_GIGABIT / BITS_PER_BYTE / (pkt_size + 20) - - @staticmethod def find_pci(pci, bound_pci): # we have to substring match PCI bus address from the end return any(b.endswith(pci) for b in bound_pci) @@ -837,16 +856,14 @@ class ProxResourceHelper(ClientResourceHelper): self._ip = self.mgmt_interface["ip"] self.done = False - self._cpu_topology = None self._vpci_to_if_name_map = None self.additional_file = {} self.remote_prox_file_name = None self.lower = None self.upper = None - self._test_cores = None - self._latency_cores = None - self._tagged_cores = None - self._plain_cores = None + self.step_delta = 1 + self.step_time = 0.5 + self._test_type = None @property def sut(self): @@ -855,40 +872,13 @@ class ProxResourceHelper(ClientResourceHelper): return self.client @property - def cpu_topology(self): - if not self._cpu_topology: - stdout = io.BytesIO() - self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout) - self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8')) - return self._cpu_topology - - @property - def test_cores(self): - if not self._test_cores: - self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE) - return self._test_cores - - @property - def mpls_cores(self): - if not self._tagged_cores: - self._tagged_cores, self._plain_cores = self.get_cores_mpls(self.PROX_CORE_GEN_MODE) - return self._tagged_cores, self._plain_cores - - @property - def tagged_cores(self): - return self.mpls_cores[0] - - @property - def plain_cores(self): - return self.mpls_cores[1] - - @property - def latency_cores(self): - if not self._latency_cores: - self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE) - return self._latency_cores + def test_type(self): + if self._test_type is None: + self._test_type = self.setup_helper.find_in_section('global', 'name', None) + return self._test_type def run_traffic(self, traffic_profile): + self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 @@ -931,75 +921,192 @@ class ProxResourceHelper(ClientResourceHelper): if func: return func(*args, **kwargs) - @contextmanager - def traffic_context(self, pkt_size, value): - self.sut.stop_all() - self.sut.reset_stats() - if self.get_test_type() == self.PROX_CORE_MPLS_TEST: - self.sut.set_pkt_size(self.tagged_cores, pkt_size) - self.sut.set_pkt_size(self.plain_cores, pkt_size - 4) - self.sut.set_speed(self.tagged_cores, value) - ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20) - self.sut.set_speed(self.plain_cores, value * ratio) - else: - self.sut.set_pkt_size(self.test_cores, pkt_size) - self.sut.set_speed(self.test_cores, value) + def _connect(self, client=None): + """Run and connect to prox on the remote system """ + # De-allocating a large amount of hugepages takes some time. If a new + # PROX instance is started immediately after killing the previous one, + # it might not be able to allocate hugepages, because they are still + # being freed. Hence the -w switch. + # self.connection.execute("sudo killall -w Prox 2>/dev/null") + # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t + # -f ./handle_none-4.cfg" + # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir + + # "; " \ + # + "export RTE_TARGET=" + self._dpdk_target + ";" \ + # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50; + # sudo " \ + # + "./build/Prox " + prox_args + # log.debug("Starting PROX with command [%s]", prox_cmd) + # thread.start_new_thread(self.ssh_check_quit, (self, self._user, + # self._ip, prox_cmd)) + if client is None: + client = ProxSocketHelper() - self.sut.start_all() - try: - yield - finally: - self.sut.stop_all() + # try connecting to Prox for 60s + for _ in range(RETRY_SECONDS): + time.sleep(RETRY_INTERVAL) + try: + client.connect(self._ip, PROX_PORT) + except (socket.gaierror, socket.error): + continue + else: + return client - def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): - # do this assert in init? unless we expect interface count to - # change from one run to another run... - ports = self.vnfd_helper.port_pairs.all_ports - port_count = len(ports) - assert port_count in {1, 2, 4}, \ - "Invalid number of ports: 1, 2 or 4 ports only supported at this time" + msg = "Failed to connect to prox, please check if system {} accepts connections on port {}" + raise Exception(msg.format(self._ip, PROX_PORT)) - with self.traffic_context(pkt_size, value): - # Getting statistics to calculate PPS at right speed.... - tsc_hz = float(self.sut.hz()) - time.sleep(2) - with self.sut.measure_tot_stats() as data: - time.sleep(duration) - # Get stats before stopping the cores. Stopping cores takes some time - # and might skew results otherwise. - latency = self.get_latency() +class ProxDataHelper(object): + + def __init__(self, vnfd_helper, sut, pkt_size, value, tolerated_loss): + super(ProxDataHelper, self).__init__() + self.vnfd_helper = vnfd_helper + self.sut = sut + self.pkt_size = pkt_size + self.value = value + self.tolerated_loss = tolerated_loss + self.port_count = len(self.vnfd_helper.port_pairs.all_ports) + self.tsc_hz = None + self.measured_stats = None + self.latency = None + self._totals_and_pps = None + self.result_tuple = None + + @property + def totals_and_pps(self): + if self._totals_and_pps is None: + rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8] + pps = self.value / 100.0 * self.line_rate_to_pps() + self._totals_and_pps = rx_total, tx_total, pps + return self._totals_and_pps + + @property + def rx_total(self): + return self.totals_and_pps[0] + + @property + def tx_total(self): + return self.totals_and_pps[1] - deltas = data['delta'] - rx_total, tx_total = self.sut.port_stats(range(port_count))[6:8] - pps = value / 100.0 * self.line_rate_to_pps(pkt_size, port_count) + @property + def pps(self): + return self.totals_and_pps[2] + @property + def samples(self): samples = {} - # we are currently using enumeration to map logical port num to interface - for port_name in ports: - port = self.vnfd_helper.port_num(port_name) - port_rx_total, port_tx_total = self.sut.port_stats([port])[6:8] + for port_name, port_num in self.vnfd_helper.ports_iter(): + port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8] samples[port_name] = { "in_packets": port_rx_total, "out_packets": port_tx_total, } + return samples - result = ProxTestDataTuple(tolerated_loss, tsc_hz, deltas.rx, deltas.tx, - deltas.tsc, latency, rx_total, tx_total, pps) - result.log_data() - return result, samples + def __enter__(self): + self.check_interface_count() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.make_tuple() + + def make_tuple(self): + if self.result_tuple: + return + + self.result_tuple = ProxTestDataTuple( + self.tolerated_loss, + self.tsc_hz, + self.measured_stats['delta'].rx, + self.measured_stats['delta'].tx, + self.measured_stats['delta'].tsc, + self.latency, + self.rx_total, + self.tx_total, + self.pps, + ) + self.result_tuple.log_data() - def get_test_type(self): - test_type = None - for section_name, section in self.setup_helper.prox_config_data: - if section_name != "global": - continue + @contextmanager + def measure_tot_stats(self): + with self.sut.measure_tot_stats() as self.measured_stats: + yield - for key, value in section: - if key == "name" and value == self.PROX_CORE_MPLS_TEST: - test_type = self.PROX_CORE_MPLS_TEST + def check_interface_count(self): + # do this assert in init? unless we expect interface count to + # change from one run to another run... + assert self.port_count in {1, 2, 4}, \ + "Invalid number of ports: 1, 2 or 4 ports only supported at this time" + + def capture_tsc_hz(self): + self.tsc_hz = float(self.sut.hz()) + + def line_rate_to_pps(self): + # FIXME Don't hardcode 10Gb/s + return self.port_count * TEN_GIGABIT / BITS_PER_BYTE / (self.pkt_size + 20) + + +class ProxProfileHelper(object): + + __prox_profile_type__ = "Generic" + + PROX_CORE_GEN_MODE = "gen" + PROX_CORE_LAT_MODE = "lat" + + @classmethod + def get_cls(cls, helper_type): + """Return class of specified type.""" + if not helper_type: + return ProxProfileHelper + + for profile_helper_class in utils.itersubclasses(cls): + if helper_type == profile_helper_class.__prox_profile_type__: + return profile_helper_class + + return ProxProfileHelper + + @classmethod + def make_profile_helper(cls, resource_helper): + return cls.get_cls(resource_helper.test_type)(resource_helper) + + def __init__(self, resource_helper): + super(ProxProfileHelper, self).__init__() + self.resource_helper = resource_helper + self._cpu_topology = None + self._test_cores = None + self._latency_cores = None + + @property + def cpu_topology(self): + if not self._cpu_topology: + stdout = io.BytesIO() + self.ssh_helper.get_file_obj("/proc/cpuinfo", stdout) + self._cpu_topology = SocketTopology.parse_cpuinfo(stdout.getvalue().decode('utf-8')) + return self._cpu_topology + + @property + def test_cores(self): + if not self._test_cores: + self._test_cores = self.get_cores(self.PROX_CORE_GEN_MODE) + return self._test_cores + + @property + def latency_cores(self): + if not self._latency_cores: + self._latency_cores = self.get_cores(self.PROX_CORE_LAT_MODE) + return self._latency_cores - return test_type + @contextmanager + def traffic_context(self, pkt_size, value): + self.sut.stop_all() + self.sut.reset_stats() + try: + self.sut.set_pkt_size(self.test_cores, pkt_size) + self.sut.set_speed(self.test_cores, value) + self.sut.start_all() + yield + finally: + self.sut.stop_all() def get_cores(self, mode): cores = [] @@ -1016,68 +1123,264 @@ class ProxResourceHelper(ClientResourceHelper): return cores - def get_cores_mpls(self, mode=PROX_CORE_GEN_MODE): + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): + data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss) + + with data_helper, self.traffic_context(pkt_size, value): + with data_helper.measure_tot_stats(): + time.sleep(duration) + # Getting statistics to calculate PPS at right speed.... + data_helper.capture_tsc_hz() + data_helper.latency = self.get_latency() + + return data_helper.result_tuple, data_helper.samples + + def get_latency(self): + """ + :return: return lat_min, lat_max, lat_avg + :rtype: list + """ + if self._latency_cores: + return self.sut.lat_stats(self._latency_cores) + return [] + + def terminate(self): + pass + + def __getattr__(self, item): + return getattr(self.resource_helper, item) + + +class ProxMplsProfileHelper(ProxProfileHelper): + + __prox_profile_type__ = "MPLS tag/untag" + + def __init__(self, resource_helper): + super(ProxMplsProfileHelper, self).__init__(resource_helper) + self._cores_tuple = None + + @property + def mpls_cores(self): + if not self._cores_tuple: + self._cores_tuple = self.get_cores_mpls() + return self._cores_tuple + + @property + def tagged_cores(self): + return self.mpls_cores[0] + + @property + def plain_cores(self): + return self.mpls_cores[1] + + def get_cores_mpls(self): cores_tagged = [] cores_plain = [] - for section_name, section in self.setup_helper.prox_config_data: + for section_name, section in self.resource_helper.setup_helper.prox_config_data: if not section_name.startswith("core"): continue - if all(key != "mode" or value != mode for key, value in section): + if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section): continue for item_key, item_value in section: - if item_key == "name" and item_value.startswith("tag"): + if item_key != 'name': + continue + + if item_value.startswith("tag"): core_tuple = CoreSocketTuple(section_name) core_tag = core_tuple.find_in_topology(self.cpu_topology) cores_tagged.append(core_tag) - elif item_key == "name" and item_value.startswith("udp"): + elif item_value.startswith("udp"): core_tuple = CoreSocketTuple(section_name) core_udp = core_tuple.find_in_topology(self.cpu_topology) cores_plain.append(core_udp) return cores_tagged, cores_plain - def get_latency(self): - """ - :return: return lat_min, lat_max, lat_avg - :rtype: list - """ - if self._latency_cores: - return self.sut.lat_stats(self._latency_cores) - return [] + @contextmanager + def traffic_context(self, pkt_size, value): + self.sut.stop_all() + self.sut.reset_stats() + try: + self.sut.set_pkt_size(self.tagged_cores, pkt_size) + self.sut.set_pkt_size(self.plain_cores, pkt_size - 4) + self.sut.set_speed(self.tagged_cores, value) + ratio = 1.0 * (pkt_size - 4 + 20) / (pkt_size + 20) + self.sut.set_speed(self.plain_cores, value * ratio) + self.sut.start_all() + yield + finally: + self.sut.stop_all() - def _connect(self, client=None): - """Run and connect to prox on the remote system """ - # De-allocating a large amount of hugepages takes some time. If a new - # PROX instance is started immediately after killing the previous one, - # it might not be able to allocate hugepages, because they are still - # being freed. Hence the -w switch. - # self.connection.execute("sudo killall -w Prox 2>/dev/null") - # prox_cmd = "export TERM=xterm; cd "+ self.bin_path +"; ./Prox -t - # -f ./handle_none-4.cfg" - # prox_cmd = "export TERM=xterm; export RTE_SDK=" + self._dpdk_dir + - # "; " \ - # + "export RTE_TARGET=" + self._dpdk_target + ";" \ - # + " cd " + self._prox_dir + "; make HW_DIRECT_STATS=y -j50; - # sudo " \ - # + "./build/Prox " + prox_args - # log.debug("Starting PROX with command [%s]", prox_cmd) - # thread.start_new_thread(self.ssh_check_quit, (self, self._user, - # self._ip, prox_cmd)) - if client is None: - client = ProxSocketHelper() - # try connecting to Prox for 60s - for _ in range(RETRY_SECONDS): - time.sleep(RETRY_INTERVAL) - try: - client.connect(self._ip, PROX_PORT) - except (socket.gaierror, socket.error): +class ProxBngProfileHelper(ProxProfileHelper): + + __prox_profile_type__ = "BNG gen" + + def __init__(self, resource_helper): + super(ProxBngProfileHelper, self).__init__(resource_helper) + self._cores_tuple = None + + @property + def bng_cores(self): + if not self._cores_tuple: + self._cores_tuple = self.get_cores_gen_bng_qos() + return self._cores_tuple + + @property + def cpe_cores(self): + return self.bng_cores[0] + + @property + def inet_cores(self): + return self.bng_cores[1] + + @property + def arp_cores(self): + return self.bng_cores[2] + + @property + def arp_task_cores(self): + return self.bng_cores[3] + + @property + def all_rx_cores(self): + return self.latency_cores + + def get_cores_gen_bng_qos(self): + cpe_cores = [] + inet_cores = [] + arp_cores = [] + arp_tasks_core = [0] + for section_name, section in self.resource_helper.setup_helper.prox_config_data: + if not section_name.startswith("core"): continue - else: - return client - msg = "Failed to connect to prox, please check if system {} accepts connections on port {}" - raise Exception(msg.format(self._ip, PROX_PORT)) + if all(key != "mode" or value != self.PROX_CORE_GEN_MODE for key, value in section): + continue + + for item_key, item_value in section: + if item_key == "name" and item_value.startswith("cpe"): + core_tuple = CoreSocketTuple(section_name) + core_tag = core_tuple.find_in_topology(self.cpu_topology) + cpe_cores.append(core_tag) + + elif item_key == "name" and item_value.startswith("inet"): + core_tuple = CoreSocketTuple(section_name) + inet_core = core_tuple.find_in_topology(self.cpu_topology) + inet_cores.append(inet_core) + + elif item_key == "name" and item_value.startswith("arp"): + core_tuple = CoreSocketTuple(section_name) + arp_core = core_tuple.find_in_topology(self.cpu_topology) + arp_cores.append(arp_core) + + # We check the tasks/core separately + if item_key == "name" and item_value.startswith("arp_task"): + core_tuple = CoreSocketTuple(section_name) + arp_task_core = core_tuple.find_in_topology(self.cpu_topology) + arp_tasks_core.append(arp_task_core) + + return cpe_cores, inet_cores, arp_cores, arp_tasks_core + + @contextmanager + def traffic_context(self, pkt_size, value): + # Tester is sending packets at the required speed already after + # setup_test(). Just get the current statistics, sleep the required + # amount of time and calculate packet loss. + inet_pkt_size = pkt_size + cpe_pkt_size = pkt_size - 24 + ratio = 1.0 * (cpe_pkt_size + 20) / (inet_pkt_size + 20) + + curr_up_speed = curr_down_speed = 0 + max_up_speed = max_down_speed = value + if ratio < 1: + max_down_speed = value * ratio + else: + max_up_speed = value / ratio + + # Initialize cores + self.sut.stop_all() + time.sleep(0.5) + + # Flush any packets in the NIC RX buffers, otherwise the stats will be + # wrong. + self.sut.start(self.all_rx_cores) + time.sleep(0.5) + self.sut.stop(self.all_rx_cores) + time.sleep(0.5) + self.sut.reset_stats() + + self.sut.set_pkt_size(self.inet_cores, inet_pkt_size) + self.sut.set_pkt_size(self.cpe_cores, cpe_pkt_size) + + self.sut.reset_values(self.cpe_cores) + self.sut.reset_values(self.inet_cores) + + # Set correct IP and UDP lengths in packet headers + # CPE + # IP length (byte 24): 26 for MAC(12), EthType(2), QinQ(8), CRC(4) + self.sut.set_value(self.cpe_cores, 24, cpe_pkt_size - 26, 2) + # UDP length (byte 46): 46 for MAC(12), EthType(2), QinQ(8), IP(20), CRC(4) + self.sut.set_value(self.cpe_cores, 46, cpe_pkt_size - 46, 2) + + # INET + # IP length (byte 20): 22 for MAC(12), EthType(2), MPLS(4), CRC(4) + self.sut.set_value(self.inet_cores, 20, inet_pkt_size - 22, 2) + # IP length (byte 48): 50 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), CRC(4) + self.sut.set_value(self.inet_cores, 48, inet_pkt_size - 50, 2) + # UDP length (byte 70): 70 for MAC(12), EthType(2), MPLS(4), IP(20), GRE(8), IP(20), CRC(4) + self.sut.set_value(self.inet_cores, 70, inet_pkt_size - 70, 2) + + # Sending ARP to initialize tables - need a few seconds of generation + # to make sure all CPEs are initialized + LOG.info("Initializing SUT: sending ARP packets") + self.sut.set_speed(self.arp_cores, 1, self.arp_task_cores) + self.sut.set_speed(self.inet_cores, curr_up_speed) + self.sut.set_speed(self.cpe_cores, curr_down_speed) + self.sut.start(self.arp_cores) + time.sleep(4) + + # Ramp up the transmission speed. First go to the common speed, then + # increase steps for the faster one. + self.sut.start(self.cpe_cores + self.inet_cores + self.latency_cores) + + LOG.info("Ramping up speed to %s up, %s down", max_up_speed, max_down_speed) + + while (curr_up_speed < max_up_speed) or (curr_down_speed < max_down_speed): + # The min(..., ...) takes care of 1) floating point rounding errors + # that could make curr_*_speed to be slightly greater than + # max_*_speed and 2) max_*_speed not being an exact multiple of + # self._step_delta. + if curr_up_speed < max_up_speed: + curr_up_speed = min(curr_up_speed + self.step_delta, max_up_speed) + if curr_down_speed < max_down_speed: + curr_down_speed = min(curr_down_speed + self.step_delta, max_down_speed) + + self.sut.set_speed(self.inet_cores, curr_up_speed) + self.sut.set_speed(self.cpe_cores, curr_down_speed) + time.sleep(self.step_time) + + LOG.info("Target speeds reached. Starting real test.") + + yield + + self.sut.stop(self.arp_cores + self.cpe_cores + self.inet_cores) + LOG.info("Test ended. Flushing NIC buffers") + self.sut.start(self.all_rx_cores) + time.sleep(3) + self.sut.stop(self.all_rx_cores) + + def run_test(self, pkt_size, duration, value, tolerated_loss=0.0): + data_helper = ProxDataHelper(self.vnfd_helper, self.sut, pkt_size, value, tolerated_loss) + + with data_helper, self.traffic_context(pkt_size, value): + with data_helper.measure_tot_stats(): + time.sleep(duration) + # Getting statistics to calculate PPS at right speed.... + data_helper.capture_tsc_hz() + data_helper.latency = self.get_latency() + + return data_helper.result_tuple, data_helper.samples diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 557009d30..5cf234514 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -282,9 +282,11 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): def setup_vnf_environment(self): self._setup_dpdk() - resource = self._setup_resources() + self.bound_pci = [v['virtual-interface']["vpci"] for v in self.vnfd_helper.interfaces] self.kill_vnf() + # bind before _setup_resources so we can use dpdk_port_num self._detect_and_bind_drivers() + resource = self._setup_resources() return resource def kill_vnf(self): @@ -307,10 +309,13 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): if exit_status != 0: self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup) - def _setup_resources(self): - interfaces = self.vnfd_helper.interfaces - self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces] + def get_collectd_options(self): + options = self.scenario_helper.all_options.get("collectd", {}) + # override with specific node settings + options.update(self.scenario_helper.options.get("collectd", {})) + return options + def _setup_resources(self): # what is this magic? how do we know which socket is for which port? # what about quad-socket? if any(v[5] == "0" for v in self.bound_pci): @@ -319,8 +324,14 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper): self.socket = 1 cores = self._validate_cpu_cfg() - return ResourceProfile(self.vnfd_helper.mgmt_interface, - interfaces=self.vnfd_helper.interfaces, cores=cores) + # implicit ordering, presumably by DPDK port num, so pre-sort by port_num + # this won't work because we don't have DPDK port numbers yet + ports = sorted(self.vnfd_helper.interfaces, key=self.vnfd_helper.port_num) + port_names = (intf["name"] for intf in ports) + collectd_options = self.get_collectd_options() + plugins = collectd_options.get("plugins", {}) + return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores, + plugins=plugins, interval=collectd_options.get("interval")) def _detect_and_bind_drivers(self): interfaces = self.vnfd_helper.interfaces @@ -465,6 +476,9 @@ class ClientResourceHelper(ResourceHelper): self._queue.put(samples) def run_traffic(self, traffic_profile): + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path try: diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py index a73c691b9..47c5a35d9 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import absolute_import + import time import os import logging @@ -79,31 +80,31 @@ class IxiaResourceHelper(ClientResourceHelper): latency = stats[0] samples = {} - for interface in self.vnfd_helper.interfaces: + # this is not DPDK port num, but this is whatever number we gave + # when we selected ports and programmed the profile + for port_num in ports: try: - name = interface["name"] - # this is not DPDK port num, but this is whatever number we gave - # when we selected ports and programmed the profile - port = self.vnfd_helper.port_num(name) - if port in ports: - samples[name] = { - "rx_throughput_kps": float(last_result["Rx_Rate_Kbps"][port]), - "tx_throughput_kps": float(last_result["Tx_Rate_Kbps"][port]), - "rx_throughput_mbps": float(last_result["Rx_Rate_Mbps"][port]), - "tx_throughput_mbps": float(last_result["Tx_Rate_Mbps"][port]), - "in_packets": int(last_result["Valid_Frames_Rx"][port]), - "out_packets": int(last_result["Frames_Tx"][port]), - "RxThroughput": int(last_result["Valid_Frames_Rx"][port]) / 30, - "TxThroughput": int(last_result["Frames_Tx"][port]) / 30, - } - if key: - avg_latency = latency["Store-Forward_Avg_latency_ns"][port] - min_latency = latency["Store-Forward_Min_latency_ns"][port] - max_latency = latency["Store-Forward_Max_latency_ns"][port] - samples[name][key] = \ - {"Store-Forward_Avg_latency_ns": avg_latency, - "Store-Forward_Min_latency_ns": min_latency, - "Store-Forward_Max_latency_ns": max_latency} + # reverse lookup port name from port_num so the stats dict is descriptive + intf = self.vnfd_helper.find_interface_by_port(port_num) + port_name = intf["name"] + samples[port_name] = { + "rx_throughput_kps": float(last_result["Rx_Rate_Kbps"][port_num]), + "tx_throughput_kps": float(last_result["Tx_Rate_Kbps"][port_num]), + "rx_throughput_mbps": float(last_result["Rx_Rate_Mbps"][port_num]), + "tx_throughput_mbps": float(last_result["Tx_Rate_Mbps"][port_num]), + "in_packets": int(last_result["Valid_Frames_Rx"][port_num]), + "out_packets": int(last_result["Frames_Tx"][port_num]), + "RxThroughput": int(last_result["Valid_Frames_Rx"][port_num]) / 30, + "TxThroughput": int(last_result["Frames_Tx"][port_num]) / 30, + } + if key: + avg_latency = latency["Store-Forward_Avg_latency_ns"][port_num] + min_latency = latency["Store-Forward_Min_latency_ns"][port_num] + max_latency = latency["Store-Forward_Max_latency_ns"][port_num] + samples[port_name][key] = \ + {"Store-Forward_Avg_latency_ns": avg_latency, + "Store-Forward_Min_latency_ns": min_latency, + "Store-Forward_Max_latency_ns": max_latency} except IndexError: pass @@ -129,30 +130,27 @@ class IxiaResourceHelper(ClientResourceHelper): self.client.ix_assign_ports() mac = {} - # TODO: shouldn't this index map to port number we used to generate the profile - for index, interface in enumerate(self.vnfd_helper.interfaces, 1): - virt_intf = interface["virtual-interface"] - mac.update({ - "src_mac_{}".format(index): virt_intf.get("local_mac", default), - "dst_mac_{}".format(index): virt_intf.get("dst_mac", default), - }) + for port_name in self.vnfd_helper.port_pairs.all_ports: + intf = self.vnfd_helper.find_interface(name=port_name) + virt_intf = intf["virtual-interface"] + # we only know static traffic id by reading the json + # this is used by _get_ixia_trafficrofile + port_num = self.vnfd_helper.port_num(intf) + mac["src_mac_{}".format(port_num)] = virt_intf.get("local_mac", default) + mac["dst_mac_{}".format(port_num)] = virt_intf.get("dst_mac", default) samples = {} - - ixia_file = find_relative_file("ixia_traffic.cfg", - self.scenario_helper.scenario_cfg["task_path"]) # Generate ixia traffic config... try: while not self._terminated.value: - traffic_profile.execute(self, self.client, mac, ixia_file) + traffic_profile.execute_traffic(self, self.client, mac) self.client_started.value = 1 time.sleep(WAIT_FOR_TRAFFIC) self.client.ix_stop_traffic() - samples = self.generate_samples() + samples = self.generate_samples(traffic_profile.ports) self._queue.put(samples) status, samples = traffic_profile.get_drop_percentage(self, samples, min_tol, - max_tol, self.client, mac, - ixia_file) + max_tol, self.client, mac) current = samples['CurrentDropPercentage'] if min_tol <= current <= max_tol or status == 'Completed': @@ -160,25 +158,25 @@ class IxiaResourceHelper(ClientResourceHelper): self.client.ix_stop_traffic() self._queue.put(samples) - except Exception: - LOG.info("Run Traffic terminated") - pass - if not self.rfc_helper.is_done(): - self._terminated.value = 1 - return + if not self.rfc_helper.is_done(): + self._terminated.value = 1 + return + + traffic_profile.execute_traffic(self, self.client, mac) + for _ in range(5): + time.sleep(self.LATENCY_TIME_SLEEP) + self.client.ix_stop_traffic() + samples = self.generate_samples(traffic_profile.ports, 'latency', {}) + self._queue.put(samples) + traffic_profile.start_ixia_latency(self, self.client, mac) + if self._terminated.value: + break - traffic_profile.execute_traffic(self, self.client, mac, ixia_file) - for _ in range(5): - time.sleep(self.LATENCY_TIME_SLEEP) self.client.ix_stop_traffic() - samples = self.generate_samples(traffic_profile.ports, 'latency', {}) - self._queue.put(samples) - traffic_profile.start_ixia_latency(self, self.client, mac, ixia_file) - if self._terminated.value: - break + except Exception: + LOG.exception("Run Traffic terminated") - self.client.ix_stop_traffic() self._terminated.value = 1 def collect_kpi(self): diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py index cd4a008ce..5f1c4d4d3 100644 --- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py @@ -50,13 +50,14 @@ class ConfigCreate(object): config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg') return config - def __init__(self, uplink_ports, downlink_ports, socket): + def __init__(self, vnfd_helper, socket): super(ConfigCreate, self).__init__() self.sw_q = -1 self.sink_q = -1 self.n_pipeline = 1 - self.uplink_ports = uplink_ports - self.downlink_ports = downlink_ports + self.vnfd_helper = vnfd_helper + self.uplink_ports = self.vnfd_helper.port_pairs.uplink_ports + self.downlink_ports = self.vnfd_helper.port_pairs.downlink_ports self.pipeline_per_port = 9 self.socket = socket @@ -77,7 +78,7 @@ class ConfigCreate(object): def vpe_rxq(self, config): for port in self.downlink_ports: - new_section = 'RXQ{0}.0'.format(port) + new_section = 'RXQ{0}.0'.format(self.vnfd_helper.port_num(port)) config.add_section(new_section) config.set(new_section, 'mempool', 'MEMPOOL1') @@ -102,7 +103,8 @@ class ConfigCreate(object): for k, v in parser.items(pipeline): if k == "pktq_in": if "RXQ" in v: - value = "RXQ{0}.0".format(self.uplink_ports[index]) + port = self.vnfd_helper.port_num(self.uplink_ports[index]) + value = "RXQ{0}.0".format(port) else: value = self.get_sink_swq(parser, pipeline, k, index) @@ -110,7 +112,8 @@ class ConfigCreate(object): elif k == "pktq_out": if "TXQ" in v: - value = "TXQ{0}.0".format(self.downlink_ports[index]) + port = self.vnfd_helper.port_num(self.downlink_ports[index]) + value = "TXQ{0}.0".format(port) else: self.sw_q += 1 value = self.get_sink_swq(parser, pipeline, k, index) @@ -131,23 +134,25 @@ class ConfigCreate(object): for k, v in parser.items(pipeline): if k == "pktq_in": + port = self.vnfd_helper.port_num(self.downlink_ports[index]) if "RXQ" not in v: value = self.get_sink_swq(parser, pipeline, k, index) elif "TM" in v: - value = "RXQ{0}.0 TM{1}".format(self.downlink_ports[index], index) + value = "RXQ{0}.0 TM{1}".format(port, index) else: - value = "RXQ{0}.0".format(self.downlink_ports[index]) + value = "RXQ{0}.0".format(port) parser.set(pipeline, k, value) if k == "pktq_out": + port = self.vnfd_helper.port_num(self.uplink_ports[index]) if "TXQ" not in v: self.sw_q += 1 value = self.get_sink_swq(parser, pipeline, k, index) elif "TM" in v: - value = "TXQ{0}.0 TM{1}".format(self.uplink_ports[index], index) + value = "TXQ{0}.0 TM{1}".format(port, index) else: - value = "TXQ{0}.0".format(self.uplink_ports[index]) + value = "TXQ{0}.0".format(port) parser.set(pipeline, k, value) @@ -174,14 +179,19 @@ class ConfigCreate(object): def generate_vpe_script(self, interfaces): rules = PipelineRules(pipeline_id=1) - for priv_port, pub_port in zip(self.uplink_ports, self.downlink_ports): - priv_intf = interfaces[priv_port]["virtual-interface"] - pub_intf = interfaces[pub_port]["virtual-interface"] + for uplink_port, downlink_port in zip(self.uplink_ports, self.downlink_ports): - dst_port0_ip = priv_intf["dst_ip"] - dst_port1_ip = pub_intf["dst_ip"] - dst_port0_mac = priv_intf["dst_mac"] - dst_port1_mac = pub_intf["dst_mac"] + uplink_intf = \ + next(intf["virtual-interface"] for intf in interfaces + if intf["name"] == uplink_port) + downlink_intf = \ + next(intf["virtual-interface"] for intf in interfaces + if intf["name"] == downlink_port) + + dst_port0_ip = uplink_intf["dst_ip"] + dst_port1_ip = downlink_intf["dst_ip"] + dst_port0_mac = uplink_intf["dst_mac"] + dst_port1_mac = downlink_intf["dst_mac"] rules.add_firewall_script(dst_port0_ip) rules.next_pipeline() @@ -226,8 +236,7 @@ class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper): } self._build_vnf_ports() - vpe_conf = ConfigCreate(self.vnfd_helper.port_pairs.uplink_ports, - self.vnfd_helper.port_pairs.downlink_ports, self.socket) + vpe_conf = ConfigCreate(self.vnfd_helper, self.socket) vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg) config_basename = posixpath.basename(self.CFG_CONFIG) diff --git a/yardstick/orchestrator/kubernetes.py b/yardstick/orchestrator/kubernetes.py index 6d7045f58..198eeac6d 100644 --- a/yardstick/orchestrator/kubernetes.py +++ b/yardstick/orchestrator/kubernetes.py @@ -23,6 +23,7 @@ class KubernetesObject(object): self.command = [kwargs.get('command', '/bin/bash')] self.args = kwargs.get('args', []) self.ssh_key = kwargs.get('ssh_key', 'yardstick_key') + self.node_selector = kwargs.get('nodeSelector', {}) self.volumes = [] @@ -37,12 +38,13 @@ class KubernetesObject(object): "template": { "metadata": { "labels": { - "app": "" + "app": name } }, "spec": { "containers": [], - "volumes": [] + "volumes": [], + "nodeSelector": {} } } } @@ -50,6 +52,7 @@ class KubernetesObject(object): self._change_value_according_name(name) self._add_containers() + self._add_node_selector() self._add_ssh_key_volume() self._add_volumes() @@ -88,6 +91,11 @@ class KubernetesObject(object): return container + def _add_node_selector(self): + utils.set_dict_value(self.template, + 'spec.template.spec.nodeSelector', + self.node_selector) + def _add_volumes(self): utils.set_dict_value(self.template, 'spec.template.spec.volumes', @@ -106,6 +114,35 @@ class KubernetesObject(object): self._add_volume(key_volume) +class ServiceObject(object): + + def __init__(self, name): + self.name = '{}-service'.format(name) + self.template = { + 'metadata': { + 'name': '{}-service'.format(name) + }, + 'spec': { + 'type': 'NodePort', + 'ports': [ + { + 'port': 22, + 'protocol': 'TCP' + } + ], + 'selector': { + 'app': name + } + } + } + + def create(self): + k8s_utils.create_service(self.template) + + def delete(self): + k8s_utils.delete_service(self.name) + + class KubernetesTemplate(object): def __init__(self, name, template_cfg): @@ -117,6 +154,8 @@ class KubernetesTemplate(object): ssh_key=self.ssh_key, **cfg) for rc, cfg in template_cfg.items()] + self.service_objs = [ServiceObject(s) for s in self.rcs] + self.pods = [] def _get_rc_name(self, rc_name): |