diff options
Diffstat (limited to 'yardstick')
11 files changed, 3576 insertions, 0 deletions
diff --git a/yardstick/common/exceptions.py b/yardstick/common/exceptions.py index 10c1f3f27..539e0fec3 100644 --- a/yardstick/common/exceptions.py +++ b/yardstick/common/exceptions.py @@ -420,3 +420,15 @@ class InvalidMacAddress(YardstickException): class ValueCheckError(YardstickException): message = 'Constraint "%(value1)s %(operator)s %(value2)s" does not hold' + + +class RestApiError(RuntimeError): + def __init__(self, message): + self._message = message + super(RestApiError, self).__init__(message) + + +class LandslideTclException(RuntimeError): + def __init__(self, message): + self._message = message + super(LandslideTclException, self).__init__(message) diff --git a/yardstick/network_services/traffic_profile/__init__.py b/yardstick/network_services/traffic_profile/__init__.py index a1b26a24d..91d8a665f 100644 --- a/yardstick/network_services/traffic_profile/__init__.py +++ b/yardstick/network_services/traffic_profile/__init__.py @@ -28,6 +28,7 @@ def register_modules(): 'yardstick.network_services.traffic_profile.prox_ramp', 'yardstick.network_services.traffic_profile.rfc2544', 'yardstick.network_services.traffic_profile.pktgen', + 'yardstick.network_services.traffic_profile.landslide_profile', ] for module in modules: diff --git a/yardstick/network_services/traffic_profile/landslide_profile.py b/yardstick/network_services/traffic_profile/landslide_profile.py new file mode 100644 index 000000000..f79226fb4 --- /dev/null +++ b/yardstick/network_services/traffic_profile/landslide_profile.py @@ -0,0 +1,47 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" Spirent Landslide traffic profile definitions """ + +from yardstick.network_services.traffic_profile import base + + +class LandslideProfile(base.TrafficProfile): + """ + This traffic profile handles attributes of Landslide data stream + """ + + def __init__(self, tp_config): + super(LandslideProfile, self).__init__(tp_config) + + # for backward compatibility support dict and list of dicts + if isinstance(tp_config["dmf_config"], dict): + self.dmf_config = [tp_config["dmf_config"]] + else: + self.dmf_config = tp_config["dmf_config"] + + def execute(self, traffic_generator): + pass + + def update_dmf(self, options): + if 'dmf' in options: + if isinstance(options['dmf'], dict): + _dmfs = [options['dmf']] + else: + _dmfs = options['dmf'] + + for index, _dmf in enumerate(_dmfs): + try: + self.dmf_config[index].update(_dmf) + except IndexError: + pass diff --git a/yardstick/network_services/utils.py b/yardstick/network_services/utils.py index 4b987fafe..9c64fecde 100644 --- a/yardstick/network_services/utils.py +++ b/yardstick/network_services/utils.py @@ -36,6 +36,9 @@ OPTS = [ cfg.StrOpt('trex_client_lib', default=os.path.join(NSB_ROOT, 'trex_client/stl'), help='trex python library path.'), + cfg.StrOpt('jre_path_i386', + default='', + help='path to installation of 32-bit Java 1.7+.'), ] CONF.register_opts(OPTS, group="nsb") diff --git a/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py new file mode 100644 index 000000000..115fddcf0 --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/agnostic_vnf.py @@ -0,0 +1,46 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from yardstick.network_services.vnf_generic.vnf import base + +LOG = logging.getLogger(__name__) + + +class AgnosticVnf(base.GenericVNF): + """ AgnosticVnf implementation. """ + def __init__(self, name, vnfd, task_id): + super(AgnosticVnf, self).__init__(name, vnfd, task_id) + + def instantiate(self, scenario_cfg, context_cfg): + pass + + def wait_for_instantiate(self): + pass + + def terminate(self): + pass + + def scale(self, flavor=""): + pass + + def collect_kpi(self): + pass + + def start_collect(self): + pass + + def stop_collect(self): + pass diff --git a/yardstick/network_services/vnf_generic/vnf/epc_vnf.py b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py new file mode 100644 index 000000000..66d16d07f --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/epc_vnf.py @@ -0,0 +1,53 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from yardstick.network_services.vnf_generic.vnf import base + +LOG = logging.getLogger(__name__) + + +class EPCVnf(base.GenericVNF): + + def __init__(self, name, vnfd, task_id): + super(EPCVnf, self).__init__(name, vnfd, task_id) + + def instantiate(self, scenario_cfg, context_cfg): + """Prepare VNF for operation and start the VNF process/VM + + :param scenario_cfg: Scenario config + :param context_cfg: Context config + """ + pass + + def wait_for_instantiate(self): + """Wait for VNF to start""" + pass + + def terminate(self): + """Kill all VNF processes""" + pass + + def scale(self, flavor=""): + pass + + def collect_kpi(self): + pass + + def start_collect(self): + pass + + def stop_collect(self): + pass diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py new file mode 100644 index 000000000..a146b72ca --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py @@ -0,0 +1,1203 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging +import requests +import six +import time + +from yardstick.common import exceptions +from yardstick.common import utils as common_utils +from yardstick.common import yaml_loader +from yardstick.network_services import utils as net_serv_utils +from yardstick.network_services.vnf_generic.vnf import sample_vnf + +try: + from lsapi import LsApi +except ImportError: + LsApi = common_utils.ErrorClass + +LOG = logging.getLogger(__name__) + + +class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen): + APP_NAME = 'LandslideTG' + + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = LandslideResourceHelper + super(LandslideTrafficGen, self).__init__(name, vnfd, task_id, + setup_env_helper_type, + resource_helper_type) + + self.bin_path = net_serv_utils.get_nsb_option('bin_path') + self.name = name + self.runs_traffic = True + self.traffic_finished = False + self.session_profile = None + + def listen_traffic(self, traffic_profile): + pass + + def terminate(self): + self.resource_helper.disconnect() + + def instantiate(self, scenario_cfg, context_cfg): + super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg) + self.resource_helper.connect() + + # Create test servers + test_servers = [x['test_server'] for x in self.vnfd_helper['config']] + self.resource_helper.create_test_servers(test_servers) + + # Create SUTs + [self.resource_helper.create_suts(x['suts']) for x in + self.vnfd_helper['config']] + + # Fill in test session based on session profile and test case options + self._load_session_profile() + + def run_traffic(self, traffic_profile): + self.resource_helper.abort_running_tests() + # Update DMF profile with related test case options + traffic_profile.update_dmf(self.scenario_helper.all_options) + # Create DMF in test user library + self.resource_helper.create_dmf(traffic_profile.dmf_config) + # Create/update test session in test user library + self.resource_helper.create_test_session(self.session_profile) + # Start test session + self.resource_helper.create_running_tests(self.session_profile['name']) + + def collect_kpi(self): + return self.resource_helper.collect_kpi() + + def wait_for_instantiate(self): + pass + + @staticmethod + def _update_session_suts(suts, testcase): + """ Create SUT entry. Update related EPC block in session profile. """ + for sut in suts: + # Update session profile EPC element with SUT info from pod file + tc_role = testcase['parameters'].get(sut['role']) + if tc_role: + _param = {} + if tc_role['class'] == 'Sut': + _param['name'] = sut['name'] + elif tc_role['class'] == 'TestNode': + _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'} + if x in sut and sut[x]}) + testcase['parameters'][sut['role']].update(_param) + else: + LOG.info('Unexpected SUT role in pod file: "%s".', sut['role']) + return testcase + + def _update_session_test_servers(self, test_server, _tsgroup_index): + """ Update tsId, reservations, pre-resolved ARP in session profile """ + # Update test server name + test_groups = self.session_profile['tsGroups'] + test_groups[_tsgroup_index]['tsId'] = test_server['name'] + + # Update preResolvedArpAddress + arp_key = 'preResolvedArpAddress' + _preresolved_arp = test_server.get(arp_key) # list of dicts + if _preresolved_arp: + test_groups[_tsgroup_index][arp_key] = _preresolved_arp + + # Update reservations + if 'phySubnets' in test_server: + reservation = {'tsId': test_server['name'], + 'tsIndex': _tsgroup_index, + 'tsName': test_server['name'], + 'phySubnets': test_server['phySubnets']} + if 'reservations' in self.session_profile: + self.session_profile['reservations'].append(reservation) + else: + self.session_profile['reservePorts'] = 'true' + self.session_profile['reservations'] = [reservation] + + @staticmethod + def _update_session_tc_params(tc_options, testcase): + for _param_key in tc_options: + if _param_key == 'AssociatedPhys': + testcase[_param_key] = tc_options[_param_key] + continue + testcase['parameters'][_param_key] = tc_options[_param_key] + return testcase + + def _load_session_profile(self): + + with common_utils.open_relative_file( + self.scenario_helper.scenario_cfg['session_profile'], + self.scenario_helper.task_path) as stream: + self.session_profile = yaml_loader.yaml_load(stream) + + # Raise exception if number of entries differs in following files, + _config_files = ['pod file', 'session_profile file', 'test_case file'] + # Count testcases number in all tsGroups of session profile + session_tests_num = [xx for x in self.session_profile['tsGroups'] + for xx in x['testCases']] + # Create a set containing number of list elements in each structure + _config_files_blocks_num = [ + len(x) for x in + (self.vnfd_helper['config'], # test_servers and suts info + session_tests_num, + self.scenario_helper.all_options['test_cases'])] # test case file + + if len(set(_config_files_blocks_num)) != 1: + raise RuntimeError('Unequal number of elements. {}'.format( + dict(six.moves.zip_longest(_config_files, + _config_files_blocks_num)))) + + ts_names = set() + _tsgroup_idx = -1 + _testcase_idx = 0 + + # Iterate over data structures to overwrite session profile defaults + # _config: single list element holding test servers and SUTs info + # _tc_options: single test case parameters + for _config, tc_options in zip( + self.vnfd_helper['config'], # test servers and SUTS + self.scenario_helper.all_options['test_cases']): # testcase + + _ts_config = _config['test_server'] + + # Calculate test group/test case indexes based on test server name + if _ts_config['name'] in ts_names: + _testcase_idx += 1 + else: + _tsgroup_idx += 1 + _testcase_idx = 0 + + _testcase = \ + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx] + + if _testcase['type'] != _ts_config['role']: + raise RuntimeError( + 'Test type mismatch in TC#{} of test server {}'.format( + _testcase_idx, _ts_config['name'])) + + # Fill session profile with test servers parameters + if _ts_config['name'] not in ts_names: + self._update_session_test_servers(_ts_config, _tsgroup_idx) + ts_names.add(_ts_config['name']) + + # Fill session profile with suts parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_suts(_config['suts'], _testcase)) + + # Update test case parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_tc_params(tc_options, _testcase)) + + +class LandslideResourceHelper(sample_vnf.ClientResourceHelper): + """Landslide TG helper class""" + + REST_STATUS_CODES = {'OK': 200, 'CREATED': 201, 'NO CHANGE': 409} + REST_API_CODES = {'NOT MODIFIED': 500810} + + def __init__(self, setup_helper): + super(LandslideResourceHelper, self).__init__(setup_helper) + self._result = {} + self.vnfd_helper = setup_helper.vnfd_helper + self.scenario_helper = setup_helper.scenario_helper + + # TAS Manager config initialization + self._url = None + self._user_id = None + self.session = None + self.license_data = {} + + # TCL session initialization + self._tcl = LandslideTclClient(LsTclHandler(), self) + + self.session = requests.Session() + self.running_tests_uri = 'runningTests' + self.test_session_uri = 'testSessions' + self.test_serv_uri = 'testServers' + self.suts_uri = 'suts' + self.users_uri = 'users' + self.user_lib_uri = None + self.run_id = None + + def abort_running_tests(self, timeout=60, delay=5): + """ Abort running test sessions, if any """ + _start_time = time.time() + while time.time() < _start_time + timeout: + run_tests_states = {x['id']: x['testStateOrStep'] + for x in self.get_running_tests()} + if not set(run_tests_states.values()).difference( + {'COMPLETE', 'COMPLETE_ERROR'}): + break + else: + [self.stop_running_tests(running_test_id=_id, force=True) + for _id, _state in run_tests_states.items() + if 'COMPLETE' not in _state] + time.sleep(delay) + else: + raise RuntimeError( + 'Some test runs not stopped during {} seconds'.format(timeout)) + + def _build_url(self, resource, action=None): + """ Build URL string + + :param resource: REST API resource name + :type resource: str + :param action: actions name and value + :type action: dict('name': <str>, 'value': <str>) + :returns str: REST API resource name with optional action info + """ + # Action is optional and accepted only in presence of resource param + if action and not resource: + raise ValueError("Resource name not provided") + # Concatenate actions + _action = ''.join(['?{}={}'.format(k, v) for k, v in + action.items()]) if action else '' + + return ''.join([self._url, resource, _action]) + + def get_response_params(self, method, resource, params=None): + """ Retrieve params from JSON response of specific resource URL + + :param method: one of supported REST API methods + :type method: str + :param resource: URI, requested resource name + :type resource: str + :param params: attributes to be found in JSON response + :type params: list(str) + """ + _res = [] + params = params if params else [] + response = self.exec_rest_request(method, resource) + # Get substring between last slash sign and question mark (if any) + url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0] + _response_json = response.json() + # Expect dict(), if URL last part and top dict key don't match + # Else, if they match, expect list() + k, v = list(_response_json.items())[0] + if k != url_last_part: + v = [v] # v: list(dict(str: str)) + # Extract params, or whole list of dicts (without top level key) + for x in v: + _res.append({param: x[param] for param in params} if params else x) + return _res + + def _create_user(self, auth, level=1): + """ Create new user + + :param auth: data to create user account on REST server + :type auth: dict + :param level: Landslide user permissions level + :type level: int + :returns int: user id + """ + # Set expiration date in two years since account creation date + _exp_date = time.strftime( + '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2)) + _username = auth['user'] + _fields = {"contactInformation": "", "expiresOn": _exp_date, + "fullName": "Test User", + "isActive": "true", "level": level, + "password": auth['password'], + "username": _username} + _response = self.exec_rest_request('post', self.users_uri, + json_data=_fields, raise_exc=False) + _resp_json = _response.json() + if _response.status_code == self.REST_STATUS_CODES['CREATED']: + # New user created + _id = _resp_json['id'] + LOG.info("New user created: username='%s', id='%s'", _username, + _id) + elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']: + # User already exists + LOG.info("Account '%s' already exists.", _username) + # Get user id + _id = self._modify_user(_username, {"isActive": "true"})['id'] + else: + raise exceptions.RestApiError( + 'Error during new user "{}" creation'.format(_username)) + return _id + + def _modify_user(self, username, fields): + """ Modify information about existing user + + :param username: user name of account to be modified + :type username: str + :param fields: data to modify user account on REST server + :type fields: dict + :returns dict: user info + """ + _response = self.exec_rest_request('post', self.users_uri, + action={'username': username}, + json_data=fields, raise_exc=False) + if _response.status_code == self.REST_STATUS_CODES['OK']: + _response = _response.json() + else: + raise exceptions.RestApiError( + 'Error during user "{}" data update: {}'.format( + username, + _response.status_code)) + LOG.info("User account '%s' modified: '%s'", username, _response) + return _response + + def _delete_user(self, username): + """ Delete user account + + :param username: username field + :type username: str + :returns bool: True if succeeded + """ + self.exec_rest_request('delete', self.users_uri, + action={'username': username}) + + def _get_users(self, username=None): + """ Get user records from REST server + + :param username: username field + :type username: None|str + :returns list(dict): empty list, or user record, or list of all users + """ + _response = self.get_response_params('get', self.users_uri) + _res = [u for u in _response if + u['username'] == username] if username else _response + return _res + + def exec_rest_request(self, method, resource, action=None, json_data=None, + logs=True, raise_exc=True): + """ Execute REST API request, return response object + + :param method: one of supported requests ('post', 'get', 'delete') + :type method: str + :param resource: URL of resource + :type resource: str + :param action: data used to provide URI located after question mark + :type action: dict + :param json_data: mandatory only for 'post' method + :type json_data: dict + :param logs: debug logs display flag + :type raise_exc: bool + :param raise_exc: if True, raise exception on REST API call error + :returns requests.Response(): REST API call response object + """ + json_data = json_data if json_data else {} + action = action if action else {} + _method = method.upper() + method = method.lower() + if method not in ('post', 'get', 'delete'): + raise ValueError("Method '{}' not supported".format(_method)) + + if method == 'post' and not action: + if not (json_data and isinstance(json_data, collections.Mapping)): + raise ValueError( + 'JSON data missing in {} request'.format(_method)) + + r = getattr(self.session, method)(self._build_url(resource, action), + json=json_data) + if raise_exc and not r.ok: + msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format( + method, self._build_url(resource, action), r.reason) + raise exceptions.RestApiError(msg) + + if logs: + LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method, + r.request.url) + LOG.debug("Response: %s", r.json()) + return r + + def connect(self): + """Connect to RESTful server using test user account""" + tas_info = self.vnfd_helper['mgmt-interface'] + # Supported REST Server ports: HTTP - 8080, HTTPS - 8181 + _port = '8080' if tas_info['proto'] == 'http' else '8181' + tas_info.update({'port': _port}) + self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info) + self.session.headers.update({'Accept': 'application/json', + 'Content-type': 'application/json'}) + # Login with super user to create test user + self.session.auth = ( + tas_info['super-user'], tas_info['super-user-password']) + LOG.info("Connect using superuser: server='%s'", self._url) + auth = {x: tas_info[x] for x in ('user', 'password')} + self._user_id = self._create_user(auth) + # Login with test user + self.session.auth = auth['user'], auth['password'] + # Test user validity + self.exec_rest_request('get', '') + + self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri) + LOG.info("Login with test user: server='%s'", self._url) + # Read existing license + self.license_data['lic_id'] = tas_info['license'] + + # Tcl client init + self._tcl.connect(tas_info['ip'], *self.session.auth) + + return self.session + + def disconnect(self): + self.session = None + self._tcl.disconnect() + + def terminate(self): + self._terminated.value = 1 + + def create_dmf(self, dmf): + if isinstance(dmf, list): + for _dmf in dmf: + self._tcl.create_dmf(_dmf) + else: + self._tcl.create_dmf(dmf) + + def delete_dmf(self, dmf): + if isinstance(dmf, list): + for _dmf in dmf: + self._tcl.delete_dmf(_dmf) + else: + self._tcl.delete_dmf(dmf) + + def create_suts(self, suts): + # Keep only supported keys in suts object + for _sut in suts: + sut_entry = {k: v for k, v in _sut.items() + if k not in {'phy', 'nextHop', 'role'}} + _response = self.exec_rest_request( + 'post', self.suts_uri, json_data=sut_entry, + logs=False, raise_exc=False) + if _response.status_code != self.REST_STATUS_CODES['CREATED']: + LOG.info(_response.reason) # Failed to create + _name = sut_entry.pop('name') + # Modify existing SUT + self.configure_sut(sut_name=_name, json_data=sut_entry) + else: + LOG.info("SUT created: %s", sut_entry) + + def get_suts(self, suts_id=None): + if suts_id: + _suts = self.exec_rest_request( + 'get', '{}/{}'.format(self.suts_uri, suts_id)).json() + else: + _suts = self.get_response_params('get', self.suts_uri) + + return _suts + + def configure_sut(self, sut_name, json_data): + """ Modify information of specific SUTs + + :param sut_name: name of existing SUT + :type sut_name: str + :param json_data: SUT settings + :type json_data: dict() + """ + LOG.info("Modifying SUT information...") + _response = self.exec_rest_request('post', + self.suts_uri, + action={'name': sut_name}, + json_data=json_data, + raise_exc=False) + if _response.status_code not in {self.REST_STATUS_CODES[x] for x in + {'OK', 'NO CHANGE'}}: + raise exceptions.RestApiError(_response.reason) + + LOG.info("Modified SUT: %s", sut_name) + + def delete_suts(self, suts_ids=None): + if not suts_ids: + _curr_suts = self.get_response_params('get', self.suts_uri) + suts_ids = [x['id'] for x in _curr_suts] + LOG.info("Deleting SUTs with following IDs: %s", suts_ids) + for _id in suts_ids: + self.exec_rest_request('delete', + '{}/{}'.format(self.suts_uri, _id)) + LOG.info("\tDone for SUT id: %s", _id) + + def _check_test_servers_state(self, test_servers_ids=None, delay=10, + timeout=300): + LOG.info("Waiting for related test servers state change to READY...") + # Wait on state change + _start_time = time.time() + while time.time() - _start_time < timeout: + ts_ids_not_ready = {x['id'] for x in + self.get_test_servers(test_servers_ids) + if x['state'] != 'READY'} + if ts_ids_not_ready == set(): + break + time.sleep(delay) + else: + raise RuntimeError( + 'Test servers not in READY state after {} seconds.'.format( + timeout)) + + def create_test_servers(self, test_servers): + """ Create test servers + + :param test_servers: input data for test servers creation + mandatory fields: managementIp + optional fields: name + :type test_servers: list(dict) + """ + _ts_ids = [] + for _ts in test_servers: + _msg = 'Created test server "%(name)s"' + _ts_ids.append(self._tcl.create_test_server(_ts)) + if _ts.get('thread_model'): + _msg += ' in mode: "%(thread_model)s"' + LOG.info(_msg, _ts) + + self._check_test_servers_state(_ts_ids) + + def get_test_servers(self, test_server_ids=None): + if not test_server_ids: # Get all test servers info + _test_servers = self.exec_rest_request( + 'get', self.test_serv_uri).json()[self.test_serv_uri] + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + _test_servers = [] + for _id in test_server_ids: + _test_servers.append(self.exec_rest_request( + 'get', '{}/{}'.format(self.test_serv_uri, _id)).json()) + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + def configure_test_servers(self, action, json_data=None, + test_server_ids=None): + if not test_server_ids: + test_server_ids = [x['id'] for x in self.get_test_servers()] + elif isinstance(test_server_ids, int): + test_server_ids = [test_server_ids] + for _id in test_server_ids: + self.exec_rest_request('post', + '{}/{}'.format(self.test_serv_uri, _id), + action=action, json_data=json_data) + LOG.info("Test server (id: %s) configuration done: %s", _id, + action) + return test_server_ids + + def delete_test_servers(self, test_servers_ids=None): + # Delete test servers + for _ts in self.get_test_servers(test_servers_ids): + self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri, + _ts['id'])) + LOG.info("Deleted test server: %s", _ts['name']) + + def create_test_session(self, test_session): + # Use tcl client to create session + test_session['library'] = self._user_id + LOG.debug("Creating session='%s'", test_session['name']) + self._tcl.create_test_session(test_session) + + def get_test_session(self, test_session_name=None): + if test_session_name: + uri = 'libraries/{}/{}/{}'.format(self._user_id, + self.test_session_uri, + test_session_name) + else: + uri = self.user_lib_uri.format(self._user_id) + _test_sessions = self.exec_rest_request('get', uri).json() + return _test_sessions + + def configure_test_session(self, template_name, test_session): + # Override specified test session parameters + LOG.info('Update test session parameters: %s', test_session['name']) + test_session.update({'library': self._user_id}) + return self.exec_rest_request( + method='post', + action={'action': 'overrideAndSaveAs'}, + json_data=test_session, + resource='{}/{}'.format(self.user_lib_uri.format(self._user_id), + template_name)) + + def delete_test_session(self, test_session): + return self.exec_rest_request('delete', '{}/{}'.format( + self.user_lib_uri.format(self._user_id), test_session)) + + def create_running_tests(self, test_session_name): + r = self.exec_rest_request('post', + self.running_tests_uri, + json_data={'library': self._user_id, + 'name': test_session_name}) + if r.status_code != self.REST_STATUS_CODES['CREATED']: + raise exceptions.RestApiError('Failed to start test session.') + self.run_id = r.json()['id'] + + def get_running_tests(self, running_test_id=None): + """Get JSON structure of specified running test entity + + :param running_test_id: ID of created running test entity + :type running_test_id: int + :returns list: running tests entity + """ + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + _res = self.exec_rest_request('get', _res_name, logs=False).json() + # If no run_id specified, skip top level key in response dict. + # Else return JSON as list + return _res.get('runningTests', [_res]) + + def delete_running_tests(self, running_test_id=None): + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.get_response_params('delete', _res_name) + LOG.info("Deleted running test with id: %s", running_test_id) + + def _running_tests_action(self, running_test_id, action, json_data=None): + if not json_data: + json_data = {} + # Supported actions: + # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.exec_rest_request('post', _res_name, {'action': action}, + json_data) + LOG.debug("Executed action: '%s' on running test id: %s", action, + running_test_id) + + def stop_running_tests(self, running_test_id, json_data=None, force=False): + _action = 'abort' if force else 'stop' + self._running_tests_action(running_test_id, _action, + json_data=json_data) + LOG.info('Performed action: "%s" to test run with id: %s', _action, + running_test_id) + + def check_running_test_state(self, run_id): + r = self.exec_rest_request('get', + '{}/{}'.format(self.running_tests_uri, + run_id)) + return r.json().get("testStateOrStep") + + def get_running_tests_results(self, run_id): + _res = self.exec_rest_request( + 'get', + '{}/{}/{}'.format(self.running_tests_uri, + run_id, + 'measurements')).json() + return _res + + def _write_results(self, results): + # Avoid None value at test session start + _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0 + + _res_tabs = results.get('tabs') + # Avoid parsing 'tab' dict key initially (missing or empty) + if not _res_tabs: + return + + # Flatten nested dict holding Landslide KPIs of current test run + flat_kpis_dict = {} + for _tab, _kpis in six.iteritems(_res_tabs): + for _kpi, _value in six.iteritems(_kpis): + # Combine table name and KPI name using delimiter "::" + _key = '::'.join([_tab, _kpi]) + try: + # Cast value from str to float + # Remove comma and/or measure units, e.g. "us" + flat_kpis_dict[_key] = float( + _value.split(' ')[0].replace(',', '')) + except ValueError: # E.g. if KPI represents datetime + pass + LOG.info("Polling test results of test run id: %s. Elapsed time: %s " + "seconds", self.run_id, _elapsed_time) + return flat_kpis_dict + + def collect_kpi(self): + if 'COMPLETE' in self.check_running_test_state(self.run_id): + self._result.update({'done': True}) + return self._result + _res = self.get_running_tests_results(self.run_id) + _kpis = self._write_results(_res) + if _kpis: + _kpis.update({'run_id': int(self.run_id)}) + _kpis.update({'iteration': _res['iteration']}) + self._result.update(_kpis) + return self._result + + +class LandslideTclClient(object): + """Landslide TG TCL client class""" + + DEFAULT_TEST_NODE = { + 'ethStatsEnabled': True, + 'forcedEthInterface': '', + 'innerVlanId': 0, + 'ip': '', + 'mac': '', + 'mtu': 1500, + 'nextHop': '', + 'numLinksOrNodes': 1, + 'numVlan': 1, + 'phy': '', + 'uniqueVlanAddr': False, + 'vlanDynamic': 0, + 'vlanId': 0, + 'vlanUserPriority': 0, + 'vlanTagType': 0 + } + + TEST_NODE_CMD = \ + 'ls::create -TestNode-{} -under $p_ -Type "eth"' \ + ' -Phy "{phy}" -Ip "{ip}" -NumLinksOrNodes {numLinksOrNodes}' \ + ' -NextHop "{nextHop}" -Mac "{mac}" -MTU {mtu}' \ + ' -ForcedEthInterface "{forcedEthInterface}"' \ + ' -EthStatsEnabled {ethStatsEnabled}' \ + ' -VlanId {vlanId} -VlanUserPriority {vlanUserPriority}' \ + ' -NumVlan {numVlan} -UniqueVlanAddr {uniqueVlanAddr}' \ + ';' + + def __init__(self, tcl_handler, ts_context): + self.tcl_server_ip = None + self._user = None + self._library_id = None + self._basic_library_id = None + self._tcl = tcl_handler + self._ts_context = ts_context + self.ts_ids = set() + + # Test types names expected in session profile, test case and pod files + self._tc_types = {"SGW_Nodal", "SGW_Node", "MME_Nodal", "PGW_Node", + "PCRF_Node"} + + self._class_param_config_handler = { + "Array": self._configure_array_param, + "TestNode": self._configure_test_node_param, + "Sut": self._configure_sut_param, + "Dmf": self._configure_dmf_param + } + + def connect(self, tcl_server_ip, username, password): + """ Connect to TCL server with username and password + + :param tcl_server_ip: TCL server IP address + :type tcl_server_ip: str + :param username: existing username on TCL server + :type username: str + :param password: password related to username on TCL server + :type password: str + """ + LOG.info("connect: server='%s' user='%s'", tcl_server_ip, username) + res = self._tcl.execute( + "ls::login {} {} {}".format(tcl_server_ip, username, password)) + if 'java0x' not in res: # handle assignment reflects login success + raise exceptions.LandslideTclException( + "connect: login failed ='{}'.".format(res)) + self._library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format( + username)) + self._basic_library_id = self._get_library_id('Basic') + self.tcl_server_ip = tcl_server_ip + self._user = username + LOG.debug("connect: user='%s' me='%s' basic='%s'", self._user, + self._library_id, + self._basic_library_id) + + def disconnect(self): + """ Disconnect from TCL server. Drop TCL connection configuration """ + LOG.info("disconnect: server='%s' user='%s'", + self.tcl_server_ip, self._user) + self._tcl.execute("ls::logout") + self.tcl_server_ip = None + self._user = None + self._library_id = None + self._basic_library_id = None + + def _add_test_server(self, name, ip): + try: + # Check if test server exists with name equal to _ts_name + ts_id = int(self.resolve_test_server_name(name)) + except ValueError: + # Such test server does not exist. Attempt to create it + ts_id = self._tcl.execute( + 'ls::perform AddTs -Name "{}" -Ip "{}"'.format(name, ip)) + try: + int(ts_id) + except ValueError: + # Failed to create test server, e.g. limit reached + raise RuntimeError( + 'Failed to create test server: "{}". {}'.format(name, + ts_id)) + return ts_id + + def _update_license(self, name): + """ Setup/update test server license + + :param name: test server name + :type name: str + """ + # Retrieve current TsInfo configuration, result stored in handle "ts" + self._tcl.execute( + 'set ts [ls::retrieve TsInfo -Name "{}"]'.format(name)) + + # Set license ID, if it differs from current one, update test server + _curr_lic_id = self._tcl.execute('ls::get $ts -RequestedLicense') + if _curr_lic_id != self._ts_context.license_data['lic_id']: + self._tcl.execute('ls::config $ts -RequestedLicense {}'.format( + self._ts_context.license_data['lic_id'])) + self._tcl.execute('ls::perform ModifyTs $ts') + + def _set_thread_model(self, name, thread_model): + # Retrieve test server configuration, store it in handle "tsc" + _cfguser_password = self._ts_context.vnfd_helper['mgmt-interface'][ + 'cfguser_password'] + self._tcl.execute( + 'set tsc [ls::perform RetrieveTsConfiguration ' + '-name "{}" {}]'.format(name, _cfguser_password)) + # Configure ThreadModel, if it differs from current one + thread_model_map = {'Legacy': 'V0', + 'Max': 'V1', + 'Fireball': 'V1_FB3'} + _model = thread_model_map[thread_model] + _curr_model = self._tcl.execute('ls::get $tsc -ThreadModel') + if _curr_model != _model: + self._tcl.execute( + 'ls::config $tsc -ThreadModel "{}"'.format(_model)) + self._tcl.execute( + 'ls::perform ApplyTsConfiguration $tsc {}'.format( + _cfguser_password)) + + def create_test_server(self, test_server): + _ts_thread_model = test_server.get('thread_model') + _ts_name = test_server['name'] + + ts_id = self._add_test_server(_ts_name, test_server['ip']) + + self._update_license(_ts_name) + + # Skip below code modifying thread_model if it is not defined + if _ts_thread_model: + self._set_thread_model(_ts_name, _ts_thread_model) + + return ts_id + + def create_test_session(self, test_session): + """ Create, configure and save Landslide test session object. + + :param test_session: Landslide TestSession object + :type test_session: dict + """ + LOG.info("create_test_session: name='%s'", test_session['name']) + self._tcl.execute('set test_ [ls::create TestSession]') + self._tcl.execute('ls::config $test_ -Library {} -Name "{}"'.format( + self._library_id, test_session['name'])) + self._tcl.execute('ls::config $test_ -Description "{}"'.format( + test_session['description'])) + if 'keywords' in test_session: + self._tcl.execute('ls::config $test_ -Keywords "{}"'.format( + test_session['keywords'])) + if 'duration' in test_session: + self._tcl.execute('ls::config $test_ -Duration "{}"'.format( + test_session['duration'])) + if 'iterations' in test_session: + self._tcl.execute('ls::config $test_ -Iterations "{}"'.format( + test_session['iterations'])) + if 'reservePorts' in test_session: + if test_session['reservePorts'] == 'true': + self._tcl.execute('ls::config $test_ -Reserve Ports') + + if 'reservations' in test_session: + for _reservation in test_session['reservations']: + self._configure_reservation(_reservation) + + if 'reportOptions' in test_session: + self._configure_report_options(test_session['reportOptions']) + + for _index, _group in enumerate(test_session['tsGroups']): + self._configure_ts_group(_group, _index) + + self._save_test_session() + + def create_dmf(self, dmf): + """ Create, configure and save Landslide Data Message Flow object. + + :param dmf: Landslide Data Message Flow object + :type: dmf: dict + """ + self._tcl.execute('set dmf_ [ls::create Dmf]') + _lib_id = self._get_library_id(dmf['dmf']['library']) + self._tcl.execute('ls::config $dmf_ -Library {} -Name "{}"'.format( + _lib_id, + dmf['dmf']['name'])) + for _param_key in dmf: + if _param_key == 'dmf': + continue + _param_value = dmf[_param_key] + if isinstance(_param_value, dict): + # Configure complex parameter + _tcl_cmd = 'ls::config $dmf_' + for _sub_param_key in _param_value: + _sub_param_value = _param_value[_sub_param_key] + if isinstance(_sub_param_value, str): + _tcl_cmd += ' -{} "{}"'.format(_sub_param_key, + _sub_param_value) + else: + _tcl_cmd += ' -{} {}'.format(_sub_param_key, + _sub_param_value) + + self._tcl.execute(_tcl_cmd) + else: + # Configure simple parameter + if isinstance(_param_value, str): + self._tcl.execute( + 'ls::config $dmf_ -{} "{}"'.format(_param_key, + _param_value)) + else: + self._tcl.execute( + 'ls::config $dmf_ -{} {}'.format(_param_key, + _param_value)) + self._save_dmf() + + def configure_dmf(self, dmf): + # Use create to reconfigure and overwrite existing dmf + self.create_dmf(dmf) + + def delete_dmf(self, dmf): + raise NotImplementedError + + def _save_dmf(self): + # Call 'Validate' to set default values for missing parameters + res = self._tcl.execute('ls::perform Validate -Dmf $dmf_') + if res == 'Invalid': + res = self._tcl.execute('ls::get $dmf_ -ErrorsAndWarnings') + LOG.error("_save_dmf: %s", res) + raise exceptions.LandslideTclException("_save_dmf: {}".format(res)) + else: + res = self._tcl.execute('ls::save $dmf_ -overwrite') + LOG.debug("_save_dmf: result (%s)", res) + + def _configure_report_options(self, options): + for _option_key in options: + _option_value = options[_option_key] + if _option_key == 'format': + _format = 0 + if _option_value == 'CSV': + _format = 1 + self._tcl.execute( + 'ls::config $test_.ReportOptions -Format {} ' + '-Ts -3 -Tc -3'.format(_format)) + else: + self._tcl.execute( + 'ls::config $test_.ReportOptions -{} {}'.format( + _option_key, + _option_value)) + + def _configure_ts_group(self, ts_group, ts_group_index): + try: + _ts_id = int(self.resolve_test_server_name(ts_group['tsId'])) + except ValueError: + raise RuntimeError('Test server name "{}" does not exist.'.format( + ts_group['tsId'])) + if _ts_id not in self.ts_ids: + self._tcl.execute( + 'set tss_ [ls::create TsGroup -under $test_ -tsId {} ]'.format( + _ts_id)) + self.ts_ids.add(_ts_id) + for _case in ts_group.get('testCases', []): + self._configure_tc_type(_case, ts_group_index) + + self._configure_preresolved_arp(ts_group.get('preResolvedArpAddress')) + + def _configure_tc_type(self, tc, ts_group_index): + if tc['type'] not in self._tc_types: + raise RuntimeError('Test type {} not supported.'.format( + tc['type'])) + tc['type'] = tc['type'].replace('_', ' ') + res = self._tcl.execute( + 'set tc_ [ls::retrieve testcase -libraryId {0} "{1}"]'.format( + self._basic_library_id, tc['type'])) + if 'Invalid' in res: + raise RuntimeError('Test type {} not found in "Basic" ' + 'library.'.format(tc['type'])) + self._tcl.execute( + 'ls::config $test_.TsGroup({}) -children-Tc $tc_'.format( + ts_group_index)) + self._tcl.execute('ls::config $tc_ -Library {0} -Name "{1}"'.format( + self._basic_library_id, tc['name'])) + self._tcl.execute( + 'ls::config $tc_ -Description "{}"'.format(tc['type'])) + self._tcl.execute( + 'ls::config $tc_ -Keywords "GTP LTE {}"'.format(tc['type'])) + if 'linked' in tc: + self._tcl.execute( + 'ls::config $tc_ -Linked {}'.format(tc['linked'])) + if 'AssociatedPhys' in tc: + self._tcl.execute('ls::config $tc_ -AssociatedPhys "{}"'.format( + tc['AssociatedPhys'])) + if 'parameters' in tc: + self._configure_parameters(tc['parameters']) + + def _configure_parameters(self, params): + self._tcl.execute('set p_ [ls::get $tc_ -children-Parameters(0)]') + for _param_key in sorted(params): + _param_value = params[_param_key] + if isinstance(_param_value, dict): + # Configure complex parameter + if _param_value['class'] in self._class_param_config_handler: + self._class_param_config_handler[_param_value['class']]( + _param_key, + _param_value) + else: + # Configure simple parameter + self._tcl.execute( + 'ls::create {} -under $p_ -Value "{}"'.format( + _param_key, + _param_value)) + + def _configure_array_param(self, name, params): + self._tcl.execute('ls::create -Array-{} -under $p_ ;'.format(name)) + for param in params['array']: + self._tcl.execute( + 'ls::create ArrayItem -under $p_.{} -Value "{}"'.format(name, + param)) + + def _configure_test_node_param(self, name, params): + _params = self.DEFAULT_TEST_NODE + _params.update(params) + + # TCL command expects lower case 'true' or 'false' + _params['ethStatsEnabled'] = str(_params['ethStatsEnabled']).lower() + _params['uniqueVlanAddr'] = str(_params['uniqueVlanAddr']).lower() + + cmd = self.TEST_NODE_CMD.format(name, **_params) + self._tcl.execute(cmd) + + def _configure_sut_param(self, name, params): + self._tcl.execute( + 'ls::create -Sut-{} -under $p_ -Name "{}";'.format(name, + params['name'])) + + def _configure_dmf_param(self, name, params): + self._tcl.execute('ls::create -Dmf-{} -under $p_ ;'.format(name)) + + for _flow_index, _flow in enumerate(params['mainflows']): + _lib_id = self._get_library_id(_flow['library']) + self._tcl.execute( + 'ls::perform AddDmfMainflow $p_.Dmf {} "{}"'.format( + _lib_id, + _flow['name'])) + + if not params.get('instanceGroups'): + return + + _instance_group = params['instanceGroups'][_flow_index] + + # Traffic Mixer parameters handling + for _key in ['mixType', 'rate']: + if _key in _instance_group: + self._tcl.execute( + 'ls::config $p_.Dmf.InstanceGroup({}) -{} {}'.format( + _flow_index, _key, _instance_group[_key])) + + # Assignments parameters handling + for _row_id, _row in enumerate(_instance_group.get('rows', [])): + self._tcl.execute( + 'ls::config $p_.Dmf.InstanceGroup({}).Row({}) -Node {} ' + '-OverridePort {} -ClientPort {} -Context {} -Role {} ' + '-PreferredTransport {} -RatingGroup {} ' + '-ServiceID {}'.format( + _flow_index, _row_id, _row['node'], + _row['overridePort'], _row['clientPort'], + _row['context'], _row['role'], _row['transport'], + _row['ratingGroup'], _row['serviceId'])) + + def _configure_reservation(self, reservation): + _ts_id = self.resolve_test_server_name(reservation['tsId']) + self._tcl.execute( + 'set reservation_ [ls::create Reservation -under $test_]') + self._tcl.execute( + 'ls::config $reservation_ -TsIndex {} -TsId {} ' + '-TsName "{}"'.format(reservation['tsIndex'], + _ts_id, + reservation['tsName'])) + for _subnet in reservation['phySubnets']: + self._tcl.execute( + 'set physubnet_ [ls::create PhySubnet -under $reservation_]') + self._tcl.execute( + 'ls::config $physubnet_ -Name "{}" -Base "{}" -Mask "{}" ' + '-NumIps {}'.format(_subnet['name'], _subnet['base'], + _subnet['mask'], _subnet['numIps'])) + + def _configure_preresolved_arp(self, pre_resolved_arp): + if not pre_resolved_arp: # Pre-resolved ARP configuration not found + return + for _entry in pre_resolved_arp: + # TsGroup handle name should correspond in _configure_ts_group() + self._tcl.execute( + 'ls::create PreResolvedArpAddress -under $tss_ ' + '-StartingAddress "{StartingAddress}" ' + '-NumNodes {NumNodes}'.format(**_entry)) + + def delete_test_session(self, test_session): + raise NotImplementedError + + def _save_test_session(self): + # Call 'Validate' to set default values for missing parameters + res = self._tcl.execute('ls::perform Validate -TestSession $test_') + if res == 'Invalid': + res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings') + raise exceptions.LandslideTclException( + "Test session validation failed. Server response: {}".format( + res)) + else: + self._tcl.execute('ls::save $test_ -overwrite') + LOG.debug("Test session saved successfully.") + + def _get_library_id(self, library): + _library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -systemLibraryName {}] -Id".format( + library)) + try: + int(_library_id) + return _library_id + except ValueError: + pass + + _library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format( + library)) + try: + int(_library_id) + except ValueError: + LOG.error("_get_library_id: library='%s' not found.", library) + raise exceptions.LandslideTclException( + "_get_library_id: library='{}' not found.".format( + library)) + + return _library_id + + def resolve_test_server_name(self, ts_name): + return self._tcl.execute("ls::query TsId {}".format(ts_name)) + + +class LsTclHandler(object): + """Landslide TCL Handler class""" + + LS_OK = "ls_ok" + JRE_PATH = net_serv_utils.get_nsb_option('jre_path_i386') + + def __init__(self): + self.tcl_cmds = {} + self._ls = LsApi(jre_path=self.JRE_PATH) + self._ls.tcl( + "ls::config ApiOptions -NoReturnSuccessResponseString '{}'".format( + self.LS_OK)) + + def execute(self, command): + res = self._ls.tcl(command) + self.tcl_cmds[command] = res + return res diff --git a/yardstick/tests/unit/network_services/traffic_profile/test_landslide_profile.py b/yardstick/tests/unit/network_services/traffic_profile/test_landslide_profile.py new file mode 100644 index 000000000..afd550029 --- /dev/null +++ b/yardstick/tests/unit/network_services/traffic_profile/test_landslide_profile.py @@ -0,0 +1,136 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import unittest + +from yardstick.network_services.traffic_profile import landslide_profile + +TP_CONFIG = { + 'schema': "nsb:traffic_profile:0.1", + 'name': 'LandslideProfile', + 'description': 'Spirent Landslide traffic profile (Data Message Flow)', + 'traffic_profile': { + 'traffic_type': 'LandslideProfile' + }, + 'dmf_config': { + 'dmf': { + 'library': 'test', + 'name': 'Fireball UDP', + 'description': "Basic data flow using UDP/IP (Fireball DMF)", + 'keywords': 'UDP ', + 'dataProtocol': 'fb_udp', + 'burstCount': 1, + 'clientPort': { + 'clientPort': 2002, + 'isClientPortRange': 'false' + }, + 'serverPort': 2003, + 'connection': { + 'initiatingSide': 'Client', + 'disconnectSide': 'Client', + 'underlyingProtocol': 'none', + 'persistentConnection': 'false' + }, + 'protocolId': 0, + 'persistentConnection': 'false', + 'transactionRate': 8.0, + 'transactions': { + 'totalTransactions': 0, + 'retries': 0, + 'dataResponseTime': 60000, + 'packetSize': 64 + }, + 'segment': { + 'segmentSize': 64000, + 'maxSegmentSize': 0 + }, + 'size': { + 'sizeDistribution': 'Fixed', + 'sizeDeviation': 10 + }, + 'interval': { + 'intervalDistribution': 'Fixed', + 'intervalDeviation': 10 + }, + 'ipHeader': { + 'typeOfService': 0, + 'timeToLive': 64 + }, + 'tcpConnection': { + 'force3Way': 'false', + 'fixedRetryTime': 0, + 'maxPacketsToForceAck': 0 + }, + 'tcp': { + 'windowSize': 32768, + 'windowScaling': -1, + 'disableFinAckWait': 'false' + }, + 'disconnectType': 'FIN', + 'slowStart': 'false', + 'connectOnly': 'false', + 'vtag': { + 'VTagMask': '0x0', + 'VTagValue': '0x0' + }, + 'sctpPayloadProtocolId': 0, + 'billingIncludeSyn': 'true', + 'billingIncludeSubflow': 'true', + 'billingRecordPerTransaction': 'false', + 'tcpPush': 'false', + 'hostDataExpansionRatio': 1 + } + } +} +DMF_OPTIONS = { + 'dmf': { + 'transactionRate': 5, + 'packetSize': 512, + 'burstCount': 1 + } +} + + +class TestLandslideProfile(unittest.TestCase): + + def test___init__(self): + ls_traffic_profile = landslide_profile.LandslideProfile(TP_CONFIG) + self.assertListEqual([TP_CONFIG["dmf_config"]], + ls_traffic_profile.dmf_config) + + def test___init__config_not_a_dict(self): + _tp_config = copy.deepcopy(TP_CONFIG) + _tp_config['dmf_config'] = [_tp_config['dmf_config']] + ls_traffic_profile = landslide_profile.LandslideProfile(_tp_config) + self.assertListEqual(_tp_config['dmf_config'], + ls_traffic_profile.dmf_config) + + def test_execute(self): + ls_traffic_profile = landslide_profile.LandslideProfile(TP_CONFIG) + self.assertIsNone(ls_traffic_profile.execute(None)) + + def test_update_dmf_options_dict(self): + ls_traffic_profile = landslide_profile.LandslideProfile(TP_CONFIG) + ls_traffic_profile.update_dmf(DMF_OPTIONS) + self.assertDictContainsSubset(DMF_OPTIONS['dmf'], + ls_traffic_profile.dmf_config[0]) + + def test_update_dmf_options_list(self): + ls_traffic_profile = landslide_profile.LandslideProfile(TP_CONFIG) + _dmf_options = copy.deepcopy(DMF_OPTIONS) + _dmf_options['dmf'] = [_dmf_options['dmf']] + ls_traffic_profile.update_dmf(_dmf_options) + self.assertTrue(all([x in ls_traffic_profile.dmf_config[0] + for x in DMF_OPTIONS['dmf']])) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_agnostic_vnf.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_agnostic_vnf.py new file mode 100644 index 000000000..3374cbe76 --- /dev/null +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_agnostic_vnf.py @@ -0,0 +1,70 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import uuid + +from yardstick.network_services.vnf_generic.vnf import agnostic_vnf + +NAME = 'vnf__0' + +VNFD = { + 'vnfd:vnfd-catalog': { + 'vnfd': [{ + 'id': 'AgnosticVnf', # NSB python class mapping + 'name': 'AgnosticVnf', + 'short-name': 'AgnosticVnf', + 'description': 'AgnosticVnf', + 'mgmt-interface': { + 'vdu-id': 'vepcvnf-baremetal', + 'user': 'user', + 'password': 'password', + 'ip': 'ip' + }, + 'vdu': [{ + 'id': 'vepcvnf-baremetal', + 'name': 'vepc-vnf-baremetal', + 'description': 'vAgnosticVnf workload', + 'external-interface': []}], + 'benchmark': { + 'kpi': []}}]}} + + +class TestAgnosticVnf(unittest.TestCase): + + def setUp(self): + self._id = uuid.uuid1().int + self.vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0] + self.agnostic_vnf = agnostic_vnf.AgnosticVnf(NAME, self.vnfd, self._id) + + def test_instantiate(self): + self.assertIsNone(self.agnostic_vnf.instantiate({}, {})) + + def test_wait_for_instantiate(self): + self.assertIsNone(self.agnostic_vnf.wait_for_instantiate()) + + def test_terminate(self): + self.assertIsNone(self.agnostic_vnf.terminate()) + + def test_scale(self): + self.assertIsNone(self.agnostic_vnf.scale()) + + def test_collect_kpi(self): + self.assertIsNone(self.agnostic_vnf.collect_kpi()) + + def test_start_collect(self): + self.assertIsNone(self.agnostic_vnf.start_collect()) + + def test_stop_collect(self): + self.assertIsNone(self.agnostic_vnf.stop_collect()) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_epc_vnf.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_epc_vnf.py new file mode 100644 index 000000000..6d14ddb54 --- /dev/null +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_epc_vnf.py @@ -0,0 +1,94 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import unittest +import uuid + +from yardstick.network_services.vnf_generic.vnf import epc_vnf + +NAME = 'vnf__0' + +VNFD = { + 'vnfd:vnfd-catalog': { + 'vnfd': [{ + 'id': 'EPCVnf', # NSB python class mapping + 'name': 'EPCVnf', + 'short-name': 'EPCVnf', + 'description': 'EPCVnf', + 'mgmt-interface': { + 'vdu-id': 'vepcvnf-baremetal', + 'user': 'user', # Value filled by vnfdgen + 'password': 'password', # Value filled by vnfdgen + 'ip': 'ip' # Value filled by vnfdgen + }, + 'vdu': [{ + 'id': 'vepcvnf-baremetal', + 'name': 'vepc-vnf-baremetal', + 'description': 'vEPCVnf workload', + 'external-interface': []}], + 'benchmark': { + 'kpi': []}}]}} + + +class TestEPCVnf(unittest.TestCase): + + def setUp(self): + self._id = uuid.uuid1().int + self.vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0] + self.epc_vnf = epc_vnf.EPCVnf(NAME, self.vnfd, self._id) + + def test___init__(self, *args): + _epc_vnf = epc_vnf.EPCVnf(NAME, self.vnfd, self._id) + for x in {'user', 'password', 'ip'}: + self.assertEqual(self.vnfd['mgmt-interface'][x], + _epc_vnf.vnfd_helper.mgmt_interface[x]) + self.assertEqual(NAME, _epc_vnf.name) + self.assertEqual([], _epc_vnf.kpi) + self.assertEqual({}, _epc_vnf.config) + self.assertFalse(_epc_vnf.runs_traffic) + + def test___init__missing_ip(self, *args): + _vnfd = copy.deepcopy(self.vnfd) + _vnfd['mgmt-interface'].pop('ip') + _epc_vnf = epc_vnf.EPCVnf(NAME, _vnfd, self._id) + for x in {'user', 'password'}: + self.assertEqual(_vnfd['mgmt-interface'][x], + _epc_vnf.vnfd_helper.mgmt_interface[x]) + self.assertNotIn('ip', _epc_vnf.vnfd_helper.mgmt_interface) + self.assertEqual(NAME, _epc_vnf.name) + self.assertEqual([], _epc_vnf.kpi) + self.assertEqual({}, _epc_vnf.config) + self.assertFalse(_epc_vnf.runs_traffic) + + def test_instantiate(self): + self.assertIsNone(self.epc_vnf.instantiate({}, {})) + + def test_wait_for_instantiate(self): + self.assertIsNone(self.epc_vnf.wait_for_instantiate()) + + def test_terminate(self): + self.assertIsNone(self.epc_vnf.terminate()) + + def test_scale(self): + self.assertIsNone(self.epc_vnf.scale()) + + def test_collect_kpi(self): + self.assertIsNone(self.epc_vnf.collect_kpi()) + + def test_start_collect(self): + self.assertIsNone(self.epc_vnf.start_collect()) + + def test_stop_collect(self): + self.assertIsNone(self.epc_vnf.stop_collect()) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py new file mode 100644 index 000000000..53439972a --- /dev/null +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py @@ -0,0 +1,1911 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import mock +import requests +import time +import unittest +import uuid + +from yardstick.benchmark.contexts import base as ctx_base +from yardstick.common import exceptions +from yardstick.common import utils as common_utils +from yardstick.common import yaml_loader +from yardstick.network_services import utils as net_serv_utils +from yardstick.network_services.traffic_profile import landslide_profile +from yardstick.network_services.vnf_generic.vnf import sample_vnf +from yardstick.network_services.vnf_generic.vnf import tg_landslide + + +NAME = "tg__0" + +EXAMPLE_URL = 'http://example.com/' +TCL_SUCCESS_RESPONSE = 'ls_ok' + +TEST_SERVERS = [ + {'ip': '192.168.122.101', + 'phySubnets': [ + {'mask': '/24', + 'base': '10.42.32.100', + 'numIps': 20, + 'name': 'eth1'} + ], + 'role': 'SGW_Node', + 'name': 'TestServer_1'}, + {'ip': '192.168.122.102', + 'phySubnets': [ + {'mask': '/24', + 'base': '10.42.32.1', + 'numIps': 100, + 'name': 'eth1' + }, + {'mask': '/24', + 'base': '10.42.33.1', + 'numIps': 100, + 'name': 'eth2'} + ], + 'preResolvedArpAddress': [ + {'NumNodes': 1, + 'StartingAddress': '10.42.33.5'} + ], + 'role': 'SGW_Nodal', + 'name': 'TestServer_2', + 'thread_model': 'Fireball' + } +] + +TS1_SUTS = [ + {'name': 'SGW - C TestNode', + 'role': 'SgwControlAddr', + 'managementIp': '12.0.1.1', + 'ip': '10.42.32.100', + 'phy': 'eth5', + 'nextHop': '10.42.32.5' + }, + {'name': 'SGW - U TestNode', + 'role': 'SgwUserAddr', + 'managementIp': '12.0.1.2', + 'ip': '10.42.32.101', + 'phy': 'eth5', + 'nextHop': '10.42.32.5' + } +] + +TS2_SUTS = [ + {'name': 'eNodeB TestNode', + 'role': 'EnbUserAddr', + 'managementIp': '12.0.2.1', + 'ip': '10.42.32.2', + 'phy': 'eth5', + 'nextHop': '10.42.32.5' + }, + {'name': 'MME TestNode', + 'role': 'MmeControlAddr', + 'managementIp': '12.0.3.1', + 'ip': '10.42.32.1', + 'phy': 'eth5', + 'nextHop': '10.42.32.5' + }, + {'name': 'NetHost TestNode', + 'role': 'NetworkHostAddrLocal', + 'managementIp': '12.0.4.1', + 'ip': '10.42.33.1', + 'phy': 'eth5', + 'nextHop': '10.42.32.5' + }, + {'name': 'PGW TestNode', + 'role': 'PgwV4Sut', + 'managementIp': '12.0.5.1', + 'ip': '10.42.32.105', + 'phy': 'eth5', + 'nextHop': '10.42.32.5' + }, + {'name': 'SGW - C SUT', + 'role': 'SgwSut', + 'managementIp': '12.0.6.1', + 'ip': '10.42.32.100' + }, + {'name': 'SGW - U SUT', + 'role': 'SgwUserSut', + 'managementIp': '12.0.6.2', + 'ip': '10.42.32.101'} +] + +VNFD = { + 'vnfd:vnfd-catalog': { + 'vnfd': [{ + 'short-name': 'landslide', + 'vdu': [{ + 'description': 'AB client interface details', + 'name': 'abclient-baremetal', + 'id': 'abclient-baremetal', + 'external-interface': []}], + 'description': 'Spirent Landslide traffic generator', + 'config': [{'test_server': TEST_SERVERS[0], 'suts': TS1_SUTS}, + {'test_server': TEST_SERVERS[1], 'suts': TS2_SUTS}], + 'mgmt-interface': { + 'vdu-id': 'landslide-tas', + 'user': 'user', + 'password': 'user', + 'super-user': 'super-user', + 'super-user-password': 'super-user-password', + 'cfguser_password': 'cfguser_password', + 'license': 48, + 'proto': 'http', + 'ip': '1.1.1.1'}, + 'benchmark': { + 'kpi': [ + 'tx_throughput_mbps', + 'rx_throughput_mbps', + 'in_packets', + 'out_packets', + 'activation_rate_sessps', + 'deactivation_rate_sessps']}, + 'id': 'LandslideTrafficGen', + 'name': 'LandslideTrafficGen'}]}} + +TAS_INFO = VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface'] + +DMF_CFG = { + "dmf": { + "library": "test", + "name": "Basic UDP" + }, + "clientPort": { + "clientPort": 2002, + "isClientPortRange": "false" + }, + "dataProtocol": "udp", + "serverPort": 2003 +} + +RESERVATIONS = [ + {'tsName': TEST_SERVERS[0]['name'], + 'phySubnets': TEST_SERVERS[0]['phySubnets'], + 'tsId': TEST_SERVERS[0]['name'], + 'tsIndex': 0}, + {'tsName': TEST_SERVERS[1]['name'], + 'phySubnets': TEST_SERVERS[1]['phySubnets'], + 'tsId': TEST_SERVERS[1]['name'], + 'tsIndex': 1}] + +SESSION_PROFILE = { + 'keywords': '', + 'duration': 60, + 'iterations': 1, + 'description': 'UE default bearer creation test case', + 'name': 'default_bearer_capacity', + 'reportOptions': {'format': 'CSV'}, + 'reservePorts': 'false', + 'tsGroups': [ + { + 'testCases': [{ + 'type': 'SGW_Node', + 'name': '', + 'linked': "false", + 'AssociatedPhys': '', + 'parameters': { + 'SgiPtpTunnelEn': 'false', + 'Gtp2Imsi': '505024101215074', + 'Sessions': '100000', + 'S5Protocol': 'GTPv2', + 'TrafficMtu': '1500', + 'Gtp2Version': '13.6.0', + 'BearerV4AddrPool': '1.0.0.1', + 'Gtp2Imei': '50502410121507', + 'PgwNodeEn': 'true', + 'DedicatedsPerDefaultBearer': '0', + 'DefaultBearers': '1', + 'SgwUserAddr': { + 'numLinksOrNodes': 1, + 'phy': 'eth1', + 'forcedEthInterface': '', + 'ip': 'SGW_USER_IP', + 'class': 'TestNode', + 'ethStatsEnabled': "false", + 'mtu': 1500 + }, + 'SgwControlAddr': { + 'numLinksOrNodes': 1, + 'phy': 'eth1', + 'forcedEthInterface': '', + 'ip': 'SGW_CONTROL_IP', + 'class': 'TestNode', + 'ethStatsEnabled': "false", + 'mtu': 1500, + 'nextHop': 'SGW_CONTROL_NEXT_HOP' + }, + 'BearerAddrPool': '2001::1', + 'TestType': 'SGW-NODE' + } + }], + 'tsId': TEST_SERVERS[0]['name']}, + { + 'testCases': [{ + 'type': 'SGW_Nodal', + 'name': '', + 'parameters': { + 'DataTraffic': 'Continuous', + 'TrafficStartType': 'When All Sessions Established', + 'NetworkHost': 'Local', + 'Gtp2Imsi': '505024101215074', + 'Dmf': { + 'mainflows': [ + { + 'name': 'Basic UDP', + 'library': 'test' + } + ], + 'class': 'Dmf', + 'instanceGroups': [ + { + 'startPaused': "false", + 'rate': 0, + 'mainflowIdx': 0, + 'mixType': '' + } + ] + }, + 'S5Protocol': 'GTPv2', + 'DataUserCfgFileEn': 'false', + 'PgwUserSutEn': 'false', + 'MmeControlAddr': { + 'numLinksOrNodes': 1, + 'phy': 'eth1', + 'forcedEthInterface': '', + 'ip': 'MME_CONTROL_IP', + 'class': 'TestNode', + 'ethStatsEnabled': "false", + 'mtu': 1500 + }, + 'SgwUserSut': { + 'class': 'Sut', + 'name': 'SGW_USER_NAME' + }, + 'TestActivity': 'Capacity Test', + 'NetworkHostAddrLocal': { + 'numLinksOrNodes': 1, + 'phy': 'eth2', + 'forcedEthInterface': '', + 'ip': 'NET_HOST_IP', + 'class': 'TestNode', + 'ethStatsEnabled': "false", + 'mtu': 1500 + }, + 'DedicatedsPerDefaultBearer': '0', + 'DisconnectRate': '1000.0', + 'Sessions': '100000', + 'SgwSut': { + 'class': 'Sut', + 'name': 'SGW_CONTROL_NAME' + }, + 'TrafficMtu': '1500', + 'Gtp2Version': '13.6.0', + 'Gtp2Imei': '50502410121507', + 'PgwNodeEn': 'false', + 'StartRate': '1000.0', + 'PgwV4Sut': { + 'class': 'Sut', + 'name': 'PGW_SUT_NAME' + }, + 'DefaultBearers': '1', + 'EnbUserAddr': { + 'numLinksOrNodes': 1, + 'phy': 'eth1', + 'forcedEthInterface': '', + 'ip': 'ENB_USER_IP', + 'class': 'TestNode', + 'ethStatsEnabled': "false", + 'mtu': 1500 + }, + 'TestType': 'SGW-NODAL' + } + }], + 'tsId': TEST_SERVERS[1]['name'] + } + ] +} + + +class TestLandslideTrafficGen(unittest.TestCase): + SCENARIO_CFG = { + 'session_profile': '/traffic_profiles/landslide/' + 'landslide_session_default_bearer.yaml', + 'task_path': '', + 'runner': { + 'type': 'Iteration', + 'iterations': 1 + }, + 'nodes': { + 'tg__0': 'tg__0.traffic_gen', + 'vnf__0': 'vnf__0.vnf_epc' + }, + 'topology': 'landslide_tg_topology.yaml', + 'type': 'NSPerf', + 'traffic_profile': '../../traffic_profiles/landslide/' + 'landslide_dmf_udp.yaml', + 'options': { + 'test_cases': [ + { + 'BearerAddrPool': '2002::2', + 'type': 'SGW_Node', + 'BearerV4AddrPool': '2.0.0.2', + 'Sessions': '90000' + }, + { + 'StartRate': '900.0', + 'type': 'SGW_Nodal', + 'DisconnectRate': '900.0', + 'Sessions': '90000' + } + ], + 'dmf': + { + 'transactionRate': 1000, + 'packetSize': 512 + } + } + } + + CONTEXT_CFG = { + 'contexts': [ + { + 'type': 'Node', + 'name': 'traffic_gen', + 'file': '/etc/yardstick/nodes/pod_landslide.yaml' + }, + { + 'type': 'Node', + 'name': 'vnf_epc', + 'file': '/etc/yardstick/nodes/pod_vepc_sut.yaml' + } + ] + } + + TRAFFIC_PROFILE = { + "schema": "nsb:traffic_profile:0.1", + "name": "LandslideProfile", + "description": "Spirent Landslide traffic profile", + "traffic_profile": { + "traffic_type": "LandslideProfile" + }, + "dmf_config": { + "dmf": { + "library": "test", + "name": "Basic UDP" + }, + "description": "Basic data flow using UDP/IP", + "keywords": "UDP", + "dataProtocol": "udp" + } + } + + SUCCESS_CREATED_CODE = 201 + SUCCESS_OK_CODE = 200 + SUCCESS_RECORD_ID = 5 + TEST_USER_ID = 11 + + def setUp(self): + self._id = uuid.uuid1().int + + self.mock_lsapi = mock.patch.object(tg_landslide, 'LsApi') + self.mock_lsapi.start() + + self.mock_ssh_helper = mock.patch.object(sample_vnf, 'VnfSshHelper') + self.mock_ssh_helper.start() + self.vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0] + self.ls_tg = tg_landslide.LandslideTrafficGen( + NAME, self.vnfd, self._id) + self.session_profile = copy.deepcopy(SESSION_PROFILE) + self.ls_tg.session_profile = self.session_profile + + self.addCleanup(self._cleanup) + + def _cleanup(self): + self.mock_lsapi.stop() + self.mock_ssh_helper.stop() + + @mock.patch.object(net_serv_utils, 'get_nsb_option') + def test___init__(self, mock_get_nsb_option, *args): + _path_to_nsb = 'path/to/nsb' + mock_get_nsb_option.return_value = _path_to_nsb + ls_tg = tg_landslide.LandslideTrafficGen(NAME, self.vnfd, self._id) + self.assertIsInstance(ls_tg.resource_helper, + tg_landslide.LandslideResourceHelper) + mock_get_nsb_option.assert_called_once_with('bin_path') + self.assertEqual(_path_to_nsb, ls_tg.bin_path) + self.assertEqual(NAME, ls_tg.name) + self.assertTrue(ls_tg.runs_traffic) + self.assertFalse(ls_tg.traffic_finished) + self.assertIsNone(ls_tg.session_profile) + + def test_listen_traffic(self): + _traffic_profile = {} + self.assertIsNone(self.ls_tg.listen_traffic(_traffic_profile)) + + def test_terminate(self, *args): + self.ls_tg.resource_helper._tcl = mock.Mock() + self.assertIsNone(self.ls_tg.terminate()) + self.ls_tg.resource_helper._tcl.disconnect.assert_called_once() + + @mock.patch.object(ctx_base.Context, 'get_context_from_server', + return_value='fake_context') + def test_instantiate(self, *args): + self.ls_tg._tg_process = mock.Mock() + self.ls_tg._tg_process.start = mock.Mock() + self.ls_tg.resource_helper.connect = mock.Mock() + self.ls_tg.resource_helper.create_test_servers = mock.Mock() + self.ls_tg.resource_helper.create_suts = mock.Mock() + self.ls_tg._load_session_profile = mock.Mock() + self.assertIsNone(self.ls_tg.instantiate(self.SCENARIO_CFG, + self.CONTEXT_CFG)) + self.ls_tg.resource_helper.connect.assert_called_once() + self.ls_tg.resource_helper.create_test_servers.assert_called_once() + _suts_blocks_num = len([item['suts'] for item in self.vnfd['config']]) + self.assertEqual(_suts_blocks_num, + self.ls_tg.resource_helper.create_suts.call_count) + self.ls_tg._load_session_profile.assert_called_once() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests') + def test_run_traffic(self, mock_get_tests, *args): + self.ls_tg.resource_helper._url = EXAMPLE_URL + self.ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG + mock_traffic_profile = mock.Mock( + spec=landslide_profile.LandslideProfile) + mock_traffic_profile.dmf_config = {'keywords': 'UDP', + 'dataProtocol': 'udp'} + mock_traffic_profile.params = self.TRAFFIC_PROFILE + self.ls_tg.resource_helper._user_id = self.TEST_USER_ID + mock_get_tests.return_value = [{'id': self.SUCCESS_RECORD_ID, + 'testStateOrStep': 'COMPLETE'}] + mock_post = mock.Mock() + mock_post.status_code = self.SUCCESS_CREATED_CODE + mock_post.json.return_value = {'id': self.SUCCESS_RECORD_ID} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value = mock_post + self.ls_tg.resource_helper.session = mock_session + self.ls_tg.resource_helper._tcl = mock.Mock() + _tcl = self.ls_tg.resource_helper._tcl + self.assertIsNone(self.ls_tg.run_traffic(mock_traffic_profile)) + self.assertEqual(self.SUCCESS_RECORD_ID, + self.ls_tg.resource_helper.run_id) + mock_traffic_profile.update_dmf.assert_called_with( + self.ls_tg.scenario_helper.all_options) + _tcl.create_dmf.assert_called_with(mock_traffic_profile.dmf_config) + _tcl.create_test_session.assert_called_with(self.session_profile) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'check_running_test_state') + def test_collect_kpi(self, mock_check_running_test_state, *args): + self.ls_tg.resource_helper.run_id = self.SUCCESS_RECORD_ID + mock_check_running_test_state.return_value = 'COMPLETE' + self.assertEqual({'done': True}, self.ls_tg.collect_kpi()) + mock_check_running_test_state.assert_called_once() + + def test_wait_for_instantiate(self): + self.assertIsNone(self.ls_tg.wait_for_instantiate()) + self.ls_tg.wait_for_instantiate() + + def test__update_session_suts_no_tc_role(self, *args): + _suts = [{'role': 'epc_role'}] + _testcase = {'parameters': {'diff_epc_role': {'class': 'Sut'}}} + res = self.ls_tg._update_session_suts(_suts, _testcase) + self.assertEqual(_testcase, res) + + def test__update_session_suts(self, *args): + + def get_testnode_param(role, key, session_prof): + """ Get value by key from the deep nested dict to avoid calls like: + e.g. session_prof['tsGroups'][0]['testCases'][1]['parameters'][key] + """ + for group in session_prof['tsGroups']: + for tc in group['testCases']: + tc_params = tc['parameters'] + if tc_params.get(role): + return tc_params[role][key] + + def get_sut_param(role, key, suts): + """ Search list of dicts for one with specific role. + Return the value of related dict by key. Expect key presence. + """ + for sut in suts: + if sut.get('role') == role: + return sut[key] + + # TestNode to verify + testnode_role = 'SgwControlAddr' + # SUT to verify + sut_role = 'SgwUserSut' + + config_suts = [config['suts'] for config in self.vnfd['config']] + session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups'] + for _tc in _ts_group['testCases']] + for suts, tc in zip(config_suts, session_tcs): + self.assertEqual(tc, self.ls_tg._update_session_suts(suts, tc)) + + # Verify TestNode class objects keys were updated + for _key in {'ip', 'phy', 'nextHop'}: + self.assertEqual( + get_testnode_param(testnode_role, _key, self.ls_tg.session_profile), + get_sut_param(testnode_role, _key, TS1_SUTS)) + # Verify Sut class objects name was updated + self.assertEqual( + get_testnode_param(sut_role, 'name', self.ls_tg.session_profile), + get_sut_param(sut_role, 'name', TS2_SUTS)) + + def test__update_session_test_servers(self, *args): + for ts_index, ts in enumerate(TEST_SERVERS): + self.assertIsNone( + self.ls_tg._update_session_test_servers(ts, ts_index)) + # Verify preResolvedArpAddress key was added + self.assertTrue(any( + _item.get('preResolvedArpAddress') + for _item in self.ls_tg.session_profile['tsGroups'])) + # Verify reservations key was added to session profile + self.assertEqual(RESERVATIONS, + self.ls_tg.session_profile.get('reservations')) + self.assertEqual('true', + self.ls_tg.session_profile.get('reservePorts')) + + def test__update_session_tc_params_assoc_phys(self): + _tc_options = {'AssociatedPhys': 'eth1'} + _testcase = {} + _testcase_orig = copy.deepcopy(_testcase) + res = self.ls_tg._update_session_tc_params(_tc_options, _testcase) + self.assertNotEqual(_testcase_orig, res) + self.assertEqual(_tc_options, _testcase) + + def test__update_session_tc_params(self, *args): + + def get_session_tc_param_value(param, tc_type, session_prof): + """ Get param value from the deep nested dict to avoid calls like: + session_prof['tsGroups'][0]['testCases'][0]['parameters'][key] + """ + for test_group in session_prof['tsGroups']: + session_tc = test_group['testCases'][0] + if session_tc['type'] == tc_type: + return session_tc['parameters'].get(param) + + session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups'] + for _tc in _ts_group['testCases']] + scenario_tcs = [_tc for _tc in + self.SCENARIO_CFG['options']['test_cases']] + for tc_options, tc in zip(scenario_tcs, session_tcs): + self.assertEqual( + tc, + self.ls_tg._update_session_tc_params(tc_options, tc)) + + # Verify that each test case parameter was updated + # Params been compared are deeply nested. Using loops to ease access. + for _tc in self.SCENARIO_CFG['options']['test_cases']: + for _key, _val in _tc.items(): + if _key != 'type': + self.assertEqual( + _val, + get_session_tc_param_value(_key, _tc.get('type'), + self.ls_tg.session_profile)) + + @mock.patch.object(common_utils, 'open_relative_file') + @mock.patch.object(yaml_loader, 'yaml_load') + @mock.patch.object(tg_landslide.LandslideTrafficGen, + '_update_session_test_servers') + @mock.patch.object(tg_landslide.LandslideTrafficGen, + '_update_session_suts') + @mock.patch.object(tg_landslide.LandslideTrafficGen, + '_update_session_tc_params') + def test__load_session_profile(self, mock_upd_ses_tc_params, + mock_upd_ses_suts, mock_upd_ses_ts, + mock_yaml_load, *args): + self.ls_tg.scenario_helper.scenario_cfg = \ + copy.deepcopy(self.SCENARIO_CFG) + mock_yaml_load.return_value = copy.deepcopy(SESSION_PROFILE) + self.assertIsNone(self.ls_tg._load_session_profile()) + self.assertIsNotNone(self.ls_tg.session_profile) + # Number of blocks in configuration files + # Number of test servers, suts and tc params blocks should be equal + _config_files_blocks_num = len([item['test_server'] + for item in self.vnfd['config']]) + self.assertEqual(_config_files_blocks_num, + mock_upd_ses_ts.call_count) + self.assertEqual(_config_files_blocks_num, + mock_upd_ses_suts.call_count) + self.assertEqual(_config_files_blocks_num, + mock_upd_ses_tc_params.call_count) + + @mock.patch.object(common_utils, 'open_relative_file') + @mock.patch.object(yaml_loader, 'yaml_load') + def test__load_session_profile_unequal_num_of_cfg_blocks( + self, mock_yaml_load, *args): + vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0]) + ls_traffic_gen = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id) + ls_traffic_gen.scenario_helper.scenario_cfg = self.SCENARIO_CFG + mock_yaml_load.return_value = copy.deepcopy(SESSION_PROFILE) + # Delete test_servers item from pod file to make it not valid + ls_traffic_gen.vnfd_helper['config'].pop() + with self.assertRaises(RuntimeError): + ls_traffic_gen._load_session_profile() + + @mock.patch.object(common_utils, 'open_relative_file') + @mock.patch.object(yaml_loader, 'yaml_load') + def test__load_session_profile_test_type_mismatch(self, mock_yaml_load, + *args): + vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0]) + # Swap test servers data in pod file + vnfd['config'] = list(reversed(vnfd['config'])) + ls_tg = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id) + ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG + mock_yaml_load.return_value = SESSION_PROFILE + with self.assertRaises(RuntimeError): + ls_tg._load_session_profile() + + +class TestLandslideResourceHelper(unittest.TestCase): + + PROTO_PORT = 8080 + EXAMPLE_URL = ''.join([TAS_INFO['proto'], '://', TAS_INFO['ip'], ':', + str(PROTO_PORT), '/api/']) + SUCCESS_CREATED_CODE = 201 + SUCCESS_OK_CODE = 200 + INVALID_REST_CODE = '400' + NOT_MODIFIED_CODE = 500810 + ERROR_CODE = 500800 + SUCCESS_RECORD_ID = 11 + EXPIRE_DATE = '2020/01/01 12:00 FLE Standard Time' + TEST_USER = 'test' + TEST_TERMINATED = 1 + AUTH_DATA = {'user': TAS_INFO['user'], 'password': TAS_INFO['password']} + TEST_SESSION_NAME = 'default_bearer_capacity' + + USERS_DATA = { + "users": [{ + "url": ''.join([EXAMPLE_URL, 'users/', str(SUCCESS_RECORD_ID)]), + "id": SUCCESS_RECORD_ID, + "level": 1, + "username": TEST_USER + }] + } + + CREATE_USER_DATA = {'username': TAS_INFO['user'], + 'expiresOn': EXPIRE_DATE, + 'level': 1, + 'contactInformation': '', + 'fullName': 'Test User', + 'password': TAS_INFO['password'], + 'isActive': 'true'} + + SUTS_DATA = { + "suts": [ + { + "url": ''.join([EXAMPLE_URL, 'suts/', str(SUCCESS_RECORD_ID)]), + "id": SUCCESS_RECORD_ID, + "name": "10.41.32.1" + }]} + + TEST_SERVERS_DATA = { + "testServers": [ + { + "url": ''.join([EXAMPLE_URL, "testServers/1"]), + "id": 1, + "name": TEST_SERVERS[0]['name'], + "state": "READY", + "version": "16.4.0.10" + }, + { + "url": ''.join([EXAMPLE_URL, "testServers/2"]), + "id": 2, + "name": TEST_SERVERS[1]['name'], + "state": "READY", + "version": "16.4.0.10" + } + + ] + } + + RUN_ID = 3 + + RUNNING_TESTS_DATA = { + "runningTests": [{ + "url": ''.join([EXAMPLE_URL, "runningTests/{}".format(RUN_ID)]), + "measurementsUrl": ''.join( + [EXAMPLE_URL, + "runningTests/{}/measurements".format(RUN_ID)]), + "criteriaUrl": ''.join( + [EXAMPLE_URL, + "runningTests/{}/criteria".format(RUN_ID)]), + "noteToUser": "", + "id": RUN_ID, + "library": SUCCESS_RECORD_ID, + "name": "default_bearer_capacity", + "user": TEST_USER, + "criteriaStatus": "NA", + "testStateOrStep": "COMPLETE" + }]} + + TEST_RESULTS_DATA = { + "interval": 0, + "elapsedTime": 138, + "actualTime": 1521548057296, + "iteration": 1, + "tabs": { + "Test Summary": { + "Start Time": "Tue Mar 20 07:11:55 CDT 2018", + "Actual Dedicated Bearer Session Connects": "100", + "Actual Dedicated Bearer Session Disconnects": "100", + "Actual Disconnect Rate(Sessions / Second)(P - I)": "164.804", + "Average Session Disconnect Time(P - I)": "5.024 s", + "Total Data Sent + Received Packets / Sec(P - I)": "1,452.294" + }}} + + def setUp(self): + self.mock_lsapi = mock.patch.object(tg_landslide, 'LsApi') + self.mock_lsapi.start() + + mock_env_helper = mock.Mock() + self.res_helper = tg_landslide.LandslideResourceHelper(mock_env_helper) + self.res_helper._url = EXAMPLE_URL + + self.addCleanup(self._cleanup) + + def _cleanup(self): + self.mock_lsapi.stop() + self.res_helper._url = None + + def test___init__(self, *args): + self.assertIsInstance(self.res_helper, + tg_landslide.LandslideResourceHelper) + self.assertEqual({}, self.res_helper._result) + self.assertIsNone(self.res_helper.run_id) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'stop_running_tests') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests') + def test_abort_running_tests_no_running_tests(self, mock_get_tests, + mock_stop_tests, *args): + tests_data = [{'id': self.SUCCESS_RECORD_ID, + 'testStateOrStep': 'COMPLETE'}] + mock_get_tests.return_value = tests_data + self.assertIsNone(self.res_helper.abort_running_tests()) + mock_stop_tests.assert_not_called() + + @mock.patch.object(time, 'sleep') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'stop_running_tests') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests') + def test_abort_running_tests(self, mock_get_tests, mock_stop_tests, *args): + test_states_seq = iter(['RUNNING', 'COMPLETE']) + + def configure_mock(*args): + return [{'id': self.SUCCESS_RECORD_ID, + 'testStateOrStep': next(test_states_seq)}] + + mock_get_tests.side_effect = configure_mock + self.assertIsNone(self.res_helper.abort_running_tests()) + mock_stop_tests.assert_called_once_with( + running_test_id=self.SUCCESS_RECORD_ID, + force=True) + self.assertEqual(2, mock_get_tests.call_count) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'stop_running_tests') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests') + def test_abort_running_tests_error(self, mock_get_tests, mock_stop_tests, + *args): + tests_data = {'id': self.SUCCESS_RECORD_ID, + 'testStateOrStep': 'RUNNING'} + mock_get_tests.return_value = [tests_data] + with self.assertRaises(RuntimeError): + self.res_helper.abort_running_tests(timeout=1, delay=1) + mock_stop_tests.assert_called_with( + running_test_id=self.SUCCESS_RECORD_ID, + force=True) + + def test__build_url(self, *args): + resource = 'users' + action = {'action': 'userCreate'} + expected_url = ''.join([EXAMPLE_URL, 'users?action=userCreate']) + self.assertEqual(expected_url, + self.res_helper._build_url(resource, action)) + + def test__build_url_error(self, *args): + resource = '' + action = {'action': 'userCreate'} + + with self.assertRaises(ValueError): + self.res_helper._build_url(resource, action) + + def test_get_response_params(self, *args): + method = 'get' + resource = 'users' + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.USERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + resp = self.res_helper.get_response_params(method, resource) + self.assertTrue(resp) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, '_get_users') + @mock.patch.object(time, 'time') + def test__create_user(self, mock_time, mock_get_users, *args): + mock_time.strftime.return_value = self.EXPIRE_DATE + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertEqual(self.SUCCESS_RECORD_ID, + self.res_helper._create_user(self.AUTH_DATA)) + mock_get_users.assert_not_called() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, '_modify_user') + @mock.patch.object(time, 'time') + def test__create_user_username_exists(self, mock_time, mock_modify_user, + *args): + mock_time.strftime.return_value = self.EXPIRE_DATE + mock_modify_user.return_value = {'id': self.SUCCESS_RECORD_ID, + 'result': 'No changes requested'} + post_resp_data = { + 'status_code': self.ERROR_CODE, + 'json.return_value': {'id': self.SUCCESS_OK_CODE, + 'apiCode': self.NOT_MODIFIED_CODE}} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + res = self.res_helper._create_user(self.AUTH_DATA) + mock_modify_user.assert_called_once_with(TAS_INFO['user'], + {'isActive': 'true'}) + self.assertEqual(self.SUCCESS_RECORD_ID, res) + + @mock.patch.object(time, 'time') + def test__create_user_error(self, mock_time, *args): + mock_time.strftime.return_value = self.EXPIRE_DATE + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': {'apiCode': self.ERROR_CODE}} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + with self.assertRaises(exceptions.RestApiError): + self.res_helper._create_user(self.AUTH_DATA) + + def test__modify_user(self, *args): + post_data = {'username': 'test_user'} + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + res = self.res_helper._modify_user(username=self.TEST_USER, + fields=post_data) + self.assertEqual(self.SUCCESS_RECORD_ID, res['id']) + + def test__modify_user_rest_resp_fail(self, *args): + post_data = {'non-existing-key': ''} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.ok = False + self.res_helper.session = mock_session + self.assertRaises(exceptions.RestApiError, + self.res_helper._modify_user, + username=self.TEST_USER, fields=post_data) + mock_session.post.assert_called_once() + + def test__delete_user(self, *args): + mock_session = mock.Mock(spec=requests.Session) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper._delete_user( + username=self.TEST_USER)) + + def test__get_users(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.USERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + self.assertEqual(self.USERS_DATA['users'], + self.res_helper._get_users()) + + def test_exec_rest_request(self, *args): + resource = 'testServers' + action = {'action': 'modify'} + expected_url = ''.join([EXAMPLE_URL, 'testServers?action=modify']) + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.res_helper.exec_rest_request('post', resource, action) + self.res_helper.session.post.assert_called_once_with(expected_url, + json={}) + + def test_exec_rest_request_unsupported_method_error(self, *args): + resource = 'testServers' + action = {'action': 'modify'} + with self.assertRaises(ValueError): + self.res_helper.exec_rest_request('patch', resource, action) + + def test_exec_rest_request_missed_action_arg(self, *args): + resource = 'testServers' + with self.assertRaises(ValueError): + self.res_helper.exec_rest_request('post', resource) + + def test_exec_rest_request_raise_exc(self): + resource = 'users' + action = {'action': 'modify'} + post_resp_data = {'status_code': self.ERROR_CODE, + 'json.return_value': { + 'status_code': self.ERROR_CODE}} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.assertRaises(exceptions.RestApiError, + self.res_helper.exec_rest_request, + 'post', resource, action, raise_exc=True) + + @mock.patch.object(time, 'time') + def test_connect(self, mock_time, *args): + vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0] + mock_time.strftime.return_value = self.EXPIRE_DATE + self.res_helper.vnfd_helper = vnfd + + self.res_helper._tcl = mock.Mock() + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session = mock.Mock(spec=requests.Session, headers={}) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertIsInstance(self.res_helper.connect(), requests.Session) + self.res_helper._tcl.connect.assert_called_once_with( + TAS_INFO['ip'], + TAS_INFO['user'], + TAS_INFO['password']) + + def test_disconnect(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.disconnect()) + self.assertIsNone(self.res_helper.session) + self.res_helper._tcl.disconnect.assert_called_once() + + def test_terminate(self, *args): + self.assertIsNone(self.res_helper.terminate()) + self.assertEqual(self.TEST_TERMINATED, + self.res_helper._terminated.value) + + def test_create_dmf(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.create_dmf(DMF_CFG)) + self.res_helper._tcl.create_dmf.assert_called_once_with(DMF_CFG) + + def test_create_dmf_as_list(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.create_dmf([DMF_CFG])) + self.res_helper._tcl.create_dmf.assert_called_once_with(DMF_CFG) + + def test_delete_dmf(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.delete_dmf(DMF_CFG)) + self.res_helper._tcl.delete_dmf.assert_called_once_with(DMF_CFG) + + def test_delete_dmf_as_list(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.delete_dmf([DMF_CFG])) + self.res_helper._tcl.delete_dmf.assert_called_once_with(DMF_CFG) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, 'configure_sut') + def test_create_suts(self, mock_configure_sut, *args): + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.create_suts(TS1_SUTS)) + mock_configure_sut.assert_not_called() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, 'configure_sut') + def test_create_suts_sut_exists(self, mock_configure_sut, *args): + sut_name = 'test_sut' + suts = [ + {'name': sut_name, + 'role': 'SgwControlAddr', + 'managementIp': '12.0.1.1', + 'ip': '10.42.32.100' + } + ] + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.NOT_MODIFIED_CODE} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.create_suts(suts)) + mock_configure_sut.assert_called_once_with( + sut_name=sut_name, + json_data={k: v for k, v in suts[0].items() + if k not in {'phy', 'nextHop', 'role', 'name'}}) + + def test_get_suts(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.SUTS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + self.assertIsInstance(self.res_helper.get_suts(), list) + + def test_get_suts_single_id(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.SUTS_DATA['suts'][0]} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + self.assertIsInstance(self.res_helper.get_suts(suts_id=2), dict) + + def test_configure_sut(self, *args): + post_data = {'managementIp': '2.2.2.2'} + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.configure_sut('test_name', + post_data)) + mock_session.post.assert_called_once() + + def test_configure_sut_error(self, *args): + post_data = {'managementIp': '2.2.2.2'} + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.NOT_MODIFIED_CODE} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + with self.assertRaises(exceptions.RestApiError): + self.res_helper.configure_sut('test_name', post_data) + + def test_delete_suts(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.SUTS_DATA} + delete_resp_data = {'status_code': self.SUCCESS_OK_CODE} + mock_session.get.return_value.configure_mock(**get_resp_data) + mock_session.delete.return_value.configure_mock(**delete_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.delete_suts()) + mock_session.delete.assert_called_once() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_test_servers') + def test__check_test_servers_state(self, mock_get_test_servers, *args): + mock_get_test_servers.return_value = \ + self.TEST_SERVERS_DATA['testServers'] + self.res_helper._check_test_servers_state() + mock_get_test_servers.assert_called_once() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_test_servers') + def test__check_test_servers_state_server_not_ready( + self, mock_get_test_servers, *args): + test_servers_not_ready = [ + { + "url": ''.join([EXAMPLE_URL, "testServers/1"]), + "id": 1, + "name": "TestServer_1", + "state": "NOT_READY", + "version": "16.4.0.10" + } + ] + + mock_get_test_servers.return_value = test_servers_not_ready + with self.assertRaises(RuntimeError): + self.res_helper._check_test_servers_state(timeout=1, delay=0) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + '_check_test_servers_state') + def test_create_test_servers(self, mock_check_ts_state, *args): + test_servers_ids = [ + ts['id'] for ts in self.TEST_SERVERS_DATA['testServers']] + + self.res_helper.license_data['lic_id'] = TAS_INFO['license'] + self.res_helper._tcl.create_test_server = mock.Mock() + self.res_helper._tcl.create_test_server.side_effect = test_servers_ids + self.assertIsNone(self.res_helper.create_test_servers(TEST_SERVERS)) + mock_check_ts_state.assert_called_once_with(test_servers_ids) + + @mock.patch.object(tg_landslide.LandslideTclClient, + 'resolve_test_server_name') + @mock.patch.object(tg_landslide.LsTclHandler, 'execute') + def test_create_test_servers_error(self, mock_execute, + mock_resolve_ts_name, *args): + self.res_helper.license_data['lic_id'] = TAS_INFO['license'] + # Return message for case test server wasn't created + mock_execute.return_value = 'TS not found' + # Return message for case test server name wasn't resolved + mock_resolve_ts_name.return_value = 'TS not found' + with self.assertRaises(RuntimeError): + self.res_helper.create_test_servers(TEST_SERVERS) + + def test_get_test_servers(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.TEST_SERVERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_test_servers() + self.assertEqual(self.TEST_SERVERS_DATA['testServers'], res) + + def test_get_test_servers_by_id(self, *args): + mock_session = mock.Mock(spec=requests.Session) + + _ts = self.TEST_SERVERS_DATA['testServers'][0] + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': _ts} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_test_servers(test_server_ids=[_ts['id']]) + self.assertEqual([_ts], res) + + def test_configure_test_servers(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.TEST_SERVERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.configure_test_servers( + action={'action': 'recycle'}) + self.assertEqual( + [x['id'] for x in self.TEST_SERVERS_DATA['testServers']], + res) + self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']), + mock_session.post.call_count) + + def test_delete_test_servers(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.TEST_SERVERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.delete_test_servers()) + self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']), + mock_session.delete.call_count) + + def test_create_test_session_res_helper(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + self.res_helper._tcl = mock.Mock() + test_session = {'name': 'test'} + self.assertIsNone(self.res_helper.create_test_session(test_session)) + self.res_helper._tcl.create_test_session.assert_called_once_with( + {'name': 'test', 'library': self.SUCCESS_RECORD_ID}) + + @mock.patch.object(tg_landslide.LandslideTclClient, + 'resolve_test_server_name', + return_value='Not Found') + def test_create_test_session_ts_name_not_found(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + test_session = { + 'duration': 60, + 'description': 'UE default bearer creation test case', + 'name': 'default_bearer_capacity', + 'tsGroups': [{'testCases': [{'type': 'SGW_Node', + 'name': ''}], + 'tsId': 'TestServer_3'}] + } + with self.assertRaises(RuntimeError): + self.res_helper.create_test_session(test_session) + + def test_get_test_session(self, *args): + test_session = {"name": self.TEST_SESSION_NAME} + self.res_helper._user_id = self.SUCCESS_RECORD_ID + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': test_session} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_test_session(self.TEST_SESSION_NAME) + self.assertEqual(test_session, res) + + def test_configure_test_session(self, *args): + test_session = {'name': self.TEST_SESSION_NAME} + self.res_helper._user_id = self.SUCCESS_RECORD_ID + self.res_helper.user_lib_uri = 'libraries/{{}}/{}'.format( + self.res_helper.test_session_uri) + mock_session = mock.Mock(spec=requests.Session) + self.res_helper.session = mock_session + res = self.res_helper.configure_test_session(self.TEST_SESSION_NAME, + test_session) + self.assertIsNotNone(res) + mock_session.post.assert_called_once() + + def test_delete_test_session(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + self.res_helper.user_lib_uri = 'libraries/{{}}/{}'.format( + self.res_helper.test_session_uri) + mock_session = mock.Mock(spec=requests.Session) + self.res_helper.session = mock_session + res = self.res_helper.delete_test_session(self.TEST_SESSION_NAME) + self.assertIsNotNone(res) + mock_session.delete.assert_called_once() + + def test_create_running_tests(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + test_session = {'id': self.SUCCESS_RECORD_ID} + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE, + 'json.return_value': test_session} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.res_helper.create_running_tests(self.TEST_SESSION_NAME) + self.assertEqual(self.SUCCESS_RECORD_ID, self.res_helper.run_id) + + def test_create_running_tests_error(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.NOT_MODIFIED_CODE} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + with self.assertRaises(exceptions.RestApiError): + self.res_helper.create_running_tests(self.TEST_SESSION_NAME) + + def test_get_running_tests(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.RUNNING_TESTS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_running_tests() + self.assertEqual(self.RUNNING_TESTS_DATA['runningTests'], res) + + def test_delete_running_tests(self, *args): + mock_session = mock.Mock(spec=requests.Session) + delete_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.RUNNING_TESTS_DATA} + mock_session.delete.return_value.configure_mock(**delete_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.delete_running_tests()) + + def test__running_tests_action(self, *args): + action = 'abort' + mock_session = mock.Mock(spec=requests.Session) + self.res_helper.session = mock_session + res = self.res_helper._running_tests_action(self.SUCCESS_RECORD_ID, + action) + self.assertIsNone(res) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + '_running_tests_action') + def test_stop_running_tests(self, mock_tests_action, *args): + res = self.res_helper.stop_running_tests(self.SUCCESS_RECORD_ID) + self.assertIsNone(res) + mock_tests_action.assert_called_once() + + def test_check_running_test_state(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = { + 'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.RUNNING_TESTS_DATA["runningTests"][0]} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.check_running_test_state(self.SUCCESS_RECORD_ID) + self.assertEqual( + self.RUNNING_TESTS_DATA["runningTests"][0]['testStateOrStep'], + res) + + def test_get_running_tests_results(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.TEST_RESULTS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_running_tests_results( + self.SUCCESS_RECORD_ID) + self.assertEqual(self.TEST_RESULTS_DATA, res) + + def test__write_results(self, *args): + res = self.res_helper._write_results(self.TEST_RESULTS_DATA) + exp_res = { + "Test Summary::Actual Dedicated Bearer Session Connects": 100.0, + "Test Summary::Actual Dedicated Bearer Session Disconnects": 100.0, + "Test Summary::Actual Disconnect Rate(Sessions / Second)(P - I)": 164.804, + "Test Summary::Average Session Disconnect Time(P - I)": 5.024, + "Test Summary::Total Data Sent + Received Packets / Sec(P - I)": 1452.294 + } + self.assertEqual(exp_res, res) + + def test__write_results_no_tabs(self, *args): + _res_data = copy.deepcopy(self.TEST_RESULTS_DATA) + del _res_data['tabs'] + # Return None if tabs not found in test results dict + self.assertIsNone(self.res_helper._write_results(_res_data)) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'check_running_test_state') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests_results') + def test_collect_kpi_test_running(self, mock_tests_results, + mock_tests_state, *args): + self.res_helper.run_id = self.SUCCESS_RECORD_ID + mock_tests_state.return_value = 'RUNNING' + mock_tests_results.return_value = self.TEST_RESULTS_DATA + res = self.res_helper.collect_kpi() + self.assertNotIn('done', res) + mock_tests_state.assert_called_once_with(self.res_helper.run_id) + mock_tests_results.assert_called_once_with(self.res_helper.run_id) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'check_running_test_state') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests_results') + def test_collect_kpi_test_completed(self, mock_tests_results, + mock_tests_state, *args): + self.res_helper.run_id = self.SUCCESS_RECORD_ID + mock_tests_state.return_value = 'COMPLETE' + res = self.res_helper.collect_kpi() + self.assertIsNotNone(res) + mock_tests_state.assert_called_once_with(self.res_helper.run_id) + mock_tests_results.assert_not_called() + self.assertDictContainsSubset({'done': True}, res) + + +class TestLandslideTclClient(unittest.TestCase): + def setUp(self): + self.mock_tcl_handler = mock.Mock(spec=tg_landslide.LsTclHandler) + self.ls_res_helper = mock.Mock( + spec=tg_landslide.LandslideResourceHelper) + self.ls_tcl_client = tg_landslide.LandslideTclClient( + self.mock_tcl_handler, + self.ls_res_helper) + + def test___init__(self, *args): + self.ls_tcl_client = tg_landslide.LandslideTclClient( + self.mock_tcl_handler, + self.ls_res_helper) + self.assertIsNone(self.ls_tcl_client.tcl_server_ip) + self.assertIsNone(self.ls_tcl_client._user) + self.assertIsNone(self.ls_tcl_client._library_id) + self.assertIsNone(self.ls_tcl_client._basic_library_id) + self.assertEqual(set(), self.ls_tcl_client.ts_ids) + self.assertIsInstance(self.ls_tcl_client._tc_types, set) + self.assertIsNotNone(self.ls_tcl_client._tc_types) + + def test_connect_login_success(self, *args): + lib_id = '123' + exec_responses = ['java0x2', lib_id, lib_id] + auth = ('user', 'password') + self.mock_tcl_handler.execute.side_effect = exec_responses + self.ls_tcl_client.connect(TAS_INFO['ip'], *auth) + self.assertEqual(lib_id, self.ls_tcl_client._library_id) + self.assertEqual(lib_id, self.ls_tcl_client._basic_library_id) + self.assertEqual(TAS_INFO['ip'], self.ls_tcl_client.tcl_server_ip) + self.assertEqual(auth[0], self.ls_tcl_client._user) + self.assertEqual(len(exec_responses), + self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call("ls::login 1.1.1.1 user password"), + mock.call("ls::get [ls::query LibraryInfo -userLibraryName user] -Id"), + ]) + + def test_connect_login_failed(self, *args): + exec_responses = ['Login failed'] + auth = ('user', 'password') + self.mock_tcl_handler.execute.side_effect = exec_responses + self.assertRaises(exceptions.LandslideTclException, + self.ls_tcl_client.connect, + TAS_INFO['ip'], + *auth) + self.assertIsNone(self.ls_tcl_client._library_id) + self.assertIsNone(self.ls_tcl_client._basic_library_id) + self.assertIsNone(self.ls_tcl_client.tcl_server_ip) + self.assertIsNone(self.ls_tcl_client._user) + self.assertEqual(len(exec_responses), + self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_called_with( + "ls::login 1.1.1.1 user password") + + def test_disconnect(self, *args): + self.ls_tcl_client.disconnect() + self.mock_tcl_handler.execute.assert_called_once_with("ls::logout") + self.assertIsNone(self.ls_tcl_client.tcl_server_ip) + self.assertIsNone(self.ls_tcl_client._user) + self.assertIsNone(self.ls_tcl_client._library_id) + self.assertIsNone(self.ls_tcl_client._basic_library_id) + + def test_create_test_server(self, *args): + return_value = '2' + self.ls_tcl_client._ts_context.vnfd_helper = \ + VNFD['vnfd:vnfd-catalog']['vnfd'][0] + self.ls_tcl_client._ts_context.license_data = {'lic_id': return_value} + self.mock_tcl_handler.execute.return_value = return_value + self.ls_tcl_client._set_thread_model = mock.Mock() + res = self.ls_tcl_client.create_test_server(TEST_SERVERS[1]) + self.assertEqual(3, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::query TsId TestServer_2'), + mock.call('set ts [ls::retrieve TsInfo -Name "TestServer_2"]'), + mock.call('ls::get $ts -RequestedLicense'), + ]) + self.ls_tcl_client._set_thread_model.assert_called_once_with( + TEST_SERVERS[1]['name'], + TEST_SERVERS[1]['thread_model']) + self.assertEqual(int(return_value), res) + + def test_create_test_server_fail_limit_reach(self, *args): + self.mock_tcl_handler.execute.side_effect = ['TS not found', + 'Add failed'] + self.assertRaises(RuntimeError, + self.ls_tcl_client.create_test_server, + TEST_SERVERS[0]) + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::query TsId TestServer_1'), + mock.call('ls::perform AddTs -Name "TestServer_1" ' + '-Ip "192.168.122.101"'), + ]) + + def test__add_test_server(self): + ts_id = '2' + self.mock_tcl_handler.execute.side_effect = ['TS not found', ts_id] + self.assertEqual(ts_id, + self.ls_tcl_client._add_test_server('name', 'ip')) + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::query TsId name'), + mock.call('ls::perform AddTs -Name "name" -Ip "ip"'), + ]) + + def test__add_test_server_failed(self): + self.mock_tcl_handler.execute.side_effect = ['TS not found', + 'Add failed'] + self.assertRaises(RuntimeError, self.ls_tcl_client._add_test_server, + 'name', 'ip') + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::query TsId name'), + mock.call('ls::perform AddTs -Name "name" -Ip "ip"'), + ]) + + def test__update_license(self): + curr_lic_id = '111' + new_lic_id = '222' + exec_resp = ['java0x4', + curr_lic_id, + TCL_SUCCESS_RESPONSE, + TCL_SUCCESS_RESPONSE] + self.ls_tcl_client._ts_context.license_data = {'lic_id': new_lic_id} + self.mock_tcl_handler.execute.side_effect = exec_resp + self.ls_tcl_client._update_license('name') + self.assertEqual(len(exec_resp), + self.mock_tcl_handler.execute.call_count) + + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('set ts [ls::retrieve TsInfo -Name "name"]'), + mock.call('ls::get $ts -RequestedLicense'), + mock.call('ls::config $ts -RequestedLicense 222'), + mock.call('ls::perform ModifyTs $ts'), + ]) + + def test__update_license_same_as_current(self): + curr_lic_id = '111' + new_lic_id = '111' + exec_resp = ['java0x4', curr_lic_id] + self.ls_tcl_client._ts_context.license_data = {'lic_id': new_lic_id} + self.mock_tcl_handler.execute.side_effect = exec_resp + self.ls_tcl_client._update_license('name') + self.assertEqual(len(exec_resp), + self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('set ts [ls::retrieve TsInfo -Name "name"]'), + mock.call('ls::get $ts -RequestedLicense'), + ]) + + def test__set_thread_model_update_needed(self): + self.ls_tcl_client._ts_context.vnfd_helper = { + 'mgmt-interface': { + 'cfguser_password': 'cfguser_password' + } + } + exec_resp = ['java0x4', 'V0', '', ''] + self.mock_tcl_handler.execute.side_effect = exec_resp + self.ls_tcl_client._set_thread_model('name', 'Fireball') + self.assertEqual(len(exec_resp), + self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('set tsc [ls::perform RetrieveTsConfiguration ' + '-name "name" cfguser_password]'), + mock.call('ls::get $tsc -ThreadModel'), + mock.call('ls::config $tsc -ThreadModel "V1_FB3"'), + mock.call('ls::perform ApplyTsConfiguration $tsc cfguser_password'), + ]) + + def test__set_thread_model_no_update_needed(self): + self.ls_tcl_client._ts_context.vnfd_helper = { + 'mgmt-interface': { + 'cfguser_password': 'cfguser_password' + } + } + exec_resp = ['java0x4', 'V0'] + self.mock_tcl_handler.execute.side_effect = exec_resp + self.ls_tcl_client._set_thread_model('name', 'Legacy') + self.assertEqual(len(exec_resp), + self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('set tsc [ls::perform RetrieveTsConfiguration ' + '-name "name" cfguser_password]'), + mock.call('ls::get $tsc -ThreadModel'), + ]) + + @mock.patch.object(tg_landslide.LandslideTclClient, + 'resolve_test_server_name', side_effect=['4', '2']) + def test_create_test_session(self, *args): + _session_profile = copy.deepcopy(SESSION_PROFILE) + _session_profile['reservations'] = RESERVATIONS + self.ls_tcl_client._save_test_session = mock.Mock() + self.ls_tcl_client._configure_ts_group = mock.Mock() + self.ls_tcl_client._library_id = 42 + self.ls_tcl_client.create_test_session(_session_profile) + self.assertEqual(17, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('set test_ [ls::create TestSession]'), + mock.call('ls::config $test_ -Library 42 ' + '-Name "default_bearer_capacity"'), + mock.call('ls::config $test_ -Description ' \ + '"UE default bearer creation test case"'), + mock.call('ls::config $test_ -Keywords ""'), + mock.call('ls::config $test_ -Duration "60"'), + mock.call('ls::config $test_ -Iterations "1"'), + # _configure_reservation + mock.call('set reservation_ [ls::create Reservation -under $test_]'), + mock.call('ls::config $reservation_ -TsIndex 0 ' + '-TsId 4 -TsName "TestServer_1"'), + mock.call('set physubnet_ [ls::create PhySubnet -under $reservation_]'), + mock.call('ls::config $physubnet_ -Name "eth1" -Base "10.42.32.100" ' + '-Mask "/24" -NumIps 20'), + # _configure_reservation + mock.call('set reservation_ [ls::create Reservation -under $test_]'), + mock.call('ls::config $reservation_ -TsIndex 1 ' + '-TsId 2 -TsName "TestServer_2"'), + mock.call('set physubnet_ [ls::create PhySubnet -under $reservation_]'), + mock.call('ls::config $physubnet_ -Name "eth1" -Base "10.42.32.1" ' + '-Mask "/24" -NumIps 100'), + mock.call('set physubnet_ [ls::create PhySubnet -under $reservation_]'), + mock.call('ls::config $physubnet_ -Name "eth2" -Base "10.42.33.1" ' + '-Mask "/24" -NumIps 100'), + # _configure_report_options + mock.call('ls::config $test_.ReportOptions -Format 1 -Ts -3 -Tc -3'), + ]) + + def test_create_dmf(self): + self.mock_tcl_handler.execute.return_value = '2' + self.ls_tcl_client._save_dmf = mock.Mock() + self.ls_tcl_client.create_dmf(copy.deepcopy(DMF_CFG)) + self.assertEqual(6, self.mock_tcl_handler.execute.call_count) + # This is needed because the dictionary is unordered and the arguments + # can come in either order + call1 = mock.call( + 'ls::config $dmf_ -clientPort 2002 -isClientPortRange "false"') + call2 = mock.call( + 'ls::config $dmf_ -isClientPortRange "false" -clientPort 2002') + self.assertTrue( + call1 in self.mock_tcl_handler.execute.mock_calls or + call2 in self.mock_tcl_handler.execute.mock_calls) + + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('set dmf_ [ls::create Dmf]'), + mock.call( + 'ls::get [ls::query LibraryInfo -systemLibraryName test] -Id'), + mock.call('ls::config $dmf_ -Library 2 -Name "Basic UDP"'), + mock.call('ls::config $dmf_ -dataProtocol "udp"'), + # mock.call( + # 'ls::config $dmf_ -clientPort 2002 -isClientPortRange "false"'), + mock.call('ls::config $dmf_ -serverPort 2003'), + ], any_order=True) + + def test_configure_dmf(self): + self.mock_tcl_handler.execute.return_value = '2' + self.ls_tcl_client._save_dmf = mock.Mock() + self.ls_tcl_client.configure_dmf(DMF_CFG) + self.assertEqual(6, self.mock_tcl_handler.execute.call_count) + # This is need because the dictionary is unordered and the arguments + # can come in either order + call1 = mock.call( + 'ls::config $dmf_ -clientPort 2002 -isClientPortRange "false"') + call2 = mock.call( + 'ls::config $dmf_ -isClientPortRange "false" -clientPort 2002') + self.assertTrue( + call1 in self.mock_tcl_handler.execute.mock_calls or + call2 in self.mock_tcl_handler.execute.mock_calls) + + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('set dmf_ [ls::create Dmf]'), + mock.call( + 'ls::get [ls::query LibraryInfo -systemLibraryName test] -Id'), + mock.call('ls::config $dmf_ -Library 2 -Name "Basic UDP"'), + mock.call('ls::config $dmf_ -dataProtocol "udp"'), + # mock.call( + # 'ls::config $dmf_ -clientPort 2002 -isClientPortRange "false"'), + mock.call('ls::config $dmf_ -serverPort 2003'), + ], any_order=True) + + def test_delete_dmf(self): + self.assertRaises(NotImplementedError, + self.ls_tcl_client.delete_dmf, + DMF_CFG) + + def test__save_dmf_valid(self): + exec_resp = [TCL_SUCCESS_RESPONSE, TCL_SUCCESS_RESPONSE] + self.mock_tcl_handler.execute.side_effect = exec_resp + self.ls_tcl_client._save_dmf() + self.assertEqual(len(exec_resp), + self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::perform Validate -Dmf $dmf_'), + mock.call('ls::save $dmf_ -overwrite'), + ]) + + def test__save_dmf_invalid(self): + exec_resp = ['Invalid', 'List of errors and warnings'] + self.mock_tcl_handler.execute.side_effect = exec_resp + self.assertRaises(exceptions.LandslideTclException, + self.ls_tcl_client._save_dmf) + self.assertEqual(len(exec_resp), + self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::perform Validate -Dmf $dmf_'), + mock.call('ls::get $dmf_ -ErrorsAndWarnings'), + ]) + + def test__configure_report_options(self): + _options = {'format': 'CSV', 'PerInterval': 'false'} + self.ls_tcl_client._configure_report_options(_options) + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::config $test_.ReportOptions -Format 1 -Ts -3 -Tc -3'), + mock.call('ls::config $test_.ReportOptions -PerInterval false'), + ], + any_order=True) + + def test___configure_ts_group(self, *args): + _ts_group = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]) + self.ls_tcl_client._configure_tc_type = mock.Mock() + self.ls_tcl_client._configure_preresolved_arp = mock.Mock() + self.ls_tcl_client.resolve_test_server_name = mock.Mock( + return_value='2') + self.ls_tcl_client._configure_ts_group(_ts_group, 0) + self.mock_tcl_handler.execute.assert_called_once_with( + 'set tss_ [ls::create TsGroup -under $test_ -tsId 2 ]') + + def test___configure_ts_group_resolve_ts_fail(self, *args): + _ts_group = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]) + self.ls_tcl_client._configure_tc_type = mock.Mock() + self.ls_tcl_client._configure_preresolved_arp = mock.Mock() + self.ls_tcl_client.resolve_test_server_name = mock.Mock( + return_value='TS Not Found') + self.assertRaises(RuntimeError, self.ls_tcl_client._configure_ts_group, + _ts_group, 0) + self.mock_tcl_handler.execute.assert_not_called() + + def test__configure_tc_type(self): + _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0]) + self.mock_tcl_handler.execute.return_value = TCL_SUCCESS_RESPONSE + self.ls_tcl_client._configure_parameters = mock.Mock() + self.ls_tcl_client._configure_tc_type(_tc, 0) + self.assertEqual(7, self.mock_tcl_handler.execute.call_count) + + def test__configure_tc_type_optional_param_omitted(self): + _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0]) + del _tc['linked'] + self.mock_tcl_handler.execute.return_value = TCL_SUCCESS_RESPONSE + self.ls_tcl_client._configure_parameters = mock.Mock() + self.ls_tcl_client._configure_tc_type(_tc, 0) + self.assertEqual(6, self.mock_tcl_handler.execute.call_count) + + def test__configure_tc_type_wrong_type(self): + _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0]) + _tc['type'] = 'not_supported' + self.ls_tcl_client._configure_parameters = mock.Mock() + self.assertRaises(RuntimeError, + self.ls_tcl_client._configure_tc_type, + _tc, 0) + self.mock_tcl_handler.assert_not_called() + + def test__configure_tc_type_not_found_basic_lib(self): + _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0]) + self.ls_tcl_client._configure_parameters = mock.Mock() + self.mock_tcl_handler.execute.return_value = 'Invalid' + self.assertRaises(RuntimeError, + self.ls_tcl_client._configure_tc_type, + _tc, 0) + + def test__configure_parameters(self): + _params = copy.deepcopy( + SESSION_PROFILE['tsGroups'][0]['testCases'][0]['parameters']) + self.ls_tcl_client._configure_parameters(_params) + self.assertEqual(16, self.mock_tcl_handler.execute.call_count) + + def test__configure_array_param(self): + _array = {"class": "Array", + "array": ["0"]} + self.ls_tcl_client._configure_array_param('name', _array) + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::create -Array-name -under $p_ ;'), + mock.call('ls::create ArrayItem -under $p_.name -Value "0"'), + ]) + + def test__configure_test_node_param(self): + _params = copy.deepcopy( + SESSION_PROFILE['tsGroups'][0]['testCases'][0]['parameters']) + self.ls_tcl_client._configure_test_node_param('SgwUserAddr', + _params['SgwUserAddr']) + cmd = ('ls::create -TestNode-SgwUserAddr -under $p_ -Type "eth" ' + '-Phy "eth1" -Ip "SGW_USER_IP" -NumLinksOrNodes 1 ' + '-NextHop "SGW_CONTROL_NEXT_HOP" -Mac "" -MTU 1500 ' + '-ForcedEthInterface "" -EthStatsEnabled false -VlanId 0 ' + '-VlanUserPriority 0 -NumVlan 1 -UniqueVlanAddr false;') + self.mock_tcl_handler.execute.assert_called_once_with(cmd) + + def test__configure_sut_param(self): + _params = {'name': 'name'} + self.ls_tcl_client._configure_sut_param('name', _params) + self.mock_tcl_handler.execute.assert_called_once_with( + 'ls::create -Sut-name -under $p_ -Name "name";') + + def test__configure_dmf_param(self): + _params = {"mainflows": [{"library": '111', + "name": "Basic UDP"}], + "instanceGroups": [{ + "mainflowIdx": 0, + "mixType": "", + "rate": 0.0, + "rows": [{ + "clientPort": 0, + "context": 0, + "node": 0, + "overridePort": "false", + "ratingGroup": 0, + "role": 0, + "serviceId": 0, + "transport": "Any"}] + }]} + self.ls_tcl_client._get_library_id = mock.Mock(return_value='111') + res = self.ls_tcl_client._configure_dmf_param('name', _params) + self.assertEqual(5, self.mock_tcl_handler.execute.call_count) + self.assertIsNone(res) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::create -Dmf-name -under $p_ ;'), + mock.call('ls::perform AddDmfMainflow $p_.Dmf 111 "Basic UDP"'), + mock.call('ls::config $p_.Dmf.InstanceGroup(0) -mixType '), + mock.call('ls::config $p_.Dmf.InstanceGroup(0) -rate 0.0'), + mock.call('ls::config $p_.Dmf.InstanceGroup(0).Row(0) -Node 0 ' + '-OverridePort false -ClientPort 0 -Context 0 -Role 0 ' + '-PreferredTransport Any -RatingGroup 0 ' + '-ServiceID 0'), + ]) + + def test__configure_dmf_param_no_instance_groups(self): + _params = {"mainflows": [{"library": '111', + "name": "Basic UDP"}]} + self.ls_tcl_client._get_library_id = mock.Mock(return_value='111') + res = self.ls_tcl_client._configure_dmf_param('name', _params) + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.assertIsNone(res) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::create -Dmf-name -under $p_ ;'), + mock.call('ls::perform AddDmfMainflow $p_.Dmf 111 "Basic UDP"'), + ]) + + def test__configure_reservation(self): + _reservation = copy.deepcopy(RESERVATIONS[0]) + self.ls_tcl_client.resolve_test_server_name = mock.Mock( + return_value='4') + res = self.ls_tcl_client._configure_reservation(_reservation) + self.assertIsNone(res) + self.assertEqual(4, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('set reservation_ [ls::create Reservation -under $test_]'), + mock.call('ls::config $reservation_ -TsIndex 0 -TsId 4 ' + \ + '-TsName "TestServer_1"'), + mock.call('set physubnet_ [ls::create PhySubnet -under $reservation_]'), + mock.call('ls::config $physubnet_ -Name "eth1" ' + \ + '-Base "10.42.32.100" -Mask "/24" -NumIps 20'), + ]) + + def test__configure_preresolved_arp(self): + _arp = [{'StartingAddress': '10.81.1.10', + 'NumNodes': 1}] + res = self.ls_tcl_client._configure_preresolved_arp(_arp) + self.mock_tcl_handler.execute.assert_called_once() + self.assertIsNone(res) + self.mock_tcl_handler.execute.assert_called_once_with( + 'ls::create PreResolvedArpAddress -under $tss_ ' + \ + '-StartingAddress "10.81.1.10" -NumNodes 1') + + def test__configure_preresolved_arp_none(self): + res = self.ls_tcl_client._configure_preresolved_arp(None) + self.assertIsNone(res) + self.mock_tcl_handler.execute.assert_not_called() + + def test_delete_test_session(self): + self.assertRaises(NotImplementedError, + self.ls_tcl_client.delete_test_session, {}) + + def test__save_test_session(self): + self.mock_tcl_handler.execute.side_effect = [TCL_SUCCESS_RESPONSE, + TCL_SUCCESS_RESPONSE] + res = self.ls_tcl_client._save_test_session() + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.assertIsNone(res) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::perform Validate -TestSession $test_'), + mock.call('ls::save $test_ -overwrite'), + ]) + + def test__save_test_session_invalid(self): + self.mock_tcl_handler.execute.side_effect = ['Invalid', 'Errors'] + self.assertRaises(exceptions.LandslideTclException, + self.ls_tcl_client._save_test_session) + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call('ls::perform Validate -TestSession $test_'), + mock.call('ls::get $test_ -ErrorsAndWarnings'), + ]) + + def test__get_library_id_system_lib(self): + self.mock_tcl_handler.execute.return_value = '111' + res = self.ls_tcl_client._get_library_id('name') + self.mock_tcl_handler.execute.assert_called_once() + self.assertEqual('111', res) + self.mock_tcl_handler.execute.assert_called_with( + 'ls::get [ls::query LibraryInfo -systemLibraryName name] -Id') + + def test__get_library_id_user_lib(self): + self.mock_tcl_handler.execute.side_effect = ['Not found', '222'] + res = self.ls_tcl_client._get_library_id('name') + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.assertEqual('222', res) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call( + 'ls::get [ls::query LibraryInfo -systemLibraryName name] -Id'), + mock.call( + 'ls::get [ls::query LibraryInfo -userLibraryName name] -Id'), + ]) + + def test__get_library_id_exception(self): + self.mock_tcl_handler.execute.side_effect = ['Not found', 'Not found'] + self.assertRaises(exceptions.LandslideTclException, + self.ls_tcl_client._get_library_id, + 'name') + self.assertEqual(2, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_has_calls([ + mock.call( + 'ls::get [ls::query LibraryInfo -systemLibraryName name] -Id'), + mock.call( + 'ls::get [ls::query LibraryInfo -userLibraryName name] -Id'), + ]) + + +class TestLsTclHandler(unittest.TestCase): + + def setUp(self): + self.mock_lsapi = mock.patch.object(tg_landslide, 'LsApi') + self.mock_lsapi.start() + + self.addCleanup(self._cleanup) + + def _cleanup(self): + self.mock_lsapi.stop() + + def test___init__(self, *args): + self.ls_tcl_handler = tg_landslide.LsTclHandler() + self.assertEqual({}, self.ls_tcl_handler.tcl_cmds) + self.ls_tcl_handler._ls.tcl.assert_called_once() + + def test_execute(self, *args): + self.ls_tcl_handler = tg_landslide.LsTclHandler() + self.ls_tcl_handler.execute('command') + self.assertIn('command', self.ls_tcl_handler.tcl_cmds) |