aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick
diff options
context:
space:
mode:
authorAbhijit Sinha <abhijit.sinha@intel.com>2018-09-20 16:40:11 +0000
committerGerrit Code Review <gerrit@opnfv.org>2018-09-20 16:40:11 +0000
commit4faa5ea500f65cca2b606a27fd2c22ed19358941 (patch)
treef3950e8ce19f89fc79a2bb3264217fc71ed4901d /yardstick
parent7ab7287fc0332b9a592d2bbf86c646e3b1b3058b (diff)
parentc32cfdf59d67d15053e5c843684fe7093de50650 (diff)
Merge changes from topics 'YARDSTICK-1354', 'YARDSTICK-1348', 'YARDSTICK-1359', 'YARDSTICK-1356'
* changes: Add vEPC service request test cases Add network initiated dedicated bearers creation Add UE initiated dedicated bearer creation test Add vEPC infrastructure for Landslide TG Add vEPC default bearers relocation test case Add vEPC default bearers create/delete test case Add Spirent Landslide traffic profile templates Update LSResourceHelper unittests Add Spirent Landslide TG API Added Landslide Resource Helper implementation Add TclClients for Spirent Landslide TG
Diffstat (limited to 'yardstick')
-rw-r--r--yardstick/common/exceptions.py12
-rw-r--r--yardstick/network_services/traffic_profile/__init__.py1
-rw-r--r--yardstick/network_services/traffic_profile/landslide_profile.py47
-rw-r--r--yardstick/network_services/utils.py3
-rw-r--r--yardstick/network_services/vnf_generic/vnf/epc_vnf.py53
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_landslide.py1203
-rw-r--r--yardstick/tests/unit/network_services/traffic_profile/test_landslide_profile.py136
-rw-r--r--yardstick/tests/unit/network_services/vnf_generic/vnf/test_epc_vnf.py94
-rw-r--r--yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py1911
9 files changed, 3460 insertions, 0 deletions
diff --git a/yardstick/common/exceptions.py b/yardstick/common/exceptions.py
index 10c1f3f27..539e0fec3 100644
--- a/yardstick/common/exceptions.py
+++ b/yardstick/common/exceptions.py
@@ -420,3 +420,15 @@ class InvalidMacAddress(YardstickException):
class ValueCheckError(YardstickException):
message = 'Constraint "%(value1)s %(operator)s %(value2)s" does not hold'
+
+
+class RestApiError(RuntimeError):
+ def __init__(self, message):
+ self._message = message
+ super(RestApiError, self).__init__(message)
+
+
+class LandslideTclException(RuntimeError):
+ def __init__(self, message):
+ self._message = message
+ super(LandslideTclException, self).__init__(message)
diff --git a/yardstick/network_services/traffic_profile/__init__.py b/yardstick/network_services/traffic_profile/__init__.py
index a1b26a24d..91d8a665f 100644
--- a/yardstick/network_services/traffic_profile/__init__.py
+++ b/yardstick/network_services/traffic_profile/__init__.py
@@ -28,6 +28,7 @@ def register_modules():
'yardstick.network_services.traffic_profile.prox_ramp',
'yardstick.network_services.traffic_profile.rfc2544',
'yardstick.network_services.traffic_profile.pktgen',
+ 'yardstick.network_services.traffic_profile.landslide_profile',
]
for module in modules:
diff --git a/yardstick/network_services/traffic_profile/landslide_profile.py b/yardstick/network_services/traffic_profile/landslide_profile.py
new file mode 100644
index 000000000..f79226fb4
--- /dev/null
+++ b/yardstick/network_services/traffic_profile/landslide_profile.py
@@ -0,0 +1,47 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Spirent Landslide traffic profile definitions """
+
+from yardstick.network_services.traffic_profile import base
+
+
+class LandslideProfile(base.TrafficProfile):
+ """
+ This traffic profile handles attributes of Landslide data stream
+ """
+
+ def __init__(self, tp_config):
+ super(LandslideProfile, self).__init__(tp_config)
+
+ # for backward compatibility support dict and list of dicts
+ if isinstance(tp_config["dmf_config"], dict):
+ self.dmf_config = [tp_config["dmf_config"]]
+ else:
+ self.dmf_config = tp_config["dmf_config"]
+
+ def execute(self, traffic_generator):
+ pass
+
+ def update_dmf(self, options):
+ if 'dmf' in options:
+ if isinstance(options['dmf'], dict):
+ _dmfs = [options['dmf']]
+ else:
+ _dmfs = options['dmf']
+
+ for index, _dmf in enumerate(_dmfs):
+ try:
+ self.dmf_config[index].update(_dmf)
+ except IndexError:
+ pass
diff --git a/yardstick/network_services/utils.py b/yardstick/network_services/utils.py
index 4b987fafe..9c64fecde 100644
--- a/yardstick/network_services/utils.py
+++ b/yardstick/network_services/utils.py
@@ -36,6 +36,9 @@ OPTS = [
cfg.StrOpt('trex_client_lib',
default=os.path.join(NSB_ROOT, 'trex_client/stl'),
help='trex python library path.'),
+ cfg.StrOpt('jre_path_i386',
+ default='',
+ help='path to installation of 32-bit Java 1.7+.'),
]
CONF.register_opts(OPTS, group="nsb")
diff --git a/yardstick/network_services/vnf_generic/vnf/epc_vnf.py b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py
new file mode 100644
index 000000000..66d16d07f
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py
@@ -0,0 +1,53 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from yardstick.network_services.vnf_generic.vnf import base
+
+LOG = logging.getLogger(__name__)
+
+
+class EPCVnf(base.GenericVNF):
+
+ def __init__(self, name, vnfd, task_id):
+ super(EPCVnf, self).__init__(name, vnfd, task_id)
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ """Prepare VNF for operation and start the VNF process/VM
+
+ :param scenario_cfg: Scenario config
+ :param context_cfg: Context config
+ """
+ pass
+
+ def wait_for_instantiate(self):
+ """Wait for VNF to start"""
+ pass
+
+ def terminate(self):
+ """Kill all VNF processes"""
+ pass
+
+ def scale(self, flavor=""):
+ pass
+
+ def collect_kpi(self):
+ pass
+
+ def start_collect(self):
+ pass
+
+ def stop_collect(self):
+ pass
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
new file mode 100644
index 000000000..a146b72ca
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
@@ -0,0 +1,1203 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import logging
+import requests
+import six
+import time
+
+from yardstick.common import exceptions
+from yardstick.common import utils as common_utils
+from yardstick.common import yaml_loader
+from yardstick.network_services import utils as net_serv_utils
+from yardstick.network_services.vnf_generic.vnf import sample_vnf
+
+try:
+ from lsapi import LsApi
+except ImportError:
+ LsApi = common_utils.ErrorClass
+
+LOG = logging.getLogger(__name__)
+
+
+class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen):
+ APP_NAME = 'LandslideTG'
+
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = LandslideResourceHelper
+ super(LandslideTrafficGen, self).__init__(name, vnfd, task_id,
+ setup_env_helper_type,
+ resource_helper_type)
+
+ self.bin_path = net_serv_utils.get_nsb_option('bin_path')
+ self.name = name
+ self.runs_traffic = True
+ self.traffic_finished = False
+ self.session_profile = None
+
+ def listen_traffic(self, traffic_profile):
+ pass
+
+ def terminate(self):
+ self.resource_helper.disconnect()
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg)
+ self.resource_helper.connect()
+
+ # Create test servers
+ test_servers = [x['test_server'] for x in self.vnfd_helper['config']]
+ self.resource_helper.create_test_servers(test_servers)
+
+ # Create SUTs
+ [self.resource_helper.create_suts(x['suts']) for x in
+ self.vnfd_helper['config']]
+
+ # Fill in test session based on session profile and test case options
+ self._load_session_profile()
+
+ def run_traffic(self, traffic_profile):
+ self.resource_helper.abort_running_tests()
+ # Update DMF profile with related test case options
+ traffic_profile.update_dmf(self.scenario_helper.all_options)
+ # Create DMF in test user library
+ self.resource_helper.create_dmf(traffic_profile.dmf_config)
+ # Create/update test session in test user library
+ self.resource_helper.create_test_session(self.session_profile)
+ # Start test session
+ self.resource_helper.create_running_tests(self.session_profile['name'])
+
+ def collect_kpi(self):
+ return self.resource_helper.collect_kpi()
+
+ def wait_for_instantiate(self):
+ pass
+
+ @staticmethod
+ def _update_session_suts(suts, testcase):
+ """ Create SUT entry. Update related EPC block in session profile. """
+ for sut in suts:
+ # Update session profile EPC element with SUT info from pod file
+ tc_role = testcase['parameters'].get(sut['role'])
+ if tc_role:
+ _param = {}
+ if tc_role['class'] == 'Sut':
+ _param['name'] = sut['name']
+ elif tc_role['class'] == 'TestNode':
+ _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'}
+ if x in sut and sut[x]})
+ testcase['parameters'][sut['role']].update(_param)
+ else:
+ LOG.info('Unexpected SUT role in pod file: "%s".', sut['role'])
+ return testcase
+
+ def _update_session_test_servers(self, test_server, _tsgroup_index):
+ """ Update tsId, reservations, pre-resolved ARP in session profile """
+ # Update test server name
+ test_groups = self.session_profile['tsGroups']
+ test_groups[_tsgroup_index]['tsId'] = test_server['name']
+
+ # Update preResolvedArpAddress
+ arp_key = 'preResolvedArpAddress'
+ _preresolved_arp = test_server.get(arp_key) # list of dicts
+ if _preresolved_arp:
+ test_groups[_tsgroup_index][arp_key] = _preresolved_arp
+
+ # Update reservations
+ if 'phySubnets' in test_server:
+ reservation = {'tsId': test_server['name'],
+ 'tsIndex': _tsgroup_index,
+ 'tsName': test_server['name'],
+ 'phySubnets': test_server['phySubnets']}
+ if 'reservations' in self.session_profile:
+ self.session_profile['reservations'].append(reservation)
+ else:
+ self.session_profile['reservePorts'] = 'true'
+ self.session_profile['reservations'] = [reservation]
+
+ @staticmethod
+ def _update_session_tc_params(tc_options, testcase):
+ for _param_key in tc_options:
+ if _param_key == 'AssociatedPhys':
+ testcase[_param_key] = tc_options[_param_key]
+ continue
+ testcase['parameters'][_param_key] = tc_options[_param_key]
+ return testcase
+
+ def _load_session_profile(self):
+
+ with common_utils.open_relative_file(
+ self.scenario_helper.scenario_cfg['session_profile'],
+ self.scenario_helper.task_path) as stream:
+ self.session_profile = yaml_loader.yaml_load(stream)
+
+ # Raise exception if number of entries differs in following files,
+ _config_files = ['pod file', 'session_profile file', 'test_case file']
+ # Count testcases number in all tsGroups of session profile
+ session_tests_num = [xx for x in self.session_profile['tsGroups']
+ for xx in x['testCases']]
+ # Create a set containing number of list elements in each structure
+ _config_files_blocks_num = [
+ len(x) for x in
+ (self.vnfd_helper['config'], # test_servers and suts info
+ session_tests_num,
+ self.scenario_helper.all_options['test_cases'])] # test case file
+
+ if len(set(_config_files_blocks_num)) != 1:
+ raise RuntimeError('Unequal number of elements. {}'.format(
+ dict(six.moves.zip_longest(_config_files,
+ _config_files_blocks_num))))
+
+ ts_names = set()
+ _tsgroup_idx = -1
+ _testcase_idx = 0
+
+ # Iterate over data structures to overwrite session profile defaults
+ # _config: single list element holding test servers and SUTs info
+ # _tc_options: single test case parameters
+ for _config, tc_options in zip(
+ self.vnfd_helper['config'], # test servers and SUTS
+ self.scenario_helper.all_options['test_cases']): # testcase
+
+ _ts_config = _config['test_server']
+
+ # Calculate test group/test case indexes based on test server name
+ if _ts_config['name'] in ts_names:
+ _testcase_idx += 1
+ else:
+ _tsgroup_idx += 1
+ _testcase_idx = 0
+
+ _testcase = \
+ self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
+ _testcase_idx]
+
+ if _testcase['type'] != _ts_config['role']:
+ raise RuntimeError(
+ 'Test type mismatch in TC#{} of test server {}'.format(
+ _testcase_idx, _ts_config['name']))
+
+ # Fill session profile with test servers parameters
+ if _ts_config['name'] not in ts_names:
+ self._update_session_test_servers(_ts_config, _tsgroup_idx)
+ ts_names.add(_ts_config['name'])
+
+ # Fill session profile with suts parameters
+ self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
+ _testcase_idx].update(
+ self._update_session_suts(_config['suts'], _testcase))
+
+ # Update test case parameters
+ self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
+ _testcase_idx].update(
+ self._update_session_tc_params(tc_options, _testcase))
+
+
+class LandslideResourceHelper(sample_vnf.ClientResourceHelper):
+ """Landslide TG helper class"""
+
+ REST_STATUS_CODES = {'OK': 200, 'CREATED': 201, 'NO CHANGE': 409}
+ REST_API_CODES = {'NOT MODIFIED': 500810}
+
+ def __init__(self, setup_helper):
+ super(LandslideResourceHelper, self).__init__(setup_helper)
+ self._result = {}
+ self.vnfd_helper = setup_helper.vnfd_helper
+ self.scenario_helper = setup_helper.scenario_helper
+
+ # TAS Manager config initialization
+ self._url = None
+ self._user_id = None
+ self.session = None
+ self.license_data = {}
+
+ # TCL session initialization
+ self._tcl = LandslideTclClient(LsTclHandler(), self)
+
+ self.session = requests.Session()
+ self.running_tests_uri = 'runningTests'
+ self.test_session_uri = 'testSessions'
+ self.test_serv_uri = 'testServers'
+ self.suts_uri = 'suts'
+ self.users_uri = 'users'
+ self.user_lib_uri = None
+ self.run_id = None
+
+ def abort_running_tests(self, timeout=60, delay=5):
+ """ Abort running test sessions, if any """
+ _start_time = time.time()
+ while time.time() < _start_time + timeout:
+ run_tests_states = {x['id']: x['testStateOrStep']
+ for x in self.get_running_tests()}
+ if not set(run_tests_states.values()).difference(
+ {'COMPLETE', 'COMPLETE_ERROR'}):
+ break
+ else:
+ [self.stop_running_tests(running_test_id=_id, force=True)
+ for _id, _state in run_tests_states.items()
+ if 'COMPLETE' not in _state]
+ time.sleep(delay)
+ else:
+ raise RuntimeError(
+ 'Some test runs not stopped during {} seconds'.format(timeout))
+
+ def _build_url(self, resource, action=None):
+ """ Build URL string
+
+ :param resource: REST API resource name
+ :type resource: str
+ :param action: actions name and value
+ :type action: dict('name': <str>, 'value': <str>)
+ :returns str: REST API resource name with optional action info
+ """
+ # Action is optional and accepted only in presence of resource param
+ if action and not resource:
+ raise ValueError("Resource name not provided")
+ # Concatenate actions
+ _action = ''.join(['?{}={}'.format(k, v) for k, v in
+ action.items()]) if action else ''
+
+ return ''.join([self._url, resource, _action])
+
+ def get_response_params(self, method, resource, params=None):
+ """ Retrieve params from JSON response of specific resource URL
+
+ :param method: one of supported REST API methods
+ :type method: str
+ :param resource: URI, requested resource name
+ :type resource: str
+ :param params: attributes to be found in JSON response
+ :type params: list(str)
+ """
+ _res = []
+ params = params if params else []
+ response = self.exec_rest_request(method, resource)
+ # Get substring between last slash sign and question mark (if any)
+ url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0]
+ _response_json = response.json()
+ # Expect dict(), if URL last part and top dict key don't match
+ # Else, if they match, expect list()
+ k, v = list(_response_json.items())[0]
+ if k != url_last_part:
+ v = [v] # v: list(dict(str: str))
+ # Extract params, or whole list of dicts (without top level key)
+ for x in v:
+ _res.append({param: x[param] for param in params} if params else x)
+ return _res
+
+ def _create_user(self, auth, level=1):
+ """ Create new user
+
+ :param auth: data to create user account on REST server
+ :type auth: dict
+ :param level: Landslide user permissions level
+ :type level: int
+ :returns int: user id
+ """
+ # Set expiration date in two years since account creation date
+ _exp_date = time.strftime(
+ '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2))
+ _username = auth['user']
+ _fields = {"contactInformation": "", "expiresOn": _exp_date,
+ "fullName": "Test User",
+ "isActive": "true", "level": level,
+ "password": auth['password'],
+ "username": _username}
+ _response = self.exec_rest_request('post', self.users_uri,
+ json_data=_fields, raise_exc=False)
+ _resp_json = _response.json()
+ if _response.status_code == self.REST_STATUS_CODES['CREATED']:
+ # New user created
+ _id = _resp_json['id']
+ LOG.info("New user created: username='%s', id='%s'", _username,
+ _id)
+ elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']:
+ # User already exists
+ LOG.info("Account '%s' already exists.", _username)
+ # Get user id
+ _id = self._modify_user(_username, {"isActive": "true"})['id']
+ else:
+ raise exceptions.RestApiError(
+ 'Error during new user "{}" creation'.format(_username))
+ return _id
+
+ def _modify_user(self, username, fields):
+ """ Modify information about existing user
+
+ :param username: user name of account to be modified
+ :type username: str
+ :param fields: data to modify user account on REST server
+ :type fields: dict
+ :returns dict: user info
+ """
+ _response = self.exec_rest_request('post', self.users_uri,
+ action={'username': username},
+ json_data=fields, raise_exc=False)
+ if _response.status_code == self.REST_STATUS_CODES['OK']:
+ _response = _response.json()
+ else:
+ raise exceptions.RestApiError(
+ 'Error during user "{}" data update: {}'.format(
+ username,
+ _response.status_code))
+ LOG.info("User account '%s' modified: '%s'", username, _response)
+ return _response
+
+ def _delete_user(self, username):
+ """ Delete user account
+
+ :param username: username field
+ :type username: str
+ :returns bool: True if succeeded
+ """
+ self.exec_rest_request('delete', self.users_uri,
+ action={'username': username})
+
+ def _get_users(self, username=None):
+ """ Get user records from REST server
+
+ :param username: username field
+ :type username: None|str
+ :returns list(dict): empty list, or user record, or list of all users
+ """
+ _response = self.get_response_params('get', self.users_uri)
+ _res = [u for u in _response if
+ u['username'] == username] if username else _response
+ return _res
+
+ def exec_rest_request(self, method, resource, action=None, json_data=None,
+ logs=True, raise_exc=True):
+ """ Execute REST API request, return response object
+
+ :param method: one of supported requests ('post', 'get', 'delete')
+ :type method: str
+ :param resource: URL of resource
+ :type resource: str
+ :param action: data used to provide URI located after question mark
+ :type action: dict
+ :param json_data: mandatory only for 'post' method
+ :type json_data: dict
+ :param logs: debug logs display flag
+ :type raise_exc: bool
+ :param raise_exc: if True, raise exception on REST API call error
+ :returns requests.Response(): REST API call response object
+ """
+ json_data = json_data if json_data else {}
+ action = action if action else {}
+ _method = method.upper()
+ method = method.lower()
+ if method not in ('post', 'get', 'delete'):
+ raise ValueError("Method '{}' not supported".format(_method))
+
+ if method == 'post' and not action:
+ if not (json_data and isinstance(json_data, collections.Mapping)):
+ raise ValueError(
+ 'JSON data missing in {} request'.format(_method))
+
+ r = getattr(self.session, method)(self._build_url(resource, action),
+ json=json_data)
+ if raise_exc and not r.ok:
+ msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format(
+ method, self._build_url(resource, action), r.reason)
+ raise exceptions.RestApiError(msg)
+
+ if logs:
+ LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method,
+ r.request.url)
+ LOG.debug("Response: %s", r.json())
+ return r
+
+ def connect(self):
+ """Connect to RESTful server using test user account"""
+ tas_info = self.vnfd_helper['mgmt-interface']
+ # Supported REST Server ports: HTTP - 8080, HTTPS - 8181
+ _port = '8080' if tas_info['proto'] == 'http' else '8181'
+ tas_info.update({'port': _port})
+ self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info)
+ self.session.headers.update({'Accept': 'application/json',
+ 'Content-type': 'application/json'})
+ # Login with super user to create test user
+ self.session.auth = (
+ tas_info['super-user'], tas_info['super-user-password'])
+ LOG.info("Connect using superuser: server='%s'", self._url)
+ auth = {x: tas_info[x] for x in ('user', 'password')}
+ self._user_id = self._create_user(auth)
+ # Login with test user
+ self.session.auth = auth['user'], auth['password']
+ # Test user validity
+ self.exec_rest_request('get', '')
+
+ self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri)
+ LOG.info("Login with test user: server='%s'", self._url)
+ # Read existing license
+ self.license_data['lic_id'] = tas_info['license']
+
+ # Tcl client init
+ self._tcl.connect(tas_info['ip'], *self.session.auth)
+
+ return self.session
+
+ def disconnect(self):
+ self.session = None
+ self._tcl.disconnect()
+
+ def terminate(self):
+ self._terminated.value = 1
+
+ def create_dmf(self, dmf):
+ if isinstance(dmf, list):
+ for _dmf in dmf:
+ self._tcl.create_dmf(_dmf)
+ else:
+ self._tcl.create_dmf(dmf)
+
+ def delete_dmf(self, dmf):
+ if isinstance(dmf, list):
+ for _dmf in dmf:
+ self._tcl.delete_dmf(_dmf)
+ else:
+ self._tcl.delete_dmf(dmf)
+
+ def create_suts(self, suts):
+ # Keep only supported keys in suts object
+ for _sut in suts:
+ sut_entry = {k: v for k, v in _sut.items()
+ if k not in {'phy', 'nextHop', 'role'}}
+ _response = self.exec_rest_request(
+ 'post', self.suts_uri, json_data=sut_entry,
+ logs=False, raise_exc=False)
+ if _response.status_code != self.REST_STATUS_CODES['CREATED']:
+ LOG.info(_response.reason) # Failed to create
+ _name = sut_entry.pop('name')
+ # Modify existing SUT
+ self.configure_sut(sut_name=_name, json_data=sut_entry)
+ else:
+ LOG.info("SUT created: %s", sut_entry)
+
+ def get_suts(self, suts_id=None):
+ if suts_id:
+ _suts = self.exec_rest_request(
+ 'get', '{}/{}'.format(self.suts_uri, suts_id)).json()
+ else:
+ _suts = self.get_response_params('get', self.suts_uri)
+
+ return _suts
+
+ def configure_sut(self, sut_name, json_data):
+ """ Modify information of specific SUTs
+
+ :param sut_name: name of existing SUT
+ :type sut_name: str
+ :param json_data: SUT settings
+ :type json_data: dict()
+ """
+ LOG.info("Modifying SUT information...")
+ _response = self.exec_rest_request('post',
+ self.suts_uri,
+ action={'name': sut_name},
+ json_data=json_data,
+ raise_exc=False)
+ if _response.status_code not in {self.REST_STATUS_CODES[x] for x in
+ {'OK', 'NO CHANGE'}}:
+ raise exceptions.RestApiError(_response.reason)
+
+ LOG.info("Modified SUT: %s", sut_name)
+
+ def delete_suts(self, suts_ids=None):
+ if not suts_ids:
+ _curr_suts = self.get_response_params('get', self.suts_uri)
+ suts_ids = [x['id'] for x in _curr_suts]
+ LOG.info("Deleting SUTs with following IDs: %s", suts_ids)
+ for _id in suts_ids:
+ self.exec_rest_request('delete',
+ '{}/{}'.format(self.suts_uri, _id))
+ LOG.info("\tDone for SUT id: %s", _id)
+
+ def _check_test_servers_state(self, test_servers_ids=None, delay=10,
+ timeout=300):
+ LOG.info("Waiting for related test servers state change to READY...")
+ # Wait on state change
+ _start_time = time.time()
+ while time.time() - _start_time < timeout:
+ ts_ids_not_ready = {x['id'] for x in
+ self.get_test_servers(test_servers_ids)
+ if x['state'] != 'READY'}
+ if ts_ids_not_ready == set():
+ break
+ time.sleep(delay)
+ else:
+ raise RuntimeError(
+ 'Test servers not in READY state after {} seconds.'.format(
+ timeout))
+
+ def create_test_servers(self, test_servers):
+ """ Create test servers
+
+ :param test_servers: input data for test servers creation
+ mandatory fields: managementIp
+ optional fields: name
+ :type test_servers: list(dict)
+ """
+ _ts_ids = []
+ for _ts in test_servers:
+ _msg = 'Created test server "%(name)s"'
+ _ts_ids.append(self._tcl.create_test_server(_ts))
+ if _ts.get('thread_model'):
+ _msg += ' in mode: "%(thread_model)s"'
+ LOG.info(_msg, _ts)
+
+ self._check_test_servers_state(_ts_ids)
+
+ def get_test_servers(self, test_server_ids=None):
+ if not test_server_ids: # Get all test servers info
+ _test_servers = self.exec_rest_request(
+ 'get', self.test_serv_uri).json()[self.test_serv_uri]
+ LOG.info("Current test servers configuration: %s", _test_servers)
+ return _test_servers
+
+ _test_servers = []
+ for _id in test_server_ids:
+ _test_servers.append(self.exec_rest_request(
+ 'get', '{}/{}'.format(self.test_serv_uri, _id)).json())
+ LOG.info("Current test servers configuration: %s", _test_servers)
+ return _test_servers
+
+ def configure_test_servers(self, action, json_data=None,
+ test_server_ids=None):
+ if not test_server_ids:
+ test_server_ids = [x['id'] for x in self.get_test_servers()]
+ elif isinstance(test_server_ids, int):
+ test_server_ids = [test_server_ids]
+ for _id in test_server_ids:
+ self.exec_rest_request('post',
+ '{}/{}'.format(self.test_serv_uri, _id),
+ action=action, json_data=json_data)
+ LOG.info("Test server (id: %s) configuration done: %s", _id,
+ action)
+ return test_server_ids
+
+ def delete_test_servers(self, test_servers_ids=None):
+ # Delete test servers
+ for _ts in self.get_test_servers(test_servers_ids):
+ self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri,
+ _ts['id']))
+ LOG.info("Deleted test server: %s", _ts['name'])
+
+ def create_test_session(self, test_session):
+ # Use tcl client to create session
+ test_session['library'] = self._user_id
+ LOG.debug("Creating session='%s'", test_session['name'])
+ self._tcl.create_test_session(test_session)
+
+ def get_test_session(self, test_session_name=None):
+ if test_session_name:
+ uri = 'libraries/{}/{}/{}'.format(self._user_id,
+ self.test_session_uri,
+ test_session_name)
+ else:
+ uri = self.user_lib_uri.format(self._user_id)
+ _test_sessions = self.exec_rest_request('get', uri).json()
+ return _test_sessions
+
+ def configure_test_session(self, template_name, test_session):
+ # Override specified test session parameters
+ LOG.info('Update test session parameters: %s', test_session['name'])
+ test_session.update({'library': self._user_id})
+ return self.exec_rest_request(
+ method='post',
+ action={'action': 'overrideAndSaveAs'},
+ json_data=test_session,
+ resource='{}/{}'.format(self.user_lib_uri.format(self._user_id),
+ template_name))
+
+ def delete_test_session(self, test_session):
+ return self.exec_rest_request('delete', '{}/{}'.format(
+ self.user_lib_uri.format(self._user_id), test_session))
+
+ def create_running_tests(self, test_session_name):
+ r = self.exec_rest_request('post',
+ self.running_tests_uri,
+ json_data={'library': self._user_id,
+ 'name': test_session_name})
+ if r.status_code != self.REST_STATUS_CODES['CREATED']:
+ raise exceptions.RestApiError('Failed to start test session.')
+ self.run_id = r.json()['id']
+
+ def get_running_tests(self, running_test_id=None):
+ """Get JSON structure of specified running test entity
+
+ :param running_test_id: ID of created running test entity
+ :type running_test_id: int
+ :returns list: running tests entity
+ """
+ if not running_test_id:
+ running_test_id = ''
+ _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
+ _res = self.exec_rest_request('get', _res_name, logs=False).json()
+ # If no run_id specified, skip top level key in response dict.
+ # Else return JSON as list
+ return _res.get('runningTests', [_res])
+
+ def delete_running_tests(self, running_test_id=None):
+ if not running_test_id:
+ running_test_id = ''
+ _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
+ self.get_response_params('delete', _res_name)
+ LOG.info("Deleted running test with id: %s", running_test_id)
+
+ def _running_tests_action(self, running_test_id, action, json_data=None):
+ if not json_data:
+ json_data = {}
+ # Supported actions:
+ # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc'
+ _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
+ self.exec_rest_request('post', _res_name, {'action': action},
+ json_data)
+ LOG.debug("Executed action: '%s' on running test id: %s", action,
+ running_test_id)
+
+ def stop_running_tests(self, running_test_id, json_data=None, force=False):
+ _action = 'abort' if force else 'stop'
+ self._running_tests_action(running_test_id, _action,
+ json_data=json_data)
+ LOG.info('Performed action: "%s" to test run with id: %s', _action,
+ running_test_id)
+
+ def check_running_test_state(self, run_id):
+ r = self.exec_rest_request('get',
+ '{}/{}'.format(self.running_tests_uri,
+ run_id))
+ return r.json().get("testStateOrStep")
+
+ def get_running_tests_results(self, run_id):
+ _res = self.exec_rest_request(
+ 'get',
+ '{}/{}/{}'.format(self.running_tests_uri,
+ run_id,
+ 'measurements')).json()
+ return _res
+
+ def _write_results(self, results):
+ # Avoid None value at test session start
+ _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0
+
+ _res_tabs = results.get('tabs')
+ # Avoid parsing 'tab' dict key initially (missing or empty)
+ if not _res_tabs:
+ return
+
+ # Flatten nested dict holding Landslide KPIs of current test run
+ flat_kpis_dict = {}
+ for _tab, _kpis in six.iteritems(_res_tabs):
+ for _kpi, _value in six.iteritems(_kpis):
+ # Combine table name and KPI name using delimiter "::"
+ _key = '::'.join([_tab, _kpi])
+ try:
+ # Cast value from str to float
+ # Remove comma and/or measure units, e.g. "us"
+ flat_kpis_dict[_key] = float(
+ _value.split(' ')[0].replace(',', ''))
+ except ValueError: # E.g. if KPI represents datetime
+ pass
+ LOG.info("Polling test results of test run id: %s. Elapsed time: %s "
+ "seconds", self.run_id, _elapsed_time)
+ return flat_kpis_dict
+
+ def collect_kpi(self):
+ if 'COMPLETE' in self.check_running_test_state(self.run_id):
+ self._result.update({'done': True})
+ return self._result
+ _res = self.get_running_tests_results(self.run_id)
+ _kpis = self._write_results(_res)
+ if _kpis:
+ _kpis.update({'run_id': int(self.run_id)})
+ _kpis.update({'iteration': _res['iteration']})
+ self._result.update(_kpis)
+ return self._result
+
+
+class LandslideTclClient(object):
+ """Landslide TG TCL client class"""
+
+ DEFAULT_TEST_NODE = {
+ 'ethStatsEnabled': True,
+ 'forcedEthInterface': '',
+ 'innerVlanId': 0,
+ 'ip': '',
+ 'mac': '',
+ 'mtu': 1500,
+ 'nextHop': '',
+ 'numLinksOrNodes': 1,
+ 'numVlan': 1,
+ 'phy': '',
+ 'uniqueVlanAddr': False,
+ 'vlanDynamic': 0,
+ 'vlanId': 0,
+ 'vlanUserPriority': 0,
+ 'vlanTagType': 0
+ }
+
+ TEST_NODE_CMD = \
+ 'ls::create -TestNode-{} -under $p_ -Type "eth"' \
+ ' -Phy "{phy}" -Ip "{ip}" -NumLinksOrNodes {numLinksOrNodes}' \
+ ' -NextHop "{nextHop}" -Mac "{mac}" -MTU {mtu}' \
+ ' -ForcedEthInterface "{forcedEthInterface}"' \
+ ' -EthStatsEnabled {ethStatsEnabled}' \
+ ' -VlanId {vlanId} -VlanUserPriority {vlanUserPriority}' \
+ ' -NumVlan {numVlan} -UniqueVlanAddr {uniqueVlanAddr}' \
+ ';'
+
+ def __init__(self, tcl_handler, ts_context):
+ self.tcl_server_ip = None
+ self._user = None
+ self._library_id = None
+ self._basic_library_id = None
+ self._tcl = tcl_handler
+ self._ts_context = ts_context
+ self.ts_ids = set()
+
+ # Test types names expected in session profile, test case and pod files
+ self._tc_types = {"SGW_Nodal", "SGW_Node", "MME_Nodal", "PGW_Node",
+ "PCRF_Node"}
+
+ self._class_param_config_handler = {
+ "Array": self._configure_array_param,
+ "TestNode": self._configure_test_node_param,
+ "Sut": self._configure_sut_param,
+ "Dmf": self._configure_dmf_param
+ }
+
+ def connect(self, tcl_server_ip, username, password):
+ """ Connect to TCL server with username and password
+
+ :param tcl_server_ip: TCL server IP address
+ :type tcl_server_ip: str
+ :param username: existing username on TCL server
+ :type username: str
+ :param password: password related to username on TCL server
+ :type password: str
+ """
+ LOG.info("connect: server='%s' user='%s'", tcl_server_ip, username)
+ res = self._tcl.execute(
+ "ls::login {} {} {}".format(tcl_server_ip, username, password))
+ if 'java0x' not in res: # handle assignment reflects login success
+ raise exceptions.LandslideTclException(
+ "connect: login failed ='{}'.".format(res))
+ self._library_id = self._tcl.execute(
+ "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format(
+ username))
+ self._basic_library_id = self._get_library_id('Basic')
+ self.tcl_server_ip = tcl_server_ip
+ self._user = username
+ LOG.debug("connect: user='%s' me='%s' basic='%s'", self._user,
+ self._library_id,
+ self._basic_library_id)
+
+ def disconnect(self):
+ """ Disconnect from TCL server. Drop TCL connection configuration """
+ LOG.info("disconnect: server='%s' user='%s'",
+ self.tcl_server_ip, self._user)
+ self._tcl.execute("ls::logout")
+ self.tcl_server_ip = None
+ self._user = None
+ self._library_id = None
+ self._basic_library_id = None
+
+ def _add_test_server(self, name, ip):
+ try:
+ # Check if test server exists with name equal to _ts_name
+ ts_id = int(self.resolve_test_server_name(name))
+ except ValueError:
+ # Such test server does not exist. Attempt to create it
+ ts_id = self._tcl.execute(
+ 'ls::perform AddTs -Name "{}" -Ip "{}"'.format(name, ip))
+ try:
+ int(ts_id)
+ except ValueError:
+ # Failed to create test server, e.g. limit reached
+ raise RuntimeError(
+ 'Failed to create test server: "{}". {}'.format(name,
+ ts_id))
+ return ts_id
+
+ def _update_license(self, name):
+ """ Setup/update test server license
+
+ :param name: test server name
+ :type name: str
+ """
+ # Retrieve current TsInfo configuration, result stored in handle "ts"
+ self._tcl.execute(
+ 'set ts [ls::retrieve TsInfo -Name "{}"]'.format(name))
+
+ # Set license ID, if it differs from current one, update test server
+ _curr_lic_id = self._tcl.execute('ls::get $ts -RequestedLicense')
+ if _curr_lic_id != self._ts_context.license_data['lic_id']:
+ self._tcl.execute('ls::config $ts -RequestedLicense {}'.format(
+ self._ts_context.license_data['lic_id']))
+ self._tcl.execute('ls::perform ModifyTs $ts')
+
+ def _set_thread_model(self, name, thread_model):
+ # Retrieve test server configuration, store it in handle "tsc"
+ _cfguser_password = self._ts_context.vnfd_helper['mgmt-interface'][
+ 'cfguser_password']
+ self._tcl.execute(
+ 'set tsc [ls::perform RetrieveTsConfiguration '
+ '-name "{}" {}]'.format(name, _cfguser_password))
+ # Configure ThreadModel, if it differs from current one
+ thread_model_map = {'Legacy': 'V0',
+ 'Max': 'V1',
+ 'Fireball': 'V1_FB3'}
+ _model = thread_model_map[thread_model]
+ _curr_model = self._tcl.execute('ls::get $tsc -ThreadModel')
+ if _curr_model != _model:
+ self._tcl.execute(
+ 'ls::config $tsc -ThreadModel "{}"'.format(_model))
+ self._tcl.execute(
+ 'ls::perform ApplyTsConfiguration $tsc {}'.format(
+ _cfguser_password))
+
+ def create_test_server(self, test_server):
+ _ts_thread_model = test_server.get('thread_model')
+ _ts_name = test_server['name']
+
+ ts_id = self._add_test_server(_ts_name, test_server['ip'])
+
+ self._update_license(_ts_name)
+
+ # Skip below code modifying thread_model if it is not defined
+ if _ts_thread_model:
+ self._set_thread_model(_ts_name, _ts_thread_model)
+
+ return ts_id
+
+ def create_test_session(self, test_session):
+ """ Create, configure and save Landslide test session object.
+
+ :param test_session: Landslide TestSession object
+ :type test_session: dict
+ """
+ LOG.info("create_test_session: name='%s'", test_session['name'])
+ self._tcl.execute('set test_ [ls::create TestSession]')
+ self._tcl.execute('ls::config $test_ -Library {} -Name "{}"'.format(
+ self._library_id, test_session['name']))
+ self._tcl.execute('ls::config $test_ -Description "{}"'.format(
+ test_session['description']))
+ if 'keywords' in test_session:
+ self._tcl.execute('ls::config $test_ -Keywords "{}"'.format(
+ test_session['keywords']))
+ if 'duration' in test_session:
+ self._tcl.execute('ls::config $test_ -Duration "{}"'.format(
+ test_session['duration']))
+ if 'iterations' in test_session:
+ self._tcl.execute('ls::config $test_ -Iterations "{}"'.format(
+ test_session['iterations']))
+ if 'reservePorts' in test_session:
+ if test_session['reservePorts'] == 'true':
+ self._tcl.execute('ls::config $test_ -Reserve Ports')
+
+ if 'reservations' in test_session:
+ for _reservation in test_session['reservations']:
+ self._configure_reservation(_reservation)
+
+ if 'reportOptions' in test_session:
+ self._configure_report_options(test_session['reportOptions'])
+
+ for _index, _group in enumerate(test_session['tsGroups']):
+ self._configure_ts_group(_group, _index)
+
+ self._save_test_session()
+
+ def create_dmf(self, dmf):
+ """ Create, configure and save Landslide Data Message Flow object.
+
+ :param dmf: Landslide Data Message Flow object
+ :type: dmf: dict
+ """
+ self._tcl.execute('set dmf_ [ls::create Dmf]')
+ _lib_id = self._get_library_id(dmf['dmf']['library'])
+ self._tcl.execute('ls::config $dmf_ -Library {} -Name "{}"'.format(
+ _lib_id,
+ dmf['dmf']['name']))
+ for _param_key in dmf:
+ if _param_key == 'dmf':
+ continue
+ _param_value = dmf[_param_key]
+ if isinstance(_param_value, dict):
+ # Configure complex parameter
+ _tcl_cmd = 'ls::config $dmf_'
+ for _sub_param_key in _param_value:
+ _sub_param_value = _param_value[_sub_param_key]
+ if isinstance(_sub_param_value, str):
+ _tcl_cmd += ' -{} "{}"'.format(_sub_param_key,
+ _sub_param_value)
+ else:
+ _tcl_cmd += ' -{} {}'.format(_sub_param_key,
+ _sub_param_value)
+
+ self._tcl.execute(_tcl_cmd)
+ else:
+ # Configure simple parameter
+ if isinstance(_param_value, str):
+ self._tcl.execute(
+ 'ls::config $dmf_ -{} "{}"'.format(_param_key,
+ _param_value))
+ else:
+ self._tcl.execute(
+ 'ls::config $dmf_ -{} {}'.format(_param_key,
+ _param_value))
+ self._save_dmf()
+
+ def configure_dmf(self, dmf):
+ # Use create to reconfigure and overwrite existing dmf
+ self.create_dmf(dmf)
+
+ def delete_dmf(self, dmf):
+ raise NotImplementedError
+
+ def _save_dmf(self):
+ # Call 'Validate' to set default values for missing parameters
+ res = self._tcl.execute('ls::perform Validate -Dmf $dmf_')
+ if res == 'Invalid':
+ res = self._tcl.execute('ls::get $dmf_ -ErrorsAndWarnings')
+ LOG.error("_save_dmf: %s", res)
+ raise exceptions.LandslideTclException("_save_dmf: {}".format(res))
+ else:
+ res = self._tcl.execute('ls::save $dmf_ -overwrite')
+ LOG.debug("_save_dmf: result (%s)", res)
+
+ def _configure_report_options(self, options):
+ for _option_key in options:
+ _option_value = options[_option_key]
+ if _option_key == 'format':
+ _format = 0
+ if _option_value == 'CSV':
+ _format = 1
+ self._tcl.execute(
+ 'ls::config $test_.ReportOptions -Format {} '
+ '-Ts -3 -Tc -3'.format(_format))
+ else:
+ self._tcl.execute(
+ 'ls::config $test_.ReportOptions -{} {}'.format(
+ _option_key,
+ _option_value))
+
+ def _configure_ts_group(self, ts_group, ts_group_index):
+ try:
+ _ts_id = int(self.resolve_test_server_name(ts_group['tsId']))
+ except ValueError:
+ raise RuntimeError('Test server name "{}" does not exist.'.format(
+ ts_group['tsId']))
+ if _ts_id not in self.ts_ids:
+ self._tcl.execute(
+ 'set tss_ [ls::create TsGroup -under $test_ -tsId {} ]'.format(
+ _ts_id))
+ self.ts_ids.add(_ts_id)
+ for _case in ts_group.get('testCases', []):
+ self._configure_tc_type(_case, ts_group_index)
+
+ self._configure_preresolved_arp(ts_group.get('preResolvedArpAddress'))
+
+ def _configure_tc_type(self, tc, ts_group_index):
+ if tc['type'] not in self._tc_types:
+ raise RuntimeError('Test type {} not supported.'.format(
+ tc['type']))
+ tc['type'] = tc['type'].replace('_', ' ')
+ res = self._tcl.execute(
+ 'set tc_ [ls::retrieve testcase -libraryId {0} "{1}"]'.format(
+ self._basic_library_id, tc['type']))
+ if 'Invalid' in res:
+ raise RuntimeError('Test type {} not found in "Basic" '
+ 'library.'.format(tc['type']))
+ self._tcl.execute(
+ 'ls::config $test_.TsGroup({}) -children-Tc $tc_'.format(
+ ts_group_index))
+ self._tcl.execute('ls::config $tc_ -Library {0} -Name "{1}"'.format(
+ self._basic_library_id, tc['name']))
+ self._tcl.execute(
+ 'ls::config $tc_ -Description "{}"'.format(tc['type']))
+ self._tcl.execute(
+ 'ls::config $tc_ -Keywords "GTP LTE {}"'.format(tc['type']))
+ if 'linked' in tc:
+ self._tcl.execute(
+ 'ls::config $tc_ -Linked {}'.format(tc['linked']))
+ if 'AssociatedPhys' in tc:
+ self._tcl.execute('ls::config $tc_ -AssociatedPhys "{}"'.format(
+ tc['AssociatedPhys']))
+ if 'parameters' in tc:
+ self._configure_parameters(tc['parameters'])
+
+ def _configure_parameters(self, params):
+ self._tcl.execute('set p_ [ls::get $tc_ -children-Parameters(0)]')
+ for _param_key in sorted(params):
+ _param_value = params[_param_key]
+ if isinstance(_param_value, dict):
+ # Configure complex parameter
+ if _param_value['class'] in self._class_param_config_handler:
+ self._class_param_config_handler[_param_value['class']](
+ _param_key,
+ _param_value)
+ else:
+ # Configure simple parameter
+ self._tcl.execute(
+ 'ls::create {} -under $p_ -Value "{}"'.format(
+ _param_key,
+ _param_value))
+
+ def _configure_array_param(self, name, params):
+ self._tcl.execute('ls::create -Array-{} -under $p_ ;'.format(name))
+ for param in params['array']:
+ self._tcl.execute(
+ 'ls::create ArrayItem -under $p_.{} -Value "{}"'.format(name,
+ param))
+
+ def _configure_test_node_param(self, name, params):
+ _params = self.DEFAULT_TEST_NODE
+ _params.update(params)
+
+ # TCL command expects lower case 'true' or 'false'
+ _params['ethStatsEnabled'] = str(_params['ethStatsEnabled']).lower()
+ _params['uniqueVlanAddr'] = str(_params['uniqueVlanAddr']).lower()
+
+ cmd = self.TEST_NODE_CMD.format(name, **_params)
+ self._tcl.execute(cmd)
+
+ def _configure_sut_param(self, name, params):
+ self._tcl.execute(
+ 'ls::create -Sut-{} -under $p_ -Name "{}";'.format(name,
+ params['name']))
+
+ def _configure_dmf_param(self, name, params):
+ self._tcl.execute('ls::create -Dmf-{} -under $p_ ;'.format(name))
+
+ for _flow_index, _flow in enumerate(params['mainflows']):
+ _lib_id = self._get_library_id(_flow['library'])
+ self._tcl.execute(
+ 'ls::perform AddDmfMainflow $p_.Dmf {} "{}"'.format(
+ _lib_id,
+ _flow['name']))
+
+ if not params.get('instanceGroups'):
+ return
+
+ _instance_group = params['instanceGroups'][_flow_index]
+
+ # Traffic Mixer parameters handling
+ for _key in ['mixType', 'rate']:
+ if _key in _instance_group:
+ self._tcl.execute(
+ 'ls::config $p_.Dmf.InstanceGroup({}) -{} {}'.format(
+ _flow_index, _key, _instance_group[_key]))
+
+ # Assignments parameters handling
+ for _row_id, _row in enumerate(_instance_group.get('rows', [])):
+ self._tcl.execute(
+ 'ls::config $p_.Dmf.InstanceGroup({}).Row({}) -Node {} '
+ '-OverridePort {} -ClientPort {} -Context {} -Role {} '
+ '-PreferredTransport {} -RatingGroup {} '
+ '-ServiceID {}'.format(
+ _flow_index, _row_id, _row['node'],
+ _row['overridePort'], _row['clientPort'],
+ _row['context'], _row['role'], _row['transport'],
+ _row['ratingGroup'], _row['serviceId']))
+
+ def _configure_reservation(self, reservation):
+ _ts_id = self.resolve_test_server_name(reservation['tsId'])
+ self._tcl.execute(
+ 'set reservation_ [ls::create Reservation -under $test_]')
+ self._tcl.execute(
+ 'ls::config $reservation_ -TsIndex {} -TsId {} '
+ '-TsName "{}"'.format(reservation['tsIndex'],
+ _ts_id,
+ reservation['tsName']))
+ for _subnet in reservation['phySubnets']:
+ self._tcl.execute(
+ 'set physubnet_ [ls::create PhySubnet -under $reservation_]')
+ self._tcl.execute(
+ 'ls::config $physubnet_ -Name "{}" -Base "{}" -Mask "{}" '
+ '-NumIps {}'.format(_subnet['name'], _subnet['base'],
+ _subnet['mask'], _subnet['numIps']))
+
+ def _configure_preresolved_arp(self, pre_resolved_arp):
+ if not pre_resolved_arp: # Pre-resolved ARP configuration not found
+ return
+ for _entry in pre_resolved_arp:
+ # TsGroup handle name should correspond in _configure_ts_group()
+ self._tcl.execute(
+ 'ls::create PreResolvedArpAddress -under $tss_ '
+ '-StartingAddress "{StartingAddress}" '
+ '-NumNodes {NumNodes}'.format(**_entry))
+
+ def delete_test_session(self, test_session):
+ raise NotImplementedError
+
+ def _save_test_session(self):
+ # Call 'Validate' to set default values for missing parameters
+ res = self._tcl.execute('ls::perform Validate -TestSession $test_')
+ if res == 'Invalid':
+ res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings')
+ raise exceptions.LandslideTclException(
+ "Test session validation failed. Server response: {}".format(
+ res))
+ else:
+ self._tcl.execute('ls::save $test_ -overwrite')
+ LOG.debug("Test session saved successfully.")
+
+ def _get_library_id(self, library):
+ _library_id = self._tcl.execute(
+ "ls::get [ls::query LibraryInfo -systemLibraryName {}] -Id".format(
+ library))
+ try:
+ int(_library_id)
+ return _library_id
+ except ValueError:
+ pass
+
+ _library_id = self._tcl.execute(
+ "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format(
+ library))
+ try:
+ int(_library_id)
+ except ValueError:
+ LOG.error("_get_library_id: library='%s' not found.", library)
+ raise exceptions.LandslideTclException(
+ "_get_library_id: library='{}' not found.".format(
+ library))
+
+ return _library_id
+
+ def resolve_test_server_name(self, ts_name):
+ return self._tcl.execute("ls::query TsId {}".format(ts_name))
+
+
+class LsTclHandler(object):
+ """Landslide TCL Handler class"""
+
+ LS_OK = "ls_ok"
+ JRE_PATH = net_serv_utils.get_nsb_option('jre_path_i386')
+
+ def __init__(self):
+ self.tcl_cmds = {}
+ self._ls = LsApi(jre_path=self.JRE_PATH)
+ self._ls.tcl(
+ "ls::config ApiOptions -NoReturnSuccessResponseString '{}'".format(
+ self.LS_OK))
+
+ def execute(self, command):
+ res = self._ls.tcl(command)
+ self.tcl_cmds[command] = res
+ return res
diff --git a/yardstick/tests/unit/network_services/traffic_profile/test_landslide_profile.py b/yardstick/tests/unit/network_services/traffic_profile/test_landslide_profile.py
new file mode 100644
index 000000000..afd550029
--- /dev/null
+++ b/yardstick/tests/unit/network_services/traffic_profile/test_landslide_profile.py
@@ -0,0 +1,136 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import unittest
+
+from yardstick.network_services.traffic_profile import landslide_profile
+
+TP_CONFIG = {
+ 'schema': "nsb:traffic_profile:0.1",
+ 'name': 'LandslideProfile',
+ 'description': 'Spirent Landslide traffic profile (Data Message Flow)',
+ 'traffic_profile': {
+ 'traffic_type': 'LandslideProfile'
+ },
+ 'dmf_config': {
+ 'dmf': {
+ 'library': 'test',
+ 'name': 'Fireball UDP',
+ 'description': "Basic data flow using UDP/IP (Fireball DMF)",
+ 'keywords': 'UDP ',
+ 'dataProtocol': 'fb_udp',
+ 'burstCount': 1,
+ 'clientPort': {
+ 'clientPort': 2002,
+ 'isClientPortRange': 'false'
+ },
+ 'serverPort': 2003,
+ 'connection': {
+ 'initiatingSide': 'Client',
+ 'disconnectSide': 'Client',
+ 'underlyingProtocol': 'none',
+ 'persistentConnection': 'false'
+ },
+ 'protocolId': 0,
+ 'persistentConnection': 'false',
+ 'transactionRate': 8.0,
+ 'transactions': {
+ 'totalTransactions': 0,
+ 'retries': 0,
+ 'dataResponseTime': 60000,
+ 'packetSize': 64
+ },
+ 'segment': {
+ 'segmentSize': 64000,
+ 'maxSegmentSize': 0
+ },
+ 'size': {
+ 'sizeDistribution': 'Fixed',
+ 'sizeDeviation': 10
+ },
+ 'interval': {
+ 'intervalDistribution': 'Fixed',
+ 'intervalDeviation': 10
+ },
+ 'ipHeader': {
+ 'typeOfService': 0,
+ 'timeToLive': 64
+ },
+ 'tcpConnection': {
+ 'force3Way': 'false',
+ 'fixedRetryTime': 0,
+ 'maxPacketsToForceAck': 0
+ },
+ 'tcp': {
+ 'windowSize': 32768,
+ 'windowScaling': -1,
+ 'disableFinAckWait': 'false'
+ },
+ 'disconnectType': 'FIN',
+ 'slowStart': 'false',
+ 'connectOnly': 'false',
+ 'vtag': {
+ 'VTagMask': '0x0',
+ 'VTagValue': '0x0'
+ },
+ 'sctpPayloadProtocolId': 0,
+ 'billingIncludeSyn': 'true',
+ 'billingIncludeSubflow': 'true',
+ 'billingRecordPerTransaction': 'false',
+ 'tcpPush': 'false',
+ 'hostDataExpansionRatio': 1
+ }
+ }
+}
+DMF_OPTIONS = {
+ 'dmf': {
+ 'transactionRate': 5,
+ 'packetSize': 512,
+ 'burstCount': 1
+ }
+}
+
+
+class TestLandslideProfile(unittest.TestCase):
+
+ def test___init__(self):
+ ls_traffic_profile = landslide_profile.LandslideProfile(TP_CONFIG)
+ self.assertListEqual([TP_CONFIG["dmf_config"]],
+ ls_traffic_profile.dmf_config)
+
+ def test___init__config_not_a_dict(self):
+ _tp_config = copy.deepcopy(TP_CONFIG)
+ _tp_config['dmf_config'] = [_tp_config['dmf_config']]
+ ls_traffic_profile = landslide_profile.LandslideProfile(_tp_config)
+ self.assertListEqual(_tp_config['dmf_config'],
+ ls_traffic_profile.dmf_config)
+
+ def test_execute(self):
+ ls_traffic_profile = landslide_profile.LandslideProfile(TP_CONFIG)
+ self.assertIsNone(ls_traffic_profile.execute(None))
+
+ def test_update_dmf_options_dict(self):
+ ls_traffic_profile = landslide_profile.LandslideProfile(TP_CONFIG)
+ ls_traffic_profile.update_dmf(DMF_OPTIONS)
+ self.assertDictContainsSubset(DMF_OPTIONS['dmf'],
+ ls_traffic_profile.dmf_config[0])
+
+ def test_update_dmf_options_list(self):
+ ls_traffic_profile = landslide_profile.LandslideProfile(TP_CONFIG)
+ _dmf_options = copy.deepcopy(DMF_OPTIONS)
+ _dmf_options['dmf'] = [_dmf_options['dmf']]
+ ls_traffic_profile.update_dmf(_dmf_options)
+ self.assertTrue(all([x in ls_traffic_profile.dmf_config[0]
+ for x in DMF_OPTIONS['dmf']]))
diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_epc_vnf.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_epc_vnf.py
new file mode 100644
index 000000000..6d14ddb54
--- /dev/null
+++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_epc_vnf.py
@@ -0,0 +1,94 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import unittest
+import uuid
+
+from yardstick.network_services.vnf_generic.vnf import epc_vnf
+
+NAME = 'vnf__0'
+
+VNFD = {
+ 'vnfd:vnfd-catalog': {
+ 'vnfd': [{
+ 'id': 'EPCVnf', # NSB python class mapping
+ 'name': 'EPCVnf',
+ 'short-name': 'EPCVnf',
+ 'description': 'EPCVnf',
+ 'mgmt-interface': {
+ 'vdu-id': 'vepcvnf-baremetal',
+ 'user': 'user', # Value filled by vnfdgen
+ 'password': 'password', # Value filled by vnfdgen
+ 'ip': 'ip' # Value filled by vnfdgen
+ },
+ 'vdu': [{
+ 'id': 'vepcvnf-baremetal',
+ 'name': 'vepc-vnf-baremetal',
+ 'description': 'vEPCVnf workload',
+ 'external-interface': []}],
+ 'benchmark': {
+ 'kpi': []}}]}}
+
+
+class TestEPCVnf(unittest.TestCase):
+
+ def setUp(self):
+ self._id = uuid.uuid1().int
+ self.vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+ self.epc_vnf = epc_vnf.EPCVnf(NAME, self.vnfd, self._id)
+
+ def test___init__(self, *args):
+ _epc_vnf = epc_vnf.EPCVnf(NAME, self.vnfd, self._id)
+ for x in {'user', 'password', 'ip'}:
+ self.assertEqual(self.vnfd['mgmt-interface'][x],
+ _epc_vnf.vnfd_helper.mgmt_interface[x])
+ self.assertEqual(NAME, _epc_vnf.name)
+ self.assertEqual([], _epc_vnf.kpi)
+ self.assertEqual({}, _epc_vnf.config)
+ self.assertFalse(_epc_vnf.runs_traffic)
+
+ def test___init__missing_ip(self, *args):
+ _vnfd = copy.deepcopy(self.vnfd)
+ _vnfd['mgmt-interface'].pop('ip')
+ _epc_vnf = epc_vnf.EPCVnf(NAME, _vnfd, self._id)
+ for x in {'user', 'password'}:
+ self.assertEqual(_vnfd['mgmt-interface'][x],
+ _epc_vnf.vnfd_helper.mgmt_interface[x])
+ self.assertNotIn('ip', _epc_vnf.vnfd_helper.mgmt_interface)
+ self.assertEqual(NAME, _epc_vnf.name)
+ self.assertEqual([], _epc_vnf.kpi)
+ self.assertEqual({}, _epc_vnf.config)
+ self.assertFalse(_epc_vnf.runs_traffic)
+
+ def test_instantiate(self):
+ self.assertIsNone(self.epc_vnf.instantiate({}, {}))
+
+ def test_wait_for_instantiate(self):
+ self.assertIsNone(self.epc_vnf.wait_for_instantiate())
+
+ def test_terminate(self):
+ self.assertIsNone(self.epc_vnf.terminate())
+
+ def test_scale(self):
+ self.assertIsNone(self.epc_vnf.scale())
+
+ def test_collect_kpi(self):
+ self.assertIsNone(self.epc_vnf.collect_kpi())
+
+ def test_start_collect(self):
+ self.assertIsNone(self.epc_vnf.start_collect())
+
+ def test_stop_collect(self):
+ self.assertIsNone(self.epc_vnf.stop_collect())
diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py
new file mode 100644
index 000000000..53439972a
--- /dev/null
+++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py
@@ -0,0 +1,1911 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import mock
+import requests
+import time
+import unittest
+import uuid
+
+from yardstick.benchmark.contexts import base as ctx_base
+from yardstick.common import exceptions
+from yardstick.common import utils as common_utils
+from yardstick.common import yaml_loader
+from yardstick.network_services import utils as net_serv_utils
+from yardstick.network_services.traffic_profile import landslide_profile
+from yardstick.network_services.vnf_generic.vnf import sample_vnf
+from yardstick.network_services.vnf_generic.vnf import tg_landslide
+
+
+NAME = "tg__0"
+
+EXAMPLE_URL = 'http://example.com/'
+TCL_SUCCESS_RESPONSE = 'ls_ok'
+
+TEST_SERVERS = [
+ {'ip': '192.168.122.101',
+ 'phySubnets': [
+ {'mask': '/24',
+ 'base': '10.42.32.100',
+ 'numIps': 20,
+ 'name': 'eth1'}
+ ],
+ 'role': 'SGW_Node',
+ 'name': 'TestServer_1'},
+ {'ip': '192.168.122.102',
+ 'phySubnets': [
+ {'mask': '/24',
+ 'base': '10.42.32.1',
+ 'numIps': 100,
+ 'name': 'eth1'
+ },
+ {'mask': '/24',
+ 'base': '10.42.33.1',
+ 'numIps': 100,
+ 'name': 'eth2'}
+ ],
+ 'preResolvedArpAddress': [
+ {'NumNodes': 1,
+ 'StartingAddress': '10.42.33.5'}
+ ],
+ 'role': 'SGW_Nodal',
+ 'name': 'TestServer_2',
+ 'thread_model': 'Fireball'
+ }
+]
+
+TS1_SUTS = [
+ {'name': 'SGW - C TestNode',
+ 'role': 'SgwControlAddr',
+ 'managementIp': '12.0.1.1',
+ 'ip': '10.42.32.100',
+ 'phy': 'eth5',
+ 'nextHop': '10.42.32.5'
+ },
+ {'name': 'SGW - U TestNode',
+ 'role': 'SgwUserAddr',
+ 'managementIp': '12.0.1.2',
+ 'ip': '10.42.32.101',
+ 'phy': 'eth5',
+ 'nextHop': '10.42.32.5'
+ }
+]
+
+TS2_SUTS = [
+ {'name': 'eNodeB TestNode',
+ 'role': 'EnbUserAddr',
+ 'managementIp': '12.0.2.1',
+ 'ip': '10.42.32.2',
+ 'phy': 'eth5',
+ 'nextHop': '10.42.32.5'
+ },
+ {'name': 'MME TestNode',
+ 'role': 'MmeControlAddr',
+ 'managementIp': '12.0.3.1',
+ 'ip': '10.42.32.1',
+ 'phy': 'eth5',
+ 'nextHop': '10.42.32.5'
+ },
+ {'name': 'NetHost TestNode',
+ 'role': 'NetworkHostAddrLocal',
+ 'managementIp': '12.0.4.1',
+ 'ip': '10.42.33.1',
+ 'phy': 'eth5',
+ 'nextHop': '10.42.32.5'
+ },
+ {'name': 'PGW TestNode',
+ 'role': 'PgwV4Sut',
+ 'managementIp': '12.0.5.1',
+ 'ip': '10.42.32.105',
+ 'phy': 'eth5',
+ 'nextHop': '10.42.32.5'
+ },
+ {'name': 'SGW - C SUT',
+ 'role': 'SgwSut',
+ 'managementIp': '12.0.6.1',
+ 'ip': '10.42.32.100'
+ },
+ {'name': 'SGW - U SUT',
+ 'role': 'SgwUserSut',
+ 'managementIp': '12.0.6.2',
+ 'ip': '10.42.32.101'}
+]
+
+VNFD = {
+ 'vnfd:vnfd-catalog': {
+ 'vnfd': [{
+ 'short-name': 'landslide',
+ 'vdu': [{
+ 'description': 'AB client interface details',
+ 'name': 'abclient-baremetal',
+ 'id': 'abclient-baremetal',
+ 'external-interface': []}],
+ 'description': 'Spirent Landslide traffic generator',
+ 'config': [{'test_server': TEST_SERVERS[0], 'suts': TS1_SUTS},
+ {'test_server': TEST_SERVERS[1], 'suts': TS2_SUTS}],
+ 'mgmt-interface': {
+ 'vdu-id': 'landslide-tas',
+ 'user': 'user',
+ 'password': 'user',
+ 'super-user': 'super-user',
+ 'super-user-password': 'super-user-password',
+ 'cfguser_password': 'cfguser_password',
+ 'license': 48,
+ 'proto': 'http',
+ 'ip': '1.1.1.1'},
+ 'benchmark': {
+ 'kpi': [
+ 'tx_throughput_mbps',
+ 'rx_throughput_mbps',
+ 'in_packets',
+ 'out_packets',
+ 'activation_rate_sessps',
+ 'deactivation_rate_sessps']},
+ 'id': 'LandslideTrafficGen',
+ 'name': 'LandslideTrafficGen'}]}}
+
+TAS_INFO = VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface']
+
+DMF_CFG = {
+ "dmf": {
+ "library": "test",
+ "name": "Basic UDP"
+ },
+ "clientPort": {
+ "clientPort": 2002,
+ "isClientPortRange": "false"
+ },
+ "dataProtocol": "udp",
+ "serverPort": 2003
+}
+
+RESERVATIONS = [
+ {'tsName': TEST_SERVERS[0]['name'],
+ 'phySubnets': TEST_SERVERS[0]['phySubnets'],
+ 'tsId': TEST_SERVERS[0]['name'],
+ 'tsIndex': 0},
+ {'tsName': TEST_SERVERS[1]['name'],
+ 'phySubnets': TEST_SERVERS[1]['phySubnets'],
+ 'tsId': TEST_SERVERS[1]['name'],
+ 'tsIndex': 1}]
+
+SESSION_PROFILE = {
+ 'keywords': '',
+ 'duration': 60,
+ 'iterations': 1,
+ 'description': 'UE default bearer creation test case',
+ 'name': 'default_bearer_capacity',
+ 'reportOptions': {'format': 'CSV'},
+ 'reservePorts': 'false',
+ 'tsGroups': [
+ {
+ 'testCases': [{
+ 'type': 'SGW_Node',
+ 'name': '',
+ 'linked': "false",
+ 'AssociatedPhys': '',
+ 'parameters': {
+ 'SgiPtpTunnelEn': 'false',
+ 'Gtp2Imsi': '505024101215074',
+ 'Sessions': '100000',
+ 'S5Protocol': 'GTPv2',
+ 'TrafficMtu': '1500',
+ 'Gtp2Version': '13.6.0',
+ 'BearerV4AddrPool': '1.0.0.1',
+ 'Gtp2Imei': '50502410121507',
+ 'PgwNodeEn': 'true',
+ 'DedicatedsPerDefaultBearer': '0',
+ 'DefaultBearers': '1',
+ 'SgwUserAddr': {
+ 'numLinksOrNodes': 1,
+ 'phy': 'eth1',
+ 'forcedEthInterface': '',
+ 'ip': 'SGW_USER_IP',
+ 'class': 'TestNode',
+ 'ethStatsEnabled': "false",
+ 'mtu': 1500
+ },
+ 'SgwControlAddr': {
+ 'numLinksOrNodes': 1,
+ 'phy': 'eth1',
+ 'forcedEthInterface': '',
+ 'ip': 'SGW_CONTROL_IP',
+ 'class': 'TestNode',
+ 'ethStatsEnabled': "false",
+ 'mtu': 1500,
+ 'nextHop': 'SGW_CONTROL_NEXT_HOP'
+ },
+ 'BearerAddrPool': '2001::1',
+ 'TestType': 'SGW-NODE'
+ }
+ }],
+ 'tsId': TEST_SERVERS[0]['name']},
+ {
+ 'testCases': [{
+ 'type': 'SGW_Nodal',
+ 'name': '',
+ 'parameters': {
+ 'DataTraffic': 'Continuous',
+ 'TrafficStartType': 'When All Sessions Established',
+ 'NetworkHost': 'Local',
+ 'Gtp2Imsi': '505024101215074',
+ 'Dmf': {
+ 'mainflows': [
+ {
+ 'name': 'Basic UDP',
+ 'library': 'test'
+ }
+ ],
+ 'class': 'Dmf',
+ 'instanceGroups': [
+ {
+ 'startPaused': "false",
+ 'rate': 0,
+ 'mainflowIdx': 0,
+ 'mixType': ''
+ }
+ ]
+ },
+ 'S5Protocol': 'GTPv2',
+ 'DataUserCfgFileEn': 'false',
+ 'PgwUserSutEn': 'false',
+ 'MmeControlAddr': {
+ 'numLinksOrNodes': 1,
+ 'phy': 'eth1',
+ 'forcedEthInterface': '',
+ 'ip': 'MME_CONTROL_IP',
+ 'class': 'TestNode',
+ 'ethStatsEnabled': "false",
+ 'mtu': 1500
+ },
+ 'SgwUserSut': {
+ 'class': 'Sut',
+ 'name': 'SGW_USER_NAME'
+ },
+ 'TestActivity': 'Capacity Test',
+ 'NetworkHostAddrLocal': {
+ 'numLinksOrNodes': 1,
+ 'phy': 'eth2',
+ 'forcedEthInterface': '',
+ 'ip': 'NET_HOST_IP',
+ 'class': 'TestNode',
+ 'ethStatsEnabled': "false",
+ 'mtu': 1500
+ },
+ 'DedicatedsPerDefaultBearer': '0',
+ 'DisconnectRate': '1000.0',
+ 'Sessions': '100000',
+ 'SgwSut': {
+ 'class': 'Sut',
+ 'name': 'SGW_CONTROL_NAME'
+ },
+ 'TrafficMtu': '1500',
+ 'Gtp2Version': '13.6.0',
+ 'Gtp2Imei': '50502410121507',
+ 'PgwNodeEn': 'false',
+ 'StartRate': '1000.0',
+ 'PgwV4Sut': {
+ 'class': 'Sut',
+ 'name': 'PGW_SUT_NAME'
+ },
+ 'DefaultBearers': '1',
+ 'EnbUserAddr': {
+ 'numLinksOrNodes': 1,
+ 'phy': 'eth1',
+ 'forcedEthInterface': '',
+ 'ip': 'ENB_USER_IP',
+ 'class': 'TestNode',
+ 'ethStatsEnabled': "false",
+ 'mtu': 1500
+ },
+ 'TestType': 'SGW-NODAL'
+ }
+ }],
+ 'tsId': TEST_SERVERS[1]['name']
+ }
+ ]
+}
+
+
+class TestLandslideTrafficGen(unittest.TestCase):
+ SCENARIO_CFG = {
+ 'session_profile': '/traffic_profiles/landslide/'
+ 'landslide_session_default_bearer.yaml',
+ 'task_path': '',
+ 'runner': {
+ 'type': 'Iteration',
+ 'iterations': 1
+ },
+ 'nodes': {
+ 'tg__0': 'tg__0.traffic_gen',
+ 'vnf__0': 'vnf__0.vnf_epc'
+ },
+ 'topology': 'landslide_tg_topology.yaml',
+ 'type': 'NSPerf',
+ 'traffic_profile': '../../traffic_profiles/landslide/'
+ 'landslide_dmf_udp.yaml',
+ 'options': {
+ 'test_cases': [
+ {
+ 'BearerAddrPool': '2002::2',
+ 'type': 'SGW_Node',
+ 'BearerV4AddrPool': '2.0.0.2',
+ 'Sessions': '90000'
+ },
+ {
+ 'StartRate': '900.0',
+ 'type': 'SGW_Nodal',
+ 'DisconnectRate': '900.0',
+ 'Sessions': '90000'
+ }
+ ],
+ 'dmf':
+ {
+ 'transactionRate': 1000,
+ 'packetSize': 512
+ }
+ }
+ }
+
+ CONTEXT_CFG = {
+ 'contexts': [
+ {
+ 'type': 'Node',
+ 'name': 'traffic_gen',
+ 'file': '/etc/yardstick/nodes/pod_landslide.yaml'
+ },
+ {
+ 'type': 'Node',
+ 'name': 'vnf_epc',
+ 'file': '/etc/yardstick/nodes/pod_vepc_sut.yaml'
+ }
+ ]
+ }
+
+ TRAFFIC_PROFILE = {
+ "schema": "nsb:traffic_profile:0.1",
+ "name": "LandslideProfile",
+ "description": "Spirent Landslide traffic profile",
+ "traffic_profile": {
+ "traffic_type": "LandslideProfile"
+ },
+ "dmf_config": {
+ "dmf": {
+ "library": "test",
+ "name": "Basic UDP"
+ },
+ "description": "Basic data flow using UDP/IP",
+ "keywords": "UDP",
+ "dataProtocol": "udp"
+ }
+ }
+
+ SUCCESS_CREATED_CODE = 201
+ SUCCESS_OK_CODE = 200
+ SUCCESS_RECORD_ID = 5
+ TEST_USER_ID = 11
+
+ def setUp(self):
+ self._id = uuid.uuid1().int
+
+ self.mock_lsapi = mock.patch.object(tg_landslide, 'LsApi')
+ self.mock_lsapi.start()
+
+ self.mock_ssh_helper = mock.patch.object(sample_vnf, 'VnfSshHelper')
+ self.mock_ssh_helper.start()
+ self.vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+ self.ls_tg = tg_landslide.LandslideTrafficGen(
+ NAME, self.vnfd, self._id)
+ self.session_profile = copy.deepcopy(SESSION_PROFILE)
+ self.ls_tg.session_profile = self.session_profile
+
+ self.addCleanup(self._cleanup)
+
+ def _cleanup(self):
+ self.mock_lsapi.stop()
+ self.mock_ssh_helper.stop()
+
+ @mock.patch.object(net_serv_utils, 'get_nsb_option')
+ def test___init__(self, mock_get_nsb_option, *args):
+ _path_to_nsb = 'path/to/nsb'
+ mock_get_nsb_option.return_value = _path_to_nsb
+ ls_tg = tg_landslide.LandslideTrafficGen(NAME, self.vnfd, self._id)
+ self.assertIsInstance(ls_tg.resource_helper,
+ tg_landslide.LandslideResourceHelper)
+ mock_get_nsb_option.assert_called_once_with('bin_path')
+ self.assertEqual(_path_to_nsb, ls_tg.bin_path)
+ self.assertEqual(NAME, ls_tg.name)
+ self.assertTrue(ls_tg.runs_traffic)
+ self.assertFalse(ls_tg.traffic_finished)
+ self.assertIsNone(ls_tg.session_profile)
+
+ def test_listen_traffic(self):
+ _traffic_profile = {}
+ self.assertIsNone(self.ls_tg.listen_traffic(_traffic_profile))
+
+ def test_terminate(self, *args):
+ self.ls_tg.resource_helper._tcl = mock.Mock()
+ self.assertIsNone(self.ls_tg.terminate())
+ self.ls_tg.resource_helper._tcl.disconnect.assert_called_once()
+
+ @mock.patch.object(ctx_base.Context, 'get_context_from_server',
+ return_value='fake_context')
+ def test_instantiate(self, *args):
+ self.ls_tg._tg_process = mock.Mock()
+ self.ls_tg._tg_process.start = mock.Mock()
+ self.ls_tg.resource_helper.connect = mock.Mock()
+ self.ls_tg.resource_helper.create_test_servers = mock.Mock()
+ self.ls_tg.resource_helper.create_suts = mock.Mock()
+ self.ls_tg._load_session_profile = mock.Mock()
+ self.assertIsNone(self.ls_tg.instantiate(self.SCENARIO_CFG,
+ self.CONTEXT_CFG))
+ self.ls_tg.resource_helper.connect.assert_called_once()
+ self.ls_tg.resource_helper.create_test_servers.assert_called_once()
+ _suts_blocks_num = len([item['suts'] for item in self.vnfd['config']])
+ self.assertEqual(_suts_blocks_num,
+ self.ls_tg.resource_helper.create_suts.call_count)
+ self.ls_tg._load_session_profile.assert_called_once()
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_running_tests')
+ def test_run_traffic(self, mock_get_tests, *args):
+ self.ls_tg.resource_helper._url = EXAMPLE_URL
+ self.ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_traffic_profile = mock.Mock(
+ spec=landslide_profile.LandslideProfile)
+ mock_traffic_profile.dmf_config = {'keywords': 'UDP',
+ 'dataProtocol': 'udp'}
+ mock_traffic_profile.params = self.TRAFFIC_PROFILE
+ self.ls_tg.resource_helper._user_id = self.TEST_USER_ID
+ mock_get_tests.return_value = [{'id': self.SUCCESS_RECORD_ID,
+ 'testStateOrStep': 'COMPLETE'}]
+ mock_post = mock.Mock()
+ mock_post.status_code = self.SUCCESS_CREATED_CODE
+ mock_post.json.return_value = {'id': self.SUCCESS_RECORD_ID}
+ mock_session = mock.Mock(spec=requests.Session)
+ mock_session.post.return_value = mock_post
+ self.ls_tg.resource_helper.session = mock_session
+ self.ls_tg.resource_helper._tcl = mock.Mock()
+ _tcl = self.ls_tg.resource_helper._tcl
+ self.assertIsNone(self.ls_tg.run_traffic(mock_traffic_profile))
+ self.assertEqual(self.SUCCESS_RECORD_ID,
+ self.ls_tg.resource_helper.run_id)
+ mock_traffic_profile.update_dmf.assert_called_with(
+ self.ls_tg.scenario_helper.all_options)
+ _tcl.create_dmf.assert_called_with(mock_traffic_profile.dmf_config)
+ _tcl.create_test_session.assert_called_with(self.session_profile)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'check_running_test_state')
+ def test_collect_kpi(self, mock_check_running_test_state, *args):
+ self.ls_tg.resource_helper.run_id = self.SUCCESS_RECORD_ID
+ mock_check_running_test_state.return_value = 'COMPLETE'
+ self.assertEqual({'done': True}, self.ls_tg.collect_kpi())
+ mock_check_running_test_state.assert_called_once()
+
+ def test_wait_for_instantiate(self):
+ self.assertIsNone(self.ls_tg.wait_for_instantiate())
+ self.ls_tg.wait_for_instantiate()
+
+ def test__update_session_suts_no_tc_role(self, *args):
+ _suts = [{'role': 'epc_role'}]
+ _testcase = {'parameters': {'diff_epc_role': {'class': 'Sut'}}}
+ res = self.ls_tg._update_session_suts(_suts, _testcase)
+ self.assertEqual(_testcase, res)
+
+ def test__update_session_suts(self, *args):
+
+ def get_testnode_param(role, key, session_prof):
+ """ Get value by key from the deep nested dict to avoid calls like:
+ e.g. session_prof['tsGroups'][0]['testCases'][1]['parameters'][key]
+ """
+ for group in session_prof['tsGroups']:
+ for tc in group['testCases']:
+ tc_params = tc['parameters']
+ if tc_params.get(role):
+ return tc_params[role][key]
+
+ def get_sut_param(role, key, suts):
+ """ Search list of dicts for one with specific role.
+ Return the value of related dict by key. Expect key presence.
+ """
+ for sut in suts:
+ if sut.get('role') == role:
+ return sut[key]
+
+ # TestNode to verify
+ testnode_role = 'SgwControlAddr'
+ # SUT to verify
+ sut_role = 'SgwUserSut'
+
+ config_suts = [config['suts'] for config in self.vnfd['config']]
+ session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups']
+ for _tc in _ts_group['testCases']]
+ for suts, tc in zip(config_suts, session_tcs):
+ self.assertEqual(tc, self.ls_tg._update_session_suts(suts, tc))
+
+ # Verify TestNode class objects keys were updated
+ for _key in {'ip', 'phy', 'nextHop'}:
+ self.assertEqual(
+ get_testnode_param(testnode_role, _key, self.ls_tg.session_profile),
+ get_sut_param(testnode_role, _key, TS1_SUTS))
+ # Verify Sut class objects name was updated
+ self.assertEqual(
+ get_testnode_param(sut_role, 'name', self.ls_tg.session_profile),
+ get_sut_param(sut_role, 'name', TS2_SUTS))
+
+ def test__update_session_test_servers(self, *args):
+ for ts_index, ts in enumerate(TEST_SERVERS):
+ self.assertIsNone(
+ self.ls_tg._update_session_test_servers(ts, ts_index))
+ # Verify preResolvedArpAddress key was added
+ self.assertTrue(any(
+ _item.get('preResolvedArpAddress')
+ for _item in self.ls_tg.session_profile['tsGroups']))
+ # Verify reservations key was added to session profile
+ self.assertEqual(RESERVATIONS,
+ self.ls_tg.session_profile.get('reservations'))
+ self.assertEqual('true',
+ self.ls_tg.session_profile.get('reservePorts'))
+
+ def test__update_session_tc_params_assoc_phys(self):
+ _tc_options = {'AssociatedPhys': 'eth1'}
+ _testcase = {}
+ _testcase_orig = copy.deepcopy(_testcase)
+ res = self.ls_tg._update_session_tc_params(_tc_options, _testcase)
+ self.assertNotEqual(_testcase_orig, res)
+ self.assertEqual(_tc_options, _testcase)
+
+ def test__update_session_tc_params(self, *args):
+
+ def get_session_tc_param_value(param, tc_type, session_prof):
+ """ Get param value from the deep nested dict to avoid calls like:
+ session_prof['tsGroups'][0]['testCases'][0]['parameters'][key]
+ """
+ for test_group in session_prof['tsGroups']:
+ session_tc = test_group['testCases'][0]
+ if session_tc['type'] == tc_type:
+ return session_tc['parameters'].get(param)
+
+ session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups']
+ for _tc in _ts_group['testCases']]
+ scenario_tcs = [_tc for _tc in
+ self.SCENARIO_CFG['options']['test_cases']]
+ for tc_options, tc in zip(scenario_tcs, session_tcs):
+ self.assertEqual(
+ tc,
+ self.ls_tg._update_session_tc_params(tc_options, tc))
+
+ # Verify that each test case parameter was updated
+ # Params been compared are deeply nested. Using loops to ease access.
+ for _tc in self.SCENARIO_CFG['options']['test_cases']:
+ for _key, _val in _tc.items():
+ if _key != 'type':
+ self.assertEqual(
+ _val,
+ get_session_tc_param_value(_key, _tc.get('type'),
+ self.ls_tg.session_profile))
+
+ @mock.patch.object(common_utils, 'open_relative_file')
+ @mock.patch.object(yaml_loader, 'yaml_load')
+ @mock.patch.object(tg_landslide.LandslideTrafficGen,
+ '_update_session_test_servers')
+ @mock.patch.object(tg_landslide.LandslideTrafficGen,
+ '_update_session_suts')
+ @mock.patch.object(tg_landslide.LandslideTrafficGen,
+ '_update_session_tc_params')
+ def test__load_session_profile(self, mock_upd_ses_tc_params,
+ mock_upd_ses_suts, mock_upd_ses_ts,
+ mock_yaml_load, *args):
+ self.ls_tg.scenario_helper.scenario_cfg = \
+ copy.deepcopy(self.SCENARIO_CFG)
+ mock_yaml_load.return_value = copy.deepcopy(SESSION_PROFILE)
+ self.assertIsNone(self.ls_tg._load_session_profile())
+ self.assertIsNotNone(self.ls_tg.session_profile)
+ # Number of blocks in configuration files
+ # Number of test servers, suts and tc params blocks should be equal
+ _config_files_blocks_num = len([item['test_server']
+ for item in self.vnfd['config']])
+ self.assertEqual(_config_files_blocks_num,
+ mock_upd_ses_ts.call_count)
+ self.assertEqual(_config_files_blocks_num,
+ mock_upd_ses_suts.call_count)
+ self.assertEqual(_config_files_blocks_num,
+ mock_upd_ses_tc_params.call_count)
+
+ @mock.patch.object(common_utils, 'open_relative_file')
+ @mock.patch.object(yaml_loader, 'yaml_load')
+ def test__load_session_profile_unequal_num_of_cfg_blocks(
+ self, mock_yaml_load, *args):
+ vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0])
+ ls_traffic_gen = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id)
+ ls_traffic_gen.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_yaml_load.return_value = copy.deepcopy(SESSION_PROFILE)
+ # Delete test_servers item from pod file to make it not valid
+ ls_traffic_gen.vnfd_helper['config'].pop()
+ with self.assertRaises(RuntimeError):
+ ls_traffic_gen._load_session_profile()
+
+ @mock.patch.object(common_utils, 'open_relative_file')
+ @mock.patch.object(yaml_loader, 'yaml_load')
+ def test__load_session_profile_test_type_mismatch(self, mock_yaml_load,
+ *args):
+ vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0])
+ # Swap test servers data in pod file
+ vnfd['config'] = list(reversed(vnfd['config']))
+ ls_tg = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id)
+ ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_yaml_load.return_value = SESSION_PROFILE
+ with self.assertRaises(RuntimeError):
+ ls_tg._load_session_profile()
+
+
+class TestLandslideResourceHelper(unittest.TestCase):
+
+ PROTO_PORT = 8080
+ EXAMPLE_URL = ''.join([TAS_INFO['proto'], '://', TAS_INFO['ip'], ':',
+ str(PROTO_PORT), '/api/'])
+ SUCCESS_CREATED_CODE = 201
+ SUCCESS_OK_CODE = 200
+ INVALID_REST_CODE = '400'
+ NOT_MODIFIED_CODE = 500810
+ ERROR_CODE = 500800
+ SUCCESS_RECORD_ID = 11
+ EXPIRE_DATE = '2020/01/01 12:00 FLE Standard Time'
+ TEST_USER = 'test'
+ TEST_TERMINATED = 1
+ AUTH_DATA = {'user': TAS_INFO['user'], 'password': TAS_INFO['password']}
+ TEST_SESSION_NAME = 'default_bearer_capacity'
+
+ USERS_DATA = {
+ "users": [{
+ "url": ''.join([EXAMPLE_URL, 'users/', str(SUCCESS_RECORD_ID)]),
+ "id": SUCCESS_RECORD_ID,
+ "level": 1,
+ "username": TEST_USER
+ }]
+ }
+
+ CREATE_USER_DATA = {'username': TAS_INFO['user'],
+ 'expiresOn': EXPIRE_DATE,
+ 'level': 1,
+ 'contactInformation': '',
+ 'fullName': 'Test User',
+ 'password': TAS_INFO['password'],
+ 'isActive': 'true'}
+
+ SUTS_DATA = {
+ "suts": [
+ {
+ "url": ''.join([EXAMPLE_URL, 'suts/', str(SUCCESS_RECORD_ID)]),
+ "id": SUCCESS_RECORD_ID,
+ "name": "10.41.32.1"
+ }]}
+
+ TEST_SERVERS_DATA = {
+ "testServers": [
+ {
+ "url": ''.join([EXAMPLE_URL, "testServers/1"]),
+ "id": 1,
+ "name": TEST_SERVERS[0]['name'],
+ "state": "READY",
+ "version": "16.4.0.10"
+ },
+ {
+ "url": ''.join([EXAMPLE_URL, "testServers/2"]),
+ "id": 2,
+ "name": TEST_SERVERS[1]['name'],
+ "state": "READY",
+ "version": "16.4.0.10"
+ }
+
+ ]
+ }
+
+ RUN_ID = 3
+
+ RUNNING_TESTS_DATA = {
+ "runningTests": [{
+ "url": ''.join([EXAMPLE_URL, "runningTests/{}".format(RUN_ID)]),
+ "measurementsUrl": ''.join(
+ [EXAMPLE_URL,
+ "runningTests/{}/measurements".format(RUN_ID)]),
+ "criteriaUrl": ''.join(
+ [EXAMPLE_URL,
+ "runningTests/{}/criteria".format(RUN_ID)]),
+ "noteToUser": "",
+ "id": RUN_ID,
+ "library": SUCCESS_RECORD_ID,
+ "name": "default_bearer_capacity",
+ "user": TEST_USER,
+ "criteriaStatus": "NA",
+ "testStateOrStep": "COMPLETE"
+ }]}
+
+ TEST_RESULTS_DATA = {
+ "interval": 0,
+ "elapsedTime": 138,
+ "actualTime": 1521548057296,
+ "iteration": 1,
+ "tabs": {
+ "Test Summary": {
+ "Start Time": "Tue Mar 20 07:11:55 CDT 2018",
+ "Actual Dedicated Bearer Session Connects": "100",
+ "Actual Dedicated Bearer Session Disconnects": "100",
+ "Actual Disconnect Rate(Sessions / Second)(P - I)": "164.804",
+ "Average Session Disconnect Time(P - I)": "5.024 s",
+ "Total Data Sent + Received Packets / Sec(P - I)": "1,452.294"
+ }}}
+
+ def setUp(self):
+ self.mock_lsapi = mock.patch.object(tg_landslide, 'LsApi')
+ self.mock_lsapi.start()
+
+ mock_env_helper = mock.Mock()
+ self.res_helper = tg_landslide.LandslideResourceHelper(mock_env_helper)
+ self.res_helper._url = EXAMPLE_URL
+
+ self.addCleanup(self._cleanup)
+
+ def _cleanup(self):
+ self.mock_lsapi.stop()
+ self.res_helper._url = None
+
+ def test___init__(self, *args):
+ self.assertIsInstance(self.res_helper,
+ tg_landslide.LandslideResourceHelper)
+ self.assertEqual({}, self.res_helper._result)
+ self.assertIsNone(self.res_helper.run_id)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'stop_running_tests')
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_running_tests')
+ def test_abort_running_tests_no_running_tests(self, mock_get_tests,
+ mock_stop_tests, *args):
+ tests_data = [{'id': self.SUCCESS_RECORD_ID,
+ 'testStateOrStep': 'COMPLETE'}]
+ mock_get_tests.return_value = tests_data
+ self.assertIsNone(self.res_helper.abort_running_tests())
+ mock_stop_tests.assert_not_called()
+
+ @mock.patch.object(time, 'sleep')
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'stop_running_tests')
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_running_tests')
+ def test_abort_running_tests(self, mock_get_tests, mock_stop_tests, *args):
+ test_states_seq = iter(['RUNNING', 'COMPLETE'])
+
+ def configure_mock(*args):
+ return [{'id': self.SUCCESS_RECORD_ID,
+ 'testStateOrStep': next(test_states_seq)}]
+
+ mock_get_tests.side_effect = configure_mock
+ self.assertIsNone(self.res_helper.abort_running_tests())
+ mock_stop_tests.assert_called_once_with(
+ running_test_id=self.SUCCESS_RECORD_ID,
+ force=True)
+ self.assertEqual(2, mock_get_tests.call_count)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'stop_running_tests')
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_running_tests')
+ def test_abort_running_tests_error(self, mock_get_tests, mock_stop_tests,
+ *args):
+ tests_data = {'id': self.SUCCESS_RECORD_ID,
+ 'testStateOrStep': 'RUNNING'}
+ mock_get_tests.return_value = [tests_data]
+ with self.assertRaises(RuntimeError):
+ self.res_helper.abort_running_tests(timeout=1, delay=1)
+ mock_stop_tests.assert_called_with(
+ running_test_id=self.SUCCESS_RECORD_ID,
+ force=True)
+
+ def test__build_url(self, *args):
+ resource = 'users'
+ action = {'action': 'userCreate'}
+ expected_url = ''.join([EXAMPLE_URL, 'users?action=userCreate'])
+ self.assertEqual(expected_url,
+ self.res_helper._build_url(resource, action))
+
+ def test__build_url_error(self, *args):
+ resource = ''
+ action = {'action': 'userCreate'}
+
+ with self.assertRaises(ValueError):
+ self.res_helper._build_url(resource, action)
+
+ def test_get_response_params(self, *args):
+ method = 'get'
+ resource = 'users'
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.USERS_DATA}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ resp = self.res_helper.get_response_params(method, resource)
+ self.assertTrue(resp)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper, '_get_users')
+ @mock.patch.object(time, 'time')
+ def test__create_user(self, mock_time, mock_get_users, *args):
+ mock_time.strftime.return_value = self.EXPIRE_DATE
+ post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE,
+ 'json.return_value': {'id': self.SUCCESS_RECORD_ID}}
+ mock_session = mock.Mock(spec=requests.Session)
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ self.assertEqual(self.SUCCESS_RECORD_ID,
+ self.res_helper._create_user(self.AUTH_DATA))
+ mock_get_users.assert_not_called()
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper, '_modify_user')
+ @mock.patch.object(time, 'time')
+ def test__create_user_username_exists(self, mock_time, mock_modify_user,
+ *args):
+ mock_time.strftime.return_value = self.EXPIRE_DATE
+ mock_modify_user.return_value = {'id': self.SUCCESS_RECORD_ID,
+ 'result': 'No changes requested'}
+ post_resp_data = {
+ 'status_code': self.ERROR_CODE,
+ 'json.return_value': {'id': self.SUCCESS_OK_CODE,
+ 'apiCode': self.NOT_MODIFIED_CODE}}
+ mock_session = mock.Mock(spec=requests.Session)
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ res = self.res_helper._create_user(self.AUTH_DATA)
+ mock_modify_user.assert_called_once_with(TAS_INFO['user'],
+ {'isActive': 'true'})
+ self.assertEqual(self.SUCCESS_RECORD_ID, res)
+
+ @mock.patch.object(time, 'time')
+ def test__create_user_error(self, mock_time, *args):
+ mock_time.strftime.return_value = self.EXPIRE_DATE
+ mock_session = mock.Mock(spec=requests.Session)
+ post_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': {'apiCode': self.ERROR_CODE}}
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ with self.assertRaises(exceptions.RestApiError):
+ self.res_helper._create_user(self.AUTH_DATA)
+
+ def test__modify_user(self, *args):
+ post_data = {'username': 'test_user'}
+ mock_session = mock.Mock(spec=requests.Session)
+ post_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': {'id': self.SUCCESS_RECORD_ID}}
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ res = self.res_helper._modify_user(username=self.TEST_USER,
+ fields=post_data)
+ self.assertEqual(self.SUCCESS_RECORD_ID, res['id'])
+
+ def test__modify_user_rest_resp_fail(self, *args):
+ post_data = {'non-existing-key': ''}
+ mock_session = mock.Mock(spec=requests.Session)
+ mock_session.post.ok = False
+ self.res_helper.session = mock_session
+ self.assertRaises(exceptions.RestApiError,
+ self.res_helper._modify_user,
+ username=self.TEST_USER, fields=post_data)
+ mock_session.post.assert_called_once()
+
+ def test__delete_user(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ self.res_helper.session = mock_session
+ self.assertIsNone(self.res_helper._delete_user(
+ username=self.TEST_USER))
+
+ def test__get_users(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.USERS_DATA}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ self.assertEqual(self.USERS_DATA['users'],
+ self.res_helper._get_users())
+
+ def test_exec_rest_request(self, *args):
+ resource = 'testServers'
+ action = {'action': 'modify'}
+ expected_url = ''.join([EXAMPLE_URL, 'testServers?action=modify'])
+ post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE,
+ 'json.return_value': {'id': self.SUCCESS_RECORD_ID}}
+ mock_session = mock.Mock(spec=requests.Session)
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ self.res_helper.exec_rest_request('post', resource, action)
+ self.res_helper.session.post.assert_called_once_with(expected_url,
+ json={})
+
+ def test_exec_rest_request_unsupported_method_error(self, *args):
+ resource = 'testServers'
+ action = {'action': 'modify'}
+ with self.assertRaises(ValueError):
+ self.res_helper.exec_rest_request('patch', resource, action)
+
+ def test_exec_rest_request_missed_action_arg(self, *args):
+ resource = 'testServers'
+ with self.assertRaises(ValueError):
+ self.res_helper.exec_rest_request('post', resource)
+
+ def test_exec_rest_request_raise_exc(self):
+ resource = 'users'
+ action = {'action': 'modify'}
+ post_resp_data = {'status_code': self.ERROR_CODE,
+ 'json.return_value': {
+ 'status_code': self.ERROR_CODE}}
+ mock_session = mock.Mock(spec=requests.Session)
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.assertRaises(exceptions.RestApiError,
+ self.res_helper.exec_rest_request,
+ 'post', resource, action, raise_exc=True)
+
+ @mock.patch.object(time, 'time')
+ def test_connect(self, mock_time, *args):
+ vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+ mock_time.strftime.return_value = self.EXPIRE_DATE
+ self.res_helper.vnfd_helper = vnfd
+
+ self.res_helper._tcl = mock.Mock()
+ post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE,
+ 'json.return_value': {'id': self.SUCCESS_RECORD_ID}}
+ mock_session = mock.Mock(spec=requests.Session, headers={})
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ self.assertIsInstance(self.res_helper.connect(), requests.Session)
+ self.res_helper._tcl.connect.assert_called_once_with(
+ TAS_INFO['ip'],
+ TAS_INFO['user'],
+ TAS_INFO['password'])
+
+ def test_disconnect(self, *args):
+ self.res_helper._tcl = mock.Mock()
+ self.assertIsNone(self.res_helper.disconnect())
+ self.assertIsNone(self.res_helper.session)
+ self.res_helper._tcl.disconnect.assert_called_once()
+
+ def test_terminate(self, *args):
+ self.assertIsNone(self.res_helper.terminate())
+ self.assertEqual(self.TEST_TERMINATED,
+ self.res_helper._terminated.value)
+
+ def test_create_dmf(self, *args):
+ self.res_helper._tcl = mock.Mock()
+ self.assertIsNone(self.res_helper.create_dmf(DMF_CFG))
+ self.res_helper._tcl.create_dmf.assert_called_once_with(DMF_CFG)
+
+ def test_create_dmf_as_list(self, *args):
+ self.res_helper._tcl = mock.Mock()
+ self.assertIsNone(self.res_helper.create_dmf([DMF_CFG]))
+ self.res_helper._tcl.create_dmf.assert_called_once_with(DMF_CFG)
+
+ def test_delete_dmf(self, *args):
+ self.res_helper._tcl = mock.Mock()
+ self.assertIsNone(self.res_helper.delete_dmf(DMF_CFG))
+ self.res_helper._tcl.delete_dmf.assert_called_once_with(DMF_CFG)
+
+ def test_delete_dmf_as_list(self, *args):
+ self.res_helper._tcl = mock.Mock()
+ self.assertIsNone(self.res_helper.delete_dmf([DMF_CFG]))
+ self.res_helper._tcl.delete_dmf.assert_called_once_with(DMF_CFG)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper, 'configure_sut')
+ def test_create_suts(self, mock_configure_sut, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE}
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ self.assertIsNone(self.res_helper.create_suts(TS1_SUTS))
+ mock_configure_sut.assert_not_called()
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper, 'configure_sut')
+ def test_create_suts_sut_exists(self, mock_configure_sut, *args):
+ sut_name = 'test_sut'
+ suts = [
+ {'name': sut_name,
+ 'role': 'SgwControlAddr',
+ 'managementIp': '12.0.1.1',
+ 'ip': '10.42.32.100'
+ }
+ ]
+ mock_session = mock.Mock(spec=requests.Session)
+ post_resp_data = {'status_code': self.NOT_MODIFIED_CODE}
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ self.assertIsNone(self.res_helper.create_suts(suts))
+ mock_configure_sut.assert_called_once_with(
+ sut_name=sut_name,
+ json_data={k: v for k, v in suts[0].items()
+ if k not in {'phy', 'nextHop', 'role', 'name'}})
+
+ def test_get_suts(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.SUTS_DATA}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ self.assertIsInstance(self.res_helper.get_suts(), list)
+
+ def test_get_suts_single_id(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.SUTS_DATA['suts'][0]}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ self.assertIsInstance(self.res_helper.get_suts(suts_id=2), dict)
+
+ def test_configure_sut(self, *args):
+ post_data = {'managementIp': '2.2.2.2'}
+ mock_session = mock.Mock(spec=requests.Session)
+ post_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': {'id': self.SUCCESS_RECORD_ID}}
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ self.assertIsNone(self.res_helper.configure_sut('test_name',
+ post_data))
+ mock_session.post.assert_called_once()
+
+ def test_configure_sut_error(self, *args):
+ post_data = {'managementIp': '2.2.2.2'}
+ mock_session = mock.Mock(spec=requests.Session)
+ post_resp_data = {'status_code': self.NOT_MODIFIED_CODE}
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ with self.assertRaises(exceptions.RestApiError):
+ self.res_helper.configure_sut('test_name', post_data)
+
+ def test_delete_suts(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.SUTS_DATA}
+ delete_resp_data = {'status_code': self.SUCCESS_OK_CODE}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ mock_session.delete.return_value.configure_mock(**delete_resp_data)
+ self.res_helper.session = mock_session
+ self.assertIsNone(self.res_helper.delete_suts())
+ mock_session.delete.assert_called_once()
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_test_servers')
+ def test__check_test_servers_state(self, mock_get_test_servers, *args):
+ mock_get_test_servers.return_value = \
+ self.TEST_SERVERS_DATA['testServers']
+ self.res_helper._check_test_servers_state()
+ mock_get_test_servers.assert_called_once()
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_test_servers')
+ def test__check_test_servers_state_server_not_ready(
+ self, mock_get_test_servers, *args):
+ test_servers_not_ready = [
+ {
+ "url": ''.join([EXAMPLE_URL, "testServers/1"]),
+ "id": 1,
+ "name": "TestServer_1",
+ "state": "NOT_READY",
+ "version": "16.4.0.10"
+ }
+ ]
+
+ mock_get_test_servers.return_value = test_servers_not_ready
+ with self.assertRaises(RuntimeError):
+ self.res_helper._check_test_servers_state(timeout=1, delay=0)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ '_check_test_servers_state')
+ def test_create_test_servers(self, mock_check_ts_state, *args):
+ test_servers_ids = [
+ ts['id'] for ts in self.TEST_SERVERS_DATA['testServers']]
+
+ self.res_helper.license_data['lic_id'] = TAS_INFO['license']
+ self.res_helper._tcl.create_test_server = mock.Mock()
+ self.res_helper._tcl.create_test_server.side_effect = test_servers_ids
+ self.assertIsNone(self.res_helper.create_test_servers(TEST_SERVERS))
+ mock_check_ts_state.assert_called_once_with(test_servers_ids)
+
+ @mock.patch.object(tg_landslide.LandslideTclClient,
+ 'resolve_test_server_name')
+ @mock.patch.object(tg_landslide.LsTclHandler, 'execute')
+ def test_create_test_servers_error(self, mock_execute,
+ mock_resolve_ts_name, *args):
+ self.res_helper.license_data['lic_id'] = TAS_INFO['license']
+ # Return message for case test server wasn't created
+ mock_execute.return_value = 'TS not found'
+ # Return message for case test server name wasn't resolved
+ mock_resolve_ts_name.return_value = 'TS not found'
+ with self.assertRaises(RuntimeError):
+ self.res_helper.create_test_servers(TEST_SERVERS)
+
+ def test_get_test_servers(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.TEST_SERVERS_DATA}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ res = self.res_helper.get_test_servers()
+ self.assertEqual(self.TEST_SERVERS_DATA['testServers'], res)
+
+ def test_get_test_servers_by_id(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+
+ _ts = self.TEST_SERVERS_DATA['testServers'][0]
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': _ts}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ res = self.res_helper.get_test_servers(test_server_ids=[_ts['id']])
+ self.assertEqual([_ts], res)
+
+ def test_configure_test_servers(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.TEST_SERVERS_DATA}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ res = self.res_helper.configure_test_servers(
+ action={'action': 'recycle'})
+ self.assertEqual(
+ [x['id'] for x in self.TEST_SERVERS_DATA['testServers']],
+ res)
+ self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']),
+ mock_session.post.call_count)
+
+ def test_delete_test_servers(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.TEST_SERVERS_DATA}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ self.assertIsNone(self.res_helper.delete_test_servers())
+ self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']),
+ mock_session.delete.call_count)
+
+ def test_create_test_session_res_helper(self, *args):
+ self.res_helper._user_id = self.SUCCESS_RECORD_ID
+ self.res_helper._tcl = mock.Mock()
+ test_session = {'name': 'test'}
+ self.assertIsNone(self.res_helper.create_test_session(test_session))
+ self.res_helper._tcl.create_test_session.assert_called_once_with(
+ {'name': 'test', 'library': self.SUCCESS_RECORD_ID})
+
+ @mock.patch.object(tg_landslide.LandslideTclClient,
+ 'resolve_test_server_name',
+ return_value='Not Found')
+ def test_create_test_session_ts_name_not_found(self, *args):
+ self.res_helper._user_id = self.SUCCESS_RECORD_ID
+ test_session = {
+ 'duration': 60,
+ 'description': 'UE default bearer creation test case',
+ 'name': 'default_bearer_capacity',
+ 'tsGroups': [{'testCases': [{'type': 'SGW_Node',
+ 'name': ''}],
+ 'tsId': 'TestServer_3'}]
+ }
+ with self.assertRaises(RuntimeError):
+ self.res_helper.create_test_session(test_session)
+
+ def test_get_test_session(self, *args):
+ test_session = {"name": self.TEST_SESSION_NAME}
+ self.res_helper._user_id = self.SUCCESS_RECORD_ID
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': test_session}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ res = self.res_helper.get_test_session(self.TEST_SESSION_NAME)
+ self.assertEqual(test_session, res)
+
+ def test_configure_test_session(self, *args):
+ test_session = {'name': self.TEST_SESSION_NAME}
+ self.res_helper._user_id = self.SUCCESS_RECORD_ID
+ self.res_helper.user_lib_uri = 'libraries/{{}}/{}'.format(
+ self.res_helper.test_session_uri)
+ mock_session = mock.Mock(spec=requests.Session)
+ self.res_helper.session = mock_session
+ res = self.res_helper.configure_test_session(self.TEST_SESSION_NAME,
+ test_session)
+ self.assertIsNotNone(res)
+ mock_session.post.assert_called_once()
+
+ def test_delete_test_session(self, *args):
+ self.res_helper._user_id = self.SUCCESS_RECORD_ID
+ self.res_helper.user_lib_uri = 'libraries/{{}}/{}'.format(
+ self.res_helper.test_session_uri)
+ mock_session = mock.Mock(spec=requests.Session)
+ self.res_helper.session = mock_session
+ res = self.res_helper.delete_test_session(self.TEST_SESSION_NAME)
+ self.assertIsNotNone(res)
+ mock_session.delete.assert_called_once()
+
+ def test_create_running_tests(self, *args):
+ self.res_helper._user_id = self.SUCCESS_RECORD_ID
+ test_session = {'id': self.SUCCESS_RECORD_ID}
+ mock_session = mock.Mock(spec=requests.Session)
+ post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE,
+ 'json.return_value': test_session}
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ self.res_helper.create_running_tests(self.TEST_SESSION_NAME)
+ self.assertEqual(self.SUCCESS_RECORD_ID, self.res_helper.run_id)
+
+ def test_create_running_tests_error(self, *args):
+ self.res_helper._user_id = self.SUCCESS_RECORD_ID
+ mock_session = mock.Mock(spec=requests.Session)
+ post_resp_data = {'status_code': self.NOT_MODIFIED_CODE}
+ mock_session.post.return_value.configure_mock(**post_resp_data)
+ self.res_helper.session = mock_session
+ with self.assertRaises(exceptions.RestApiError):
+ self.res_helper.create_running_tests(self.TEST_SESSION_NAME)
+
+ def test_get_running_tests(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.RUNNING_TESTS_DATA}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ res = self.res_helper.get_running_tests()
+ self.assertEqual(self.RUNNING_TESTS_DATA['runningTests'], res)
+
+ def test_delete_running_tests(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ delete_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.RUNNING_TESTS_DATA}
+ mock_session.delete.return_value.configure_mock(**delete_resp_data)
+ self.res_helper.session = mock_session
+ self.assertIsNone(self.res_helper.delete_running_tests())
+
+ def test__running_tests_action(self, *args):
+ action = 'abort'
+ mock_session = mock.Mock(spec=requests.Session)
+ self.res_helper.session = mock_session
+ res = self.res_helper._running_tests_action(self.SUCCESS_RECORD_ID,
+ action)
+ self.assertIsNone(res)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ '_running_tests_action')
+ def test_stop_running_tests(self, mock_tests_action, *args):
+ res = self.res_helper.stop_running_tests(self.SUCCESS_RECORD_ID)
+ self.assertIsNone(res)
+ mock_tests_action.assert_called_once()
+
+ def test_check_running_test_state(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {
+ 'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.RUNNING_TESTS_DATA["runningTests"][0]}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ res = self.res_helper.check_running_test_state(self.SUCCESS_RECORD_ID)
+ self.assertEqual(
+ self.RUNNING_TESTS_DATA["runningTests"][0]['testStateOrStep'],
+ res)
+
+ def test_get_running_tests_results(self, *args):
+ mock_session = mock.Mock(spec=requests.Session)
+ get_resp_data = {'status_code': self.SUCCESS_OK_CODE,
+ 'json.return_value': self.TEST_RESULTS_DATA}
+ mock_session.get.return_value.configure_mock(**get_resp_data)
+ self.res_helper.session = mock_session
+ res = self.res_helper.get_running_tests_results(
+ self.SUCCESS_RECORD_ID)
+ self.assertEqual(self.TEST_RESULTS_DATA, res)
+
+ def test__write_results(self, *args):
+ res = self.res_helper._write_results(self.TEST_RESULTS_DATA)
+ exp_res = {
+ "Test Summary::Actual Dedicated Bearer Session Connects": 100.0,
+ "Test Summary::Actual Dedicated Bearer Session Disconnects": 100.0,
+ "Test Summary::Actual Disconnect Rate(Sessions / Second)(P - I)": 164.804,
+ "Test Summary::Average Session Disconnect Time(P - I)": 5.024,
+ "Test Summary::Total Data Sent + Received Packets / Sec(P - I)": 1452.294
+ }
+ self.assertEqual(exp_res, res)
+
+ def test__write_results_no_tabs(self, *args):
+ _res_data = copy.deepcopy(self.TEST_RESULTS_DATA)
+ del _res_data['tabs']
+ # Return None if tabs not found in test results dict
+ self.assertIsNone(self.res_helper._write_results(_res_data))
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'check_running_test_state')
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_running_tests_results')
+ def test_collect_kpi_test_running(self, mock_tests_results,
+ mock_tests_state, *args):
+ self.res_helper.run_id = self.SUCCESS_RECORD_ID
+ mock_tests_state.return_value = 'RUNNING'
+ mock_tests_results.return_value = self.TEST_RESULTS_DATA
+ res = self.res_helper.collect_kpi()
+ self.assertNotIn('done', res)
+ mock_tests_state.assert_called_once_with(self.res_helper.run_id)
+ mock_tests_results.assert_called_once_with(self.res_helper.run_id)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'check_running_test_state')
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_running_tests_results')
+ def test_collect_kpi_test_completed(self, mock_tests_results,
+ mock_tests_state, *args):
+ self.res_helper.run_id = self.SUCCESS_RECORD_ID
+ mock_tests_state.return_value = 'COMPLETE'
+ res = self.res_helper.collect_kpi()
+ self.assertIsNotNone(res)
+ mock_tests_state.assert_called_once_with(self.res_helper.run_id)
+ mock_tests_results.assert_not_called()
+ self.assertDictContainsSubset({'done': True}, res)
+
+
+class TestLandslideTclClient(unittest.TestCase):
+ def setUp(self):
+ self.mock_tcl_handler = mock.Mock(spec=tg_landslide.LsTclHandler)
+ self.ls_res_helper = mock.Mock(
+ spec=tg_landslide.LandslideResourceHelper)
+ self.ls_tcl_client = tg_landslide.LandslideTclClient(
+ self.mock_tcl_handler,
+ self.ls_res_helper)
+
+ def test___init__(self, *args):
+ self.ls_tcl_client = tg_landslide.LandslideTclClient(
+ self.mock_tcl_handler,
+ self.ls_res_helper)
+ self.assertIsNone(self.ls_tcl_client.tcl_server_ip)
+ self.assertIsNone(self.ls_tcl_client._user)
+ self.assertIsNone(self.ls_tcl_client._library_id)
+ self.assertIsNone(self.ls_tcl_client._basic_library_id)
+ self.assertEqual(set(), self.ls_tcl_client.ts_ids)
+ self.assertIsInstance(self.ls_tcl_client._tc_types, set)
+ self.assertIsNotNone(self.ls_tcl_client._tc_types)
+
+ def test_connect_login_success(self, *args):
+ lib_id = '123'
+ exec_responses = ['java0x2', lib_id, lib_id]
+ auth = ('user', 'password')
+ self.mock_tcl_handler.execute.side_effect = exec_responses
+ self.ls_tcl_client.connect(TAS_INFO['ip'], *auth)
+ self.assertEqual(lib_id, self.ls_tcl_client._library_id)
+ self.assertEqual(lib_id, self.ls_tcl_client._basic_library_id)
+ self.assertEqual(TAS_INFO['ip'], self.ls_tcl_client.tcl_server_ip)
+ self.assertEqual(auth[0], self.ls_tcl_client._user)
+ self.assertEqual(len(exec_responses),
+ self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call("ls::login 1.1.1.1 user password"),
+ mock.call("ls::get [ls::query LibraryInfo -userLibraryName user] -Id"),
+ ])
+
+ def test_connect_login_failed(self, *args):
+ exec_responses = ['Login failed']
+ auth = ('user', 'password')
+ self.mock_tcl_handler.execute.side_effect = exec_responses
+ self.assertRaises(exceptions.LandslideTclException,
+ self.ls_tcl_client.connect,
+ TAS_INFO['ip'],
+ *auth)
+ self.assertIsNone(self.ls_tcl_client._library_id)
+ self.assertIsNone(self.ls_tcl_client._basic_library_id)
+ self.assertIsNone(self.ls_tcl_client.tcl_server_ip)
+ self.assertIsNone(self.ls_tcl_client._user)
+ self.assertEqual(len(exec_responses),
+ self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_called_with(
+ "ls::login 1.1.1.1 user password")
+
+ def test_disconnect(self, *args):
+ self.ls_tcl_client.disconnect()
+ self.mock_tcl_handler.execute.assert_called_once_with("ls::logout")
+ self.assertIsNone(self.ls_tcl_client.tcl_server_ip)
+ self.assertIsNone(self.ls_tcl_client._user)
+ self.assertIsNone(self.ls_tcl_client._library_id)
+ self.assertIsNone(self.ls_tcl_client._basic_library_id)
+
+ def test_create_test_server(self, *args):
+ return_value = '2'
+ self.ls_tcl_client._ts_context.vnfd_helper = \
+ VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+ self.ls_tcl_client._ts_context.license_data = {'lic_id': return_value}
+ self.mock_tcl_handler.execute.return_value = return_value
+ self.ls_tcl_client._set_thread_model = mock.Mock()
+ res = self.ls_tcl_client.create_test_server(TEST_SERVERS[1])
+ self.assertEqual(3, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::query TsId TestServer_2'),
+ mock.call('set ts [ls::retrieve TsInfo -Name "TestServer_2"]'),
+ mock.call('ls::get $ts -RequestedLicense'),
+ ])
+ self.ls_tcl_client._set_thread_model.assert_called_once_with(
+ TEST_SERVERS[1]['name'],
+ TEST_SERVERS[1]['thread_model'])
+ self.assertEqual(int(return_value), res)
+
+ def test_create_test_server_fail_limit_reach(self, *args):
+ self.mock_tcl_handler.execute.side_effect = ['TS not found',
+ 'Add failed']
+ self.assertRaises(RuntimeError,
+ self.ls_tcl_client.create_test_server,
+ TEST_SERVERS[0])
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::query TsId TestServer_1'),
+ mock.call('ls::perform AddTs -Name "TestServer_1" '
+ '-Ip "192.168.122.101"'),
+ ])
+
+ def test__add_test_server(self):
+ ts_id = '2'
+ self.mock_tcl_handler.execute.side_effect = ['TS not found', ts_id]
+ self.assertEqual(ts_id,
+ self.ls_tcl_client._add_test_server('name', 'ip'))
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::query TsId name'),
+ mock.call('ls::perform AddTs -Name "name" -Ip "ip"'),
+ ])
+
+ def test__add_test_server_failed(self):
+ self.mock_tcl_handler.execute.side_effect = ['TS not found',
+ 'Add failed']
+ self.assertRaises(RuntimeError, self.ls_tcl_client._add_test_server,
+ 'name', 'ip')
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::query TsId name'),
+ mock.call('ls::perform AddTs -Name "name" -Ip "ip"'),
+ ])
+
+ def test__update_license(self):
+ curr_lic_id = '111'
+ new_lic_id = '222'
+ exec_resp = ['java0x4',
+ curr_lic_id,
+ TCL_SUCCESS_RESPONSE,
+ TCL_SUCCESS_RESPONSE]
+ self.ls_tcl_client._ts_context.license_data = {'lic_id': new_lic_id}
+ self.mock_tcl_handler.execute.side_effect = exec_resp
+ self.ls_tcl_client._update_license('name')
+ self.assertEqual(len(exec_resp),
+ self.mock_tcl_handler.execute.call_count)
+
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('set ts [ls::retrieve TsInfo -Name "name"]'),
+ mock.call('ls::get $ts -RequestedLicense'),
+ mock.call('ls::config $ts -RequestedLicense 222'),
+ mock.call('ls::perform ModifyTs $ts'),
+ ])
+
+ def test__update_license_same_as_current(self):
+ curr_lic_id = '111'
+ new_lic_id = '111'
+ exec_resp = ['java0x4', curr_lic_id]
+ self.ls_tcl_client._ts_context.license_data = {'lic_id': new_lic_id}
+ self.mock_tcl_handler.execute.side_effect = exec_resp
+ self.ls_tcl_client._update_license('name')
+ self.assertEqual(len(exec_resp),
+ self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('set ts [ls::retrieve TsInfo -Name "name"]'),
+ mock.call('ls::get $ts -RequestedLicense'),
+ ])
+
+ def test__set_thread_model_update_needed(self):
+ self.ls_tcl_client._ts_context.vnfd_helper = {
+ 'mgmt-interface': {
+ 'cfguser_password': 'cfguser_password'
+ }
+ }
+ exec_resp = ['java0x4', 'V0', '', '']
+ self.mock_tcl_handler.execute.side_effect = exec_resp
+ self.ls_tcl_client._set_thread_model('name', 'Fireball')
+ self.assertEqual(len(exec_resp),
+ self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('set tsc [ls::perform RetrieveTsConfiguration '
+ '-name "name" cfguser_password]'),
+ mock.call('ls::get $tsc -ThreadModel'),
+ mock.call('ls::config $tsc -ThreadModel "V1_FB3"'),
+ mock.call('ls::perform ApplyTsConfiguration $tsc cfguser_password'),
+ ])
+
+ def test__set_thread_model_no_update_needed(self):
+ self.ls_tcl_client._ts_context.vnfd_helper = {
+ 'mgmt-interface': {
+ 'cfguser_password': 'cfguser_password'
+ }
+ }
+ exec_resp = ['java0x4', 'V0']
+ self.mock_tcl_handler.execute.side_effect = exec_resp
+ self.ls_tcl_client._set_thread_model('name', 'Legacy')
+ self.assertEqual(len(exec_resp),
+ self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('set tsc [ls::perform RetrieveTsConfiguration '
+ '-name "name" cfguser_password]'),
+ mock.call('ls::get $tsc -ThreadModel'),
+ ])
+
+ @mock.patch.object(tg_landslide.LandslideTclClient,
+ 'resolve_test_server_name', side_effect=['4', '2'])
+ def test_create_test_session(self, *args):
+ _session_profile = copy.deepcopy(SESSION_PROFILE)
+ _session_profile['reservations'] = RESERVATIONS
+ self.ls_tcl_client._save_test_session = mock.Mock()
+ self.ls_tcl_client._configure_ts_group = mock.Mock()
+ self.ls_tcl_client._library_id = 42
+ self.ls_tcl_client.create_test_session(_session_profile)
+ self.assertEqual(17, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('set test_ [ls::create TestSession]'),
+ mock.call('ls::config $test_ -Library 42 '
+ '-Name "default_bearer_capacity"'),
+ mock.call('ls::config $test_ -Description ' \
+ '"UE default bearer creation test case"'),
+ mock.call('ls::config $test_ -Keywords ""'),
+ mock.call('ls::config $test_ -Duration "60"'),
+ mock.call('ls::config $test_ -Iterations "1"'),
+ # _configure_reservation
+ mock.call('set reservation_ [ls::create Reservation -under $test_]'),
+ mock.call('ls::config $reservation_ -TsIndex 0 '
+ '-TsId 4 -TsName "TestServer_1"'),
+ mock.call('set physubnet_ [ls::create PhySubnet -under $reservation_]'),
+ mock.call('ls::config $physubnet_ -Name "eth1" -Base "10.42.32.100" '
+ '-Mask "/24" -NumIps 20'),
+ # _configure_reservation
+ mock.call('set reservation_ [ls::create Reservation -under $test_]'),
+ mock.call('ls::config $reservation_ -TsIndex 1 '
+ '-TsId 2 -TsName "TestServer_2"'),
+ mock.call('set physubnet_ [ls::create PhySubnet -under $reservation_]'),
+ mock.call('ls::config $physubnet_ -Name "eth1" -Base "10.42.32.1" '
+ '-Mask "/24" -NumIps 100'),
+ mock.call('set physubnet_ [ls::create PhySubnet -under $reservation_]'),
+ mock.call('ls::config $physubnet_ -Name "eth2" -Base "10.42.33.1" '
+ '-Mask "/24" -NumIps 100'),
+ # _configure_report_options
+ mock.call('ls::config $test_.ReportOptions -Format 1 -Ts -3 -Tc -3'),
+ ])
+
+ def test_create_dmf(self):
+ self.mock_tcl_handler.execute.return_value = '2'
+ self.ls_tcl_client._save_dmf = mock.Mock()
+ self.ls_tcl_client.create_dmf(copy.deepcopy(DMF_CFG))
+ self.assertEqual(6, self.mock_tcl_handler.execute.call_count)
+ # This is needed because the dictionary is unordered and the arguments
+ # can come in either order
+ call1 = mock.call(
+ 'ls::config $dmf_ -clientPort 2002 -isClientPortRange "false"')
+ call2 = mock.call(
+ 'ls::config $dmf_ -isClientPortRange "false" -clientPort 2002')
+ self.assertTrue(
+ call1 in self.mock_tcl_handler.execute.mock_calls or
+ call2 in self.mock_tcl_handler.execute.mock_calls)
+
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('set dmf_ [ls::create Dmf]'),
+ mock.call(
+ 'ls::get [ls::query LibraryInfo -systemLibraryName test] -Id'),
+ mock.call('ls::config $dmf_ -Library 2 -Name "Basic UDP"'),
+ mock.call('ls::config $dmf_ -dataProtocol "udp"'),
+ # mock.call(
+ # 'ls::config $dmf_ -clientPort 2002 -isClientPortRange "false"'),
+ mock.call('ls::config $dmf_ -serverPort 2003'),
+ ], any_order=True)
+
+ def test_configure_dmf(self):
+ self.mock_tcl_handler.execute.return_value = '2'
+ self.ls_tcl_client._save_dmf = mock.Mock()
+ self.ls_tcl_client.configure_dmf(DMF_CFG)
+ self.assertEqual(6, self.mock_tcl_handler.execute.call_count)
+ # This is need because the dictionary is unordered and the arguments
+ # can come in either order
+ call1 = mock.call(
+ 'ls::config $dmf_ -clientPort 2002 -isClientPortRange "false"')
+ call2 = mock.call(
+ 'ls::config $dmf_ -isClientPortRange "false" -clientPort 2002')
+ self.assertTrue(
+ call1 in self.mock_tcl_handler.execute.mock_calls or
+ call2 in self.mock_tcl_handler.execute.mock_calls)
+
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('set dmf_ [ls::create Dmf]'),
+ mock.call(
+ 'ls::get [ls::query LibraryInfo -systemLibraryName test] -Id'),
+ mock.call('ls::config $dmf_ -Library 2 -Name "Basic UDP"'),
+ mock.call('ls::config $dmf_ -dataProtocol "udp"'),
+ # mock.call(
+ # 'ls::config $dmf_ -clientPort 2002 -isClientPortRange "false"'),
+ mock.call('ls::config $dmf_ -serverPort 2003'),
+ ], any_order=True)
+
+ def test_delete_dmf(self):
+ self.assertRaises(NotImplementedError,
+ self.ls_tcl_client.delete_dmf,
+ DMF_CFG)
+
+ def test__save_dmf_valid(self):
+ exec_resp = [TCL_SUCCESS_RESPONSE, TCL_SUCCESS_RESPONSE]
+ self.mock_tcl_handler.execute.side_effect = exec_resp
+ self.ls_tcl_client._save_dmf()
+ self.assertEqual(len(exec_resp),
+ self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::perform Validate -Dmf $dmf_'),
+ mock.call('ls::save $dmf_ -overwrite'),
+ ])
+
+ def test__save_dmf_invalid(self):
+ exec_resp = ['Invalid', 'List of errors and warnings']
+ self.mock_tcl_handler.execute.side_effect = exec_resp
+ self.assertRaises(exceptions.LandslideTclException,
+ self.ls_tcl_client._save_dmf)
+ self.assertEqual(len(exec_resp),
+ self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::perform Validate -Dmf $dmf_'),
+ mock.call('ls::get $dmf_ -ErrorsAndWarnings'),
+ ])
+
+ def test__configure_report_options(self):
+ _options = {'format': 'CSV', 'PerInterval': 'false'}
+ self.ls_tcl_client._configure_report_options(_options)
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::config $test_.ReportOptions -Format 1 -Ts -3 -Tc -3'),
+ mock.call('ls::config $test_.ReportOptions -PerInterval false'),
+ ],
+ any_order=True)
+
+ def test___configure_ts_group(self, *args):
+ _ts_group = copy.deepcopy(SESSION_PROFILE['tsGroups'][0])
+ self.ls_tcl_client._configure_tc_type = mock.Mock()
+ self.ls_tcl_client._configure_preresolved_arp = mock.Mock()
+ self.ls_tcl_client.resolve_test_server_name = mock.Mock(
+ return_value='2')
+ self.ls_tcl_client._configure_ts_group(_ts_group, 0)
+ self.mock_tcl_handler.execute.assert_called_once_with(
+ 'set tss_ [ls::create TsGroup -under $test_ -tsId 2 ]')
+
+ def test___configure_ts_group_resolve_ts_fail(self, *args):
+ _ts_group = copy.deepcopy(SESSION_PROFILE['tsGroups'][0])
+ self.ls_tcl_client._configure_tc_type = mock.Mock()
+ self.ls_tcl_client._configure_preresolved_arp = mock.Mock()
+ self.ls_tcl_client.resolve_test_server_name = mock.Mock(
+ return_value='TS Not Found')
+ self.assertRaises(RuntimeError, self.ls_tcl_client._configure_ts_group,
+ _ts_group, 0)
+ self.mock_tcl_handler.execute.assert_not_called()
+
+ def test__configure_tc_type(self):
+ _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0])
+ self.mock_tcl_handler.execute.return_value = TCL_SUCCESS_RESPONSE
+ self.ls_tcl_client._configure_parameters = mock.Mock()
+ self.ls_tcl_client._configure_tc_type(_tc, 0)
+ self.assertEqual(7, self.mock_tcl_handler.execute.call_count)
+
+ def test__configure_tc_type_optional_param_omitted(self):
+ _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0])
+ del _tc['linked']
+ self.mock_tcl_handler.execute.return_value = TCL_SUCCESS_RESPONSE
+ self.ls_tcl_client._configure_parameters = mock.Mock()
+ self.ls_tcl_client._configure_tc_type(_tc, 0)
+ self.assertEqual(6, self.mock_tcl_handler.execute.call_count)
+
+ def test__configure_tc_type_wrong_type(self):
+ _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0])
+ _tc['type'] = 'not_supported'
+ self.ls_tcl_client._configure_parameters = mock.Mock()
+ self.assertRaises(RuntimeError,
+ self.ls_tcl_client._configure_tc_type,
+ _tc, 0)
+ self.mock_tcl_handler.assert_not_called()
+
+ def test__configure_tc_type_not_found_basic_lib(self):
+ _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0])
+ self.ls_tcl_client._configure_parameters = mock.Mock()
+ self.mock_tcl_handler.execute.return_value = 'Invalid'
+ self.assertRaises(RuntimeError,
+ self.ls_tcl_client._configure_tc_type,
+ _tc, 0)
+
+ def test__configure_parameters(self):
+ _params = copy.deepcopy(
+ SESSION_PROFILE['tsGroups'][0]['testCases'][0]['parameters'])
+ self.ls_tcl_client._configure_parameters(_params)
+ self.assertEqual(16, self.mock_tcl_handler.execute.call_count)
+
+ def test__configure_array_param(self):
+ _array = {"class": "Array",
+ "array": ["0"]}
+ self.ls_tcl_client._configure_array_param('name', _array)
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::create -Array-name -under $p_ ;'),
+ mock.call('ls::create ArrayItem -under $p_.name -Value "0"'),
+ ])
+
+ def test__configure_test_node_param(self):
+ _params = copy.deepcopy(
+ SESSION_PROFILE['tsGroups'][0]['testCases'][0]['parameters'])
+ self.ls_tcl_client._configure_test_node_param('SgwUserAddr',
+ _params['SgwUserAddr'])
+ cmd = ('ls::create -TestNode-SgwUserAddr -under $p_ -Type "eth" '
+ '-Phy "eth1" -Ip "SGW_USER_IP" -NumLinksOrNodes 1 '
+ '-NextHop "SGW_CONTROL_NEXT_HOP" -Mac "" -MTU 1500 '
+ '-ForcedEthInterface "" -EthStatsEnabled false -VlanId 0 '
+ '-VlanUserPriority 0 -NumVlan 1 -UniqueVlanAddr false;')
+ self.mock_tcl_handler.execute.assert_called_once_with(cmd)
+
+ def test__configure_sut_param(self):
+ _params = {'name': 'name'}
+ self.ls_tcl_client._configure_sut_param('name', _params)
+ self.mock_tcl_handler.execute.assert_called_once_with(
+ 'ls::create -Sut-name -under $p_ -Name "name";')
+
+ def test__configure_dmf_param(self):
+ _params = {"mainflows": [{"library": '111',
+ "name": "Basic UDP"}],
+ "instanceGroups": [{
+ "mainflowIdx": 0,
+ "mixType": "",
+ "rate": 0.0,
+ "rows": [{
+ "clientPort": 0,
+ "context": 0,
+ "node": 0,
+ "overridePort": "false",
+ "ratingGroup": 0,
+ "role": 0,
+ "serviceId": 0,
+ "transport": "Any"}]
+ }]}
+ self.ls_tcl_client._get_library_id = mock.Mock(return_value='111')
+ res = self.ls_tcl_client._configure_dmf_param('name', _params)
+ self.assertEqual(5, self.mock_tcl_handler.execute.call_count)
+ self.assertIsNone(res)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::create -Dmf-name -under $p_ ;'),
+ mock.call('ls::perform AddDmfMainflow $p_.Dmf 111 "Basic UDP"'),
+ mock.call('ls::config $p_.Dmf.InstanceGroup(0) -mixType '),
+ mock.call('ls::config $p_.Dmf.InstanceGroup(0) -rate 0.0'),
+ mock.call('ls::config $p_.Dmf.InstanceGroup(0).Row(0) -Node 0 '
+ '-OverridePort false -ClientPort 0 -Context 0 -Role 0 '
+ '-PreferredTransport Any -RatingGroup 0 '
+ '-ServiceID 0'),
+ ])
+
+ def test__configure_dmf_param_no_instance_groups(self):
+ _params = {"mainflows": [{"library": '111',
+ "name": "Basic UDP"}]}
+ self.ls_tcl_client._get_library_id = mock.Mock(return_value='111')
+ res = self.ls_tcl_client._configure_dmf_param('name', _params)
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.assertIsNone(res)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::create -Dmf-name -under $p_ ;'),
+ mock.call('ls::perform AddDmfMainflow $p_.Dmf 111 "Basic UDP"'),
+ ])
+
+ def test__configure_reservation(self):
+ _reservation = copy.deepcopy(RESERVATIONS[0])
+ self.ls_tcl_client.resolve_test_server_name = mock.Mock(
+ return_value='4')
+ res = self.ls_tcl_client._configure_reservation(_reservation)
+ self.assertIsNone(res)
+ self.assertEqual(4, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('set reservation_ [ls::create Reservation -under $test_]'),
+ mock.call('ls::config $reservation_ -TsIndex 0 -TsId 4 ' + \
+ '-TsName "TestServer_1"'),
+ mock.call('set physubnet_ [ls::create PhySubnet -under $reservation_]'),
+ mock.call('ls::config $physubnet_ -Name "eth1" ' + \
+ '-Base "10.42.32.100" -Mask "/24" -NumIps 20'),
+ ])
+
+ def test__configure_preresolved_arp(self):
+ _arp = [{'StartingAddress': '10.81.1.10',
+ 'NumNodes': 1}]
+ res = self.ls_tcl_client._configure_preresolved_arp(_arp)
+ self.mock_tcl_handler.execute.assert_called_once()
+ self.assertIsNone(res)
+ self.mock_tcl_handler.execute.assert_called_once_with(
+ 'ls::create PreResolvedArpAddress -under $tss_ ' + \
+ '-StartingAddress "10.81.1.10" -NumNodes 1')
+
+ def test__configure_preresolved_arp_none(self):
+ res = self.ls_tcl_client._configure_preresolved_arp(None)
+ self.assertIsNone(res)
+ self.mock_tcl_handler.execute.assert_not_called()
+
+ def test_delete_test_session(self):
+ self.assertRaises(NotImplementedError,
+ self.ls_tcl_client.delete_test_session, {})
+
+ def test__save_test_session(self):
+ self.mock_tcl_handler.execute.side_effect = [TCL_SUCCESS_RESPONSE,
+ TCL_SUCCESS_RESPONSE]
+ res = self.ls_tcl_client._save_test_session()
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.assertIsNone(res)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::perform Validate -TestSession $test_'),
+ mock.call('ls::save $test_ -overwrite'),
+ ])
+
+ def test__save_test_session_invalid(self):
+ self.mock_tcl_handler.execute.side_effect = ['Invalid', 'Errors']
+ self.assertRaises(exceptions.LandslideTclException,
+ self.ls_tcl_client._save_test_session)
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call('ls::perform Validate -TestSession $test_'),
+ mock.call('ls::get $test_ -ErrorsAndWarnings'),
+ ])
+
+ def test__get_library_id_system_lib(self):
+ self.mock_tcl_handler.execute.return_value = '111'
+ res = self.ls_tcl_client._get_library_id('name')
+ self.mock_tcl_handler.execute.assert_called_once()
+ self.assertEqual('111', res)
+ self.mock_tcl_handler.execute.assert_called_with(
+ 'ls::get [ls::query LibraryInfo -systemLibraryName name] -Id')
+
+ def test__get_library_id_user_lib(self):
+ self.mock_tcl_handler.execute.side_effect = ['Not found', '222']
+ res = self.ls_tcl_client._get_library_id('name')
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.assertEqual('222', res)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call(
+ 'ls::get [ls::query LibraryInfo -systemLibraryName name] -Id'),
+ mock.call(
+ 'ls::get [ls::query LibraryInfo -userLibraryName name] -Id'),
+ ])
+
+ def test__get_library_id_exception(self):
+ self.mock_tcl_handler.execute.side_effect = ['Not found', 'Not found']
+ self.assertRaises(exceptions.LandslideTclException,
+ self.ls_tcl_client._get_library_id,
+ 'name')
+ self.assertEqual(2, self.mock_tcl_handler.execute.call_count)
+ self.mock_tcl_handler.execute.assert_has_calls([
+ mock.call(
+ 'ls::get [ls::query LibraryInfo -systemLibraryName name] -Id'),
+ mock.call(
+ 'ls::get [ls::query LibraryInfo -userLibraryName name] -Id'),
+ ])
+
+
+class TestLsTclHandler(unittest.TestCase):
+
+ def setUp(self):
+ self.mock_lsapi = mock.patch.object(tg_landslide, 'LsApi')
+ self.mock_lsapi.start()
+
+ self.addCleanup(self._cleanup)
+
+ def _cleanup(self):
+ self.mock_lsapi.stop()
+
+ def test___init__(self, *args):
+ self.ls_tcl_handler = tg_landslide.LsTclHandler()
+ self.assertEqual({}, self.ls_tcl_handler.tcl_cmds)
+ self.ls_tcl_handler._ls.tcl.assert_called_once()
+
+ def test_execute(self, *args):
+ self.ls_tcl_handler = tg_landslide.LsTclHandler()
+ self.ls_tcl_handler.execute('command')
+ self.assertIn('command', self.ls_tcl_handler.tcl_cmds)