From 4bcfaec22bd4c9ece155986ed2c0bd91262afd8e Mon Sep 17 00:00:00 2001 From: Emma Foley Date: Thu, 30 Aug 2018 13:54:57 +0100 Subject: Add Spirent Landslide TG API New TG class "LandslideTrafficGen" represents an interface to use Spirent Landslide API to configure and execute vEPC test cases on hardware/software Spirent environment. For that purpose this class and its helper classes use Spirent Landslide REST API calls and Landslide TCL API calls via library module lsapi.py. Change-Id: Ib6560d5cd2483c6c9f5c95568345ac39bfebbd4d Signed-off-by: Emma Foley Signed-off-by: Orest Voznyy --- .../vnf_generic/vnf/tg_landslide.py | 182 ++++++++- .../vnf_generic/vnf/test_tg_landslide.py | 405 +++++++++++++++++++-- 2 files changed, 547 insertions(+), 40 deletions(-) diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py index 0ccffeeb1..d4222c136 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py @@ -12,14 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import logging import requests import six import time -from collections import Mapping from yardstick.common import exceptions from yardstick.common import utils as common_utils +from yardstick.common import yaml_loader from yardstick.network_services import utils as net_serv_utils from yardstick.network_services.vnf_generic.vnf import sample_vnf @@ -31,6 +32,181 @@ except ImportError: LOG = logging.getLogger(__name__) +class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen): + APP_NAME = 'LandslideTG' + + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = LandslideResourceHelper + super(LandslideTrafficGen, self).__init__(name, vnfd, task_id, + setup_env_helper_type, + resource_helper_type) + + self.bin_path = net_serv_utils.get_nsb_option('bin_path') + self.name = name + self.runs_traffic = True + self.traffic_finished = False + self.session_profile = None + + def listen_traffic(self, traffic_profile): + pass + + def terminate(self): + self.resource_helper.disconnect() + + def instantiate(self, scenario_cfg, context_cfg): + super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg) + self.resource_helper.connect() + + # Create test servers + test_servers = [x['test_server'] for x in self.vnfd_helper['config']] + self.resource_helper.create_test_servers(test_servers) + + # Create SUTs + [self.resource_helper.create_suts(x['suts']) for x in + self.vnfd_helper['config']] + + # Fill in test session based on session profile and test case options + self._load_session_profile() + + def run_traffic(self, traffic_profile): + self.resource_helper.abort_running_tests() + # Update DMF profile with related test case options + traffic_profile.update_dmf(self.scenario_helper.all_options) + # Create DMF in test user library + self.resource_helper.create_dmf(traffic_profile.dmf_config) + # Create/update test session in test user library + self.resource_helper.create_test_session(self.session_profile) + # Start test session + self.resource_helper.create_running_tests(self.session_profile['name']) + + def collect_kpi(self): + return self.resource_helper.collect_kpi() + + def wait_for_instantiate(self): + pass + + @staticmethod + def _update_session_suts(suts, testcase): + """ Create SUT entry. Update related EPC block in session profile. """ + for sut in suts: + # Update session profile EPC element with SUT info from pod file + tc_role = testcase['parameters'].get(sut['role']) + if tc_role: + _param = {} + if tc_role['class'] == 'Sut': + _param['name'] = sut['name'] + elif tc_role['class'] == 'TestNode': + _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'} + if x in sut and sut[x]}) + testcase['parameters'][sut['role']].update(_param) + else: + LOG.info('Unexpected SUT role in pod file: "%s".', sut['role']) + return testcase + + def _update_session_test_servers(self, test_server, _tsgroup_index): + """ Update tsId, reservations, pre-resolved ARP in session profile """ + # Update test server name + test_groups = self.session_profile['tsGroups'] + test_groups[_tsgroup_index]['tsId'] = test_server['name'] + + # Update preResolvedArpAddress + arp_key = 'preResolvedArpAddress' + _preresolved_arp = test_server.get(arp_key) # list of dicts + if _preresolved_arp: + test_groups[_tsgroup_index][arp_key] = _preresolved_arp + + # Update reservations + if 'phySubnets' in test_server: + reservation = {'tsId': test_server['name'], + 'tsIndex': _tsgroup_index, + 'tsName': test_server['name'], + 'phySubnets': test_server['phySubnets']} + if 'reservations' in self.session_profile: + self.session_profile['reservations'].append(reservation) + else: + self.session_profile['reservePorts'] = 'true' + self.session_profile['reservations'] = [reservation] + + @staticmethod + def _update_session_tc_params(tc_options, testcase): + for _param_key in tc_options: + if _param_key == 'AssociatedPhys': + testcase[_param_key] = tc_options[_param_key] + continue + testcase['parameters'][_param_key] = tc_options[_param_key] + return testcase + + def _load_session_profile(self): + + with common_utils.open_relative_file( + self.scenario_helper.scenario_cfg['session_profile'], + self.scenario_helper.task_path) as stream: + self.session_profile = yaml_loader.yaml_load(stream) + + # Raise exception if number of entries differs in following files, + _config_files = ['pod file', 'session_profile file', 'test_case file'] + # Count testcases number in all tsGroups of session profile + session_tests_num = [xx for x in self.session_profile['tsGroups'] + for xx in x['testCases']] + # Create a set containing number of list elements in each structure + _config_files_blocks_num = [ + len(x) for x in + (self.vnfd_helper['config'], # test_servers and suts info + session_tests_num, + self.scenario_helper.all_options['test_cases'])] # test case file + + if len(set(_config_files_blocks_num)) != 1: + raise RuntimeError('Unequal number of elements. {}'.format( + dict(six.moves.zip_longest(_config_files, + _config_files_blocks_num)))) + + ts_names = set() + _tsgroup_idx = -1 + _testcase_idx = 0 + + # Iterate over data structures to overwrite session profile defaults + # _config: single list element holding test servers and SUTs info + # _tc_options: single test case parameters + for _config, tc_options in zip( + self.vnfd_helper['config'], # test servers and SUTS + self.scenario_helper.all_options['test_cases']): # testcase + + _ts_config = _config['test_server'] + + # Calculate test group/test case indexes based on test server name + if _ts_config['name'] in ts_names: + _testcase_idx += 1 + else: + _tsgroup_idx += 1 + _testcase_idx = 0 + + _testcase = \ + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx] + + if _testcase['type'] != _ts_config['role']: + raise RuntimeError( + 'Test type mismatch in TC#{} of test server {}'.format( + _testcase_idx, _ts_config['name'])) + + # Fill session profile with test servers parameters + if _ts_config['name'] not in ts_names: + self._update_session_test_servers(_ts_config, _tsgroup_idx) + ts_names.add(_ts_config['name']) + + # Fill session profile with suts parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_suts(_config['suts'], _testcase)) + + # Update test case parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_tc_params(tc_options, _testcase)) + + class LandslideResourceHelper(sample_vnf.ClientResourceHelper): """Landslide TG helper class""" @@ -228,7 +404,7 @@ class LandslideResourceHelper(sample_vnf.ClientResourceHelper): raise ValueError("Method '{}' not supported".format(_method)) if method == 'post' and not action: - if not (json_data and isinstance(json_data, Mapping)): + if not (json_data and isinstance(json_data, collections.Mapping)): raise ValueError( 'JSON data missing in {} request'.format(_method)) @@ -731,7 +907,7 @@ class LandslideTclClient(object): self._tcl.execute('ls::config $test_ -Iterations "{}"'.format( test_session['iterations'])) if 'reservePorts' in test_session: - if test_session['reservePorts']: + if test_session['reservePorts'] == 'true': self._tcl.execute('ls::config $test_ -Reserve Ports') if 'reservations' in test_session: diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py index 9f0e69b60..d47061c6e 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py @@ -11,17 +11,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# import copy import mock import requests import time import unittest +import uuid +from yardstick.benchmark.contexts import base as ctx_base from yardstick.common import exceptions +from yardstick.common import utils as common_utils +from yardstick.common import yaml_loader +from yardstick.network_services import utils as net_serv_utils +from yardstick.network_services.traffic_profile import landslide_profile +from yardstick.network_services.vnf_generic.vnf import sample_vnf from yardstick.network_services.vnf_generic.vnf import tg_landslide + NAME = "tg__0" EXAMPLE_URL = 'http://example.com/' @@ -164,6 +171,16 @@ DMF_CFG = { "serverPort": 2003 } +RESERVATIONS = [ + {'tsName': TEST_SERVERS[0]['name'], + 'phySubnets': TEST_SERVERS[0]['phySubnets'], + 'tsId': TEST_SERVERS[0]['name'], + 'tsIndex': 0}, + {'tsName': TEST_SERVERS[1]['name'], + 'phySubnets': TEST_SERVERS[1]['phySubnets'], + 'tsId': TEST_SERVERS[1]['name'], + 'tsIndex': 1}] + SESSION_PROFILE = { 'keywords': '', 'duration': 60, @@ -171,33 +188,13 @@ SESSION_PROFILE = { 'description': 'UE default bearer creation test case', 'name': 'default_bearer_capacity', 'reportOptions': {'format': 'CSV'}, - "reservePorts": "true", - "reservations": [ - {"tsId": 4, - "tsIndex": 0, - "tsName": TEST_SERVERS[0]['name'], - "phySubnets": [ - {"base": "10.42.32.100", "mask": "/24", "name": "eth5", - "numIps": 20}, - {"base": "10.42.33.100", "mask": "/24", "name": "eth6", - "numIps": 20} - ]}, - {"tsId": 2, - "tsIndex": 1, - "tsName": TEST_SERVERS[1]['name'], - "phySubnets": [ - {"base": "10.42.32.1", "mask": "/24", "name": "eth5", - "numIps": 100}, - {"base": "10.42.33.1", "mask": "/24", "name": "eth6", - "numIps": 100} - ]} - ], + 'reservePorts': 'false', 'tsGroups': [ { 'testCases': [{ 'type': 'SGW_Node', 'name': '', - 'linked': False, + 'linked': "false", 'AssociatedPhys': '', 'parameters': { 'SgiPtpTunnelEn': 'false', @@ -217,7 +214,7 @@ SESSION_PROFILE = { 'forcedEthInterface': '', 'ip': 'SGW_USER_IP', 'class': 'TestNode', - 'ethStatsEnabled': False, + 'ethStatsEnabled': "false", 'mtu': 1500 }, 'SgwControlAddr': { @@ -226,7 +223,7 @@ SESSION_PROFILE = { 'forcedEthInterface': '', 'ip': 'SGW_CONTROL_IP', 'class': 'TestNode', - 'ethStatsEnabled': False, + 'ethStatsEnabled': "false", 'mtu': 1500, 'nextHop': 'SGW_CONTROL_NEXT_HOP' }, @@ -254,7 +251,7 @@ SESSION_PROFILE = { 'class': 'Dmf', 'instanceGroups': [ { - 'startPaused': False, + 'startPaused': "false", 'rate': 0, 'mainflowIdx': 0, 'mixType': '' @@ -270,7 +267,7 @@ SESSION_PROFILE = { 'forcedEthInterface': '', 'ip': 'MME_CONTROL_IP', 'class': 'TestNode', - 'ethStatsEnabled': False, + 'ethStatsEnabled': "false", 'mtu': 1500 }, 'SgwUserSut': { @@ -284,7 +281,7 @@ SESSION_PROFILE = { 'forcedEthInterface': '', 'ip': 'NET_HOST_IP', 'class': 'TestNode', - 'ethStatsEnabled': False, + 'ethStatsEnabled': "false", 'mtu': 1500 }, 'DedicatedsPerDefaultBearer': '0', @@ -310,7 +307,7 @@ SESSION_PROFILE = { 'forcedEthInterface': '', 'ip': 'ENB_USER_IP', 'class': 'TestNode', - 'ethStatsEnabled': False, + 'ethStatsEnabled': "false", 'mtu': 1500 }, 'TestType': 'SGW-NODAL' @@ -322,7 +319,340 @@ SESSION_PROFILE = { } +class TestLandslideTrafficGen(unittest.TestCase): + SCENARIO_CFG = { + 'session_profile': '/traffic_profiles/landslide/' + 'landslide_session_default_bearer.yaml', + 'task_path': '', + 'runner': { + 'type': 'Iteration', + 'iterations': 1 + }, + 'nodes': { + 'tg__0': 'tg__0.traffic_gen', + 'vnf__0': 'vnf__0.vnf_epc' + }, + 'topology': 'landslide_tg_topology.yaml', + 'type': 'NSPerf', + 'traffic_profile': '../../traffic_profiles/landslide/' + 'landslide_dmf_udp.yaml', + 'options': { + 'test_cases': [ + { + 'BearerAddrPool': '2002::2', + 'type': 'SGW_Node', + 'BearerV4AddrPool': '2.0.0.2', + 'Sessions': '90000' + }, + { + 'StartRate': '900.0', + 'type': 'SGW_Nodal', + 'DisconnectRate': '900.0', + 'Sessions': '90000' + } + ], + 'dmf': + { + 'transactionRate': 1000, + 'packetSize': 512 + } + } + } + + CONTEXT_CFG = { + 'contexts': [ + { + 'type': 'Node', + 'name': 'traffic_gen', + 'file': '/etc/yardstick/nodes/pod_landslide.yaml' + }, + { + 'type': 'Node', + 'name': 'vnf_epc', + 'file': '/etc/yardstick/nodes/pod_vepc_sut.yaml' + } + ] + } + + TRAFFIC_PROFILE = { + "schema": "nsb:traffic_profile:0.1", + "name": "LandslideProfile", + "description": "Spirent Landslide traffic profile", + "traffic_profile": { + "traffic_type": "LandslideProfile" + }, + "dmf_config": { + "dmf": { + "library": "test", + "name": "Basic UDP" + }, + "description": "Basic data flow using UDP/IP", + "keywords": "UDP", + "dataProtocol": "udp" + } + } + + SUCCESS_CREATED_CODE = 201 + SUCCESS_OK_CODE = 200 + SUCCESS_RECORD_ID = 5 + TEST_USER_ID = 11 + + def setUp(self): + self._id = uuid.uuid1().int + + self.mock_lsapi = mock.patch.object(tg_landslide, 'LsApi') + self.mock_lsapi.start() + + self.mock_ssh_helper = mock.patch.object(sample_vnf, 'VnfSshHelper') + self.mock_ssh_helper.start() + self.vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0] + self.ls_tg = tg_landslide.LandslideTrafficGen( + NAME, self.vnfd, self._id) + self.session_profile = copy.deepcopy(SESSION_PROFILE) + self.ls_tg.session_profile = self.session_profile + + self.addCleanup(self._cleanup) + + def _cleanup(self): + self.mock_lsapi.stop() + self.mock_ssh_helper.stop() + + @mock.patch.object(net_serv_utils, 'get_nsb_option') + def test___init__(self, mock_get_nsb_option, *args): + _path_to_nsb = 'path/to/nsb' + mock_get_nsb_option.return_value = _path_to_nsb + ls_tg = tg_landslide.LandslideTrafficGen(NAME, self.vnfd, self._id) + self.assertIsInstance(ls_tg.resource_helper, + tg_landslide.LandslideResourceHelper) + mock_get_nsb_option.assert_called_once_with('bin_path') + self.assertEqual(_path_to_nsb, ls_tg.bin_path) + self.assertEqual(NAME, ls_tg.name) + self.assertTrue(ls_tg.runs_traffic) + self.assertFalse(ls_tg.traffic_finished) + self.assertIsNone(ls_tg.session_profile) + + def test_listen_traffic(self): + _traffic_profile = {} + self.assertIsNone(self.ls_tg.listen_traffic(_traffic_profile)) + + def test_terminate(self, *args): + self.ls_tg.resource_helper._tcl = mock.Mock() + self.assertIsNone(self.ls_tg.terminate()) + self.ls_tg.resource_helper._tcl.disconnect.assert_called_once() + + @mock.patch.object(ctx_base.Context, 'get_context_from_server', + return_value='fake_context') + def test_instantiate(self, *args): + self.ls_tg._tg_process = mock.Mock() + self.ls_tg._tg_process.start = mock.Mock() + self.ls_tg.resource_helper.connect = mock.Mock() + self.ls_tg.resource_helper.create_test_servers = mock.Mock() + self.ls_tg.resource_helper.create_suts = mock.Mock() + self.ls_tg._load_session_profile = mock.Mock() + self.assertIsNone(self.ls_tg.instantiate(self.SCENARIO_CFG, + self.CONTEXT_CFG)) + self.ls_tg.resource_helper.connect.assert_called_once() + self.ls_tg.resource_helper.create_test_servers.assert_called_once() + _suts_blocks_num = len([item['suts'] for item in self.vnfd['config']]) + self.assertEqual(_suts_blocks_num, + self.ls_tg.resource_helper.create_suts.call_count) + self.ls_tg._load_session_profile.assert_called_once() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests') + def test_run_traffic(self, mock_get_tests, *args): + self.ls_tg.resource_helper._url = EXAMPLE_URL + self.ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG + mock_traffic_profile = mock.Mock( + spec=landslide_profile.LandslideProfile) + mock_traffic_profile.dmf_config = {'keywords': 'UDP', + 'dataProtocol': 'udp'} + mock_traffic_profile.params = self.TRAFFIC_PROFILE + self.ls_tg.resource_helper._user_id = self.TEST_USER_ID + mock_get_tests.return_value = [{'id': self.SUCCESS_RECORD_ID, + 'testStateOrStep': 'COMPLETE'}] + mock_post = mock.Mock() + mock_post.status_code = self.SUCCESS_CREATED_CODE + mock_post.json.return_value = {'id': self.SUCCESS_RECORD_ID} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value = mock_post + self.ls_tg.resource_helper.session = mock_session + self.ls_tg.resource_helper._tcl = mock.Mock() + _tcl = self.ls_tg.resource_helper._tcl + self.assertIsNone(self.ls_tg.run_traffic(mock_traffic_profile)) + self.assertEqual(self.SUCCESS_RECORD_ID, + self.ls_tg.resource_helper.run_id) + mock_traffic_profile.update_dmf.assert_called_with( + self.ls_tg.scenario_helper.all_options) + _tcl.create_dmf.assert_called_with(mock_traffic_profile.dmf_config) + _tcl.create_test_session.assert_called_with(self.session_profile) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'check_running_test_state') + def test_collect_kpi(self, mock_check_running_test_state, *args): + self.ls_tg.resource_helper.run_id = self.SUCCESS_RECORD_ID + mock_check_running_test_state.return_value = 'COMPLETE' + self.assertEqual({'done': True}, self.ls_tg.collect_kpi()) + mock_check_running_test_state.assert_called_once() + + def test_wait_for_instantiate(self): + self.assertIsNone(self.ls_tg.wait_for_instantiate()) + self.ls_tg.wait_for_instantiate() + + def test__update_session_suts_no_tc_role(self, *args): + _suts = [{'role': 'epc_role'}] + _testcase = {'parameters': {'diff_epc_role': {'class': 'Sut'}}} + res = self.ls_tg._update_session_suts(_suts, _testcase) + self.assertEqual(_testcase, res) + + def test__update_session_suts(self, *args): + + def get_testnode_param(role, key, session_prof): + """ Get value by key from the deep nested dict to avoid calls like: + e.g. session_prof['tsGroups'][0]['testCases'][1]['parameters'][key] + """ + for group in session_prof['tsGroups']: + for tc in group['testCases']: + tc_params = tc['parameters'] + if tc_params.get(role): + return tc_params[role][key] + + def get_sut_param(role, key, suts): + """ Search list of dicts for one with specific role. + Return the value of related dict by key. Expect key presence. + """ + for sut in suts: + if sut.get('role') == role: + return sut[key] + + # TestNode to verify + testnode_role = 'SgwControlAddr' + # SUT to verify + sut_role = 'SgwUserSut' + + config_suts = [config['suts'] for config in self.vnfd['config']] + session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups'] + for _tc in _ts_group['testCases']] + for suts, tc in zip(config_suts, session_tcs): + self.assertEqual(tc, self.ls_tg._update_session_suts(suts, tc)) + + # Verify TestNode class objects keys were updated + for _key in {'ip', 'phy', 'nextHop'}: + self.assertEqual( + get_testnode_param(testnode_role, _key, self.ls_tg.session_profile), + get_sut_param(testnode_role, _key, TS1_SUTS)) + # Verify Sut class objects name was updated + self.assertEqual( + get_testnode_param(sut_role, 'name', self.ls_tg.session_profile), + get_sut_param(sut_role, 'name', TS2_SUTS)) + + def test__update_session_test_servers(self, *args): + for ts_index, ts in enumerate(TEST_SERVERS): + self.assertIsNone( + self.ls_tg._update_session_test_servers(ts, ts_index)) + # Verify preResolvedArpAddress key was added + self.assertTrue(any( + _item.get('preResolvedArpAddress') + for _item in self.ls_tg.session_profile['tsGroups'])) + # Verify reservations key was added to session profile + self.assertEqual(RESERVATIONS, + self.ls_tg.session_profile.get('reservations')) + self.assertEqual('true', + self.ls_tg.session_profile.get('reservePorts')) + + def test__update_session_tc_params_assoc_phys(self): + _tc_options = {'AssociatedPhys': 'eth1'} + _testcase = {} + _testcase_orig = copy.deepcopy(_testcase) + res = self.ls_tg._update_session_tc_params(_tc_options, _testcase) + self.assertNotEqual(_testcase_orig, res) + self.assertEqual(_tc_options, _testcase) + + def test__update_session_tc_params(self, *args): + + def get_session_tc_param_value(param, tc_type, session_prof): + """ Get param value from the deep nested dict to avoid calls like: + session_prof['tsGroups'][0]['testCases'][0]['parameters'][key] + """ + for test_group in session_prof['tsGroups']: + session_tc = test_group['testCases'][0] + if session_tc['type'] == tc_type: + return session_tc['parameters'].get(param) + + session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups'] + for _tc in _ts_group['testCases']] + scenario_tcs = [_tc for _tc in + self.SCENARIO_CFG['options']['test_cases']] + for tc_options, tc in zip(scenario_tcs, session_tcs): + self.assertEqual( + tc, + self.ls_tg._update_session_tc_params(tc_options, tc)) + + # Verify that each test case parameter was updated + # Params been compared are deeply nested. Using loops to ease access. + for _tc in self.SCENARIO_CFG['options']['test_cases']: + for _key, _val in _tc.items(): + if _key != 'type': + self.assertEqual( + _val, + get_session_tc_param_value(_key, _tc.get('type'), + self.ls_tg.session_profile)) + + @mock.patch.object(common_utils, 'open_relative_file') + @mock.patch.object(yaml_loader, 'yaml_load') + @mock.patch.object(tg_landslide.LandslideTrafficGen, + '_update_session_test_servers') + @mock.patch.object(tg_landslide.LandslideTrafficGen, + '_update_session_suts') + @mock.patch.object(tg_landslide.LandslideTrafficGen, + '_update_session_tc_params') + def test__load_session_profile(self, mock_upd_ses_tc_params, + mock_upd_ses_suts, mock_upd_ses_ts, + mock_yaml_load, *args): + self.ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG + mock_yaml_load.return_value = SESSION_PROFILE + self.assertIsNone(self.ls_tg._load_session_profile()) + self.assertIsNotNone(self.ls_tg.session_profile) + # Number of blocks in configuration files + # Number of test servers, suts and tc params blocks should be equal + _config_files_blocks_num = len([item['test_server'] + for item in self.vnfd['config']]) + self.assertEqual(_config_files_blocks_num, + mock_upd_ses_ts.call_count) + self.assertEqual(_config_files_blocks_num, + mock_upd_ses_suts.call_count) + self.assertEqual(_config_files_blocks_num, + mock_upd_ses_tc_params.call_count) + + @mock.patch.object(common_utils, 'open_relative_file') + @mock.patch.object(yaml_loader, 'yaml_load') + def test__load_session_profile_unequal_num_of_cfg_blocks( + self, mock_yaml_load, *args): + vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0]) + ls_traffic_gen = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id) + ls_traffic_gen.scenario_helper.scenario_cfg = self.SCENARIO_CFG + mock_yaml_load.return_value = SESSION_PROFILE + # Delete test_servers item from pod file to make it not valid + ls_traffic_gen.vnfd_helper['config'].pop() + with self.assertRaises(RuntimeError): + ls_traffic_gen._load_session_profile() + + @mock.patch.object(common_utils, 'open_relative_file') + @mock.patch.object(yaml_loader, 'yaml_load') + def test__load_session_profile_test_type_mismatch(self, mock_yaml_load, + *args): + vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0]) + # Swap test servers data in pod file + vnfd['config'] = list(reversed(vnfd['config'])) + ls_tg = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id) + ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG + mock_yaml_load.return_value = SESSION_PROFILE + with self.assertRaises(RuntimeError): + ls_tg._load_session_profile() + + class TestLandslideResourceHelper(unittest.TestCase): + PROTO_PORT = 8080 EXAMPLE_URL = ''.join([TAS_INFO['proto'], '://', TAS_INFO['ip'], ':', str(PROTO_PORT), '/api/']) @@ -829,8 +1159,8 @@ class TestLandslideResourceHelper(unittest.TestCase): res = self.res_helper.configure_test_servers( action={'action': 'recycle'}) self.assertEqual( - {x['id'] for x in self.TEST_SERVERS_DATA['testServers']}, - set(res)) + [x['id'] for x in self.TEST_SERVERS_DATA['testServers']], + res) self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']), mock_session.post.call_count) @@ -844,7 +1174,7 @@ class TestLandslideResourceHelper(unittest.TestCase): self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']), mock_session.delete.call_count) - def test_create_test_session(self, *args): + def test_create_test_session_res_helper(self, *args): self.res_helper._user_id = self.SUCCESS_RECORD_ID self.res_helper._tcl = mock.Mock() test_session = {'name': 'test'} @@ -976,7 +1306,6 @@ class TestLandslideResourceHelper(unittest.TestCase): self.assertEqual(self.TEST_RESULTS_DATA, res) def test__write_results(self, *args): - self.maxDiff = None res = self.res_helper._write_results(self.TEST_RESULTS_DATA) exp_res = { "Test Summary::Actual Dedicated Bearer Session Connects": 100.0, @@ -1166,10 +1495,11 @@ class TestLandslideTclClient(unittest.TestCase): 'resolve_test_server_name', return_value='2') def test_create_test_session(self, *args): _session_profile = copy.deepcopy(SESSION_PROFILE) + _session_profile['reservations'] = RESERVATIONS self.ls_tcl_client._save_test_session = mock.Mock() self.ls_tcl_client._configure_ts_group = mock.Mock() self.ls_tcl_client.create_test_session(_session_profile) - self.assertEqual(20, self.mock_tcl_handler.execute.call_count) + self.assertEqual(17, self.mock_tcl_handler.execute.call_count) def test_create_dmf(self): self.mock_tcl_handler.execute.return_value = '2' @@ -1293,7 +1623,7 @@ class TestLandslideTclClient(unittest.TestCase): "clientPort": 0, "context": 0, "node": 0, - "overridePort": False, + "overridePort": "false", "ratingGroup": 0, "role": 0, "serviceId": 0, @@ -1313,12 +1643,12 @@ class TestLandslideTclClient(unittest.TestCase): self.assertIsNone(res) def test__configure_reservation(self): - _reservation = copy.deepcopy(SESSION_PROFILE['reservations'][0]) + _reservation = copy.deepcopy(RESERVATIONS[0]) self.ls_tcl_client.resolve_test_server_name = mock.Mock( return_value='2') res = self.ls_tcl_client._configure_reservation(_reservation) self.assertIsNone(res) - self.assertEqual(6, self.mock_tcl_handler.execute.call_count) + self.assertEqual(4, self.mock_tcl_handler.execute.call_count) def test__configure_preresolved_arp(self): _arp = [{'StartingAddress': '10.81.1.10', @@ -1330,6 +1660,7 @@ class TestLandslideTclClient(unittest.TestCase): def test__configure_preresolved_arp_none(self): res = self.ls_tcl_client._configure_preresolved_arp(None) self.assertIsNone(res) + self.mock_tcl_handler.execute.assert_not_called() def test_delete_test_session(self): self.assertRaises(NotImplementedError, -- cgit 1.2.3-korg