aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick
diff options
context:
space:
mode:
authorDeepak S <deepak.s@linux.intel.com>2017-06-20 14:31:19 -0700
committerRoss Brattain <ross.b.brattain@intel.com>2017-08-08 08:54:23 -0700
commit5ce3b6f8c8b3217091e51a6041455738603d90b8 (patch)
treeca34e15a85d69e2b23ce498fead47761624ae42c /yardstick
parent72778951d6b8968f562fb8fefa02a57159ea1b83 (diff)
NSB update
Refactored main NSB VNF classes accroding to class diagram https://wiki.opnfv.org/display/yardstick/NSB+class+diagram All the SampleVNFs have been separated and placed under the SampleVNF class. Added AutoConnectSSH to automatically create SSH conneciton on demand. Added VnfdHelper class to wrap the VNFD dictionary in prepartion for class-based modeling. Extracted DpdkVnfSetupEnvHelper for DPDK based VNF setup. Extracted Stats and other client config to ResourceHelper Had to replace dict_key_flatten with deepgetitem due to Python 2.7 Jinja2 infinite recursion. Change-Id: Ia8840e9c44cdbdf39aab6b02e6d2176b31937dc9 Signed-off-by: Deepak S <deepak.s@linux.intel.com> Signed-off-by: Edward MacGillivray <edward.s.macgillivray@intel.com> Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
Diffstat (limited to 'yardstick')
-rw-r--r--yardstick/benchmark/contexts/base.py29
-rw-r--r--yardstick/benchmark/contexts/heat.py127
-rw-r--r--yardstick/benchmark/contexts/node.py42
-rw-r--r--yardstick/benchmark/contexts/standalone.py41
-rw-r--r--yardstick/benchmark/core/task.py8
-rw-r--r--yardstick/benchmark/scenarios/lib/migrate.py11
-rw-r--r--yardstick/benchmark/scenarios/networking/vnf_generic.py292
-rw-r--r--yardstick/common/utils.py98
-rw-r--r--yardstick/network_services/helpers/__init__.py0
-rw-r--r--yardstick/network_services/helpers/cpu.py76
-rw-r--r--yardstick/network_services/helpers/samplevnf_helper.py639
-rw-r--r--yardstick/network_services/pipeline.py113
-rw-r--r--yardstick/network_services/traffic_profile/rfc2544.py210
-rw-r--r--yardstick/network_services/traffic_profile/traffic_profile.py13
-rw-r--r--yardstick/network_services/utils.py8
-rw-r--r--yardstick/network_services/vnf_generic/vnf/base.py234
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py994
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_ping.py155
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py303
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_trex.py345
-rw-r--r--yardstick/network_services/vnf_generic/vnf/vpe_vnf.py545
-rw-r--r--yardstick/network_services/vnf_generic/vnfdgen.py64
-rw-r--r--yardstick/network_services/yang_model.py107
-rw-r--r--yardstick/orchestrator/heat.py4
-rw-r--r--yardstick/ssh.py147
25 files changed, 3211 insertions, 1394 deletions
diff --git a/yardstick/benchmark/contexts/base.py b/yardstick/benchmark/contexts/base.py
index e362c6a3d..c9b5b51c9 100644
--- a/yardstick/benchmark/contexts/base.py
+++ b/yardstick/benchmark/contexts/base.py
@@ -18,6 +18,15 @@ class Context(object):
"""Class that represents a context in the logical model"""
list = []
+ @staticmethod
+ def split_name(name, sep='.'):
+ try:
+ name_iter = iter(name.split(sep))
+ except AttributeError:
+ # name is not a string
+ return None, None
+ return next(name_iter), next(name_iter, None)
+
def __init__(self):
Context.list.append(self)
@@ -71,7 +80,23 @@ class Context(object):
try:
return next(s for s in servers if s)
except StopIteration:
- raise ValueError("context not found for server '%r'" %
+ raise ValueError("context not found for server %r" %
+ attr_name)
+
+ @staticmethod
+ def get_context_from_server(attr_name):
+ """lookup context info by name from node config
+ attr_name: either a name of the node created by yardstick or a dict
+ with attribute name mapping when using external templates
+
+ :returns Context instance
+ """
+ servers = ((context._get_server(attr_name), context)
+ for context in Context.list)
+ try:
+ return next(con for s, con in servers if s)
+ except StopIteration:
+ raise ValueError("context not found for name %r" %
attr_name)
@staticmethod
@@ -85,5 +110,5 @@ class Context(object):
try:
return next(n for n in networks if n)
except StopIteration:
- raise ValueError("context not found for server '%r'" %
+ raise ValueError("context not found for server %r" %
attr_name)
diff --git a/yardstick/benchmark/contexts/heat.py b/yardstick/benchmark/contexts/heat.py
index d5349eab5..c8d53e324 100644
--- a/yardstick/benchmark/contexts/heat.py
+++ b/yardstick/benchmark/contexts/heat.py
@@ -17,7 +17,6 @@ import uuid
from collections import OrderedDict
import ipaddress
-import paramiko
import pkg_resources
from yardstick.benchmark.contexts.base import Context
@@ -28,12 +27,21 @@ from yardstick.benchmark.contexts.model import update_scheduler_hints
from yardstick.common.openstack_utils import get_neutron_client
from yardstick.orchestrator.heat import HeatTemplate, get_short_key_uuid
from yardstick.common.constants import YARDSTICK_ROOT_PATH
+from yardstick.ssh import SSH
LOG = logging.getLogger(__name__)
DEFAULT_HEAT_TIMEOUT = 3600
+def join_args(sep, *args):
+ return sep.join(args)
+
+
+def h_join(*args):
+ return '-'.join(args)
+
+
class HeatContext(Context):
"""Class that represents a context in the logical model"""
@@ -43,12 +51,14 @@ class HeatContext(Context):
self.name = None
self.stack = None
self.networks = OrderedDict()
+ self.heat_timeout = None
self.servers = []
self.placement_groups = []
self.server_groups = []
self.keypair_name = None
self.secgroup_name = None
self._server_map = {}
+ self.attrs = {}
self._image = None
self._flavor = None
self.flavors = set()
@@ -65,7 +75,8 @@ class HeatContext(Context):
get_short_key_uuid(self.key_uuid)])
super(HeatContext, self).__init__()
- def assign_external_network(self, networks):
+ @staticmethod
+ def assign_external_network(networks):
sorted_networks = sorted(networks.items())
external_network = os.environ.get("EXTERNAL_NETWORK", "net04_ext")
@@ -74,8 +85,7 @@ class HeatContext(Context):
# no external net defined, assign it to first network using os.environ
sorted_networks[0][1]["external_network"] = external_network
- self.networks = OrderedDict((name, Network(name, self, attrs))
- for name, attrs in sorted_networks)
+ return sorted_networks
def init(self, attrs):
"""initializes itself from the supplied arguments"""
@@ -88,8 +98,8 @@ class HeatContext(Context):
self.heat_parameters = attrs.get("heat_parameters")
return
- self.keypair_name = self.name + "-key"
- self.secgroup_name = self.name + "-secgroup"
+ self.keypair_name = h_join(self.name, "key")
+ self.secgroup_name = h_join(self.name, "secgroup")
self._image = attrs.get("image")
@@ -97,29 +107,29 @@ class HeatContext(Context):
self.heat_timeout = attrs.get("timeout", DEFAULT_HEAT_TIMEOUT)
- self.placement_groups = [PlacementGroup(name, self, pgattrs["policy"])
- for name, pgattrs in attrs.get(
+ self.placement_groups = [PlacementGroup(name, self, pg_attrs["policy"])
+ for name, pg_attrs in attrs.get(
"placement_groups", {}).items()]
- self.server_groups = [ServerGroup(name, self, sgattrs["policy"])
- for name, sgattrs in attrs.get(
+ self.server_groups = [ServerGroup(name, self, sg_attrs["policy"])
+ for name, sg_attrs in attrs.get(
"server_groups", {}).items()]
# we have to do this first, because we are injecting external_network
# into the dict
- self.assign_external_network(attrs["networks"])
+ sorted_networks = self.assign_external_network(attrs["networks"])
+
+ self.networks = OrderedDict(
+ (name, Network(name, self, net_attrs)) for name, net_attrs in
+ sorted_networks)
- for name, serverattrs in sorted(attrs["servers"].items()):
- server = Server(name, self, serverattrs)
+ for name, server_attrs in sorted(attrs["servers"].items()):
+ server = Server(name, self, server_attrs)
self.servers.append(server)
self._server_map[server.dn] = server
- rsa_key = paramiko.RSAKey.generate(bits=2048, progress_func=None)
- rsa_key.write_private_key_file(self.key_filename)
- print("Writing %s ..." % self.key_filename)
- with open(self.key_filename + ".pub", "w") as pubkey_file:
- pubkey_file.write(
- "%s %s\n" % (rsa_key.get_name(), rsa_key.get_base64()))
+ self.attrs = attrs
+ SSH.gen_keys(self.key_filename)
@property
def image(self):
@@ -188,7 +198,7 @@ class HeatContext(Context):
try:
self.flavors.add(server.flavor["name"])
except KeyError:
- self.flavors.add(server.stack_name + "-flavor")
+ self.flavors.add(h_join(server.stack_name, "flavor"))
# add servers with availability policy
added_servers = []
@@ -286,7 +296,7 @@ class HeatContext(Context):
# let the other failures happen, we want stack trace
raise
- # TODO: use Neutron to get segementation-id
+ # TODO: use Neutron to get segmentation-id
self.get_neutron_info()
# copy some vital stack output into server objects
@@ -311,24 +321,26 @@ class HeatContext(Context):
def make_interface_dict(self, network_name, stack_name, outputs):
private_ip = outputs[stack_name]
- mac_addr = outputs[stack_name + "-mac_address"]
- subnet_cidr_key = "-".join([self.name, network_name, 'subnet', 'cidr'])
- gateway_key = "-".join([self.name, network_name, 'subnet', 'gateway_ip'])
- subnet_cidr = outputs[subnet_cidr_key]
- subnet_ip = ipaddress.ip_network(subnet_cidr)
+ mac_address = outputs[h_join(stack_name, "mac_address")]
+ output_subnet_cidr = outputs[h_join(self.name, network_name,
+ 'subnet', 'cidr')]
+
+ output_subnet_gateway = outputs[h_join(self.name, network_name,
+ 'subnet', 'gateway_ip')]
+
return {
"private_ip": private_ip,
- "subnet_id": outputs[stack_name + "-subnet_id"],
- "subnet_cidr": subnet_cidr,
- "network": str(subnet_ip.network_address),
- "netmask": str(subnet_ip.netmask),
- "gateway_ip": outputs[gateway_key],
- "mac_address": mac_addr,
- "device_id": outputs[stack_name + "-device_id"],
- "network_id": outputs[stack_name + "-network_id"],
+ "subnet_id": outputs[h_join(stack_name, "subnet_id")],
+ "subnet_cidr": output_subnet_cidr,
+ "network": str(ipaddress.ip_network(output_subnet_cidr).network_address),
+ "netmask": str(ipaddress.ip_network(output_subnet_cidr).netmask),
+ "gateway_ip": output_subnet_gateway,
+ "mac_address": mac_address,
+ "device_id": outputs[h_join(stack_name, "device_id")],
+ "network_id": outputs[h_join(stack_name, "network_id")],
"network_name": network_name,
# to match vnf_generic
- "local_mac": mac_addr,
+ "local_mac": mac_address,
"local_ip": private_ip,
"vld_id": self.networks[network_name].vld_id,
}
@@ -357,7 +369,8 @@ class HeatContext(Context):
"network": intf["network"],
"netmask": intf["netmask"],
"if": name,
- "gateway": intf["gateway_ip"],
+ # We have to encode a None gateway as '' for Jinja2 to YAML conversion
+ "gateway": intf["gateway_ip"] if intf["gateway_ip"] else '',
}
for name, intf in server.interfaces.items()
]
@@ -370,31 +383,24 @@ class HeatContext(Context):
"""
key_filename = pkg_resources.resource_filename(
'yardstick.resources',
- 'files/yardstick_key-' + get_short_key_uuid(self.key_uuid))
-
- if not isinstance(attr_name, collections.Mapping):
- server = self._server_map.get(attr_name, None)
+ h_join('files/yardstick_key', get_short_key_uuid(self.key_uuid)))
- else:
- cname = attr_name["name"].split(".")[1]
- if cname != self.name:
+ if isinstance(attr_name, collections.Mapping):
+ node_name, cname = self.split_name(attr_name['name'])
+ if cname is None or cname != self.name:
return None
- public_ip = None
- private_ip = None
- if "public_ip_attr" in attr_name:
- public_ip = self.stack.outputs[attr_name["public_ip_attr"]]
- if "private_ip_attr" in attr_name:
- private_ip = self.stack.outputs[
- attr_name["private_ip_attr"]]
-
# Create a dummy server instance for holding the *_ip attributes
- server = Server(attr_name["name"].split(".")[0], self, {})
- server.public_ip = public_ip
- server.private_ip = private_ip
+ server = Server(node_name, self, {})
+ server.public_ip = self.stack.outputs.get(
+ attr_name.get("public_ip_attr", object()), None)
- if server is None:
- return None
+ server.private_ip = self.stack.outputs.get(
+ attr_name.get("private_ip_attr", object()), None)
+ else:
+ server = self._server_map.get(attr_name, None)
+ if server is None:
+ return None
result = {
"user": server.context.user,
@@ -417,12 +423,9 @@ class HeatContext(Context):
else:
# Don't generalize too much Just support vld_id
- vld_id = attr_name.get('vld_id')
- if vld_id is None:
- return None
-
- network = next((n for n in self.networks.values() if
- getattr(n, "vld_id", None) == vld_id), None)
+ vld_id = attr_name.get('vld_id', {})
+ network_iter = (n for n in self.networks.values() if n.vld_id == vld_id)
+ network = next(network_iter, None)
if network is None:
return None
diff --git a/yardstick/benchmark/contexts/node.py b/yardstick/benchmark/contexts/node.py
index b3f0aca0e..78a2d1f46 100644
--- a/yardstick/benchmark/contexts/node.py
+++ b/yardstick/benchmark/contexts/node.py
@@ -19,7 +19,7 @@ import pkg_resources
from yardstick import ssh
from yardstick.benchmark.contexts.base import Context
-from yardstick.common import constants as consts
+from yardstick.common.constants import ANSIBLE_DIR, YARDSTICK_ROOT_PATH
LOG = logging.getLogger(__name__)
@@ -38,6 +38,7 @@ class NodeContext(Context):
self.computes = []
self.baremetals = []
self.env = {}
+ self.attrs = {}
super(NodeContext, self).__init__()
def read_config_file(self):
@@ -45,24 +46,23 @@ class NodeContext(Context):
with open(self.file_path) as stream:
LOG.info("Parsing pod file: %s", self.file_path)
- cfg = yaml.load(stream)
+ cfg = yaml.safe_load(stream)
return cfg
def init(self, attrs):
"""initializes itself from the supplied arguments"""
self.name = attrs["name"]
- self.file_path = attrs.get("file", "pod.yaml")
+ self.file_path = file_path = attrs.get("file", "pod.yaml")
try:
cfg = self.read_config_file()
- except IOError as ioerror:
- if ioerror.errno == errno.ENOENT:
- self.file_path = \
- os.path.join(consts.YARDSTICK_ROOT_PATH, self.file_path)
- cfg = self.read_config_file()
- else:
+ except IOError as io_error:
+ if io_error.errno != errno.ENOENT:
raise
+ self.file_path = os.path.join(YARDSTICK_ROOT_PATH, file_path)
+ cfg = self.read_config_file()
+
self.nodes.extend(cfg["nodes"])
self.controllers.extend([node for node in cfg["nodes"]
if node["role"] == "Controller"])
@@ -76,6 +76,7 @@ class NodeContext(Context):
LOG.debug("BareMetals: %r", self.baremetals)
self.env = attrs.get('env', {})
+ self.attrs = attrs
LOG.debug("Env: %r", self.env)
# add optional static network definition
@@ -112,19 +113,17 @@ class NodeContext(Context):
def _do_ansible_job(self, path):
cmd = 'ansible-playbook -i inventory.ini %s' % path
- p = subprocess.Popen(cmd, shell=True, cwd=consts.ANSIBLE_DIR)
+ p = subprocess.Popen(cmd, shell=True, cwd=ANSIBLE_DIR)
p.communicate()
def _get_server(self, attr_name):
"""lookup server info by name from context
attr_name: a name for a server listed in nodes config file
"""
- if isinstance(attr_name, collections.Mapping):
+ node_name, name = self.split_name(attr_name)
+ if name is None or self.name != name:
return None
- if self.name != attr_name.split(".")[1]:
- return None
- node_name = attr_name.split(".")[0]
matching_nodes = (n for n in self.nodes if n["name"] == node_name)
try:
@@ -140,9 +139,10 @@ class NodeContext(Context):
pass
else:
raise ValueError("Duplicate nodes!!! Nodes: %s %s",
- (matching_nodes, duplicate))
+ (node, duplicate))
node["name"] = attr_name
+ node.setdefault("interfaces", {})
return node
def _get_network(self, attr_name):
@@ -151,12 +151,10 @@ class NodeContext(Context):
else:
# Don't generalize too much Just support vld_id
- vld_id = attr_name.get('vld_id')
- if vld_id is None:
- return None
-
- network = next((n for n in self.networks.values() if
- n.get("vld_id") == vld_id), None)
+ vld_id = attr_name.get('vld_id', {})
+ # for node context networks are dicts
+ iter1 = (n for n in self.networks.values() if n.get('vld_id') == vld_id)
+ network = next(iter1, None)
if network is None:
return None
@@ -193,7 +191,7 @@ class NodeContext(Context):
def _execute_local_script(self, info):
script, options = self._get_script(info)
- script = os.path.join(consts.YARDSTICK_ROOT_PATH, script)
+ script = os.path.join(YARDSTICK_ROOT_PATH, script)
cmd = ['bash', script, options]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
diff --git a/yardstick/benchmark/contexts/standalone.py b/yardstick/benchmark/contexts/standalone.py
index 2bc1f3755..ae1046974 100644
--- a/yardstick/benchmark/contexts/standalone.py
+++ b/yardstick/benchmark/contexts/standalone.py
@@ -15,6 +15,7 @@
from __future__ import absolute_import
import logging
+import os
import errno
import collections
import yaml
@@ -41,14 +42,15 @@ class StandaloneContext(Context):
self.networks = {}
self.nfvi_node = []
self.nfvi_obj = None
- super(self.__class__, self).__init__()
+ self.attrs = {}
+ super(StandaloneContext, self).__init__()
def read_config_file(self):
"""Read from config file"""
with open(self.file_path) as stream:
LOG.info("Parsing pod file: %s", self.file_path)
- cfg = yaml.load(stream)
+ cfg = yaml.safe_load(stream)
return cfg
def get_nfvi_obj(self):
@@ -63,17 +65,15 @@ class StandaloneContext(Context):
"""initializes itself from the supplied arguments"""
self.name = attrs["name"]
- self.file_path = attrs.get("file", "pod.yaml")
- LOG.info("Parsing pod file: %s", self.file_path)
+ self.file_path = file_path = attrs.get("file", "pod.yaml")
try:
cfg = self.read_config_file()
- except IOError as ioerror:
- if ioerror.errno == errno.ENOENT:
- self.file_path = YARDSTICK_ROOT_PATH + self.file_path
- cfg = self.read_config_file()
- else:
+ except IOError as io_error:
+ if io_error.errno != errno.ENOENT:
raise
+ self.file_path = os.path.join(YARDSTICK_ROOT_PATH, file_path)
+ cfg = self.read_config_file()
self.vm_deploy = attrs.get("vm_deploy", True)
self.nodes.extend([node for node in cfg["nodes"]
@@ -90,6 +90,7 @@ class StandaloneContext(Context):
else:
LOG.debug("Node role is other than SRIOV and OVS")
self.nfvi_obj = self.get_nfvi_obj()
+ self.attrs = attrs
# add optional static network definition
self.networks.update(cfg.get("networks", {}))
self.nfvi_obj = self.get_nfvi_obj()
@@ -146,11 +147,10 @@ class StandaloneContext(Context):
Keyword arguments:
attr_name -- A name for a server listed in nodes config file
"""
- if isinstance(attr_name, collections.Mapping):
- return None
- if self.name != attr_name.split(".")[1]:
+ node_name, name = self.split_name(attr_name)
+ if name is None or self.name != name:
return None
- node_name = attr_name.split(".")[0]
+
matching_nodes = (n for n in self.nodes if n["name"] == node_name)
try:
# A clone is created in order to avoid affecting the
@@ -165,7 +165,8 @@ class StandaloneContext(Context):
pass
else:
raise ValueError("Duplicate nodes!!! Nodes: %s %s",
- (matching_nodes, duplicate))
+ (node, duplicate))
+
node["name"] = attr_name
return node
@@ -175,14 +176,10 @@ class StandaloneContext(Context):
else:
# Don't generalize too much Just support vld_id
- vld_id = attr_name.get('vld_id')
- if vld_id is None:
- return None
- try:
- network = next(n for n in self.networks.values() if
- n.get("vld_id") == vld_id)
- except StopIteration:
- return None
+ vld_id = attr_name.get('vld_id', {})
+ # for standalone context networks are dicts
+ iter1 = (n for n in self.networks.values() if n.get('vld_id') == vld_id)
+ network = next(iter1, None)
if network is None:
return None
diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py
index af508496f..2b10c61b3 100644
--- a/yardstick/benchmark/core/task.py
+++ b/yardstick/benchmark/core/task.py
@@ -13,6 +13,8 @@ from __future__ import absolute_import
from __future__ import print_function
import sys
import os
+from collections import OrderedDict
+
import yaml
import atexit
import ipaddress
@@ -121,6 +123,7 @@ class Task(object): # pragma: no cover
except KeyboardInterrupt:
raise
except Exception:
+ LOG.exception('')
testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
else:
testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
@@ -591,8 +594,9 @@ def _is_background_scenario(scenario):
def parse_nodes_with_context(scenario_cfg):
"""parse the 'nodes' fields in scenario """
- nodes = scenario_cfg["nodes"]
- return {nodename: Context.get_server(node) for nodename, node in nodes.items()}
+ # ensure consistency in node instantiation order
+ return OrderedDict((nodename, Context.get_server(scenario_cfg["nodes"][nodename]))
+ for nodename in sorted(scenario_cfg["nodes"]))
def get_networks_from_nodes(nodes):
diff --git a/yardstick/benchmark/scenarios/lib/migrate.py b/yardstick/benchmark/scenarios/lib/migrate.py
index 116bae69e..dd244c7ce 100644
--- a/yardstick/benchmark/scenarios/lib/migrate.py
+++ b/yardstick/benchmark/scenarios/lib/migrate.py
@@ -16,7 +16,7 @@ import threading
import time
from datetime import datetime
-import ping
+
from yardstick.common import openstack_utils
from yardstick.common.utils import change_obj_to_dict
@@ -28,6 +28,15 @@ TIMEOUT = 0.05
PACKAGE_SIZE = 64
+try:
+ import ping
+except ImportError:
+ # temp fix for ping module import error on Python3
+ # we need to replace the ping module anyway
+ import mock
+ ping = mock.MagicMock()
+
+
class Migrate(base.Scenario): # pragma: no cover
"""
Execute a live migration for two hosts
diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py
index 9607e3005..af17a3150 100644
--- a/yardstick/benchmark/scenarios/networking/vnf_generic.py
+++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py
@@ -20,11 +20,11 @@ import errno
import os
import re
+from itertools import chain
+import yaml
from operator import itemgetter
from collections import defaultdict
-import yaml
-
from yardstick.benchmark.scenarios import base
from yardstick.common.utils import import_modules_from_package, itersubclasses
from yardstick.network_services.collector.subscriber import Collector
@@ -80,6 +80,22 @@ class SshManager(object):
self.conn.close()
+def find_relative_file(path, task_path):
+ # fixme: create schema to validate all fields have been provided
+ try:
+ with open(path):
+ pass
+ return path
+ except IOError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ else:
+ rel_path = os.path.join(task_path, path)
+ with open(rel_path):
+ pass
+ return rel_path
+
+
def open_relative_file(path, task_path):
try:
return open(path)
@@ -103,166 +119,176 @@ class NetworkServiceTestCase(base.Scenario):
# fixme: create schema to validate all fields have been provided
with open_relative_file(scenario_cfg["topology"],
scenario_cfg['task_path']) as stream:
- topology_yaml = yaml.load(stream)
+ topology_yaml = yaml.safe_load(stream)
self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0]
self.vnfs = []
self.collector = None
self.traffic_profile = None
- @classmethod
- def _get_traffic_flow(cls, scenario_cfg):
+ def _get_traffic_flow(self):
try:
- with open(scenario_cfg["traffic_options"]["flow"]) as fflow:
- flow = yaml.load(fflow)
+ with open(self.scenario_cfg["traffic_options"]["flow"]) as fflow:
+ flow = yaml.safe_load(fflow)
except (KeyError, IOError, OSError):
flow = {}
return flow
- @classmethod
- def _get_traffic_imix(cls, scenario_cfg):
+ def _get_traffic_imix(self):
try:
- with open(scenario_cfg["traffic_options"]["imix"]) as fimix:
- imix = yaml.load(fimix)
+ with open(self.scenario_cfg["traffic_options"]["imix"]) as fimix:
+ imix = yaml.safe_load(fimix)
except (KeyError, IOError, OSError):
imix = {}
return imix
- @classmethod
- def _get_traffic_profile(cls, scenario_cfg, context_cfg):
- traffic_profile_tpl = ""
- private = {}
- public = {}
- try:
- with open_relative_file(scenario_cfg["traffic_profile"],
- scenario_cfg["task_path"]) as infile:
- traffic_profile_tpl = infile.read()
-
- except (KeyError, IOError, OSError):
- raise
-
- return [traffic_profile_tpl, private, public]
-
- def _fill_traffic_profile(self, scenario_cfg, context_cfg):
- flow = self._get_traffic_flow(scenario_cfg)
-
- imix = self._get_traffic_imix(scenario_cfg)
-
- traffic_mapping, private, public = \
- self._get_traffic_profile(scenario_cfg, context_cfg)
-
- traffic_profile = vnfdgen.generate_vnfd(traffic_mapping,
- {"imix": imix, "flow": flow,
- "private": private,
- "public": public})
-
- return TrafficProfile.get(traffic_profile)
-
- @classmethod
- def _find_vnf_name_from_id(cls, topology, vnf_id):
+ def _get_traffic_profile(self):
+ profile = self.scenario_cfg["traffic_profile"]
+ path = self.scenario_cfg["task_path"]
+ with open_relative_file(profile, path) as infile:
+ return infile.read()
+
+ def _fill_traffic_profile(self):
+ traffic_mapping = self._get_traffic_profile()
+ traffic_map_data = {
+ 'flow': self._get_traffic_flow(),
+ 'imix': self._get_traffic_imix(),
+ 'private': {},
+ 'public': {},
+ }
+
+ traffic_vnfd = vnfdgen.generate_vnfd(traffic_mapping, traffic_map_data)
+ self.traffic_profile = TrafficProfile.get(traffic_vnfd)
+ return self.traffic_profile
+
+ def _find_vnf_name_from_id(self, vnf_id):
return next((vnfd["vnfd-id-ref"]
- for vnfd in topology["constituent-vnfd"]
+ for vnfd in self.topology["constituent-vnfd"]
if vnf_id == vnfd["member-vnf-index"]), None)
@staticmethod
def get_vld_networks(networks):
return {n['vld_id']: n for n in networks.values()}
- def _resolve_topology(self, context_cfg, topology):
- for vld in topology["vld"]:
+ def _resolve_topology(self):
+ for vld in self.topology["vld"]:
try:
- node_0, node_1 = vld["vnfd-connection-point-ref"]
- except (TypeError, ValueError):
+ node0_data, node1_data = vld["vnfd-connection-point-ref"]
+ except (ValueError, TypeError):
raise IncorrectConfig("Topology file corrupted, "
- "wrong number of endpoints for connection")
+ "wrong endpoint count for connection")
- node_0_name = self._find_vnf_name_from_id(topology,
- node_0["member-vnf-index-ref"])
- node_1_name = self._find_vnf_name_from_id(topology,
- node_1["member-vnf-index-ref"])
+ node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
+ node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
- node_0_ifname = node_0["vnfd-connection-point-ref"]
- node_1_ifname = node_1["vnfd-connection-point-ref"]
+ node0_if_name = node0_data["vnfd-connection-point-ref"]
+ node1_if_name = node1_data["vnfd-connection-point-ref"]
- node_0_if = context_cfg["nodes"][node_0_name]["interfaces"][node_0_ifname]
- node_1_if = context_cfg["nodes"][node_1_name]["interfaces"][node_1_ifname]
try:
- vld_networks = self.get_vld_networks(context_cfg["networks"])
+ nodes = self.context_cfg["nodes"]
+ node0_if = nodes[node0_name]["interfaces"][node0_if_name]
+ node1_if = nodes[node1_name]["interfaces"][node1_if_name]
- node_0_if["vld_id"] = vld["id"]
- node_1_if["vld_id"] = vld["id"]
+ # names so we can do reverse lookups
+ node0_if["ifname"] = node0_if_name
+ node1_if["ifname"] = node1_if_name
+
+ node0_if["node_name"] = node0_name
+ node1_if["node_name"] = node1_name
+
+ vld_networks = self.get_vld_networks(self.context_cfg["networks"])
+ node0_if["vld_id"] = vld["id"]
+ node1_if["vld_id"] = vld["id"]
# set peer name
- node_0_if["peer_name"] = node_1_name
- node_1_if["peer_name"] = node_0_name
+ node0_if["peer_name"] = node1_name
+ node1_if["peer_name"] = node0_name
# set peer interface name
- node_0_if["peer_ifname"] = node_1_ifname
- node_1_if["peer_ifname"] = node_0_ifname
+ node0_if["peer_ifname"] = node1_if_name
+ node1_if["peer_ifname"] = node0_if_name
- # just load the whole network dict
- node_0_if["network"] = vld_networks.get(vld["id"], {})
- node_1_if["network"] = vld_networks.get(vld["id"], {})
+ # just load the network
+ node0_if["network"] = vld_networks.get(vld["id"], {})
+ node1_if["network"] = vld_networks.get(vld["id"], {})
- node_0_if["dst_mac"] = node_1_if["local_mac"]
- node_0_if["dst_ip"] = node_1_if["local_ip"]
+ node0_if["dst_mac"] = node1_if["local_mac"]
+ node0_if["dst_ip"] = node1_if["local_ip"]
- node_1_if["dst_mac"] = node_0_if["local_mac"]
- node_1_if["dst_ip"] = node_0_if["local_ip"]
+ node1_if["dst_mac"] = node0_if["local_mac"]
+ node1_if["dst_ip"] = node0_if["local_ip"]
- # add peer interface dict, but remove circular link
- # TODO: don't waste memory
- node_0_copy = node_0_if.copy()
- node_1_copy = node_1_if.copy()
- node_0_if["peer_intf"] = node_1_copy
- node_1_if["peer_intf"] = node_0_copy
except KeyError:
+ LOG.exception("")
raise IncorrectConfig("Required interface not found, "
"topology file corrupted")
- @classmethod
- def _find_list_index_from_vnf_idx(cls, topology, vnf_idx):
- return next((topology["constituent-vnfd"].index(vnfd)
- for vnfd in topology["constituent-vnfd"]
+ for vld in self.topology['vld']:
+ try:
+ node0_data, node1_data = vld["vnfd-connection-point-ref"]
+ except (ValueError, TypeError):
+ raise IncorrectConfig("Topology file corrupted, "
+ "wrong endpoint count for connection")
+
+ node0_name = self._find_vnf_name_from_id(node0_data["member-vnf-index-ref"])
+ node1_name = self._find_vnf_name_from_id(node1_data["member-vnf-index-ref"])
+
+ node0_if_name = node0_data["vnfd-connection-point-ref"]
+ node1_if_name = node1_data["vnfd-connection-point-ref"]
+
+ nodes = self.context_cfg["nodes"]
+ node0_if = nodes[node0_name]["interfaces"][node0_if_name]
+ node1_if = nodes[node1_name]["interfaces"][node1_if_name]
+
+ # add peer interface dict, but remove circular link
+ # TODO: don't waste memory
+ node0_copy = node0_if.copy()
+ node1_copy = node1_if.copy()
+ node0_if["peer_intf"] = node1_copy
+ node1_if["peer_intf"] = node0_copy
+
+ def _find_vnfd_from_vnf_idx(self, vnf_idx):
+ return next((vnfd for vnfd in self.topology["constituent-vnfd"]
if vnf_idx == vnfd["member-vnf-index"]), None)
- def _update_context_with_topology(self, context_cfg, topology):
- for idx in topology["constituent-vnfd"]:
- vnf_idx = idx["member-vnf-index"]
- nodes = context_cfg["nodes"]
- node = self._find_vnf_name_from_id(topology, vnf_idx)
- list_idx = self._find_list_index_from_vnf_idx(topology, vnf_idx)
- nodes[node].update(topology["constituent-vnfd"][list_idx])
+ def _update_context_with_topology(self):
+ for vnfd in self.topology["constituent-vnfd"]:
+ vnf_idx = vnfd["member-vnf-index"]
+ vnf_name = self._find_vnf_name_from_id(vnf_idx)
+ vnfd = self._find_vnfd_from_vnf_idx(vnf_idx)
+ self.context_cfg["nodes"][vnf_name].update(vnfd)
@staticmethod
def _sort_dpdk_port_num(netdevs):
# dpdk_port_num is PCI BUS ID ordering, lowest first
s = sorted(netdevs.values(), key=itemgetter('pci_bus_id'))
- for dpdk_port_num, netdev in enumerate(s, 1):
+ for dpdk_port_num, netdev in enumerate(s):
netdev['dpdk_port_num'] = dpdk_port_num
@classmethod
def _probe_missing_values(cls, netdevs, network, missing):
- mac = network['local_mac']
+ mac_lower = network['local_mac'].lower()
for netdev in netdevs.values():
- if netdev['address'].lower() == mac.lower():
- network['driver'] = netdev['driver']
- network['vpci'] = netdev['pci_bus_id']
- network['dpdk_port_num'] = netdev['dpdk_port_num']
- network['ifindex'] = netdev['ifindex']
+ if netdev['address'].lower() != mac_lower:
+ continue
+ network.update({
+ 'driver': netdev['driver'],
+ 'vpci': netdev['pci_bus_id'],
+ 'ifindex': netdev['ifindex'],
+ })
TOPOLOGY_REQUIRED_KEYS = frozenset({
- "vpci", "local_ip", "netmask", "local_mac", "driver", "dpdk_port_num"})
+ "vpci", "local_ip", "netmask", "local_mac", "driver"})
- def map_topology_to_infrastructure(self, context_cfg, topology):
+ def map_topology_to_infrastructure(self):
""" This method should verify if the available resources defined in pod.yaml
match the topology.yaml file.
+ :param context_cfg:
:param topology:
:return: None. Side effect: context_cfg is updated
"""
-
- for node, node_dict in context_cfg["nodes"].items():
+ for node, node_dict in self.context_cfg["nodes"].items():
cmd = "PATH=$PATH:/sbin:/usr/sbin ip addr show"
with SshManager(node_dict) as conn:
@@ -276,28 +302,28 @@ class NetworkServiceTestCase(base.Scenario):
"Cannot find netdev info in sysfs" % node)
netdevs = node_dict['netdevs'] = self.parse_netdev_info(
stdout)
- self._sort_dpdk_port_num(netdevs)
for network in node_dict["interfaces"].values():
missing = self.TOPOLOGY_REQUIRED_KEYS.difference(network)
+ if not missing:
+ continue
+
+ try:
+ self._probe_missing_values(netdevs, network,
+ missing)
+ except KeyError:
+ pass
+ else:
+ missing = self.TOPOLOGY_REQUIRED_KEYS.difference(
+ network)
if missing:
- try:
- self._probe_missing_values(netdevs, network,
- missing)
- except KeyError:
- pass
- else:
- missing = self.TOPOLOGY_REQUIRED_KEYS.difference(
- network)
- if missing:
- raise IncorrectConfig(
- "Require interface fields '%s' "
- "not found, topology file "
- "corrupted" % ', '.join(missing))
+ raise IncorrectConfig(
+ "Require interface fields '%s' not found, topology file "
+ "corrupted" % ', '.join(missing))
# 3. Use topology file to find connections & resolve dest address
- self._resolve_topology(context_cfg, topology)
- self._update_context_with_topology(context_cfg, topology)
+ self._resolve_topology()
+ self._update_context_with_topology()
FIND_NETDEVICE_STRING = r"""find /sys/devices/pci* -type d -name net -exec sh -c '{ grep -sH ^ \
$1/ifindex $1/address $1/operstate $1/device/vendor $1/device/device \
@@ -361,7 +387,7 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
node_intf = node['interfaces'][intf['name']]
intf['virtual-interface'].update(node_intf)
- def load_vnf_models(self, scenario_cfg, context_cfg):
+ def load_vnf_models(self, scenario_cfg=None, context_cfg=None):
""" Create VNF objects based on YAML descriptors
:param scenario_cfg:
@@ -369,21 +395,29 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
:param context_cfg:
:return:
"""
+ if scenario_cfg is None:
+ scenario_cfg = self.scenario_cfg
+
+ if context_cfg is None:
+ context_cfg = self.context_cfg
+
vnfs = []
+ # we assume OrderedDict for consistenct in instantiation
for node_name, node in context_cfg["nodes"].items():
LOG.debug(node)
- with open_relative_file(node["VNF model"],
- scenario_cfg['task_path']) as stream:
+ file_name = node["VNF model"]
+ file_path = scenario_cfg['task_path']
+ with open_relative_file(file_name, file_path) as stream:
vnf_model = stream.read()
vnfd = vnfdgen.generate_vnfd(vnf_model, node)
# TODO: here add extra context_cfg["nodes"] regardless of template
vnfd = vnfd["vnfd:vnfd-catalog"]["vnfd"][0]
self.update_interfaces_from_node(vnfd, node)
vnf_impl = self.get_vnf_impl(vnfd['id'])
- vnf_instance = vnf_impl(vnfd)
- vnf_instance.name = node_name
+ vnf_instance = vnf_impl(node_name, vnfd)
vnfs.append(vnf_instance)
+ self.vnfs = vnfs
return vnfs
def setup(self):
@@ -392,18 +426,25 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
:return:
"""
# 1. Verify if infrastructure mapping can meet topology
- self.map_topology_to_infrastructure(self.context_cfg, self.topology)
+ self.map_topology_to_infrastructure()
# 1a. Load VNF models
- self.vnfs = self.load_vnf_models(self.scenario_cfg, self.context_cfg)
+ self.load_vnf_models()
# 1b. Fill traffic profile with information from topology
- self.traffic_profile = self._fill_traffic_profile(self.scenario_cfg,
- self.context_cfg)
+ self._fill_traffic_profile()
# 2. Provision VNFs
+
+ # link events will cause VNF application to exit
+ # so we should start traffic runners before VNFs
+ traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
+ non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic]
try:
- for vnf in self.vnfs:
+ for vnf in chain(traffic_runners, non_traffic_runners):
LOG.info("Instantiating %s", vnf.name)
vnf.instantiate(self.scenario_cfg, self.context_cfg)
+ for vnf in chain(traffic_runners, non_traffic_runners):
+ LOG.info("Waiting for %s to instantiate", vnf.name)
+ vnf.wait_for_instantiate()
except RuntimeError:
for vnf in self.vnfs:
vnf.terminate()
@@ -411,7 +452,6 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
# 3. Run experiment
# Start listeners first to avoid losing packets
- traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic]
for traffic_gen in traffic_runners:
traffic_gen.listen_traffic(self.traffic_profile)
diff --git a/yardstick/common/utils.py b/yardstick/common/utils.py
index 7a64b8ca2..1059e1ce4 100644
--- a/yardstick/common/utils.py
+++ b/yardstick/common/utils.py
@@ -26,6 +26,7 @@ import sys
import collections
import socket
import random
+import ipaddress
from functools import reduce
from contextlib import closing
@@ -148,7 +149,7 @@ def source_env(env_file):
p = subprocess.Popen(". %s; env" % env_file, stdout=subprocess.PIPE,
shell=True)
output = p.communicate()[0]
- env = dict((line.split('=', 1) for line in output.splitlines()))
+ env = dict(line.split('=', 1) for line in output.splitlines() if '=' in line)
os.environ.update(env)
return env
@@ -221,12 +222,12 @@ def flatten_dict_key(data):
for v in data.values()):
return data
- for k, v in six.iteritems(data):
+ for k, v in data.items():
if isinstance(v, collections.Mapping):
- for n_k, n_v in six.iteritems(v):
+ for n_k, n_v in v.items():
next_data["%s.%s" % (k, n_k)] = n_v
# use list because iterable is too generic
- elif isinstance(v, list):
+ elif isinstance(v, collections.Iterable) and not isinstance(v, six.string_types):
for index, item in enumerate(v):
next_data["%s%d" % (k, index)] = item
else:
@@ -267,7 +268,6 @@ def set_dict_value(dic, keys, value):
return_dic = dic
for key in keys.split('.'):
-
return_dic.setdefault(key, {})
if key == keys.split('.')[-1]:
return_dic[key] = value
@@ -282,3 +282,91 @@ def get_free_port(ip):
port = random.randint(5000, 10000)
if s.connect_ex((ip, port)) != 0:
return port
+
+
+def mac_address_to_hex_list(mac):
+ octets = ["0x{:02x}".format(int(elem, 16)) for elem in mac.split(':')]
+ assert len(octets) == 6 and all(len(octet) == 4 for octet in octets)
+ return octets
+
+
+def safe_ip_address(ip_addr):
+ """ get ip address version v6 or v4 """
+ try:
+ return ipaddress.ip_address(six.text_type(ip_addr))
+ except ValueError:
+ logging.error("%s is not valid", ip_addr)
+ return None
+
+
+def get_ip_version(ip_addr):
+ """ get ip address version v6 or v4 """
+ try:
+ address = ipaddress.ip_address(six.text_type(ip_addr))
+ except ValueError:
+ logging.error("%s is not valid", ip_addr)
+ return None
+ else:
+ return address.version
+
+
+def ip_to_hex(ip_addr):
+ try:
+ address = ipaddress.ip_address(six.text_type(ip_addr))
+ except ValueError:
+ logging.error("%s is not valid", ip_addr)
+ return ip_addr
+
+ if address.version != 4:
+ return ip_addr
+ return '{:08x}'.format(int(address))
+
+
+def try_int(s, *args):
+ """Convert to integer if possible."""
+ try:
+ return int(s)
+ except (TypeError, ValueError):
+ return args[0] if args else s
+
+
+class SocketTopology(dict):
+
+ def sockets(self):
+ return sorted(self.keys())
+
+ def cores(self):
+ return sorted(core for cores in self.values() for core in cores)
+
+ def processors(self):
+ return sorted(
+ proc for cores in self.values() for procs in cores.values() for
+ proc in procs)
+
+
+def parse_cpuinfo(cpuinfo):
+ socket_map = {}
+
+ lines = cpuinfo.splitlines()
+
+ core_details = []
+ core_lines = {}
+ for line in lines:
+ if line.strip():
+ name, value = line.split(":", 1)
+ core_lines[name.strip()] = try_int(value.strip())
+ else:
+ core_details.append(core_lines)
+ core_lines = {}
+
+ for core in core_details:
+ socket_map.setdefault(core["physical id"], {}).setdefault(
+ core["core id"], {})[core["processor"]] = (
+ core["processor"], core["core id"], core["physical id"])
+
+ return SocketTopology(socket_map)
+
+
+def config_to_dict(config):
+ return {section: dict(config.items(section)) for section in
+ config.sections()}
diff --git a/yardstick/network_services/helpers/__init__.py b/yardstick/network_services/helpers/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/yardstick/network_services/helpers/__init__.py
diff --git a/yardstick/network_services/helpers/cpu.py b/yardstick/network_services/helpers/cpu.py
new file mode 100644
index 000000000..a5ba6c31e
--- /dev/null
+++ b/yardstick/network_services/helpers/cpu.py
@@ -0,0 +1,76 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class CpuSysCores(object):
+
+ def __init__(self, connection=""):
+ self.core_map = {}
+ self.connection = connection
+
+ def _open_cpuinfo(self):
+ lines = []
+ lines = self.connection.execute("cat /proc/cpuinfo")[1].split(u'\n')
+ return lines
+
+ def _get_core_details(self, lines):
+ core_details = []
+ core_lines = {}
+ for line in lines:
+ if line.strip():
+ name, value = line.split(":", 1)
+ core_lines[name.strip()] = value.strip()
+ else:
+ core_details.append(core_lines)
+ core_lines = {}
+
+ return core_details
+
+ def get_core_socket(self):
+ lines = self.connection.execute("lscpu")[1].split(u'\n')
+ num_cores = self._get_core_details(lines)
+ for num in num_cores:
+ self.core_map["cores_per_socket"] = num["Core(s) per socket"]
+ self.core_map["thread_per_core"] = num["Thread(s) per core"]
+
+ lines = self._open_cpuinfo()
+ core_details = self._get_core_details(lines)
+ for core in core_details:
+ for k, v in core.items():
+ if k == "physical id":
+ if core["physical id"] not in self.core_map:
+ self.core_map[core['physical id']] = []
+ self.core_map[core['physical id']].append(
+ core["processor"])
+
+ return self.core_map
+
+ def validate_cpu_cfg(self, vnf_cfg=None):
+ if vnf_cfg is None:
+ vnf_cfg = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1
+ }
+ if self.core_map["thread_per_core"] == 1 and \
+ vnf_cfg["worker_config"] == "1C/2T":
+ return -1
+
+ if vnf_cfg['lb_config'] == 'SW':
+ num_cpu = int(vnf_cfg["worker_threads"]) + 5
+ if int(self.core_map["cores_per_socket"]) < num_cpu:
+ return -1
+
+ return 0
diff --git a/yardstick/network_services/helpers/samplevnf_helper.py b/yardstick/network_services/helpers/samplevnf_helper.py
new file mode 100644
index 000000000..1eefc5ffa
--- /dev/null
+++ b/yardstick/network_services/helpers/samplevnf_helper.py
@@ -0,0 +1,639 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+import ipaddress
+import logging
+import os
+import sys
+from collections import OrderedDict, defaultdict
+from itertools import chain
+
+import six
+from six.moves.configparser import ConfigParser
+
+from yardstick.common.utils import ip_to_hex
+
+LOG = logging.getLogger(__name__)
+
+LINK_CONFIG_TEMPLATE = """\
+link {0} down
+link {0} config {1} {2}
+link {0} up
+"""
+
+ACTION_TEMPLATE = """\
+p action add {0} accept
+p action add {0} fwd
+p action add {0} count
+"""
+
+FW_ACTION_TEMPLATE = """\
+p action add {0} accept
+p action add {0} fwd
+p action add {0} count
+p action add {0} conntrack
+"""
+
+# This sets up a basic passthrough with no rules
+SCRIPT_TPL = """
+{link_config}
+
+{arp_config}
+
+{arp_config6}
+
+{actions}
+
+{rules}
+
+"""
+
+
+class MultiPortConfig(object):
+
+ HW_LB = "HW"
+
+ @staticmethod
+ def float_x_plus_one_tenth_of_y(x, y):
+ return float(x) + float(y) / 10.0
+
+ @staticmethod
+ def make_str(base, iterator):
+ return ' '.join((base.format(x) for x in iterator))
+
+ @classmethod
+ def make_range_str(cls, base, start, stop=0, offset=0):
+ if offset and not stop:
+ stop = start + offset
+ return cls.make_str(base, range(start, stop))
+
+ @staticmethod
+ def parser_get(parser, section, key, default=None):
+ if parser.has_option(section, key):
+ return parser.get(section, key)
+ return default
+
+ @staticmethod
+ def make_ip_addr(ip, mask_len):
+ try:
+ return ipaddress.ip_interface(six.text_type('/'.join([ip, mask_len])))
+ except ValueError:
+ # None so we can skip later
+ return None
+
+ @classmethod
+ def validate_ip_and_prefixlen(cls, ip_addr, prefixlen):
+ ip_addr = cls.make_ip_addr(ip_addr, prefixlen)
+ return ip_addr.ip.exploded, ip_addr.network.prefixlen
+
+ def __init__(self, topology_file, config_tpl, tmp_file, interfaces=None,
+ vnf_type='CGNAT', lb_count=2, worker_threads=3,
+ worker_config='1C/1T', lb_config='SW', socket=0):
+
+ super(MultiPortConfig, self).__init__()
+ self.topology_file = topology_file
+ self.worker_config = worker_config.split('/')[1].lower()
+ self.worker_threads = self.get_worker_threads(worker_threads)
+ self.vnf_type = vnf_type
+ self.pipe_line = 0
+ self.interfaces = interfaces if interfaces else {}
+ self.networks = {}
+ self.write_parser = ConfigParser()
+ self.read_parser = ConfigParser()
+ self.read_parser.read(config_tpl)
+ self.master_core = self.read_parser.get("PIPELINE0", "core")
+ self.master_tpl = self.get_config_tpl_data('MASTER')
+ self.arpicmp_tpl = self.get_config_tpl_data('ARPICMP')
+ self.txrx_tpl = self.get_config_tpl_data('TXRX')
+ self.loadb_tpl = self.get_config_tpl_data('LOADB')
+ self.vnf_tpl = self.get_config_tpl_data(vnf_type)
+ self.swq = 0
+ self.lb_count = int(lb_count)
+ self.lb_config = lb_config
+ self.tmp_file = os.path.join("/tmp", tmp_file)
+ self.pktq_out_os = []
+ self.socket = socket
+ self.start_core = ""
+ self.pipeline_counter = ""
+ self.txrx_pipeline = ""
+ self.port_pair_list = []
+ self.lb_to_port_pair_mapping = {}
+ self.init_eal()
+
+ self.lb_index = None
+ self.mul = 0
+ self.port_pairs = []
+ self.port_pair_list = []
+ self.ports_len = 0
+ self.prv_que_handler = None
+ self.vnfd = None
+ self.rules = None
+ self.pktq_out = ''
+
+ @staticmethod
+ def gen_core(core):
+ # return "s{}c{}".format(self.socket, core)
+ # don't use sockets for VNFs, because we don't want to have to
+ # adjust VM CPU topology. It is virtual anyway
+ return str(core)
+
+ def make_port_pairs_iter(self, operand, iterable):
+ return (operand(x[-1], y) for y in iterable for x in chain(*self.port_pairs))
+
+ def make_range_port_pairs_iter(self, operand, start, end):
+ return self.make_port_pairs_iter(operand, range(start, end))
+
+ def init_eal(self):
+ vpci = [v['virtual-interface']["vpci"] for v in self.interfaces]
+ with open(self.tmp_file, 'w') as fh:
+ fh.write('[EAL]\n')
+ for item in vpci:
+ fh.write('w = {0}\n'.format(item))
+ fh.write('\n')
+
+ def update_timer(self):
+ timer_tpl = self.get_config_tpl_data('TIMER')
+ timer_tpl['core'] = self.gen_core(self.start_core)
+ self.update_write_parser(timer_tpl)
+ self.start_core += 1
+
+ def get_config_tpl_data(self, type_value):
+ for section in self.read_parser.sections():
+ if self.read_parser.has_option(section, 'type'):
+ if type_value == self.read_parser.get(section, 'type'):
+ tpl = OrderedDict(self.read_parser.items(section))
+ return tpl
+
+ def get_txrx_tpl_data(self, value):
+ for section in self.read_parser.sections():
+ if self.read_parser.has_option(section, 'pipeline_txrx_type'):
+ if value == self.read_parser.get(section, 'pipeline_txrx_type'):
+ tpl = OrderedDict(self.read_parser.items(section))
+ return tpl
+
+ def init_write_parser_template(self, type_value='ARPICMP'):
+ for section in self.read_parser.sections():
+ if type_value == self.parser_get(self.read_parser, section, 'type', object()):
+ self.start_core = self.read_parser.getint(section, 'core')
+ self.pipeline_counter = self.read_parser.getint(section, 'core')
+ self.txrx_pipeline = self.read_parser.getint(section, 'core')
+ return
+ self.write_parser.add_section(section)
+ for name, value in self.read_parser.items(section):
+ self.write_parser.set(section, name, value)
+
+ def update_write_parser(self, data):
+ section = "PIPELINE{0}".format(self.pipeline_counter)
+ self.write_parser.add_section(section)
+ for name, value in data.items():
+ self.write_parser.set(section, name, value)
+
+ def get_worker_threads(self, worker_threads):
+ if self.worker_config == '1t':
+ return worker_threads
+ else:
+ return worker_threads - worker_threads % 2
+
+ def generate_next_core_id(self):
+ if self.worker_config == '1t':
+ self.start_core += 1
+ return
+
+ try:
+ self.start_core = 'h{}'.format(int(self.start_core))
+ except ValueError:
+ self.start_core = int(self.start_core[:-1]) + 1
+
+ @staticmethod
+ def get_port_pairs(interfaces):
+ port_pair_list = []
+ networks = defaultdict(list)
+ for private_intf in interfaces:
+ vintf = private_intf['virtual-interface']
+ networks[vintf['vld_id']].append(vintf)
+
+ for name, net in networks.items():
+ # partition returns a tuple
+ parts = list(name.partition('private'))
+ if parts[0]:
+ # 'private' was not in or not leftmost in the string
+ continue
+ parts[1] = 'public'
+ public_id = ''.join(parts)
+ for private_intf in net:
+ try:
+ public_peer_intfs = networks[public_id]
+ except KeyError:
+ LOG.warning("private network without peer %s, %s not found", name, public_id)
+ continue
+
+ for public_intf in public_peer_intfs:
+ port_pair = private_intf["ifname"], public_intf["ifname"]
+ port_pair_list.append(port_pair)
+
+ return port_pair_list, networks
+
+ def get_lb_count(self):
+ self.lb_count = int(min(len(self.port_pair_list), self.lb_count))
+
+ def generate_lb_to_port_pair_mapping(self):
+ self.lb_to_port_pair_mapping = defaultdict(int)
+ port_pair_count = len(self.port_pair_list)
+ lb_pair_count = int(port_pair_count / self.lb_count)
+ for i in range(self.lb_count):
+ self.lb_to_port_pair_mapping[i + 1] = lb_pair_count
+ for i in range(port_pair_count % self.lb_count):
+ self.lb_to_port_pair_mapping[i + 1] += 1
+
+ def set_priv_to_pub_mapping(self):
+ return "".join(str(y) for y in [(int(x[0][-1]), int(x[1][-1])) for x in
+ self.port_pair_list])
+
+ def set_priv_que_handler(self):
+ # iterated twice, can't be generator
+ priv_to_pub_map = [(int(x[0][-1]), int(x[1][-1])) for x in self.port_pairs]
+ # must be list to use .index()
+ port_list = list(chain.from_iterable(priv_to_pub_map))
+ priv_ports = (x[0] for x in priv_to_pub_map)
+ self.prv_que_handler = '({})'.format(
+ ",".join((str(port_list.index(x)) for x in priv_ports)))
+
+ def generate_arp_route_tbl(self):
+ arp_config = []
+ arp_route_tbl_tmpl = "({port0_dst_ip_hex},{port0_netmask_hex},{port_num}," \
+ "{next_hop_ip_hex})"
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ port_num = int(port[-1])
+ interface = self.interfaces[port_num]
+ # port0_ip = ipaddress.ip_interface(six.text_type(
+ # "%s/%s" % (interface["virtual-interface"]["local_ip"],
+ # interface["virtual-interface"]["netmask"])))
+ dst_port0_ip = \
+ ipaddress.ip_interface(six.text_type(
+ "%s/%s" % (interface["virtual-interface"]["dst_ip"],
+ interface["virtual-interface"]["netmask"])))
+ arp_vars = {
+ "port0_dst_ip_hex": ip_to_hex(dst_port0_ip.ip.exploded),
+ "port0_netmask_hex": ip_to_hex(dst_port0_ip.network.netmask.exploded),
+ "port_num": port_num,
+ # next hop is dst in this case
+ "next_hop_ip_hex": ip_to_hex(dst_port0_ip.ip.exploded),
+ }
+ arp_config.append(arp_route_tbl_tmpl.format(**arp_vars))
+
+ return ' '.join(arp_config)
+
+ def generate_arpicmp_data(self):
+ swq_in_str = self.make_range_str('SWQ{}', self.swq, offset=self.lb_count)
+ self.swq += self.lb_count
+ swq_out_str = self.make_range_str('SWQ{}', self.swq, offset=self.lb_count)
+ self.swq += self.lb_count
+ mac_iter = (self.interfaces[int(x[-1])]['virtual-interface']['local_mac']
+ for port_pair in self.port_pair_list for x in port_pair)
+ pktq_in_iter = ('RXQ{}'.format(float(x[0][-1])) for x in self.port_pair_list)
+
+ arpicmp_data = {
+ 'core': self.gen_core(self.start_core),
+ 'pktq_in': swq_in_str,
+ 'pktq_out': swq_out_str,
+ 'ports_mac_list': ' '.join(mac_iter),
+ 'pktq_in_prv': ' '.join(pktq_in_iter),
+ 'prv_to_pub_map': self.set_priv_to_pub_mapping(),
+ 'arp_route_tbl': self.generate_arp_route_tbl(),
+ # can't use empty string, defaul to ()
+ 'nd_route_tbl': "()",
+ }
+ self.pktq_out_os = swq_out_str.split(' ')
+ # why?
+ if self.lb_config == self.HW_LB:
+ arpicmp_data['pktq_in'] = swq_in_str
+ self.swq = 0
+ return arpicmp_data
+
+ def generate_final_txrx_data(self):
+ swq_start = self.swq - self.ports_len * self.worker_threads
+
+ txq_start = 0
+ txq_end = self.worker_threads
+
+ pktq_out_iter = self.make_range_port_pairs_iter(self.float_x_plus_one_tenth_of_y,
+ txq_start, txq_end)
+
+ swq_str = self.make_range_str('SWQ{}', swq_start, self.swq)
+ txq_str = self.make_str('TXQ{}', pktq_out_iter)
+ rxtx_data = {
+ 'pktq_in': swq_str,
+ 'pktq_out': txq_str,
+ 'pipeline_txrx_type': 'TXTX',
+ 'core': self.gen_core(self.start_core),
+ }
+ pktq_in = rxtx_data['pktq_in']
+ pktq_in = '{0} {1}'.format(pktq_in, self.pktq_out_os[self.lb_index - 1])
+ rxtx_data['pktq_in'] = pktq_in
+ self.pipeline_counter += 1
+ return rxtx_data
+
+ def generate_initial_txrx_data(self):
+ pktq_iter = self.make_range_port_pairs_iter(self.float_x_plus_one_tenth_of_y,
+ 0, self.worker_threads)
+
+ rxq_str = self.make_str('RXQ{}', pktq_iter)
+ swq_str = self.make_range_str('SWQ{}', self.swq, offset=self.ports_len)
+ txrx_data = {
+ 'pktq_in': rxq_str,
+ 'pktq_out': swq_str + ' SWQ{0}'.format(self.lb_index - 1),
+ 'pipeline_txrx_type': 'RXRX',
+ 'core': self.gen_core(self.start_core),
+ }
+ self.pipeline_counter += 1
+ return txrx_data
+
+ def generate_lb_data(self):
+ pktq_in = self.make_range_str('SWQ{}', self.swq, offset=self.ports_len)
+ self.swq += self.ports_len
+
+ offset = self.ports_len * self.worker_threads
+ pktq_out = self.make_range_str('SWQ{}', self.swq, offset=offset)
+ self.pktq_out = pktq_out.split()
+
+ self.swq += (self.ports_len * self.worker_threads)
+ lb_data = {
+ 'prv_que_handler': self.prv_que_handler,
+ 'pktq_in': pktq_in,
+ 'pktq_out': pktq_out,
+ 'n_vnf_threads': str(self.worker_threads),
+ 'core': self.gen_core(self.start_core),
+ }
+ self.pipeline_counter += 1
+ return lb_data
+
+ def generate_vnf_data(self):
+ if self.lb_config == self.HW_LB:
+ port_iter = self.make_port_pairs_iter(self.float_x_plus_one_tenth_of_y, [self.mul])
+ pktq_in = self.make_str('RXQ{}', port_iter)
+
+ self.mul += 1
+ port_iter = self.make_port_pairs_iter(self.float_x_plus_one_tenth_of_y, [self.mul])
+ pktq_out = self.make_str('TXQ{}', port_iter)
+
+ pipe_line_data = {
+ 'pktq_in': pktq_in,
+ 'pktq_out': pktq_out + ' SWQ{0}'.format(self.swq),
+ 'prv_que_handler': self.prv_que_handler,
+ 'core': self.gen_core(self.start_core),
+ }
+ self.swq += 1
+ else:
+ pipe_line_data = {
+ 'pktq_in': ' '.join((self.pktq_out.pop(0) for _ in range(self.ports_len))),
+ 'pktq_out': self.make_range_str('SWQ{}', self.swq, offset=self.ports_len),
+ 'prv_que_handler': self.prv_que_handler,
+ 'core': self.gen_core(self.start_core),
+ }
+ self.swq += self.ports_len
+
+ if self.vnf_type in ('ACL', 'VFW'):
+ pipe_line_data.pop('prv_que_handler')
+
+ if self.vnf_tpl.get('vnf_set'):
+ public_ip_port_range_list = self.vnf_tpl['public_ip_port_range'].split(':')
+ ip_in_hex = '{:x}'.format(int(public_ip_port_range_list[0], 16) + self.lb_index - 1)
+ public_ip_port_range_list[0] = ip_in_hex
+ self.vnf_tpl['public_ip_port_range'] = ':'.join(public_ip_port_range_list)
+
+ self.pipeline_counter += 1
+ return pipe_line_data
+
+ def generate_config_data(self):
+ self.init_write_parser_template()
+
+ # use master core for master, don't use self.start_core
+ self.write_parser.set('PIPELINE0', 'core', self.gen_core(self.master_core))
+ arpicmp_data = self.generate_arpicmp_data()
+ self.arpicmp_tpl.update(arpicmp_data)
+ self.update_write_parser(self.arpicmp_tpl)
+
+ self.start_core += 1
+ if self.vnf_type == 'CGNAPT':
+ self.pipeline_counter += 1
+ self.update_timer()
+
+ for lb in self.lb_to_port_pair_mapping:
+ self.lb_index = lb
+ self.mul = 0
+ port_pair_count = self.lb_to_port_pair_mapping[lb]
+ if not self.port_pair_list:
+ continue
+
+ self.port_pairs = self.port_pair_list[:port_pair_count]
+ self.port_pair_list = self.port_pair_list[port_pair_count:]
+ self.ports_len = port_pair_count * 2
+ self.set_priv_que_handler()
+ if self.lb_config == 'SW':
+ txrx_data = self.generate_initial_txrx_data()
+ self.txrx_tpl.update(txrx_data)
+ self.update_write_parser(self.txrx_tpl)
+ self.start_core += 1
+ lb_data = self.generate_lb_data()
+ self.loadb_tpl.update(lb_data)
+ self.update_write_parser(self.loadb_tpl)
+ self.start_core += 1
+
+ for i in range(self.worker_threads):
+ vnf_data = self.generate_vnf_data()
+ if not self.vnf_tpl:
+ self.vnf_tpl = {}
+ self.vnf_tpl.update(vnf_data)
+ self.update_write_parser(self.vnf_tpl)
+ try:
+ self.vnf_tpl.pop('vnf_set')
+ except KeyError:
+ pass
+ else:
+ self.vnf_tpl.pop('public_ip_port_range')
+ self.generate_next_core_id()
+
+ if self.lb_config == 'SW':
+ txrx_data = self.generate_final_txrx_data()
+ self.txrx_tpl.update(txrx_data)
+ self.update_write_parser(self.txrx_tpl)
+ self.start_core += 1
+ self.vnf_tpl = self.get_config_tpl_data(self.vnf_type)
+
+ def generate_config(self):
+ self.port_pair_list, self.networks = self.get_port_pairs(self.interfaces)
+ self.get_lb_count()
+ self.generate_lb_to_port_pair_mapping()
+ self.generate_config_data()
+ self.write_parser.write(sys.stdout)
+ with open(self.tmp_file, 'a') as tfh:
+ self.write_parser.write(tfh)
+
+ def generate_link_config(self):
+
+ link_configs = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ port = port[-1]
+ virtual_interface = self.interfaces[int(port)]["virtual-interface"]
+ local_ip = virtual_interface["local_ip"]
+ netmask = virtual_interface["netmask"]
+ port_ip, prefix_len = self.validate_ip_and_prefixlen(local_ip, netmask)
+ link_configs.append(LINK_CONFIG_TEMPLATE.format(port, port_ip, prefix_len))
+
+ return ''.join(link_configs)
+
+ def get_route_data(self, src_key, data_key, port):
+ route_list = self.vnfd['vdu'][0].get(src_key, [])
+ return next((route[data_key] for route in route_list if route['if'] == port), None)
+
+ def get_ports_gateway(self, port):
+ return self.get_route_data('routing_table', 'gateway', port)
+
+ def get_ports_gateway6(self, port):
+ return self.get_route_data('nd_route_tbl', 'gateway', port)
+
+ def get_netmask_gateway(self, port):
+ return self.get_route_data('routing_table', 'netmask', port)
+
+ def get_netmask_gateway6(self, port):
+ return self.get_route_data('nd_route_tbl', 'netmask', port)
+
+ def generate_arp_config(self):
+ arp_config = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ gateway = self.get_ports_gateway(port)
+ # omit entries with no gateway
+ if not gateway:
+ continue
+ dst_mac = self.interfaces[int(port[-1])]["virtual-interface"]["dst_mac"]
+ arp_config.append((port[-1], gateway, dst_mac, self.txrx_pipeline))
+
+ return '\n'.join(('p {3} arpadd {0} {1} {2}'.format(*values) for values in arp_config))
+
+ def generate_arp_config6(self):
+ arp_config6 = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ gateway6 = self.get_ports_gateway6(port)
+ # omit entries with no gateway
+ if not gateway6:
+ continue
+ dst_mac6 = self.interfaces[int(port[-1])]["virtual-interface"]["dst_mac"]
+ arp_config6.append((port[-1], gateway6, dst_mac6, self.txrx_pipeline))
+
+ return '\n'.join(('p {3} arpadd {0} {1} {2}'.format(*values) for values in arp_config6))
+
+ def generate_action_config(self):
+ port_list = []
+ for port_pair in self.port_pair_list:
+ for port in port_pair:
+ port_list.append(port[-1])
+
+ if self.vnf_type == "VFW":
+ template = FW_ACTION_TEMPLATE
+ else:
+ template = ACTION_TEMPLATE
+
+ return ''.join((template.format(port) for port in port_list))
+
+ def get_ip_from_port(self, port):
+ return self.make_ip_addr(self.get_ports_gateway(port), self.get_netmask_gateway(port))
+
+ def get_ip_and_prefixlen_from_ip_of_port(self, port):
+ ip_addr = self.get_ip_from_port(port)
+ # handle cases with no gateway
+ if ip_addr:
+ return ip_addr.ip.exploded, ip_addr.network.prefixlen
+ else:
+ return None, None
+
+ def generate_rule_config(self):
+ cmd = 'acl' if self.vnf_type == "ACL" else "vfw"
+ rules_config = self.rules if self.rules else ''
+ new_rules = []
+ new_ipv6_rules = []
+ pattern = 'p {0} add {1} {2} {3} {4} {5} 0 65535 0 65535 0 0 {6}'
+ for port_pair in self.port_pair_list:
+ src_port = int(port_pair[0][-1])
+ dst_port = int(port_pair[1][-1])
+
+ src_ip, src_prefix_len = self.get_ip_and_prefixlen_from_ip_of_port(port_pair[0])
+ dst_ip, dst_prefix_len = self.get_ip_and_prefixlen_from_ip_of_port(port_pair[1])
+ # ignore entires with empty values
+ if all((src_ip, src_prefix_len, dst_ip, dst_prefix_len)):
+ new_rules.append((cmd, self.txrx_pipeline, src_ip, src_prefix_len,
+ dst_ip, dst_prefix_len, dst_port))
+ new_rules.append((cmd, self.txrx_pipeline, dst_ip, dst_prefix_len,
+ src_ip, src_prefix_len, src_port))
+
+ src_ip = self.get_ports_gateway6(port_pair[0])
+ src_prefix_len = self.get_netmask_gateway6(port_pair[0])
+ dst_ip = self.get_ports_gateway6(port_pair[1])
+ dst_prefix_len = self.get_netmask_gateway6(port_pair[0])
+ # ignore entires with empty values
+ if all((src_ip, src_prefix_len, dst_ip, dst_prefix_len)):
+ new_ipv6_rules.append((cmd, self.txrx_pipeline, src_ip, src_prefix_len,
+ dst_ip, dst_prefix_len, dst_port))
+ new_ipv6_rules.append((cmd, self.txrx_pipeline, dst_ip, dst_prefix_len,
+ src_ip, src_prefix_len, src_port))
+
+ acl_apply = "\np %s applyruleset" % cmd
+ new_rules_config = '\n'.join(pattern.format(*values) for values
+ in chain(new_rules, new_ipv6_rules))
+ return ''.join([rules_config, new_rules_config, acl_apply])
+
+ def generate_script_data(self):
+ self.port_pair_list, self.networks = self.get_port_pairs(self.interfaces)
+ self.get_lb_count()
+ script_data = {
+ 'link_config': self.generate_link_config(),
+ 'arp_config': self.generate_arp_config(),
+ 'arp_config6': self.generate_arp_config6(),
+ 'actions': '',
+ 'rules': '',
+ }
+
+ if self.vnf_type in ('ACL', 'VFW'):
+ script_data.update({
+ 'actions': self.generate_action_config(),
+ 'rules': self.generate_rule_config(),
+ })
+
+ return script_data
+
+ def generate_script(self, vnfd, rules=None):
+ self.vnfd = vnfd
+ self.rules = rules
+ script_data = self.generate_script_data()
+ script = SCRIPT_TPL.format(**script_data)
+ if self.lb_config == self.HW_LB:
+ script += 'set fwd rxonly'
+ hwlb_tpl = """
+set_sym_hash_ena_per_port {0} enable
+set_hash_global_config {0} simple_xor ipv4-udp enable
+set_sym_hash_ena_per_port {1} enable
+set_hash_global_config {1} simple_xor ipv4-udp enable
+set_hash_input_set {0} ipv4-udp src-ipv4 udp-src-port add
+set_hash_input_set {1} ipv4-udp dst-ipv4 udp-dst-port add
+set_hash_input_set {0} ipv6-udp src-ipv6 udp-src-port add
+set_hash_input_set {1} ipv6-udp dst-ipv6 udp-dst-port add
+"""
+ for port_pair in self.port_pair_list:
+ script += hwlb_tpl.format(port_pair[0][-1], port_pair[1][-1])
+ return script
diff --git a/yardstick/network_services/pipeline.py b/yardstick/network_services/pipeline.py
new file mode 100644
index 000000000..d781ba0cd
--- /dev/null
+++ b/yardstick/network_services/pipeline.py
@@ -0,0 +1,113 @@
+# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import print_function
+import itertools
+
+from six.moves import zip
+
+FIREWALL_ADD_DEFAULT = "p {0} firewall add default 1"
+FIREWALL_ADD_PRIO = """\
+p {0} firewall add priority 1 ipv4 {1} 24 0.0.0.0 0 0 65535 0 65535 6 0xFF port 0"""
+
+FLOW_ADD_QINQ_RULES = """\
+p {0} flow add qinq 128 512 port 0 id 1
+p {0} flow add default 1"""
+
+ACTION_FLOW_BULK = "p {0} action flow bulk /tmp/action_bulk_512.txt"
+ACTION_DSCP_CLASS_COLOR = "p {0} action dscp {1} class {2} color {3}"
+ROUTE_ADD_DEFAULT = "p {0} route add default 1"
+ROUTE_ADD_ETHER_QINQ = 'p {0} route add {1} {2} port 0 ether {3} qinq 0 {4}'
+ROUTE_ADD_ETHER_MPLS = "p {0} route add {1} 21 port 0 ether {2} mpls 0:{3}"
+
+
+class PipelineRules(object):
+
+ def __init__(self, pipeline_id=0):
+ super(PipelineRules, self).__init__()
+ self.rule_list = []
+ self.pipeline_id = pipeline_id
+
+ def __str__(self):
+ return '\n'.join(self.rule_list)
+
+ def get_string(self):
+ return str(self)
+
+ def next_pipeline(self, num=1):
+ self.pipeline_id += num
+
+ def add_newline(self):
+ self.rule_list.append('')
+
+ def add_rule(self, base, *args):
+ self.rule_list.append(base.format(self.pipeline_id, *args))
+
+ def add_firewall_prio(self, ip):
+ self.add_rule(FIREWALL_ADD_PRIO, ip)
+
+ def add_firewall_script(self, ip):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ for i in range(256):
+ ip_addr[-2] = str(i)
+ ip = '.'.join(ip_addr)
+ self.add_firewall_prio(ip)
+ self.add_rule(FIREWALL_ADD_DEFAULT)
+ self.add_newline()
+
+ def add_flow_classification_script(self):
+ self.add_rule(FLOW_ADD_QINQ_RULES)
+
+ def add_flow_action(self):
+ self.add_rule(ACTION_FLOW_BULK)
+
+ def add_dscp_class_color(self, dscp, color):
+ self.add_rule(ACTION_DSCP_CLASS_COLOR, dscp, dscp % 4, color)
+
+ def add_flow_action2(self):
+ self.add_rule(ACTION_FLOW_BULK)
+ for dscp, color in zip(range(64), itertools.cycle('GYR')):
+ self.add_dscp_class_color(dscp, color)
+
+ def add_route_ether_mpls(self, ip, mac_addr, index):
+ self.add_rule(ROUTE_ADD_ETHER_MPLS, ip, mac_addr, index)
+
+ def add_route_script(self, ip, mac_addr):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ for index in range(0, 256, 8):
+ ip_addr[-2] = str(index)
+ ip = '.'.join(ip_addr)
+ self.add_route_ether_mpls(ip, mac_addr, index)
+ self.add_rule(ROUTE_ADD_DEFAULT)
+ self.add_newline()
+
+ def add_ether_qinq(self, ip, mask, mac_addr, index):
+ self.add_rule(ROUTE_ADD_ETHER_QINQ, ip, mask, mac_addr, index)
+
+ def add_route_script2(self, ip, mac_addr):
+ ip_addr = ip.split('.')
+ assert len(ip_addr) == 4
+ ip_addr[-1] = '0'
+ mask = 24
+ for i in range(0, 256):
+ ip_addr[-2] = str(i)
+ ip = '.'.join(ip_addr)
+ self.add_ether_qinq(ip, mask, mac_addr, i)
+ self.add_rule(ROUTE_ADD_DEFAULT)
+ self.add_newline()
diff --git a/yardstick/network_services/traffic_profile/rfc2544.py b/yardstick/network_services/traffic_profile/rfc2544.py
index 99964d329..b07bc9d5a 100644
--- a/yardstick/network_services/traffic_profile/rfc2544.py
+++ b/yardstick/network_services/traffic_profile/rfc2544.py
@@ -17,6 +17,10 @@ from __future__ import absolute_import
from __future__ import division
import logging
+from stl.trex_stl_lib.trex_stl_client import STLStream
+from stl.trex_stl_lib.trex_stl_streams import STLFlowLatencyStats
+from stl.trex_stl_lib.trex_stl_streams import STLTXCont
+
from yardstick.network_services.traffic_profile.traffic_profile \
import TrexProfile
@@ -28,79 +32,175 @@ class RFC2544Profile(TrexProfile):
def __init__(self, traffic_generator):
super(RFC2544Profile, self).__init__(traffic_generator)
+ self.generator = None
self.max_rate = None
self.min_rate = None
+ self.ports = None
self.rate = 100
- self.tmp_drop = None
- self.tmp_throughput = None
- self.profile_data = None
-
- def execute(self, traffic_generator):
- ''' Generate the stream and run traffic on the given ports '''
- if self.first_run:
- self.profile_data = self.params.get('private', '')
- ports = [traffic_generator.my_ports[0]]
- traffic_generator.client.add_streams(self.get_streams(),
- ports=ports[0])
- profile_data = self.params.get('public', '')
- if profile_data:
- self.profile_data = profile_data
- ports.append(traffic_generator.my_ports[1])
- traffic_generator.client.add_streams(self.get_streams(),
- ports=ports[1])
+ self.drop_percent_at_max_tx = None
+ self.throughput_max = None
- self.max_rate = self.rate
- self.min_rate = 0
- traffic_generator.client.start(ports=ports,
- mult=self.get_multiplier(),
- duration=30, force=True)
- self.tmp_drop = 0
- self.tmp_throughput = 0
+ def register_generator(self, generator):
+ self.generator = generator
+
+ def execute(self, traffic_generator=None):
+ """ Generate the stream and run traffic on the given ports """
+ if traffic_generator is not None and self.generator is None:
+ self.generator = traffic_generator
+
+ if self.ports is not None:
+ return
+
+ self.ports = []
+ priv_ports = self.generator.priv_ports
+ pub_ports = self.generator.pub_ports
+ # start from 1 for private_1, public_1, etc.
+ for index, (priv_port, pub_port) in enumerate(zip(priv_ports, pub_ports), 1):
+ profile_data = self.params.get('private_{}'.format(index), '')
+ self.ports.append(priv_port)
+ # pass profile_data directly, don't use self.profile_data
+ self.generator.client.add_streams(self.get_streams(profile_data), ports=priv_port)
+ profile_data = self.params.get('public_{}'.format(index), '')
+ # correlated traffic doesn't use public traffic?
+ if not profile_data or self.generator.rfc2544_helper.correlated_traffic:
+ continue
+ # just get the pub_port
+ self.ports.append(pub_port)
+ self.generator.client.add_streams(self.get_streams(profile_data), ports=pub_port)
+
+ self.max_rate = self.rate
+ self.min_rate = 0
+ self.generator.client.start(ports=self.ports, mult=self.get_multiplier(),
+ duration=30, force=True)
+ self.drop_percent_at_max_tx = 0
+ self.throughput_max = 0
def get_multiplier(self):
- ''' Get the rate at which next iternation to run '''
+ """ Get the rate at which next iteration to run """
self.rate = round((self.max_rate + self.min_rate) / 2.0, 2)
multiplier = round(self.rate / self.pps, 2)
return str(multiplier)
- def get_drop_percentage(self, traffic_generator,
- samples, tol_min, tolerance):
- ''' Calculate the drop percentage and run the traffic '''
- in_packets = sum([samples[iface]['in_packets'] for iface in samples])
- out_packets = sum([samples[iface]['out_packets'] for iface in samples])
+ def get_drop_percentage(self, generator=None):
+ """ Calculate the drop percentage and run the traffic """
+ if generator is None:
+ generator = self.generator
+ run_duration = self.generator.RUN_DURATION
+ samples = self.generator.generate_samples()
+
+ in_packets = sum([value['in_packets'] for value in samples.values()])
+ out_packets = sum([value['out_packets'] for value in samples.values()])
+
packet_drop = abs(out_packets - in_packets)
drop_percent = 100.0
try:
- drop_percent = round((packet_drop / float(out_packets)) * 100, 2)
+ drop_percent = round((packet_drop / float(out_packets)) * 100, 5)
except ZeroDivisionError:
LOGGING.info('No traffic is flowing')
- samples['TxThroughput'] = out_packets / 30
- samples['RxThroughput'] = in_packets / 30
- samples['CurrentDropPercentage'] = drop_percent
- samples['Throughput'] = self.tmp_throughput
- samples['DropPercentage'] = self.tmp_drop
- if drop_percent > tolerance and self.tmp_throughput == 0:
- samples['Throughput'] = (in_packets / 30)
- samples['DropPercentage'] = drop_percent
- if self.first_run:
- max_supported_rate = out_packets / 30
- self.rate = max_supported_rate
+
+ # TODO(esm): RFC2544 doesn't tolerate packet loss, why do we?
+ tolerance_low = generator.rfc2544_helper.tolerance_low
+ tolerance_high = generator.rfc2544_helper.tolerance_high
+
+ tx_rate = out_packets / run_duration
+ rx_rate = in_packets / run_duration
+
+ throughput_max = self.throughput_max
+ drop_percent_at_max_tx = self.drop_percent_at_max_tx
+
+ if self.drop_percent_at_max_tx is None:
+ self.rate = tx_rate
self.first_run = False
- if drop_percent > tolerance:
+
+ if drop_percent > tolerance_high:
+ # TODO(esm): why don't we discard results that are out of tolerance?
self.max_rate = self.rate
- elif drop_percent < tol_min:
+ if throughput_max == 0:
+ throughput_max = rx_rate
+ drop_percent_at_max_tx = drop_percent
+
+ elif drop_percent >= tolerance_low:
+ # TODO(esm): why do we update the samples dict in this case
+ # and not update our tracking values?
+ throughput_max = rx_rate
+ drop_percent_at_max_tx = drop_percent
+
+ elif drop_percent >= self.drop_percent_at_max_tx:
+ # TODO(esm): why don't we discard results that are out of tolerance?
self.min_rate = self.rate
- if drop_percent >= self.tmp_drop:
- self.tmp_drop = drop_percent
- self.tmp_throughput = (in_packets / 30)
- samples['Throughput'] = (in_packets / 30)
- samples['DropPercentage'] = drop_percent
+ self.drop_percent_at_max_tx = drop_percent_at_max_tx = drop_percent
+ self.throughput_max = throughput_max = rx_rate
+
else:
- samples['Throughput'] = (in_packets / 30)
- samples['DropPercentage'] = drop_percent
+ # TODO(esm): why don't we discard results that are out of tolerance?
+ self.min_rate = self.rate
+
+ generator.clear_client_stats()
+ generator.start_client(mult=self.get_multiplier(),
+ duration=run_duration, force=True)
+
+ # if correlated traffic update the Throughput
+ if generator.rfc2544_helper.correlated_traffic:
+ throughput_max *= 2
+
+ samples.update({
+ 'TxThroughput': tx_rate,
+ 'RxThroughput': rx_rate,
+ 'CurrentDropPercentage': drop_percent,
+ 'Throughput': throughput_max,
+ 'DropPercentage': drop_percent_at_max_tx,
+ })
- traffic_generator.client.clear_stats(ports=traffic_generator.my_ports)
- traffic_generator.client.start(ports=traffic_generator.my_ports,
- mult=self.get_multiplier(),
- duration=30, force=True)
return samples
+
+ def execute_latency(self, generator=None, samples=None):
+ if generator is None:
+ generator = self.generator
+
+ if samples is None:
+ samples = generator.generate_samples()
+
+ self.pps, multiplier = self.calculate_pps(samples)
+ self.ports = []
+ self.pg_id = self.params['traffic_profile'].get('pg_id', 1)
+ priv_ports = generator.priv_ports
+ pub_ports = generator.pub_ports
+ for index, (priv_port, pub_port) in enumerate(zip(priv_ports, pub_ports), 1):
+ profile_data = self.params.get('private_{}'.format(index), '')
+ self.ports.append(priv_port)
+ generator.client.add_streams(self.get_streams(profile_data),
+ ports=priv_port)
+
+ profile_data = self.params.get('public_{}'.format(index), '')
+ if not profile_data or generator.correlated_traffic:
+ continue
+
+ pub_port = generator.pub_ports[index]
+ self.ports.append(pub_port)
+ generator.client.add_streams(self.get_streams(profile_data),
+ ports=pub_port)
+
+ generator.start_client(ports=self.ports, mult=str(multiplier),
+ duration=120, force=True)
+ self.first_run = False
+
+ def calculate_pps(self, samples):
+ pps = round(samples['Throughput'] / 2, 2)
+ multiplier = round(self.rate / self.pps, 2)
+ return pps, multiplier
+
+ def create_single_stream(self, packet_size, pps, isg=0):
+ packet = self._create_single_packet(packet_size)
+ if pps:
+ stl_mode = STLTXCont(pps=pps)
+ else:
+ stl_mode = STLTXCont(pps=self.pps)
+ if self.pg_id:
+ LOGGING.debug("pg_id: %s", self.pg_id)
+ stl_flow_stats = STLFlowLatencyStats(pg_id=self.pg_id)
+ stream = STLStream(isg=isg, packet=packet, mode=stl_mode,
+ flow_stats=stl_flow_stats)
+ self.pg_id += 1
+ else:
+ stream = STLStream(isg=isg, packet=packet, mode=stl_mode)
+ return stream
diff --git a/yardstick/network_services/traffic_profile/traffic_profile.py b/yardstick/network_services/traffic_profile/traffic_profile.py
index 156cc6644..3e1f8d89f 100644
--- a/yardstick/network_services/traffic_profile/traffic_profile.py
+++ b/yardstick/network_services/traffic_profile/traffic_profile.py
@@ -399,16 +399,19 @@ class TrexProfile(TrafficProfile):
logging.debug("Imax: %s rate: %s", imix_count, self.rate)
return imix_count
- def get_streams(self):
- """ generate trex stream """
+ def get_streams(self, profile_data):
+ """ generate trex stream
+ :param profile_data:
+ :type profile_data:
+ """
self.streams = []
self.pps = self.params['traffic_profile'].get('frame_rate', 100)
- for packet_name in self.profile_data:
- outer_l2 = self.profile_data[packet_name].get('outer_l2')
+ for packet_name in profile_data:
+ outer_l2 = profile_data[packet_name].get('outer_l2')
imix_data = self.generate_imix_data(outer_l2)
if not imix_data:
imix_data = {64: self.pps}
- self.generate_vm(self.profile_data[packet_name])
+ self.generate_vm(profile_data[packet_name])
for size in imix_data:
self._generate_streams(size, imix_data[size])
self._generate_profile()
diff --git a/yardstick/network_services/utils.py b/yardstick/network_services/utils.py
index cb71a6029..38fbda47f 100644
--- a/yardstick/network_services/utils.py
+++ b/yardstick/network_services/utils.py
@@ -45,17 +45,19 @@ def get_nsb_option(option, default=None):
return default
-def provision_tool(connection, tool_path):
+def provision_tool(connection, tool_path, tool_file=None):
"""
verify if the tool path exits on the node,
if not push the local binary to remote node
:return - Tool path
"""
+ if tool_file:
+ tool_path = os.path.join(tool_path, tool_file)
bin_path = get_nsb_option("bin_path")
- exit_status, stdout = connection.execute("which %s" % tool_path)[:2]
+ exit_status, stdout = connection.execute("which %s > /dev/null 2>&1" % tool_path)[:2]
if exit_status == 0:
- return encodeutils.safe_decode(stdout, incoming='utf-8').rstrip()
+ return encodeutils.safe_decode(tool_path, incoming='utf-8').rstrip()
logging.warning("%s not found on %s, will try to copy from localhost",
tool_path, connection.host)
diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py
index 2df6037f3..955f9f03d 100644
--- a/yardstick/network_services/vnf_generic/vnf/base.py
+++ b/yardstick/network_services/vnf_generic/vnf/base.py
@@ -15,10 +15,6 @@
from __future__ import absolute_import
import logging
-import ipaddress
-import six
-
-from yardstick.network_services.utils import get_nsb_option
LOG = logging.getLogger(__name__)
@@ -61,192 +57,69 @@ class QueueFileWrapper(object):
self.q_out.get()
-class GenericVNF(object):
+class VnfdHelper(dict):
+
+ @property
+ def mgmt_interface(self):
+ return self["mgmt-interface"]
+
+ @property
+ def vdu(self):
+ return self['vdu']
+
+ @property
+ def vdu0(self):
+ return self.vdu[0]
+
+ @property
+ def interfaces(self):
+ return self.vdu0['external-interface']
+
+ @property
+ def kpi(self):
+ return self['benchmark']['kpi']
+
+ def find_virtual_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ virtual_intf = interface["virtual-interface"]
+ if virtual_intf[key] == value:
+ return interface
+
+ def find_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ if interface[key] == value:
+ return interface
+
+
+class VNFObject(object):
+
+ def __init__(self, name, vnfd):
+ super(VNFObject, self).__init__()
+ self.name = name
+ self.vnfd_helper = VnfdHelper(vnfd) # fixme: parse this into a structure
+
+
+class GenericVNF(VNFObject):
+
""" Class providing file-like API for generic VNF implementation """
- def __init__(self, vnfd):
- super(GenericVNF, self).__init__()
- self.vnfd = vnfd # fixme: parse this into a structure
+ def __init__(self, name, vnfd):
+ super(GenericVNF, self).__init__(name, vnfd)
# List of statistics we can obtain from this VNF
# - ETSI MANO 6.3.1.1 monitoring_parameter
- self.kpi = self._get_kpi_definition(vnfd)
+ self.kpi = self._get_kpi_definition()
# Standard dictionary containing params like thread no, buffer size etc
self.config = {}
self.runs_traffic = False
- self.name = "vnf__1" # name in topology file
- self.bin_path = get_nsb_option("bin_path", "")
- @classmethod
- def _get_kpi_definition(cls, vnfd):
+ def _get_kpi_definition(self):
""" Get list of KPIs defined in VNFD
:param vnfd:
:return: list of KPIs, e.g. ['throughput', 'latency']
"""
- return vnfd['benchmark']['kpi']
-
- @classmethod
- def get_ip_version(cls, ip_addr):
- """ get ip address version v6 or v4 """
- try:
- address = ipaddress.ip_address(six.text_type(ip_addr))
- except ValueError:
- LOG.error(ip_addr, " is not valid")
- return
- else:
- return address.version
-
- def _ip_to_hex(self, ip_addr):
- ip_x = ip_addr
- if self.get_ip_version(ip_addr) == 4:
- ip_to_convert = ip_addr.split(".")
- ip_octect = [int(octect) for octect in ip_to_convert]
- ip_x = "{0[0]:02X}{0[1]:02X}{0[2]:02X}{0[3]:02X}".format(ip_octect)
- return ip_x
-
- def _get_dpdk_port_num(self, name):
- for intf in self.vnfd['vdu'][0]['external-interface']:
- if name == intf['name']:
- return intf['virtual-interface']['dpdk_port_num']
-
- def _append_routes(self, ip_pipeline_cfg):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
-
- where = ip_pipeline_cfg.find("arp_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_add = ip_pipeline_cfg[where:]
-
- tmp = route_add.find('\n')
- route_add = route_add[tmp:]
-
- cmds = "arp_route_tbl ="
-
- for route in routing_table:
- net = self._ip_to_hex(route['network'])
- net_nm = self._ip_to_hex(route['netmask'])
- net_gw = self._ip_to_hex(route['gateway'])
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
-
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_add
-
- return ip_pipeline_cfg
-
- def _append_nd_routes(self, ip_pipeline_cfg):
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- where = ip_pipeline_cfg.find("nd_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_nd = ip_pipeline_cfg[where:]
-
- tmp = route_nd.find('\n')
- route_nd = route_nd[tmp:]
-
- cmds = "nd_route_tbl ="
-
- for route in routing_table:
- net = route['network']
- net_nm = route['netmask']
- net_gw = route['gateway']
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
-
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_nd
-
- return ip_pipeline_cfg
-
- def _get_port0localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0localip6 : %s", return_value)
- return return_value
-
- def _get_port1localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1localip6 : %s", return_value)
- return return_value
-
- def _get_port0prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['netmask']
- LOG.info("_get_port0prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port1prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['netmask']
- LOG.info("_get_port1prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port0gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0gateway6 : %s", return_value)
- return return_value
-
- def _get_port1gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1gateway6 : %s", return_value)
- return return_value
+ return self.vnfd_helper.kpi
def instantiate(self, scenario_cfg, context_cfg):
""" Prepare VNF for operation and start the VNF process/VM
@@ -284,11 +157,10 @@ class GenericVNF(object):
class GenericTrafficGen(GenericVNF):
""" Class providing file-like API for generic traffic generator """
- def __init__(self, vnfd):
- super(GenericTrafficGen, self).__init__(vnfd)
+ def __init__(self, name, vnfd):
+ super(GenericTrafficGen, self).__init__(name, vnfd)
self.runs_traffic = True
self.traffic_finished = False
- self.name = "tgen__1" # name in topology file
def run_traffic(self, traffic_profile):
""" Generate traffic on the wire according to the given params.
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
new file mode 100644
index 000000000..89c086d97
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -0,0 +1,994 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Base class implementation for generic vnf implementation """
+
+from __future__ import absolute_import
+
+import posixpath
+import time
+import logging
+import os
+import re
+import subprocess
+from collections import Mapping
+
+from multiprocessing import Queue, Value, Process
+
+from six.moves import cStringIO
+
+from yardstick.benchmark.contexts.base import Context
+from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.network_services.helpers.cpu import CpuSysCores
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
+from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
+from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.network_services.utils import get_nsb_option
+
+from stl.trex_stl_lib.trex_stl_client import STLClient
+from stl.trex_stl_lib.trex_stl_client import LoggerApi
+from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError
+
+from yardstick.ssh import AutoConnectSSH
+
+DPDK_VERSION = "dpdk-16.07"
+
+LOG = logging.getLogger(__name__)
+
+
+REMOTE_TMP = "/tmp"
+
+
+class VnfSshHelper(AutoConnectSSH):
+
+ def __init__(self, node, bin_path, wait=None):
+ self.node = node
+ kwargs = self.args_from_node(self.node)
+ if wait:
+ kwargs.setdefault('wait', wait)
+
+ super(VnfSshHelper, self).__init__(**kwargs)
+ self.bin_path = bin_path
+
+ @staticmethod
+ def get_class():
+ # must return static class name, anything else refers to the calling class
+ # i.e. the subclass, not the superclass
+ return VnfSshHelper
+
+ def copy(self):
+ # this copy constructor is different from SSH classes, since it uses node
+ return self.get_class()(self.node, self.bin_path)
+
+ def upload_config_file(self, prefix, content):
+ cfg_file = os.path.join(REMOTE_TMP, prefix)
+ LOG.debug(content)
+ file_obj = cStringIO(content)
+ self.put_file_obj(file_obj, cfg_file)
+ return cfg_file
+
+ def join_bin_path(self, *args):
+ return os.path.join(self.bin_path, *args)
+
+ def provision_tool(self, tool_path=None, tool_file=None):
+ if tool_path is None:
+ tool_path = self.bin_path
+ return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
+
+
+class SetupEnvHelper(object):
+
+ CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
+ CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
+ CORES = []
+ DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
+ PIPELINE_COMMAND = ''
+ VNF_TYPE = "SAMPLE"
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(SetupEnvHelper, self).__init__()
+ self.vnfd_helper = vnfd_helper
+ self.ssh_helper = ssh_helper
+ self.scenario_helper = scenario_helper
+
+ def _get_ports_gateway(self, name):
+ routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
+ for route in routing_table:
+ if name == route['if']:
+ return route['gateway']
+ return None
+
+ def build_config(self):
+ raise NotImplementedError
+
+ def setup_vnf_environment(self):
+ pass
+ # raise NotImplementedError
+
+ def tear_down(self):
+ raise NotImplementedError
+
+
+class DpdkVnfSetupEnvHelper(SetupEnvHelper):
+
+ APP_NAME = 'DpdkVnf'
+ DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
+ DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
+ FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
+
+ HW_DEFAULT_CORE = 3
+ SW_DEFAULT_CORE = 2
+
+ DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
+
+ @staticmethod
+ def _update_packet_type(ip_pipeline_cfg, traffic_options):
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ @classmethod
+ def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
+ traffic_type = traffic_options['traffic_type']
+
+ if traffic_options['vnf_type'] is not cls.APP_NAME:
+ match_str = 'traffic_type = 4'
+ replace_str = 'traffic_type = {0}'.format(traffic_type)
+
+ elif traffic_type == 4:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv4'
+
+ else:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv6'
+
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
+ self.all_ports = None
+ self.bound_pci = None
+ self._dpdk_nic_bind = None
+ self.socket = None
+
+ @property
+ def dpdk_nic_bind(self):
+ if self._dpdk_nic_bind is None:
+ self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
+ return self._dpdk_nic_bind
+
+ def _setup_hugepages(self):
+ cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
+ hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
+
+ memory_path = \
+ '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
+ self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
+
+ if hugepages == "2048kB":
+ pages = 16384
+ else:
+ pages = 16
+
+ self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
+
+ def _get_dpdk_port_num(self, name):
+ interface = self.vnfd_helper.find_interface(name=name)
+ return interface['virtual-interface']['dpdk_port_num']
+
+ def build_config(self):
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ task_path = self.scenario_helper.task_path
+
+ lb_count = vnf_cfg.get('lb_count', 3)
+ lb_config = vnf_cfg.get('lb_config', 'SW')
+ worker_config = vnf_cfg.get('worker_config', '1C/1T')
+ worker_threads = vnf_cfg.get('worker_threads', 3)
+
+ traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
+ traffic_options = {
+ 'traffic_type': traffic_type,
+ 'pkt_type': 'ipv%s' % traffic_type,
+ 'vnf_type': self.VNF_TYPE,
+ }
+
+ config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ multiport = MultiPortConfig(self.scenario_helper.topology,
+ config_tpl_cfg,
+ config_basename,
+ self.vnfd_helper.interfaces,
+ self.VNF_TYPE,
+ lb_count,
+ worker_threads,
+ worker_config,
+ lb_config,
+ self.socket)
+
+ multiport.generate_config()
+ with open(self.CFG_CONFIG) as handle:
+ new_config = handle.read()
+
+ new_config = self._update_traffic_type(new_config, traffic_options)
+ new_config = self._update_packet_type(new_config, traffic_options)
+
+ self.ssh_helper.upload_config_file(config_basename, new_config)
+ self.ssh_helper.upload_config_file(script_basename,
+ multiport.generate_script(self.vnfd_helper))
+ self.all_ports = multiport.port_pair_list
+
+ LOG.info("Provision and start the %s", self.APP_NAME)
+ self._build_pipeline_kwargs()
+ return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
+
+ def _build_pipeline_kwargs(self):
+ tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
+ ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
+ self.pipeline_kwargs = {
+ 'cfg_file': self.CFG_CONFIG,
+ 'script': self.CFG_SCRIPT,
+ 'ports_len_hex': ports_len_hex,
+ 'tool_path': tool_path,
+ }
+
+ def _get_app_cpu(self):
+ if self.CORES:
+ return self.CORES
+
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ sys_obj = CpuSysCores(self.ssh_helper)
+ self.sys_cpu = sys_obj.get_core_socket()
+ num_core = int(vnf_cfg["worker_threads"])
+ if vnf_cfg.get("lb_config", "SW") == 'HW':
+ num_core += self.HW_DEFAULT_CORE
+ else:
+ num_core += self.SW_DEFAULT_CORE
+ app_cpu = self.sys_cpu[str(self.socket)][:num_core]
+ return app_cpu
+
+ def _get_cpu_sibling_list(self, cores=None):
+ if cores is None:
+ cores = self._get_app_cpu()
+ sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
+ awk_template = "awk -F: '{ print $1 }' < %s"
+ sys_path = "/sys/devices/system/cpu/"
+ cpu_topology = []
+ try:
+ for core in cores:
+ sys_cmd = sys_cmd_template % (sys_path, core)
+ cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
+ cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
+
+ return cpu_topology
+ except Exception:
+ return []
+
+ def _validate_cpu_cfg(self):
+ return self._get_cpu_sibling_list()
+
+ def _find_used_drivers(self):
+ cmd = "{0} -s".format(self.dpdk_nic_bind)
+ rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
+
+ self.used_drivers = {
+ vpci: (index, driver)
+ for index, (vpci, driver)
+ in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
+ if any(b.endswith(vpci) for b in self.bound_pci)
+ }
+
+ def setup_vnf_environment(self):
+ self._setup_dpdk()
+ resource = self._setup_resources()
+ self._kill_vnf()
+ self._detect_drivers()
+ return resource
+
+ def _kill_vnf(self):
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+
+ def _setup_dpdk(self):
+ """ setup dpdk environment needed for vnf to run """
+
+ self._setup_hugepages()
+ self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
+
+ exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
+ if exit_status == 0:
+ return
+
+ dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
+ dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
+ exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
+ if exit_status != 0:
+ self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+
+ def _setup_resources(self):
+ interfaces = self.vnfd_helper.interfaces
+ self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
+
+ # what is this magic? how do we know which socket is for which port?
+ # what about quad-socket?
+ if any(v[5] == "0" for v in self.bound_pci):
+ self.socket = 0
+ else:
+ self.socket = 1
+
+ cores = self._validate_cpu_cfg()
+ return ResourceProfile(self.vnfd_helper, cores)
+
+ def _detect_drivers(self):
+ interfaces = self.vnfd_helper.interfaces
+
+ self._find_used_drivers()
+ for vpci, (index, _) in self.used_drivers.items():
+ try:
+ intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
+ except StopIteration:
+ pass
+ else:
+ intf1['dpdk_port_num'] = index
+
+ for vpci in self.bound_pci:
+ self._bind_dpdk('igb_uio', vpci)
+ time.sleep(2)
+
+ def _bind_dpdk(self, driver, vpci, force=True):
+ if force:
+ force = '--force '
+ else:
+ force = ''
+ cmd = self.DPDK_BIND_CMD.format(force=force,
+ dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci)
+ self.ssh_helper.execute(cmd)
+
+ def _detect_and_bind_dpdk(self, vpci, driver):
+ find_net_cmd = self.FIND_NET_CMD.format(vpci)
+ exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status == 0:
+ # already bound
+ return None
+ self._bind_dpdk(driver, vpci)
+ exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status != 0:
+ # failed to bind
+ return None
+ return stdout
+
+ def _bind_kernel_devices(self):
+ for intf in self.vnfd_helper.interfaces:
+ vi = intf["virtual-interface"]
+ stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
+ if stdout is not None:
+ vi["local_iface_name"] = posixpath.basename(stdout)
+
+ def tear_down(self):
+ for vpci, (_, driver) in self.used_drivers.items():
+ self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci))
+
+
+class ResourceHelper(object):
+
+ COLLECT_KPI = ''
+ MAKE_INSTALL = 'cd {0} && make && sudo make install'
+ RESOURCE_WORD = 'sample'
+
+ COLLECT_MAP = {}
+
+ def __init__(self, setup_helper):
+ super(ResourceHelper, self).__init__()
+ self.resource = None
+ self.setup_helper = setup_helper
+ self.ssh_helper = setup_helper.ssh_helper
+
+ def setup(self):
+ self.resource = self.setup_helper.setup_vnf_environment()
+
+ def generate_cfg(self):
+ pass
+
+ def _collect_resource_kpi(self):
+ result = {}
+ status = self.resource.check_if_sa_running("collectd")[0]
+ if status:
+ result = self.resource.amqp_collect_nfvi_kpi()
+
+ result = {"core": result}
+ return result
+
+ def start_collect(self):
+ self.resource.initiate_systemagent(self.ssh_helper.bin_path)
+ self.resource.start()
+ self.resource.amqp_process_for_nfvi_kpi()
+
+ def stop_collect(self):
+ if self.resource:
+ self.resource.stop()
+
+ def collect_kpi(self):
+ return self._collect_resource_kpi()
+
+
+class ClientResourceHelper(ResourceHelper):
+
+ RUN_DURATION = 60
+ QUEUE_WAIT_TIME = 5
+ SYNC_PORT = 1
+ ASYNC_PORT = 2
+
+ def __init__(self, setup_helper):
+ super(ClientResourceHelper, self).__init__(setup_helper)
+ self.vnfd_helper = setup_helper.vnfd_helper
+ self.scenario_helper = setup_helper.scenario_helper
+
+ self.client = None
+ self.client_started = Value('i', 0)
+ self.my_ports = None
+ self._queue = Queue()
+ self._result = {}
+ self._terminated = Value('i', 0)
+ self._vpci_ascending = None
+
+ def _build_ports(self):
+ self.my_ports = [0, 1]
+
+ def get_stats(self, *args, **kwargs):
+ try:
+ return self.client.get_stats(*args, **kwargs)
+ except STLStateError:
+ LOG.exception("TRex client not connected")
+ return {}
+
+ def generate_samples(self, key=None, default=None):
+ last_result = self.get_stats(self.my_ports)
+ key_value = last_result.get(key, default)
+
+ if not isinstance(last_result, Mapping): # added for mock unit test
+ self._terminated.value = 1
+ return {}
+
+ samples = {}
+ for vpci_idx, vpci in enumerate(self._vpci_ascending):
+ name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
+ # fixme: VNFDs KPIs values needs to be mapped to TRex structure
+ xe_value = last_result.get(vpci_idx, {})
+ samples[name] = {
+ "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
+ "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
+ "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
+ "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
+ "in_packets": int(xe_value.get("ipackets", 0)),
+ "out_packets": int(xe_value.get("opackets", 0)),
+ }
+ if key:
+ samples[name][key] = key_value
+ return samples
+
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ samples = self.generate_samples()
+ time.sleep(self.QUEUE_WAIT_TIME)
+ self._queue.put(samples)
+
+ def run_traffic(self, traffic_profile):
+ # fixme: fix passing correct trex config file,
+ # instead of searching the default path
+ self._build_ports()
+ self.client = self._connect()
+ self.client.reset(ports=self.my_ports)
+ self.client.remove_all_streams(self.my_ports) # remove all streams
+ traffic_profile.register_generator(self)
+
+ while self._terminated.value == 0:
+ self._run_traffic_once(traffic_profile)
+
+ self.client.stop(self.my_ports)
+ self.client.disconnect()
+ self._terminated.value = 0
+
+ def terminate(self):
+ self._terminated.value = 1 # stop client
+
+ def clear_stats(self, ports=None):
+ if ports is None:
+ ports = self.my_ports
+ self.client.clear_stats(ports=ports)
+
+ def start(self, ports=None, *args, **kwargs):
+ if ports is None:
+ ports = self.my_ports
+ self.client.start(ports=ports, *args, **kwargs)
+
+ def collect_kpi(self):
+ if not self._queue.empty():
+ kpi = self._queue.get()
+ self._result.update(kpi)
+ LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
+ return self._result
+
+ def _connect(self, client=None):
+ if client is None:
+ client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
+ server=self.vnfd_helper.mgmt_interface["ip"],
+ verbose_level=LoggerApi.VERBOSE_QUIET)
+
+ # try to connect with 5s intervals, 30s max
+ for idx in range(6):
+ try:
+ client.connect()
+ break
+ except STLError:
+ LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
+ time.sleep(5)
+ return client
+
+
+class Rfc2544ResourceHelper(object):
+
+ DEFAULT_CORRELATED_TRAFFIC = False
+ DEFAULT_LATENCY = False
+ DEFAULT_TOLERANCE = '0.0001 - 0.0001'
+
+ def __init__(self, scenario_helper):
+ super(Rfc2544ResourceHelper, self).__init__()
+ self.scenario_helper = scenario_helper
+ self._correlated_traffic = None
+ self.iteration = Value('i', 0)
+ self._latency = None
+ self._rfc2544 = None
+ self._tolerance_low = None
+ self._tolerance_high = None
+
+ @property
+ def rfc2544(self):
+ if self._rfc2544 is None:
+ self._rfc2544 = self.scenario_helper.all_options['rfc2544']
+ return self._rfc2544
+
+ @property
+ def tolerance_low(self):
+ if self._tolerance_low is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_low
+
+ @property
+ def tolerance_high(self):
+ if self._tolerance_high is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_high
+
+ @property
+ def correlated_traffic(self):
+ if self._correlated_traffic is None:
+ self._correlated_traffic = \
+ self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
+
+ return self._correlated_traffic
+
+ @property
+ def latency(self):
+ if self._latency is None:
+ self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
+ return self._latency
+
+ def get_rfc2544(self, name, default=None):
+ return self.rfc2544.get(name, default)
+
+ def get_rfc_tolerance(self):
+ tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
+ tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
+ self._tolerance_low = next(tolerance_iter)
+ self._tolerance_high = next(tolerance_iter, self.tolerance_low)
+
+
+class SampleVNFDeployHelper(object):
+
+ SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
+ REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
+ SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
+
+ def __init__(self, vnfd_helper, ssh_helper):
+ super(SampleVNFDeployHelper, self).__init__()
+ self.ssh_helper = ssh_helper
+ self.vnfd_helper = vnfd_helper
+
+ DISABLE_DEPLOY = True
+
+ def deploy_vnfs(self, app_name):
+ # temp disable for now
+ if self.DISABLE_DEPLOY:
+ return
+
+ vnf_bin = self.ssh_helper.join_bin_path(app_name)
+ exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
+ if not exit_status:
+ return
+
+ subprocess.check_output(["rm", "-rf", self.REPO_NAME])
+ subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
+ time.sleep(2)
+ self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
+ self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
+
+ build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
+ time.sleep(2)
+ http_proxy = os.environ.get('http_proxy', '')
+ https_proxy = os.environ.get('https_proxy', '')
+ cmd = "sudo -E %s --silent '%s' '%s'" % (build_script, http_proxy, https_proxy)
+ LOG.debug(cmd)
+ self.ssh_helper.execute(cmd)
+ vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
+ self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
+ self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
+
+
+class ScenarioHelper(object):
+
+ DEFAULT_VNF_CFG = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1,
+ }
+
+ def __init__(self, name):
+ self.name = name
+ self.scenario_cfg = None
+
+ @property
+ def task_path(self):
+ return self.scenario_cfg["task_path"]
+
+ @property
+ def nodes(self):
+ return self.scenario_cfg['nodes']
+
+ @property
+ def all_options(self):
+ return self.scenario_cfg["options"]
+
+ @property
+ def options(self):
+ return self.all_options[self.name]
+
+ @property
+ def vnf_cfg(self):
+ return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
+
+ @property
+ def topology(self):
+ return self.scenario_cfg['topology']
+
+
+class SampleVNF(GenericVNF):
+ """ Class providing file-like API for generic VNF implementation """
+
+ VNF_PROMPT = "pipeline>"
+ WAIT_TIME = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNF, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.all_ports = None
+ self.context_cfg = None
+ self.nfvi_context = None
+ self.pipeline_kwargs = {}
+ self.priv_ports = None
+ self.pub_ports = None
+ # TODO(esm): make QueueFileWrapper invert-able so that we
+ # never have to manage the queues
+ self.q_in = Queue()
+ self.q_out = Queue()
+ self.queue_wrapper = None
+ self.run_kwargs = {}
+ self.scenario_cfg = None
+ self.tg_port_pairs = None
+ self.used_drivers = {}
+ self.vnf_port_pairs = None
+ self._vnf_process = None
+
+ def _get_route_data(self, route_index, route_type):
+ route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
+ for _ in range(route_index):
+ next(route_iter, '')
+ return next(route_iter, {}).get(route_type, '')
+
+ def _get_port0localip6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0localip6 : %s", return_value)
+ return return_value
+
+ def _get_port1localip6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1localip6 : %s", return_value)
+ return return_value
+
+ def _get_port0prefixlen6(self):
+ return_value = self._get_route_data(0, 'netmask')
+ LOG.info("_get_port0prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port1prefixlen6(self):
+ return_value = self._get_route_data(1, 'netmask')
+ LOG.info("_get_port1prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port0gateway6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0gateway6 : %s", return_value)
+ return return_value
+
+ def _get_port1gateway6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1gateway6 : %s", return_value)
+ return return_value
+
+ def _start_vnf(self):
+ self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
+ self._vnf_process = Process(target=self._run)
+ self._vnf_process.start()
+
+ def _vnf_up_post(self):
+ pass
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.context_cfg = context_cfg
+ self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
+ # self.nfvi_context = None
+
+ self.deploy_helper.deploy_vnfs(self.APP_NAME)
+ self.resource_helper.setup()
+ self._start_vnf()
+
+ def wait_for_instantiate(self):
+ buf = []
+ time.sleep(self.WAIT_TIME) # Give some time for config to load
+ while True:
+ if not self._vnf_process.is_alive():
+ raise RuntimeError("%s VNF process died." % self.APP_NAME)
+
+ # TODO(esm): move to QueueFileWrapper
+ while self.q_out.qsize() > 0:
+ buf.append(self.q_out.get())
+ message = ''.join(buf)
+ if self.VNF_PROMPT in message:
+ LOG.info("%s VNF is up and running.", self.APP_NAME)
+ self._vnf_up_post()
+ self.queue_wrapper.clear()
+ self.resource_helper.start_collect()
+ return self._vnf_process.exitcode
+
+ if "PANIC" in message:
+ raise RuntimeError("Error starting %s VNF." %
+ self.APP_NAME)
+
+ LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
+ time.sleep(1)
+
+ def _build_run_kwargs(self):
+ self.run_kwargs = {
+ 'stdin': self.queue_wrapper,
+ 'stdout': self.queue_wrapper,
+ 'keep_stdin_open': True,
+ 'pty': True,
+ }
+
+ def _build_config(self):
+ return self.setup_helper.build_config()
+
+ def _run(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+ cmd = self._build_config()
+ # kill before starting
+ self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
+
+ LOG.debug(cmd)
+ self._build_run_kwargs()
+ self.ssh_helper.run(cmd, **self.run_kwargs)
+
+ def vnf_execute(self, cmd, wait_time=2):
+ """ send cmd to vnf process """
+
+ LOG.info("%s command: %s", self.APP_NAME, cmd)
+ self.q_in.put("{}\r\n".format(cmd))
+ time.sleep(wait_time)
+ output = []
+ while self.q_out.qsize() > 0:
+ output.append(self.q_out.get())
+ return "".join(output)
+
+ def _tear_down(self):
+ pass
+
+ def terminate(self):
+ self.vnf_execute("quit")
+ if self._vnf_process:
+ self._vnf_process.terminate()
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+ self._tear_down()
+ self.resource_helper.stop_collect()
+
+ def get_stats(self, *args, **kwargs):
+ """
+ Method for checking the statistics
+
+ :return:
+ VNF statistics
+ """
+ cmd = 'p {0} stats'.format(self.APP_WORD)
+ out = self.vnf_execute(cmd)
+ return out
+
+ def collect_kpi(self):
+ stats = self.get_stats()
+ m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if m:
+ result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
+ result["collect_stats"] = self.resource_helper.collect_kpi()
+ else:
+ result = {
+ "packets_in": 0,
+ "packets_fwd": 0,
+ "packets_dropped": 0,
+ }
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+
+class SampleVNFTrafficGen(GenericTrafficGen):
+ """ Class providing file-like API for generic traffic generator """
+
+ APP_NAME = 'Sample'
+ RUN_WAIT = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+ self.name = "tgen__1" # name in topology file
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ClientResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.runs_traffic = True
+ self.traffic_finished = False
+ self.tg_port_pairs = None
+ self._tg_process = None
+ self._traffic_process = None
+
+ def _start_server(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.resource_helper.generate_cfg()
+ self.setup_helper.setup_vnf_environment()
+ self.resource_helper.setup()
+
+ LOG.info("Starting %s server...", self.APP_NAME)
+ self._tg_process = Process(target=self._start_server)
+ self._tg_process.start()
+
+ def wait_for_instantiate(self):
+ return self._wait_for_process()
+
+ def _check_status(self):
+ raise NotImplementedError
+
+ def _wait_for_process(self):
+ while True:
+ if not self._tg_process.is_alive():
+ raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
+ LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
+ time.sleep(1)
+ status = self._check_status()
+ if status == 0:
+ LOG.info("%s TG Server is up and running.", self.APP_NAME)
+ return self._tg_process.exitcode
+
+ def _traffic_runner(self, traffic_profile):
+ LOG.info("Starting %s client...", self.APP_NAME)
+ self.resource_helper.run_traffic(traffic_profile)
+
+ def run_traffic(self, traffic_profile):
+ """ Generate traffic on the wire according to the given params.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Mandatory.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ self._traffic_process = Process(target=self._traffic_runner,
+ args=(traffic_profile,))
+ self._traffic_process.start()
+ # Wait for traffic process to start
+ while self.resource_helper.client_started.value == 0:
+ time.sleep(self.RUN_WAIT)
+
+ return self._traffic_process.is_alive()
+
+ def listen_traffic(self, traffic_profile):
+ """ Listen to traffic with the given parameters.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Optional.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ pass
+
+ def verify_traffic(self, traffic_profile):
+ """ Verify captured traffic after it has ended. Optional.
+
+ :param traffic_profile:
+ :return: dict
+ """
+ pass
+
+ def collect_kpi(self):
+ result = self.resource_helper.collect_kpi()
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+ def terminate(self):
+ """ After this method finishes, all traffic processes should stop. Mandatory.
+
+ :return: True/False
+ """
+ self.traffic_finished = True
+ if self._traffic_process is not None:
+ self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
index 000a91db4..e65296287 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
@@ -16,14 +16,13 @@
from __future__ import absolute_import
from __future__ import print_function
import logging
-import multiprocessing
import re
-import time
-import os
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import provision_tool
+from multiprocessing import Queue
+from ipaddress import IPv4Interface
+
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
LOG = logging.getLogger(__name__)
@@ -42,77 +41,59 @@ class PingParser(object):
if match:
# IMPORTANT: in order for the data to be properly taken
# in by InfluxDB, it needs to be converted to numeric types
- self.queue.put({"packets_received": float(match.group(1)),
- "rtt": float(match.group(2))})
+ self.queue.put({
+ "packets_received": float(match.group(1)),
+ "rtt": float(match.group(2)),
+ })
def close(self):
- ''' close the ssh connection '''
- pass
+ """ close the ssh connection """
+ self.closed = True
def clear(self):
- ''' clear queue till Empty '''
+ """ clear queue till Empty """
while self.queue.qsize() > 0:
self.queue.get()
-class PingTrafficGen(GenericTrafficGen):
+class PingSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ def setup_vnf_environment(self):
+ self._bind_kernel_devices()
+
+
+class PingTrafficGen(SampleVNFTrafficGen):
"""
This traffic generator can ping a single IP with pingsize
and target given in traffic profile
"""
- def __init__(self, vnfd):
- super(PingTrafficGen, self).__init__(vnfd)
+ TG_NAME = 'Ping'
+ RUN_WAIT = 4
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = PingSetupEnvHelper
+
+ super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
+ self._queue = Queue()
+ self._parser = PingParser(self._queue)
self._result = {}
- self._parser = None
- self._queue = None
- self._traffic_process = None
-
- mgmt_interface = vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- def _bind_device_kernel(self, connection):
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
- drivers = {intf["virtual-interface"]["vpci"]:
- intf["virtual-interface"]["driver"]
- for intf in self.vnfd["vdu"][0]["external-interface"]}
-
- commands = \
- ['"{0}" --force -b "{1}" "{2}"'.format(dpdk_nic_bind, value, key)
- for key, value in drivers.items()]
- for command in commands:
- connection.execute(command)
-
- for index, out in enumerate(self.vnfd["vdu"][0]["external-interface"]):
- vpci = out["virtual-interface"]["vpci"]
- net = "find /sys/class/net -lname '*{}*' -printf '%f'".format(vpci)
- out = connection.execute(net)[1]
- ifname = out.split('/')[-1].strip('\n')
- self.vnfd["vdu"][0]["external-interface"][index][
- "virtual-interface"]["local_iface_name"] = ifname
def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(PingTrafficGen, self).scale(flavor)
+ """ scale vnf-based on flavor input """
+ pass
- def instantiate(self, scenario_cfg, context_cfg):
- self._result = {"packets_received": 0, "rtt": 0}
- self._bind_device_kernel(self.connection)
+ def _check_status(self):
+ return self._tg_process.is_alive()
- def run_traffic(self, traffic_profile):
- self._queue = multiprocessing.Queue()
- self._parser = PingParser(self._queue)
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._parser))
- self._traffic_process.start()
- # Wait for traffic process to start
- time.sleep(4)
- return self._traffic_process.is_alive()
+ def instantiate(self, scenario_cfg, context_cfg):
+ self._result = {
+ "packets_received": 0,
+ "rtt": 0,
+ }
+ self.setup_helper.setup_vnf_environment()
def listen_traffic(self, traffic_profile):
""" Not needed for ping
@@ -122,38 +103,26 @@ class PingTrafficGen(GenericTrafficGen):
"""
pass
- def _traffic_runner(self, traffic_profile, filewrapper):
-
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
- external_interface = self.vnfd["vdu"][0]["external-interface"]
- virtual_interface = external_interface[0]["virtual-interface"]
- target_ip = virtual_interface["dst_ip"].split('/')[0]
- local_ip = virtual_interface["local_ip"].split('/')[0]
- local_if_name = \
- virtual_interface["local_iface_name"].split('/')[0]
- packet_size = traffic_profile.params["traffic_profile"]["frame_size"]
-
- run_cmd = []
-
- run_cmd.append("ip addr flush %s" % local_if_name)
- run_cmd.append("ip addr add %s/24 dev %s" % (local_ip, local_if_name))
- run_cmd.append("ip link set %s up" % local_if_name)
-
- for cmd in run_cmd:
- self.connection.execute(cmd)
-
- ping_cmd = ("ping -s %s %s" % (packet_size, target_ip))
- self.connection.run(ping_cmd, stdout=filewrapper,
+ def _traffic_runner(self, traffic_profile):
+ intf = self.vnfd_helper.interfaces[0]["virtual-interface"]
+ profile = traffic_profile.params["traffic_profile"]
+ cmd_kwargs = {
+ 'target_ip': IPv4Interface(intf["dst_ip"]).ip.exploded,
+ 'local_ip': IPv4Interface(intf["local_ip"]).ip.exploded,
+ 'local_if_name': intf["local_iface_name"].split('/')[0],
+ 'packet_size': profile["frame_size"],
+ }
+
+ cmd_list = [
+ "sudo ip addr flush {local_if_name}",
+ "sudo ip addr add {local_ip}/24 dev {local_if_name}",
+ "sudo ip link set {local_if_name} up",
+ ]
+
+ for cmd in cmd_list:
+ self.ssh_helper.execute(cmd.format(**cmd_kwargs))
+
+ ping_cmd = "ping -s {packet_size} {target_ip}"
+ self.ssh_helper.run(ping_cmd.format(**cmd_kwargs),
+ stdout=self._parser,
keep_stdin_open=True, pty=True)
-
- def collect_kpi(self):
- if not self._queue.empty():
- kpi = self._queue.get()
- self._result.update(kpi)
- return self._result
-
- def terminate(self):
- if self._traffic_process is not None:
- self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
index 7da4b31e9..79e42e0a8 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
@@ -15,267 +15,98 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
import time
import logging
-import os
-import yaml
+from collections import Mapping
+from itertools import chain
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import get_nsb_option
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper
LOGGING = logging.getLogger(__name__)
-DURATION = 30
-WAIT_TIME = 3
-TREX_SYNC_PORT = 4500
-TREX_ASYNC_PORT = 4501
+class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper):
-class TrexTrafficGenRFC(GenericTrafficGen):
- """
- This class handles mapping traffic profile and generating
- traffic for rfc2544 testcase.
- """
-
- def __init__(self, vnfd):
- super(TrexTrafficGenRFC, self).__init__(vnfd)
- self._result = {}
- self._terminated = multiprocessing.Value('i', 0)
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.tc_file_name = None
- self.client = None
- self.my_ports = None
-
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
-
- def _generate_trex_cfg(self, vnfd):
- """
-
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
-
- cfg_file = []
- vpci = []
- port = {}
-
- ext_intf = vnfd["vdu"][0]["external-interface"]
- for interface in ext_intf:
- virt_intf = interface["virtual-interface"]
- vpci.append(virt_intf["vpci"])
-
- port["src_mac"] = \
- self._split_mac_address_into_list(virt_intf["local_mac"])
-
- time.sleep(WAIT_TIME)
- port["dest_mac"] = \
- self._split_mac_address_into_list(virt_intf["dst_mac"])
- if virt_intf["dst_mac"]:
- trex_cfg["port_info"].append(port.copy())
-
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
-
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
-
- self._vpci_ascending = sorted(vpci)
-
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGenRFC, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
- trex = os.path.join(self.bin_path, "trex")
- err, _, _ = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))
- if err != 0:
- self.connection.put(trex, trex, True)
+ def is_done(self):
+ return self.latency and self.iteration.value > 10
- LOGGING.debug("Starting TRex server...")
- _tg_server = \
- multiprocessing.Process(target=self._start_server)
- _tg_server.start()
- while True:
- LOGGING.info("Waiting for TG Server to start.. ")
- time.sleep(WAIT_TIME)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOGGING.info("TG server is up and running.")
- return _tg_server.exitcode
- if not _tg_server.is_alive():
- raise RuntimeError("Traffic Generator process died.")
+class TrexRfcResourceHelper(TrexResourceHelper):
- def listen_traffic(self, traffic_profile):
- pass
+ LATENCY_TIME_SLEEP = 120
+ RUN_DURATION = 30
+ WAIT_TIME = 3
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
+ def __init__(self, setup_helper, rfc_helper_type=None):
+ super(TrexRfcResourceHelper, self).__init__(setup_helper)
- def run_traffic(self, traffic_profile,
- client_started=multiprocessing.Value('i', 0)):
+ if rfc_helper_type is None:
+ rfc_helper_type = TrexRfc2544ResourceHelper
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- client_started, self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while client_started.value == 0:
- time.sleep(1)
+ self.rfc2544_helper = rfc_helper_type(self.scenario_helper)
+ # self.tg_port_pairs = []
- return self._traffic_process.is_alive()
+ def _build_ports(self):
+ self.tg_port_pairs, self.networks = MultiPortConfig.get_port_pairs(
+ self.vnfd_helper.interfaces)
+ self.priv_ports = [int(x[0][-1]) for x in self.tg_port_pairs]
+ self.pub_ports = [int(x[1][-1]) for x in self.tg_port_pairs]
+ self.my_ports = list(set(chain(self.priv_ports, self.pub_ports)))
- def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
-
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
- _server.execute("pkill -9 rex > /dev/null 2>&1")
-
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
-
- _server.execute(trex_cmd)
-
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOGGING.info("Unable to connect to Trex. Attempt %s", idx)
- time.sleep(WAIT_TIME)
- return client
-
- @classmethod
- def _get_rfc_tolerance(cls, tc_yaml):
- tolerance = '0.8 - 1.0'
- if 'tc_options' in tc_yaml['scenarios'][0]:
- tc_options = tc_yaml['scenarios'][0]['tc_options']
- if 'rfc2544' in tc_options:
- tolerance = \
- tc_options['rfc2544'].get('allowed_drop_rate', '0.8 - 1.0')
-
- tolerance = tolerance.split('-')
- min_tol = float(tolerance[0])
- if len(tolerance) == 2:
- max_tol = float(tolerance[1])
- else:
- max_tol = float(tolerance[0])
-
- return [min_tol, max_tol]
-
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOGGING.info("Starting TRex client...")
- tc_yaml = {}
-
- with open(self.tc_file_name) as tc_file:
- tc_yaml = yaml.load(tc_file.read())
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ self.client.stop(self.my_ports)
+ time.sleep(self.WAIT_TIME)
+ samples = traffic_profile.get_drop_percentage(self)
+ self._queue.put(samples)
- tolerance = self._get_rfc_tolerance(tc_yaml)
+ if not self.rfc2544_helper.is_done():
+ return
- # fixme: fix passing correct trex config file,
- # instead of searching the default path
- self.my_ports = [0, 1]
- self.client = self._connect_client()
+ self.client.stop(self.my_ports)
self.client.reset(ports=self.my_ports)
- self.client.remove_all_streams(self.my_ports) # remove all streams
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- time.sleep(DURATION)
+ self.client.remove_all_streams(self.my_ports)
+ traffic_profile.execute_latency(samples=samples)
+ multiplier = traffic_profile.calculate_pps(samples)[1]
+ for _ in range(5):
+ time.sleep(self.LATENCY_TIME_SLEEP)
self.client.stop(self.my_ports)
- time.sleep(WAIT_TIME)
+ time.sleep(self.WAIT_TIME)
last_res = self.client.get_stats(self.my_ports)
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- if not isinstance(last_res, dict):
- terminated.value = 1
- last_res = {}
+ if not isinstance(last_res, Mapping):
+ self._terminated.value = 1
+ continue
+ self.generate_samples('latency', {})
+ self._queue.put(samples)
+ self.client.start(mult=str(multiplier),
+ ports=self.my_ports,
+ duration=120, force=True)
- samples[name] = \
- {"rx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("rx_pps", 0.0)),
- "tx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("tx_pps", 0.0)),
- "rx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("rx_bps", 0.0)),
- "tx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("tx_bps", 0.0)),
- "in_packets":
- last_res.get(vpci_idx, {}).get("ipackets", 0),
- "out_packets":
- last_res.get(vpci_idx, {}).get("opackets", 0)}
+ def start_client(self, mult, duration, force=True):
+ self.client.start(ports=self.my_ports, mult=mult, duration=duration, force=force)
- samples = \
- traffic_profile.get_drop_percentage(self, samples,
- tolerance[0], tolerance[1])
- queue.put(samples)
- self.client.stop(self.my_ports)
- self.client.disconnect()
- queue.put(samples)
+ def clear_client_stats(self):
+ self.client.clear_stats(ports=self.my_ports)
def collect_kpi(self):
- if not self._queue.empty():
- result = self._queue.get()
- self._result.update(result)
- LOGGING.debug("trex collect Kpis %s", self._result)
- return self._result
+ self.rfc2544_helper.iteration.value += 1
+ super(TrexRfcResourceHelper, self).collect_kpi()
+
- def terminate(self):
- self._terminated.value = 1 # stop Trex clinet
+class TrexTrafficGenRFC(TrexTrafficGen):
+ """
+ This class handles mapping traffic profile and generating
+ traffic for rfc2544 testcase.
+ """
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexRfcResourceHelper
- if self._traffic_process:
- self._traffic_process.terminate()
+ super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
index 058b715fe..616b331ba 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
@@ -15,261 +15,136 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
-import time
+
import logging
import os
+
import yaml
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.common.utils import mac_address_to_hex_list
from yardstick.network_services.utils import get_nsb_option
-from yardstick.network_services.utils import provision_tool
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
LOG = logging.getLogger(__name__)
-DURATION = 30
-WAIT_QUEUE = 1
-TREX_SYNC_PORT = 4500
-TREX_ASYNC_PORT = 4501
-class TrexTrafficGen(GenericTrafficGen):
+class TrexResourceHelper(ClientResourceHelper):
+
+ CONF_FILE = '/tmp/trex_cfg.yaml'
+ QUEUE_WAIT_TIME = 1
+ RESOURCE_WORD = 'trex'
+ RUN_DURATION = 0
+
+ SYNC_PORT = 4500
+ ASYNC_PORT = 4501
+
+ def generate_cfg(self):
+ ext_intf = self.vnfd_helper.interfaces
+ vpci_list = []
+ port_list = []
+ trex_cfg = {
+ 'port_limit': 0,
+ 'version': '2',
+ 'interfaces': vpci_list,
+ 'port_info': port_list,
+ "port_limit": len(ext_intf),
+ "version": '2',
+ }
+ cfg_file = [trex_cfg]
+
+ for interface in ext_intf:
+ virtual_interface = interface['virtual-interface']
+ vpci_list.append(virtual_interface["vpci"])
+ dst_mac = virtual_interface["dst_mac"]
+
+ if not dst_mac:
+ continue
+
+ local_mac = virtual_interface["local_mac"]
+ port_list.append({
+ "src_mac": mac_address_to_hex_list(local_mac),
+ "dest_mac": mac_address_to_hex_list(dst_mac),
+ })
+
+ cfg_str = yaml.safe_dump(cfg_file, default_flow_style=False, explicit_start=True)
+ self.ssh_helper.upload_config_file(os.path.basename(self.CONF_FILE), cfg_str)
+ self._vpci_ascending = sorted(vpci_list)
+
+ def check_status(self):
+ status, _, _ = self.ssh_helper.execute("sudo lsof -i:%s" % self.SYNC_PORT)
+ return status
+
+ # temp disable
+ DISABLE_DEPLOY = True
+
+ def setup(self):
+ if self.DISABLE_DEPLOY:
+ return
+
+ trex_path = self.ssh_helper.join_bin_path('trex')
+
+ err = self.ssh_helper.execute("which {}".format(trex_path))[0]
+ if err == 0:
+ return
+
+ LOG.info("Copying %s to destination...", self.RESOURCE_WORD)
+ self.ssh_helper.run("sudo mkdir -p '{}'".format(os.path.dirname(trex_path)))
+ self.ssh_helper.put("~/.bash_profile", "~/.bash_profile")
+ self.ssh_helper.put(trex_path, trex_path, True)
+ ko_src = os.path.join(trex_path, "scripts/ko/src/")
+ self.ssh_helper.execute(self.MAKE_INSTALL.format(ko_src))
+
+ def start(self, ports=None, *args, **kwargs):
+ cmd = "sudo fuser -n tcp {0.SYNC_PORT} {0.ASYNC_PORT} -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd.format(self))
+
+ self.ssh_helper.execute("sudo pkill -9 rex > /dev/null 2>&1")
+
+ trex_path = self.ssh_helper.join_bin_path("trex", "scripts")
+ path = get_nsb_option("trex_path", trex_path)
+
+ # cmd = "sudo ./t-rex-64 -i --cfg %s > /dev/null 2>&1" % self.CONF_FILE
+ cmd = "./t-rex-64 -i --cfg '{}'".format(self.CONF_FILE)
+
+ # if there are errors we want to see them
+ # we have to sudo cd because the path might be owned by root
+ trex_cmd = """sudo bash -c "cd '{}' ; {}" >/dev/null""".format(path, cmd)
+ self.ssh_helper.execute(trex_cmd)
+
+ def terminate(self):
+ super(TrexResourceHelper, self).terminate()
+ cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT))
+
+
+class TrexTrafficGen(SampleVNFTrafficGen):
"""
This class handles mapping traffic profile and generating
traffic for given testcase
"""
- def __init__(self, vnfd):
- super(TrexTrafficGen, self).__init__(vnfd)
- self._result = {}
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.client = None
- self.my_ports = None
- self.client_started = multiprocessing.Value('i', 0)
-
- mgmt_interface = vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
-
- def _generate_trex_cfg(self, vnfd):
- """
-
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
-
- cfg_file = []
- vpci = []
- port = {}
-
- for interface in range(len(vnfd["vdu"][0]["external-interface"])):
- ext_intrf = vnfd["vdu"][0]["external-interface"]
- virtual_interface = ext_intrf[interface]["virtual-interface"]
- vpci.append(virtual_interface["vpci"])
-
- port["src_mac"] = self._split_mac_address_into_list(
- virtual_interface["local_mac"])
- port["dest_mac"] = self._split_mac_address_into_list(
- virtual_interface["dst_mac"])
-
- trex_cfg["port_info"].append(port.copy())
-
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
-
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
-
- self._vpci_ascending = sorted(vpci)
-
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
-
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
-
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
-
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
-
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
+ APP_NAME = 'TRex'
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexResourceHelper
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGen, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.setup_vnf_environment(self.connection)
-
- trex = os.path.join(self.bin_path, "trex")
- err = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))[0]
- if err != 0:
- LOG.info("Copying trex to destination...")
- self.connection.put("/root/.bash_profile", "/root/.bash_profile")
- self.connection.put(trex, trex, True)
- ko_src = os.path.join(trex, "scripts/ko/src/")
- self.connection.execute("cd %s && make && make install" % ko_src)
-
- LOG.info("Starting TRex server...")
- _tg_process = \
- multiprocessing.Process(target=self._start_server)
- _tg_process.start()
- while True:
- if not _tg_process.is_alive():
- raise RuntimeError("Traffic Generator process died.")
- LOG.info("Waiting for TG Server to start.. ")
- time.sleep(1)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOG.info("TG server is up and running.")
- return _tg_process.exitcode
-
- def listen_traffic(self, traffic_profile):
- pass
+ super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
-
- def run_traffic(self, traffic_profile):
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- self.client_started,
- self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while self.client_started.value == 0:
- time.sleep(1)
-
- return self._traffic_process.is_alive()
+ def _check_status(self):
+ return self.resource_helper.check_status()
def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
+ super(TrexTrafficGen, self)._start_server()
+ self.resource_helper.start()
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
+ def scale(self, flavor=""):
+ pass
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
-
- _server.execute(trex_cmd)
-
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- # try to connect with 5s intervals, 30s max
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
- time.sleep(5)
- return client
-
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOG.info("Starting TRex client...")
-
- self.my_ports = [0, 1]
- self.client = self._connect_client()
- self.client.reset(ports=self.my_ports)
-
- self.client.remove_all_streams(self.my_ports) # remove all streams
-
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- last_res = self.client.get_stats(self.my_ports)
- if not isinstance(last_res, dict): # added for mock unit test
- terminated.value = 1
- last_res = {}
-
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- xe_value = last_res.get(vpci_idx, {})
- samples[name] = \
- {"rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
- "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
- "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
- "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
- "in_packets": xe_value.get("ipackets", 0),
- "out_packets": xe_value.get("opackets", 0)}
- time.sleep(WAIT_QUEUE)
- queue.put(samples)
-
- self.client.disconnect()
- terminated.value = 0
-
- def collect_kpi(self):
- if not self._queue.empty():
- self._result.update(self._queue.get())
- LOG.debug("trex collect Kpis %s", self._result)
- return self._result
+ def listen_traffic(self, traffic_profile):
+ pass
def terminate(self):
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
- self.traffic_finished = True
- if self._traffic_process:
- self._traffic_process.terminate()
+ self.resource_helper.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
index e9e80bdfb..310ab67cb 100644
--- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
@@ -15,313 +15,268 @@
from __future__ import absolute_import
from __future__ import print_function
-import tempfile
-import time
import os
import logging
import re
-from multiprocessing import Queue
-import multiprocessing
-import ipaddress
-import six
+import posixpath
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
-from yardstick.network_services.utils import provision_tool
-from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
-from yardstick.network_services.nfvi.resource import ResourceProfile
+from six.moves import configparser, zip
-LOG = logging.getLogger(__name__)
-VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}'
-CORES = ['0', '1', '2']
-WAIT_TIME = 20
+from yardstick.network_services.pipeline import PipelineRules
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+LOG = logging.getLogger(__name__)
-class VpeApproxVnf(GenericVNF):
+VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}"""
+
+VPE_COLLECT_KPI = """\
+Pkts in:\s(\d+)\r\n\
+\tPkts dropped by Pkts in:\s(\d+)\r\n\
+\tPkts dropped by AH:\s(\d+)\r\n\\
+\tPkts dropped by other:\s(\d+)\
+"""
+
+
+class ConfigCreate(object):
+
+ @staticmethod
+ def vpe_tmq(config, index):
+ tm_q = 'TM{0}'.format(index)
+ config.add_section(tm_q)
+ config.set(tm_q, 'burst_read', '24')
+ config.set(tm_q, 'burst_write', '32')
+ config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg')
+ return config
+
+ def __init__(self, priv_ports, pub_ports, socket):
+ super(ConfigCreate, self).__init__()
+ self.sw_q = -1
+ self.sink_q = -1
+ self.n_pipeline = 1
+ self.priv_ports = priv_ports
+ self.pub_ports = pub_ports
+ self.pipeline_per_port = 9
+ self.socket = socket
+
+ def vpe_initialize(self, config):
+ config.add_section('EAL')
+ config.set('EAL', 'log_level', '0')
+
+ config.add_section('PIPELINE0')
+ config.set('PIPELINE0', 'type', 'MASTER')
+ config.set('PIPELINE0', 'core', 's%sC0' % self.socket)
+
+ config.add_section('MEMPOOL0')
+ config.set('MEMPOOL0', 'pool_size', '256K')
+
+ config.add_section('MEMPOOL1')
+ config.set('MEMPOOL1', 'pool_size', '2M')
+ return config
+
+ def vpe_rxq(self, config):
+ for port in self.pub_ports:
+ new_section = 'RXQ{0}.0'.format(port)
+ config.add_section(new_section)
+ config.set(new_section, 'mempool', 'MEMPOOL1')
+
+ return config
+
+ def get_sink_swq(self, parser, pipeline, k, index):
+ sink = ""
+ pktq = parser.get(pipeline, k)
+ if "SINK" in pktq:
+ self.sink_q += 1
+ sink = " SINK{0}".format(self.sink_q)
+ if "TM" in pktq:
+ sink = " TM{0}".format(index)
+ pktq = "SWQ{0}{1}".format(self.sw_q, sink)
+ return pktq
+
+ def vpe_upstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_upstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ if k == "pktq_in":
+ index = intf['index']
+ if "RXQ" in v:
+ value = "RXQ{0}.0".format(index)
+ else:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ elif k == "pktq_out":
+ index = intf['peer_intf']['index']
+ if "TXQ" in v:
+ value = "TXQ{0}.0".format(index)
+ else:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def vpe_downstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_downstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ index = intf['dpdk_port_num']
+ peer_index = intf['peer_intf']['dpdk_port_num']
+
+ if k == "pktq_in":
+ if "RXQ" not in v:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "RXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "RXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ if k == "pktq_out":
+ if "TXQ" not in v:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "TXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "TXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def create_vpe_config(self, vnf_cfg):
+ config = configparser.ConfigParser()
+ vpe_cfg = os.path.join("/tmp/vpe_config")
+ with open(vpe_cfg, 'w') as cfg_file:
+ config = self.vpe_initialize(config)
+ config = self.vpe_rxq(config)
+ config.write(cfg_file)
+ for index, priv_port in enumerate(self.priv_ports):
+ config = self.vpe_upstream(vnf_cfg, priv_port)
+ config.write(cfg_file)
+ config = self.vpe_downstream(vnf_cfg, priv_port)
+ config = self.vpe_tmq(config, index)
+ config.write(cfg_file)
+
+ def generate_vpe_script(self, interfaces):
+ rules = PipelineRules(pipeline_id=1)
+ for priv_port, pub_port in zip(self.priv_ports, self.pub_ports):
+ priv_intf = interfaces[priv_port]["virtual-interface"]
+ pub_intf = interfaces[pub_port]["virtual-interface"]
+
+ dst_port0_ip = priv_intf["dst_ip"]
+ dst_port1_ip = pub_intf["dst_ip"]
+ dst_port0_mac = priv_intf["dst_mac"]
+ dst_port1_mac = pub_intf["dst_mac"]
+
+ rules.add_firewall_script(dst_port0_ip)
+ rules.next_pipeline()
+ rules.add_flow_classification_script()
+ rules.next_pipeline()
+ rules.add_flow_action()
+ rules.next_pipeline()
+ rules.add_flow_action2()
+ rules.next_pipeline()
+ rules.add_route_script(dst_port1_ip, dst_port1_mac)
+ rules.next_pipeline()
+ rules.add_route_script2(dst_port0_ip, dst_port0_mac)
+ rules.next_pipeline(num=4)
+
+ return rules.get_string()
+
+
+class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ CFG_CONFIG = "/tmp/vpe_config"
+ CFG_SCRIPT = "/tmp/vpe_script"
+ CORES = ['0', '1', '2', '3', '4', '5']
+ PIPELINE_COMMAND = VPE_PIPELINE_COMMAND
+
+ def build_config(self):
+ vpe_vars = {
+ "bin_path": self.ssh_helper.bin_path,
+ "socket": self.socket,
+ }
+
+ all_ports = []
+ priv_ports = []
+ pub_ports = []
+ for interface in self.vnfd_helper.interfaces:
+ all_ports.append(interface['name'])
+ vld_id = interface['virtual-interface']['vld_id']
+ if vld_id.startswith('private'):
+ priv_ports.append(interface)
+ elif vld_id.startswith('public'):
+ pub_ports.append(interface)
+
+ vpe_conf = ConfigCreate(priv_ports, pub_ports, self.socket)
+ vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg)
+
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ with open(self.CFG_CONFIG) as handle:
+ vpe_config = handle.read()
+
+ self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars))
+
+ vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces)
+ self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars))
+
+
+class VpeApproxVnf(SampleVNF):
""" This class handles vPE VNF model-driver definitions """
- def __init__(self, vnfd):
- super(VpeApproxVnf, self).__init__(vnfd)
- self.socket = None
- self.q_in = Queue()
- self.q_out = Queue()
- self.vnf_cfg = None
- self._vnf_process = None
- self.connection = None
- self.resource = None
-
- def _resource_collect_start(self):
- self.resource.initiate_systemagent(self.bin_path)
- self.resource.start()
+ APP_NAME = 'vPE_vnf'
+ APP_WORD = 'vpe'
+ COLLECT_KPI = VPE_COLLECT_KPI
+ WAIT_TIME = 20
- def _resource_collect_stop(self):
- self.resource.stop()
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = VpeApproxSetupEnvHelper
- def _collect_resource_kpi(self):
- result = {}
+ super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
- status = self.resource.check_if_sa_running("collectd")[0]
- if status:
- result = self.resource.amqp_collect_nfvi_kpi()
-
- result = {"core": result}
-
- return result
-
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
-
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
-
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
-
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
-
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
-
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
-
- def _get_cpu_sibling_list(self):
- cpu_topo = []
- for core in CORES:
- sys_cmd = \
- "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \
- % core
- cpuid = \
- self.connection.execute("awk -F: '{ print $1 }' < %s" %
- sys_cmd)[1]
- cpu_topo += \
- [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')]
-
- return [cpu.strip() for cpu in cpu_topo]
-
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(VpeApproxVnf, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg']
-
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
-
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
-
- self.setup_vnf_environment(self.connection)
-
- cores = self._get_cpu_sibling_list()
- self.resource = ResourceProfile(self.vnfd, cores)
-
- self.connection.execute("pkill vPE_vnf")
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- self.socket = \
- next((0 for v in interfaces
- if v['virtual-interface']["vpci"][5] == "0"), 1)
-
- bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
- for vpci in bound_pci:
- self.connection.execute(
- "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci))
- queue_wrapper = \
- QueueFileWrapper(self.q_in, self.q_out, "pipeline>")
- self._vnf_process = multiprocessing.Process(target=self._run_vpe,
- args=(queue_wrapper,
- vnf_cfg,))
- self._vnf_process.start()
- buf = []
- time.sleep(WAIT_TIME) # Give some time for config to load
- while True:
- message = ''
- while self.q_out.qsize() > 0:
- buf.append(self.q_out.get())
- message = ''.join(buf)
- if "pipeline>" in message:
- LOG.info("VPE VNF is up and running.")
- queue_wrapper.clear()
- self._resource_collect_start()
- return self._vnf_process.exitcode
- if "PANIC" in message:
- raise RuntimeError("Error starting vPE VNF.")
-
- LOG.info("Waiting for VNF to start.. ")
- time.sleep(3)
- if not self._vnf_process.is_alive():
- raise RuntimeError("vPE VNF process died.")
-
- def _get_ports_gateway(self, name):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
-
- for route in routing_table:
- if name == route['if']:
- return route['gateway']
-
- def terminate(self):
- self.execute_command("quit")
- if self._vnf_process:
- self._vnf_process.terminate()
-
- def _run_vpe(self, filewrapper, vnf_cfg):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- port0_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"],
- interfaces[0]["virtual-interface"]["netmask"])))
- port1_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"],
- interfaces[1]["virtual-interface"]["netmask"])))
- dst_port0_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"],
- interfaces[0]["virtual-interface"]["netmask"]))
- dst_port1_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"],
- interfaces[1]["virtual-interface"]["netmask"]))
-
- vpe_vars = {"port0_local_ip": port0_ip.ip.exploded,
- "port0_dst_ip": dst_port0_ip.ip.exploded,
- "port0_local_ip_hex":
- self._ip_to_hex(port0_ip.ip.exploded),
- "port0_prefixlen": port0_ip.network.prefixlen,
- "port0_netmask": port0_ip.network.netmask.exploded,
- "port0_netmask_hex":
- self._ip_to_hex(port0_ip.network.netmask.exploded),
- "port0_local_mac":
- interfaces[0]["virtual-interface"]["local_mac"],
- "port0_dst_mac":
- interfaces[0]["virtual-interface"]["dst_mac"],
- "port0_gateway":
- self._get_ports_gateway(interfaces[0]["name"]),
- "port0_local_network":
- port0_ip.network.network_address.exploded,
- "port0_prefix": port0_ip.network.prefixlen,
- "port1_local_ip": port1_ip.ip.exploded,
- "port1_dst_ip": dst_port1_ip.ip.exploded,
- "port1_local_ip_hex":
- self._ip_to_hex(port1_ip.ip.exploded),
- "port1_prefixlen": port1_ip.network.prefixlen,
- "port1_netmask": port1_ip.network.netmask.exploded,
- "port1_netmask_hex":
- self._ip_to_hex(port1_ip.network.netmask.exploded),
- "port1_local_mac":
- interfaces[1]["virtual-interface"]["local_mac"],
- "port1_dst_mac":
- interfaces[1]["virtual-interface"]["dst_mac"],
- "port1_gateway":
- self._get_ports_gateway(interfaces[1]["name"]),
- "port1_local_network":
- port1_ip.network.network_address.exploded,
- "port1_prefix": port1_ip.network.prefixlen,
- "port0_local_ip6": self._get_port0localip6(),
- "port1_local_ip6": self._get_port1localip6(),
- "port0_prefixlen6": self._get_port0prefixlen6(),
- "port1_prefixlen6": self._get_port1prefixlen6(),
- "port0_gateway6": self._get_port0gateway6(),
- "port1_gateway6": self._get_port1gateway6(),
- "port0_dst_ip_hex6": self._get_port0localip6(),
- "port1_dst_ip_hex6": self._get_port1localip6(),
- "port0_dst_netmask_hex6": self._get_port0prefixlen6(),
- "port1_dst_netmask_hex6": self._get_port1prefixlen6(),
- "bin_path": self.bin_path,
- "socket": self.socket}
-
- for cfg in os.listdir(vnf_cfg):
- vpe_config = ""
- with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg:
- vpe_config = vpe_cfg.read()
-
- self._provide_config_file(cfg, vpe_config, vpe_vars)
-
- LOG.info("Provision and start the vPE")
- tool_path = provision_tool(self.connection,
- os.path.join(self.bin_path, "vPE_vnf"))
- cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config",
- script="/tmp/vpe_script",
- tool_path=tool_path)
- self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper,
- keep_stdin_open=True, pty=True)
-
- def _provide_config_file(self, prefix, template, args):
- cfg, cfg_content = tempfile.mkstemp()
- cfg = os.fdopen(cfg, "w+")
- cfg.write(template.format(**args))
- cfg.close()
- cfg_file = "/tmp/%s" % prefix
- self.connection.put(cfg_content, cfg_file)
- return cfg_file
-
- def execute_command(self, cmd):
- ''' send cmd to vnf process '''
- LOG.info("VPE command: %s", cmd)
- output = []
- if self.q_in:
- self.q_in.put(cmd + "\r\n")
- time.sleep(3)
- while self.q_out.qsize() > 0:
- output.append(self.q_out.get())
- return "".join(output)
+ def get_stats(self, *args, **kwargs):
+ raise NotImplementedError
def collect_kpi(self):
- result = self.get_stats_vpe()
- collect_stats = self._collect_resource_kpi()
- result["collect_stats"] = collect_stats
- LOG.debug("vPE collet Kpis: %s", result)
- return result
-
- def get_stats_vpe(self):
- ''' get vpe statistics '''
- result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0,
- 'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0}
- up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0',
- 'p 5 stats port out 1']
- down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0']
- pattern = \
- "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \
- "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)"
-
- for cmd in up_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_up_stream"] = \
- result.get("pkt_in_up_stream", 0) + int(match.group(1))
- result["pkt_drop_up_stream"] = \
- result.get("pkt_drop_up_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
-
- for cmd in down_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_down_stream"] = \
- result.get("pkt_in_down_stream", 0) + int(match.group(1))
- result["pkt_drop_down_stream"] = \
- result.get("pkt_drop_down_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
+ result = {
+ 'pkt_in_up_stream': 0,
+ 'pkt_drop_up_stream': 0,
+ 'pkt_in_down_stream': 0,
+ 'pkt_drop_down_stream': 0,
+ 'collect_stats': self.resource_helper.collect_kpi(),
+ }
+
+ indexes_in = [1]
+ indexes_drop = [2, 3]
+ command = 'p {0} stats port {1} 0'
+ for index, direction in ((5, 'up'), (9, 'down')):
+ key_in = "pkt_in_{0}_stream".format(direction)
+ key_drop = "pkt_drop_{0}_stream".format(direction)
+ for mode in ('in', 'out'):
+ stats = self.vnf_execute(command.format(index, mode))
+ match = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if not match:
+ continue
+ result[key_in] += sum(int(match.group(x)) for x in indexes_in)
+ result[key_drop] += sum(int(match.group(x)) for x in indexes_drop)
+
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
return result
diff --git a/yardstick/network_services/vnf_generic/vnfdgen.py b/yardstick/network_services/vnf_generic/vnfdgen.py
index b56a91915..0120b493e 100644
--- a/yardstick/network_services/vnf_generic/vnfdgen.py
+++ b/yardstick/network_services/vnf_generic/vnfdgen.py
@@ -14,11 +14,16 @@
""" Generic file to map and build vnf discriptor """
from __future__ import absolute_import
-import collections
+from functools import reduce
import jinja2
+import logging
import yaml
+from yardstick.common.utils import try_int
+
+LOG = logging.getLogger(__name__)
+
def render(vnf_model, **kwargs):
"""Render jinja2 VNF template
@@ -40,7 +45,8 @@ def generate_vnfd(vnf_model, node):
as input for GenericVNF.__init__
"""
# get is unused as global method inside template
- node["get"] = get
+ # node["get"] = key_flatten_get
+ node["get"] = deepgetitem
# Set Node details to default if not defined in pod file
# we CANNOT use TaskTemplate.render because it does not allow
# for missing variables, we need to allow password for key_filename
@@ -52,36 +58,34 @@ def generate_vnfd(vnf_model, node):
return filled_vnfd
-def dict_key_flatten(data):
- """ Convert nested dict structure to dotted key
- (e.g. {"a":{"b":1}} -> {"a.b":1}
-
- :param data: nested dictionary
- :return: flat dicrionary
- """
- next_data = {}
-
- # check for non-string iterables
- if not any((isinstance(v, collections.Iterable) and not isinstance(v, str))
- for v in data.values()):
- return data
+# dict_flatten was causing recursion errors with Jinja2 so we removed and replaced
+# which this function from stackoverflow that doesn't require generating entire dictionaries
+# each time we query a key
+def deepgetitem(obj, item, default=None):
+ """Steps through an item chain to get the ultimate value.
- for key, val in data.items():
- if isinstance(val, collections.Mapping):
- for n_k, n_v in val.items():
- next_data["%s.%s" % (key, n_k)] = n_v
- elif isinstance(val, collections.Iterable) and not isinstance(val,
- str):
- for index, item in enumerate(val):
- next_data["%s%d" % (key, index)] = item
- else:
- next_data[key] = val
+ If ultimate value or path to value does not exist, does not raise
+ an exception and instead returns `fallback`.
- return dict_key_flatten(next_data)
+ Based on
+ https://stackoverflow.com/a/38623359
+ https://stackoverflow.com/users/1820042/donny-winston
+ add try_int to work with sequences
-def get(obj, key, *args):
- """ Get template key from dictionary, get default value or raise an exception
+ >>> d = {'snl_final': {'about': {'_icsd': {'icsd_id': 1, 'fr': [2, 3]}}}}
+ >>> deepgetitem(d, 'snl_final.about._icsd.icsd_id')
+ 1
+ >>> deepgetitem(d, 'snl_final.about._sandbox.sbx_id')
+ >>>
+ >>> deepgetitem(d, 'snl_final.about._icsd.fr.1')
+ 3
"""
- data = dict_key_flatten(obj)
- return data.get(key, *args)
+ def getitem(obj, name):
+ # if integer then list index
+ name = try_int(name)
+ try:
+ return obj[name]
+ except (KeyError, TypeError, IndexError):
+ return default
+ return reduce(getitem, item.split('.'), obj)
diff --git a/yardstick/network_services/yang_model.py b/yardstick/network_services/yang_model.py
new file mode 100644
index 000000000..fbf224bd8
--- /dev/null
+++ b/yardstick/network_services/yang_model.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import print_function
+import logging
+import ipaddress
+import yaml
+import six
+
+LOG = logging.getLogger(__name__)
+
+
+class YangModel(object):
+
+ RULE_TEMPLATE = "p acl add 1 {0} {1} {2} {3} {4} {5} {6} {7} 0 0 {8}"
+
+ def __init__(self, config_file):
+ super(YangModel, self).__init__()
+ self._config_file = config_file
+ self._options = {}
+ self._rules = ''
+
+ @property
+ def config_file(self):
+ return self._config_file
+
+ @config_file.setter
+ def config_file(self, value):
+ self._config_file = value
+ self._options = {}
+ self._rules = ''
+
+ def _read_config(self):
+ # TODO: add some error handling in case of empty or non-existing file
+ try:
+ with open(self._config_file) as f:
+ self._options = yaml.safe_load(f)
+ except Exception as e:
+ LOG.exception("Failed to load the yaml %s", e)
+ raise
+
+ def _get_entries(self):
+ if not self._options:
+ return ''
+
+ rule_list = []
+ for ace in self._options['access-list1']['acl']['access-list-entries']:
+ # TODO: resolve ports using topology file and nodes'
+ # ids: public or private.
+ matches = ace['ace']['matches']
+ dst_ipv4_net = matches['destination-ipv4-network']
+ dst_ipv4_net_ip = ipaddress.ip_interface(six.text_type(dst_ipv4_net))
+ port0_local_network = dst_ipv4_net_ip.network.network_address.exploded
+ port0_prefix = dst_ipv4_net_ip.network.prefixlen
+
+ src_ipv4_net = matches['source-ipv4-network']
+ src_ipv4_net_ip = ipaddress.ip_interface(six.text_type(src_ipv4_net))
+ port1_local_network = src_ipv4_net_ip.network.network_address.exploded
+ port1_prefix = src_ipv4_net_ip.network.prefixlen
+
+ lower_dport = matches['destination-port-range']['lower-port']
+ upper_dport = matches['destination-port-range']['upper-port']
+
+ lower_sport = matches['source-port-range']['lower-port']
+ upper_sport = matches['source-port-range']['upper-port']
+
+ # TODO: proto should be read from file also.
+ # Now all rules in sample ACL file are TCP.
+ rule_list.append('') # get an extra new line
+ rule_list.append(self.RULE_TEMPLATE.format(port0_local_network,
+ port0_prefix,
+ port1_local_network,
+ port1_prefix,
+ lower_dport,
+ upper_dport,
+ lower_sport,
+ upper_sport,
+ 0))
+ rule_list.append(self.RULE_TEMPLATE.format(port1_local_network,
+ port1_prefix,
+ port0_local_network,
+ port0_prefix,
+ lower_sport,
+ upper_sport,
+ lower_dport,
+ upper_dport,
+ 1))
+
+ self._rules = '\n'.join(rule_list)
+
+ def get_rules(self):
+ if not self._rules:
+ self._read_config()
+ self._get_entries()
+ return self._rules
diff --git a/yardstick/orchestrator/heat.py b/yardstick/orchestrator/heat.py
index bf6593d45..c21a47473 100644
--- a/yardstick/orchestrator/heat.py
+++ b/yardstick/orchestrator/heat.py
@@ -316,7 +316,9 @@ name (i.e. %s).\
'enable_dhcp': enable_dhcp,
}
}
- if gateway_ip is not None:
+ if gateway_ip == 'null':
+ self.resources[name]['properties']['gateway_ip'] = None
+ elif gateway_ip is not None:
self.resources[name]['properties']['gateway_ip'] = gateway_ip
self._template['outputs'][name] = {
diff --git a/yardstick/ssh.py b/yardstick/ssh.py
index cf9adf0dc..8ac3eaa3a 100644
--- a/yardstick/ssh.py
+++ b/yardstick/ssh.py
@@ -77,8 +77,8 @@ from oslo_utils import encodeutils
from scp import SCPClient
import six
-
-SSH_PORT = paramiko.config.SSH_PORT
+from yardstick.common.utils import try_int
+from yardstick.network_services.utils import provision_tool
class SSHError(Exception):
@@ -92,7 +92,26 @@ class SSHTimeout(SSHError):
class SSH(object):
"""Represent ssh connection."""
- def __init__(self, user, host, port=SSH_PORT, pkey=None,
+ SSH_PORT = paramiko.config.SSH_PORT
+
+ @staticmethod
+ def gen_keys(key_filename, bit_count=2048):
+ rsa_key = paramiko.RSAKey.generate(bits=bit_count, progress_func=None)
+ rsa_key.write_private_key_file(key_filename)
+ print("Writing %s ..." % key_filename)
+ with open('.'.join([key_filename, "pub"]), "w") as pubkey_file:
+ pubkey_file.write(rsa_key.get_name())
+ pubkey_file.write(' ')
+ pubkey_file.write(rsa_key.get_base64())
+ pubkey_file.write('\n')
+
+ @staticmethod
+ def get_class():
+ # must return static class name, anything else refers to the calling class
+ # i.e. the subclass, not the superclass
+ return SSH
+
+ def __init__(self, user, host, port=None, pkey=None,
key_filename=None, password=None, name=None):
"""Initialize SSH client.
@@ -115,7 +134,7 @@ class SSH(object):
self.log.debug("user:%s host:%s", user, host)
# we may get text port from YAML, convert to int
- self.port = int(port)
+ self.port = try_int(port, self.SSH_PORT)
self.pkey = self._get_pkey(pkey) if pkey else None
self.password = password
self.key_filename = key_filename
@@ -129,21 +148,25 @@ class SSH(object):
logging.getLogger("paramiko").setLevel(logging.WARN)
@classmethod
- def from_node(cls, node, overrides=None, defaults=None):
+ def args_from_node(cls, node, overrides=None, defaults=None):
if overrides is None:
overrides = {}
if defaults is None:
defaults = {}
params = ChainMap(overrides, node, defaults)
- return cls(
- user=params['user'],
- host=params['ip'],
- # paramiko doesn't like None default, requires SSH_PORT default
- port=params.get('ssh_port', SSH_PORT),
- pkey=params.get('pkey'),
- key_filename=params.get('key_filename'),
- password=params.get('password'),
- name=params.get('name'))
+ return {
+ 'user': params['user'],
+ 'host': params['ip'],
+ 'port': params.get('ssh_port', cls.SSH_PORT),
+ 'pkey': params.get('pkey'),
+ 'key_filename': params.get('key_filename'),
+ 'password': params.get('password'),
+ 'name': params.get('name'),
+ }
+
+ @classmethod
+ def from_node(cls, node, overrides=None, defaults=None):
+ return cls(**cls.args_from_node(node, overrides, defaults))
def _get_pkey(self, key):
if isinstance(key, six.string_types):
@@ -156,8 +179,12 @@ class SSH(object):
errors.append(e)
raise SSHError("Invalid pkey: %s" % (errors))
+ @property
+ def is_connected(self):
+ return bool(self._client)
+
def _get_client(self):
- if self._client:
+ if self.is_connected:
return self._client
try:
self._client = paramiko.SSHClient()
@@ -176,9 +203,24 @@ class SSH(object):
raise SSHError(message % {"exception": e,
"exception_type": type(e)})
+ def _make_dict(self):
+ return {
+ 'user': self.user,
+ 'host': self.host,
+ 'port': self.port,
+ 'pkey': self.pkey,
+ 'key_filename': self.key_filename,
+ 'password': self.password,
+ 'name': self.name,
+ }
+
+ def copy(self):
+ return self.get_class()(**self._make_dict())
+
def close(self):
- self._client.close()
- self._client = False
+ if self._client:
+ self._client.close()
+ self._client = False
def run(self, cmd, stdin=None, stdout=None, stderr=None,
raise_on_error=True, timeout=3600,
@@ -308,7 +350,7 @@ class SSH(object):
timeout=timeout, raise_on_error=False)
stdout.seek(0)
stderr.seek(0)
- return (exit_status, stdout.read(), stderr.read())
+ return exit_status, stdout.read(), stderr.read()
def wait(self, timeout=120, interval=1):
"""Wait for the host will be available via ssh."""
@@ -369,3 +411,72 @@ class SSH(object):
self._put_file_sftp(localpath, remotepath, mode=mode)
except (paramiko.SSHException, socket.error):
self._put_file_shell(localpath, remotepath, mode=mode)
+
+ def provision_tool(self, tool_path, tool_file=None):
+ return provision_tool(self, tool_path, tool_file)
+
+ def put_file_obj(self, file_obj, remotepath, mode=None):
+ client = self._get_client()
+
+ with client.open_sftp() as sftp:
+ sftp.putfo(file_obj, remotepath)
+ if mode is not None:
+ sftp.chmod(remotepath, mode)
+
+
+class AutoConnectSSH(SSH):
+
+ def __init__(self, user, host, port=None, pkey=None,
+ key_filename=None, password=None, name=None, wait=False):
+ super(AutoConnectSSH, self).__init__(user, host, port, pkey, key_filename, password, name)
+ self._wait = wait
+
+ def _make_dict(self):
+ data = super(AutoConnectSSH, self)._make_dict()
+ data.update({
+ 'wait': self._wait
+ })
+ return data
+
+ def _connect(self):
+ if not self.is_connected:
+ self._get_client()
+ if self._wait:
+ self.wait()
+
+ def drop_connection(self):
+ """ Don't close anything, just force creation of a new client """
+ self._client = False
+
+ def execute(self, cmd, stdin=None, timeout=3600):
+ self._connect()
+ return super(AutoConnectSSH, self).execute(cmd, stdin, timeout)
+
+ def run(self, cmd, stdin=None, stdout=None, stderr=None,
+ raise_on_error=True, timeout=3600,
+ keep_stdin_open=False, pty=False):
+ self._connect()
+ return super(AutoConnectSSH, self).run(cmd, stdin, stdout, stderr, raise_on_error,
+ timeout, keep_stdin_open, pty)
+
+ def put(self, files, remote_path=b'.', recursive=False):
+ self._connect()
+ return super(AutoConnectSSH, self).put(files, remote_path, recursive)
+
+ def put_file(self, local_path, remote_path, mode=None):
+ self._connect()
+ return super(AutoConnectSSH, self).put_file(local_path, remote_path, mode)
+
+ def put_file_obj(self, file_obj, remote_path, mode=None):
+ self._connect()
+ return super(AutoConnectSSH, self).put_file_obj(file_obj, remote_path, mode)
+
+ def provision_tool(self, tool_path, tool_file=None):
+ self._connect()
+ return super(AutoConnectSSH, self).provision_tool(tool_path, tool_file)
+
+ @staticmethod
+ def get_class():
+ # must return static class name, anything else refers to the calling class
+ # i.e. the subclass, not the superclass
+ return AutoConnectSSH