summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_landslide.py182
-rw-r--r--yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py405
2 files changed, 547 insertions, 40 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
index 0ccffeeb1..d4222c136 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
@@ -12,14 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
import logging
import requests
import six
import time
-from collections import Mapping
from yardstick.common import exceptions
from yardstick.common import utils as common_utils
+from yardstick.common import yaml_loader
from yardstick.network_services import utils as net_serv_utils
from yardstick.network_services.vnf_generic.vnf import sample_vnf
@@ -31,6 +32,181 @@ except ImportError:
LOG = logging.getLogger(__name__)
+class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen):
+ APP_NAME = 'LandslideTG'
+
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = LandslideResourceHelper
+ super(LandslideTrafficGen, self).__init__(name, vnfd, task_id,
+ setup_env_helper_type,
+ resource_helper_type)
+
+ self.bin_path = net_serv_utils.get_nsb_option('bin_path')
+ self.name = name
+ self.runs_traffic = True
+ self.traffic_finished = False
+ self.session_profile = None
+
+ def listen_traffic(self, traffic_profile):
+ pass
+
+ def terminate(self):
+ self.resource_helper.disconnect()
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg)
+ self.resource_helper.connect()
+
+ # Create test servers
+ test_servers = [x['test_server'] for x in self.vnfd_helper['config']]
+ self.resource_helper.create_test_servers(test_servers)
+
+ # Create SUTs
+ [self.resource_helper.create_suts(x['suts']) for x in
+ self.vnfd_helper['config']]
+
+ # Fill in test session based on session profile and test case options
+ self._load_session_profile()
+
+ def run_traffic(self, traffic_profile):
+ self.resource_helper.abort_running_tests()
+ # Update DMF profile with related test case options
+ traffic_profile.update_dmf(self.scenario_helper.all_options)
+ # Create DMF in test user library
+ self.resource_helper.create_dmf(traffic_profile.dmf_config)
+ # Create/update test session in test user library
+ self.resource_helper.create_test_session(self.session_profile)
+ # Start test session
+ self.resource_helper.create_running_tests(self.session_profile['name'])
+
+ def collect_kpi(self):
+ return self.resource_helper.collect_kpi()
+
+ def wait_for_instantiate(self):
+ pass
+
+ @staticmethod
+ def _update_session_suts(suts, testcase):
+ """ Create SUT entry. Update related EPC block in session profile. """
+ for sut in suts:
+ # Update session profile EPC element with SUT info from pod file
+ tc_role = testcase['parameters'].get(sut['role'])
+ if tc_role:
+ _param = {}
+ if tc_role['class'] == 'Sut':
+ _param['name'] = sut['name']
+ elif tc_role['class'] == 'TestNode':
+ _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'}
+ if x in sut and sut[x]})
+ testcase['parameters'][sut['role']].update(_param)
+ else:
+ LOG.info('Unexpected SUT role in pod file: "%s".', sut['role'])
+ return testcase
+
+ def _update_session_test_servers(self, test_server, _tsgroup_index):
+ """ Update tsId, reservations, pre-resolved ARP in session profile """
+ # Update test server name
+ test_groups = self.session_profile['tsGroups']
+ test_groups[_tsgroup_index]['tsId'] = test_server['name']
+
+ # Update preResolvedArpAddress
+ arp_key = 'preResolvedArpAddress'
+ _preresolved_arp = test_server.get(arp_key) # list of dicts
+ if _preresolved_arp:
+ test_groups[_tsgroup_index][arp_key] = _preresolved_arp
+
+ # Update reservations
+ if 'phySubnets' in test_server:
+ reservation = {'tsId': test_server['name'],
+ 'tsIndex': _tsgroup_index,
+ 'tsName': test_server['name'],
+ 'phySubnets': test_server['phySubnets']}
+ if 'reservations' in self.session_profile:
+ self.session_profile['reservations'].append(reservation)
+ else:
+ self.session_profile['reservePorts'] = 'true'
+ self.session_profile['reservations'] = [reservation]
+
+ @staticmethod
+ def _update_session_tc_params(tc_options, testcase):
+ for _param_key in tc_options:
+ if _param_key == 'AssociatedPhys':
+ testcase[_param_key] = tc_options[_param_key]
+ continue
+ testcase['parameters'][_param_key] = tc_options[_param_key]
+ return testcase
+
+ def _load_session_profile(self):
+
+ with common_utils.open_relative_file(
+ self.scenario_helper.scenario_cfg['session_profile'],
+ self.scenario_helper.task_path) as stream:
+ self.session_profile = yaml_loader.yaml_load(stream)
+
+ # Raise exception if number of entries differs in following files,
+ _config_files = ['pod file', 'session_profile file', 'test_case file']
+ # Count testcases number in all tsGroups of session profile
+ session_tests_num = [xx for x in self.session_profile['tsGroups']
+ for xx in x['testCases']]
+ # Create a set containing number of list elements in each structure
+ _config_files_blocks_num = [
+ len(x) for x in
+ (self.vnfd_helper['config'], # test_servers and suts info
+ session_tests_num,
+ self.scenario_helper.all_options['test_cases'])] # test case file
+
+ if len(set(_config_files_blocks_num)) != 1:
+ raise RuntimeError('Unequal number of elements. {}'.format(
+ dict(six.moves.zip_longest(_config_files,
+ _config_files_blocks_num))))
+
+ ts_names = set()
+ _tsgroup_idx = -1
+ _testcase_idx = 0
+
+ # Iterate over data structures to overwrite session profile defaults
+ # _config: single list element holding test servers and SUTs info
+ # _tc_options: single test case parameters
+ for _config, tc_options in zip(
+ self.vnfd_helper['config'], # test servers and SUTS
+ self.scenario_helper.all_options['test_cases']): # testcase
+
+ _ts_config = _config['test_server']
+
+ # Calculate test group/test case indexes based on test server name
+ if _ts_config['name'] in ts_names:
+ _testcase_idx += 1
+ else:
+ _tsgroup_idx += 1
+ _testcase_idx = 0
+
+ _testcase = \
+ self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
+ _testcase_idx]
+
+ if _testcase['type'] != _ts_config['role']:
+ raise RuntimeError(
+ 'Test type mismatch in TC#{} of test server {}'.format(
+ _testcase_idx, _ts_config['name']))
+
+ # Fill session profile with test servers parameters
+ if _ts_config['name'] not in ts_names:
+ self._update_session_test_servers(_ts_config, _tsgroup_idx)
+ ts_names.add(_ts_config['name'])
+
+ # Fill session profile with suts parameters
+ self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
+ _testcase_idx].update(
+ self._update_session_suts(_config['suts'], _testcase))
+
+ # Update test case parameters
+ self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
+ _testcase_idx].update(
+ self._update_session_tc_params(tc_options, _testcase))
+
+
class LandslideResourceHelper(sample_vnf.ClientResourceHelper):
"""Landslide TG helper class"""
@@ -228,7 +404,7 @@ class LandslideResourceHelper(sample_vnf.ClientResourceHelper):
raise ValueError("Method '{}' not supported".format(_method))
if method == 'post' and not action:
- if not (json_data and isinstance(json_data, Mapping)):
+ if not (json_data and isinstance(json_data, collections.Mapping)):
raise ValueError(
'JSON data missing in {} request'.format(_method))
@@ -731,7 +907,7 @@ class LandslideTclClient(object):
self._tcl.execute('ls::config $test_ -Iterations "{}"'.format(
test_session['iterations']))
if 'reservePorts' in test_session:
- if test_session['reservePorts']:
+ if test_session['reservePorts'] == 'true':
self._tcl.execute('ls::config $test_ -Reserve Ports')
if 'reservations' in test_session:
diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py
index 9f0e69b60..d47061c6e 100644
--- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py
+++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py
@@ -11,17 +11,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
import copy
import mock
import requests
import time
import unittest
+import uuid
+from yardstick.benchmark.contexts import base as ctx_base
from yardstick.common import exceptions
+from yardstick.common import utils as common_utils
+from yardstick.common import yaml_loader
+from yardstick.network_services import utils as net_serv_utils
+from yardstick.network_services.traffic_profile import landslide_profile
+from yardstick.network_services.vnf_generic.vnf import sample_vnf
from yardstick.network_services.vnf_generic.vnf import tg_landslide
+
NAME = "tg__0"
EXAMPLE_URL = 'http://example.com/'
@@ -164,6 +171,16 @@ DMF_CFG = {
"serverPort": 2003
}
+RESERVATIONS = [
+ {'tsName': TEST_SERVERS[0]['name'],
+ 'phySubnets': TEST_SERVERS[0]['phySubnets'],
+ 'tsId': TEST_SERVERS[0]['name'],
+ 'tsIndex': 0},
+ {'tsName': TEST_SERVERS[1]['name'],
+ 'phySubnets': TEST_SERVERS[1]['phySubnets'],
+ 'tsId': TEST_SERVERS[1]['name'],
+ 'tsIndex': 1}]
+
SESSION_PROFILE = {
'keywords': '',
'duration': 60,
@@ -171,33 +188,13 @@ SESSION_PROFILE = {
'description': 'UE default bearer creation test case',
'name': 'default_bearer_capacity',
'reportOptions': {'format': 'CSV'},
- "reservePorts": "true",
- "reservations": [
- {"tsId": 4,
- "tsIndex": 0,
- "tsName": TEST_SERVERS[0]['name'],
- "phySubnets": [
- {"base": "10.42.32.100", "mask": "/24", "name": "eth5",
- "numIps": 20},
- {"base": "10.42.33.100", "mask": "/24", "name": "eth6",
- "numIps": 20}
- ]},
- {"tsId": 2,
- "tsIndex": 1,
- "tsName": TEST_SERVERS[1]['name'],
- "phySubnets": [
- {"base": "10.42.32.1", "mask": "/24", "name": "eth5",
- "numIps": 100},
- {"base": "10.42.33.1", "mask": "/24", "name": "eth6",
- "numIps": 100}
- ]}
- ],
+ 'reservePorts': 'false',
'tsGroups': [
{
'testCases': [{
'type': 'SGW_Node',
'name': '',
- 'linked': False,
+ 'linked': "false",
'AssociatedPhys': '',
'parameters': {
'SgiPtpTunnelEn': 'false',
@@ -217,7 +214,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'SGW_USER_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500
},
'SgwControlAddr': {
@@ -226,7 +223,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'SGW_CONTROL_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500,
'nextHop': 'SGW_CONTROL_NEXT_HOP'
},
@@ -254,7 +251,7 @@ SESSION_PROFILE = {
'class': 'Dmf',
'instanceGroups': [
{
- 'startPaused': False,
+ 'startPaused': "false",
'rate': 0,
'mainflowIdx': 0,
'mixType': ''
@@ -270,7 +267,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'MME_CONTROL_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500
},
'SgwUserSut': {
@@ -284,7 +281,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'NET_HOST_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500
},
'DedicatedsPerDefaultBearer': '0',
@@ -310,7 +307,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'ENB_USER_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500
},
'TestType': 'SGW-NODAL'
@@ -322,7 +319,340 @@ SESSION_PROFILE = {
}
+class TestLandslideTrafficGen(unittest.TestCase):
+ SCENARIO_CFG = {
+ 'session_profile': '/traffic_profiles/landslide/'
+ 'landslide_session_default_bearer.yaml',
+ 'task_path': '',
+ 'runner': {
+ 'type': 'Iteration',
+ 'iterations': 1
+ },
+ 'nodes': {
+ 'tg__0': 'tg__0.traffic_gen',
+ 'vnf__0': 'vnf__0.vnf_epc'
+ },
+ 'topology': 'landslide_tg_topology.yaml',
+ 'type': 'NSPerf',
+ 'traffic_profile': '../../traffic_profiles/landslide/'
+ 'landslide_dmf_udp.yaml',
+ 'options': {
+ 'test_cases': [
+ {
+ 'BearerAddrPool': '2002::2',
+ 'type': 'SGW_Node',
+ 'BearerV4AddrPool': '2.0.0.2',
+ 'Sessions': '90000'
+ },
+ {
+ 'StartRate': '900.0',
+ 'type': 'SGW_Nodal',
+ 'DisconnectRate': '900.0',
+ 'Sessions': '90000'
+ }
+ ],
+ 'dmf':
+ {
+ 'transactionRate': 1000,
+ 'packetSize': 512
+ }
+ }
+ }
+
+ CONTEXT_CFG = {
+ 'contexts': [
+ {
+ 'type': 'Node',
+ 'name': 'traffic_gen',
+ 'file': '/etc/yardstick/nodes/pod_landslide.yaml'
+ },
+ {
+ 'type': 'Node',
+ 'name': 'vnf_epc',
+ 'file': '/etc/yardstick/nodes/pod_vepc_sut.yaml'
+ }
+ ]
+ }
+
+ TRAFFIC_PROFILE = {
+ "schema": "nsb:traffic_profile:0.1",
+ "name": "LandslideProfile",
+ "description": "Spirent Landslide traffic profile",
+ "traffic_profile": {
+ "traffic_type": "LandslideProfile"
+ },
+ "dmf_config": {
+ "dmf": {
+ "library": "test",
+ "name": "Basic UDP"
+ },
+ "description": "Basic data flow using UDP/IP",
+ "keywords": "UDP",
+ "dataProtocol": "udp"
+ }
+ }
+
+ SUCCESS_CREATED_CODE = 201
+ SUCCESS_OK_CODE = 200
+ SUCCESS_RECORD_ID = 5
+ TEST_USER_ID = 11
+
+ def setUp(self):
+ self._id = uuid.uuid1().int
+
+ self.mock_lsapi = mock.patch.object(tg_landslide, 'LsApi')
+ self.mock_lsapi.start()
+
+ self.mock_ssh_helper = mock.patch.object(sample_vnf, 'VnfSshHelper')
+ self.mock_ssh_helper.start()
+ self.vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+ self.ls_tg = tg_landslide.LandslideTrafficGen(
+ NAME, self.vnfd, self._id)
+ self.session_profile = copy.deepcopy(SESSION_PROFILE)
+ self.ls_tg.session_profile = self.session_profile
+
+ self.addCleanup(self._cleanup)
+
+ def _cleanup(self):
+ self.mock_lsapi.stop()
+ self.mock_ssh_helper.stop()
+
+ @mock.patch.object(net_serv_utils, 'get_nsb_option')
+ def test___init__(self, mock_get_nsb_option, *args):
+ _path_to_nsb = 'path/to/nsb'
+ mock_get_nsb_option.return_value = _path_to_nsb
+ ls_tg = tg_landslide.LandslideTrafficGen(NAME, self.vnfd, self._id)
+ self.assertIsInstance(ls_tg.resource_helper,
+ tg_landslide.LandslideResourceHelper)
+ mock_get_nsb_option.assert_called_once_with('bin_path')
+ self.assertEqual(_path_to_nsb, ls_tg.bin_path)
+ self.assertEqual(NAME, ls_tg.name)
+ self.assertTrue(ls_tg.runs_traffic)
+ self.assertFalse(ls_tg.traffic_finished)
+ self.assertIsNone(ls_tg.session_profile)
+
+ def test_listen_traffic(self):
+ _traffic_profile = {}
+ self.assertIsNone(self.ls_tg.listen_traffic(_traffic_profile))
+
+ def test_terminate(self, *args):
+ self.ls_tg.resource_helper._tcl = mock.Mock()
+ self.assertIsNone(self.ls_tg.terminate())
+ self.ls_tg.resource_helper._tcl.disconnect.assert_called_once()
+
+ @mock.patch.object(ctx_base.Context, 'get_context_from_server',
+ return_value='fake_context')
+ def test_instantiate(self, *args):
+ self.ls_tg._tg_process = mock.Mock()
+ self.ls_tg._tg_process.start = mock.Mock()
+ self.ls_tg.resource_helper.connect = mock.Mock()
+ self.ls_tg.resource_helper.create_test_servers = mock.Mock()
+ self.ls_tg.resource_helper.create_suts = mock.Mock()
+ self.ls_tg._load_session_profile = mock.Mock()
+ self.assertIsNone(self.ls_tg.instantiate(self.SCENARIO_CFG,
+ self.CONTEXT_CFG))
+ self.ls_tg.resource_helper.connect.assert_called_once()
+ self.ls_tg.resource_helper.create_test_servers.assert_called_once()
+ _suts_blocks_num = len([item['suts'] for item in self.vnfd['config']])
+ self.assertEqual(_suts_blocks_num,
+ self.ls_tg.resource_helper.create_suts.call_count)
+ self.ls_tg._load_session_profile.assert_called_once()
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_running_tests')
+ def test_run_traffic(self, mock_get_tests, *args):
+ self.ls_tg.resource_helper._url = EXAMPLE_URL
+ self.ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_traffic_profile = mock.Mock(
+ spec=landslide_profile.LandslideProfile)
+ mock_traffic_profile.dmf_config = {'keywords': 'UDP',
+ 'dataProtocol': 'udp'}
+ mock_traffic_profile.params = self.TRAFFIC_PROFILE
+ self.ls_tg.resource_helper._user_id = self.TEST_USER_ID
+ mock_get_tests.return_value = [{'id': self.SUCCESS_RECORD_ID,
+ 'testStateOrStep': 'COMPLETE'}]
+ mock_post = mock.Mock()
+ mock_post.status_code = self.SUCCESS_CREATED_CODE
+ mock_post.json.return_value = {'id': self.SUCCESS_RECORD_ID}
+ mock_session = mock.Mock(spec=requests.Session)
+ mock_session.post.return_value = mock_post
+ self.ls_tg.resource_helper.session = mock_session
+ self.ls_tg.resource_helper._tcl = mock.Mock()
+ _tcl = self.ls_tg.resource_helper._tcl
+ self.assertIsNone(self.ls_tg.run_traffic(mock_traffic_profile))
+ self.assertEqual(self.SUCCESS_RECORD_ID,
+ self.ls_tg.resource_helper.run_id)
+ mock_traffic_profile.update_dmf.assert_called_with(
+ self.ls_tg.scenario_helper.all_options)
+ _tcl.create_dmf.assert_called_with(mock_traffic_profile.dmf_config)
+ _tcl.create_test_session.assert_called_with(self.session_profile)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'check_running_test_state')
+ def test_collect_kpi(self, mock_check_running_test_state, *args):
+ self.ls_tg.resource_helper.run_id = self.SUCCESS_RECORD_ID
+ mock_check_running_test_state.return_value = 'COMPLETE'
+ self.assertEqual({'done': True}, self.ls_tg.collect_kpi())
+ mock_check_running_test_state.assert_called_once()
+
+ def test_wait_for_instantiate(self):
+ self.assertIsNone(self.ls_tg.wait_for_instantiate())
+ self.ls_tg.wait_for_instantiate()
+
+ def test__update_session_suts_no_tc_role(self, *args):
+ _suts = [{'role': 'epc_role'}]
+ _testcase = {'parameters': {'diff_epc_role': {'class': 'Sut'}}}
+ res = self.ls_tg._update_session_suts(_suts, _testcase)
+ self.assertEqual(_testcase, res)
+
+ def test__update_session_suts(self, *args):
+
+ def get_testnode_param(role, key, session_prof):
+ """ Get value by key from the deep nested dict to avoid calls like:
+ e.g. session_prof['tsGroups'][0]['testCases'][1]['parameters'][key]
+ """
+ for group in session_prof['tsGroups']:
+ for tc in group['testCases']:
+ tc_params = tc['parameters']
+ if tc_params.get(role):
+ return tc_params[role][key]
+
+ def get_sut_param(role, key, suts):
+ """ Search list of dicts for one with specific role.
+ Return the value of related dict by key. Expect key presence.
+ """
+ for sut in suts:
+ if sut.get('role') == role:
+ return sut[key]
+
+ # TestNode to verify
+ testnode_role = 'SgwControlAddr'
+ # SUT to verify
+ sut_role = 'SgwUserSut'
+
+ config_suts = [config['suts'] for config in self.vnfd['config']]
+ session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups']
+ for _tc in _ts_group['testCases']]
+ for suts, tc in zip(config_suts, session_tcs):
+ self.assertEqual(tc, self.ls_tg._update_session_suts(suts, tc))
+
+ # Verify TestNode class objects keys were updated
+ for _key in {'ip', 'phy', 'nextHop'}:
+ self.assertEqual(
+ get_testnode_param(testnode_role, _key, self.ls_tg.session_profile),
+ get_sut_param(testnode_role, _key, TS1_SUTS))
+ # Verify Sut class objects name was updated
+ self.assertEqual(
+ get_testnode_param(sut_role, 'name', self.ls_tg.session_profile),
+ get_sut_param(sut_role, 'name', TS2_SUTS))
+
+ def test__update_session_test_servers(self, *args):
+ for ts_index, ts in enumerate(TEST_SERVERS):
+ self.assertIsNone(
+ self.ls_tg._update_session_test_servers(ts, ts_index))
+ # Verify preResolvedArpAddress key was added
+ self.assertTrue(any(
+ _item.get('preResolvedArpAddress')
+ for _item in self.ls_tg.session_profile['tsGroups']))
+ # Verify reservations key was added to session profile
+ self.assertEqual(RESERVATIONS,
+ self.ls_tg.session_profile.get('reservations'))
+ self.assertEqual('true',
+ self.ls_tg.session_profile.get('reservePorts'))
+
+ def test__update_session_tc_params_assoc_phys(self):
+ _tc_options = {'AssociatedPhys': 'eth1'}
+ _testcase = {}
+ _testcase_orig = copy.deepcopy(_testcase)
+ res = self.ls_tg._update_session_tc_params(_tc_options, _testcase)
+ self.assertNotEqual(_testcase_orig, res)
+ self.assertEqual(_tc_options, _testcase)
+
+ def test__update_session_tc_params(self, *args):
+
+ def get_session_tc_param_value(param, tc_type, session_prof):
+ """ Get param value from the deep nested dict to avoid calls like:
+ session_prof['tsGroups'][0]['testCases'][0]['parameters'][key]
+ """
+ for test_group in session_prof['tsGroups']:
+ session_tc = test_group['testCases'][0]
+ if session_tc['type'] == tc_type:
+ return session_tc['parameters'].get(param)
+
+ session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups']
+ for _tc in _ts_group['testCases']]
+ scenario_tcs = [_tc for _tc in
+ self.SCENARIO_CFG['options']['test_cases']]
+ for tc_options, tc in zip(scenario_tcs, session_tcs):
+ self.assertEqual(
+ tc,
+ self.ls_tg._update_session_tc_params(tc_options, tc))
+
+ # Verify that each test case parameter was updated
+ # Params been compared are deeply nested. Using loops to ease access.
+ for _tc in self.SCENARIO_CFG['options']['test_cases']:
+ for _key, _val in _tc.items():
+ if _key != 'type':
+ self.assertEqual(
+ _val,
+ get_session_tc_param_value(_key, _tc.get('type'),
+ self.ls_tg.session_profile))
+
+ @mock.patch.object(common_utils, 'open_relative_file')
+ @mock.patch.object(yaml_loader, 'yaml_load')
+ @mock.patch.object(tg_landslide.LandslideTrafficGen,
+ '_update_session_test_servers')
+ @mock.patch.object(tg_landslide.LandslideTrafficGen,
+ '_update_session_suts')
+ @mock.patch.object(tg_landslide.LandslideTrafficGen,
+ '_update_session_tc_params')
+ def test__load_session_profile(self, mock_upd_ses_tc_params,
+ mock_upd_ses_suts, mock_upd_ses_ts,
+ mock_yaml_load, *args):
+ self.ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_yaml_load.return_value = SESSION_PROFILE
+ self.assertIsNone(self.ls_tg._load_session_profile())
+ self.assertIsNotNone(self.ls_tg.session_profile)
+ # Number of blocks in configuration files
+ # Number of test servers, suts and tc params blocks should be equal
+ _config_files_blocks_num = len([item['test_server']
+ for item in self.vnfd['config']])
+ self.assertEqual(_config_files_blocks_num,
+ mock_upd_ses_ts.call_count)
+ self.assertEqual(_config_files_blocks_num,
+ mock_upd_ses_suts.call_count)
+ self.assertEqual(_config_files_blocks_num,
+ mock_upd_ses_tc_params.call_count)
+
+ @mock.patch.object(common_utils, 'open_relative_file')
+ @mock.patch.object(yaml_loader, 'yaml_load')
+ def test__load_session_profile_unequal_num_of_cfg_blocks(
+ self, mock_yaml_load, *args):
+ vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0])
+ ls_traffic_gen = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id)
+ ls_traffic_gen.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_yaml_load.return_value = SESSION_PROFILE
+ # Delete test_servers item from pod file to make it not valid
+ ls_traffic_gen.vnfd_helper['config'].pop()
+ with self.assertRaises(RuntimeError):
+ ls_traffic_gen._load_session_profile()
+
+ @mock.patch.object(common_utils, 'open_relative_file')
+ @mock.patch.object(yaml_loader, 'yaml_load')
+ def test__load_session_profile_test_type_mismatch(self, mock_yaml_load,
+ *args):
+ vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0])
+ # Swap test servers data in pod file
+ vnfd['config'] = list(reversed(vnfd['config']))
+ ls_tg = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id)
+ ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_yaml_load.return_value = SESSION_PROFILE
+ with self.assertRaises(RuntimeError):
+ ls_tg._load_session_profile()
+
+
class TestLandslideResourceHelper(unittest.TestCase):
+
PROTO_PORT = 8080
EXAMPLE_URL = ''.join([TAS_INFO['proto'], '://', TAS_INFO['ip'], ':',
str(PROTO_PORT), '/api/'])
@@ -829,8 +1159,8 @@ class TestLandslideResourceHelper(unittest.TestCase):
res = self.res_helper.configure_test_servers(
action={'action': 'recycle'})
self.assertEqual(
- {x['id'] for x in self.TEST_SERVERS_DATA['testServers']},
- set(res))
+ [x['id'] for x in self.TEST_SERVERS_DATA['testServers']],
+ res)
self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']),
mock_session.post.call_count)
@@ -844,7 +1174,7 @@ class TestLandslideResourceHelper(unittest.TestCase):
self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']),
mock_session.delete.call_count)
- def test_create_test_session(self, *args):
+ def test_create_test_session_res_helper(self, *args):
self.res_helper._user_id = self.SUCCESS_RECORD_ID
self.res_helper._tcl = mock.Mock()
test_session = {'name': 'test'}
@@ -976,7 +1306,6 @@ class TestLandslideResourceHelper(unittest.TestCase):
self.assertEqual(self.TEST_RESULTS_DATA, res)
def test__write_results(self, *args):
- self.maxDiff = None
res = self.res_helper._write_results(self.TEST_RESULTS_DATA)
exp_res = {
"Test Summary::Actual Dedicated Bearer Session Connects": 100.0,
@@ -1166,10 +1495,11 @@ class TestLandslideTclClient(unittest.TestCase):
'resolve_test_server_name', return_value='2')
def test_create_test_session(self, *args):
_session_profile = copy.deepcopy(SESSION_PROFILE)
+ _session_profile['reservations'] = RESERVATIONS
self.ls_tcl_client._save_test_session = mock.Mock()
self.ls_tcl_client._configure_ts_group = mock.Mock()
self.ls_tcl_client.create_test_session(_session_profile)
- self.assertEqual(20, self.mock_tcl_handler.execute.call_count)
+ self.assertEqual(17, self.mock_tcl_handler.execute.call_count)
def test_create_dmf(self):
self.mock_tcl_handler.execute.return_value = '2'
@@ -1293,7 +1623,7 @@ class TestLandslideTclClient(unittest.TestCase):
"clientPort": 0,
"context": 0,
"node": 0,
- "overridePort": False,
+ "overridePort": "false",
"ratingGroup": 0,
"role": 0,
"serviceId": 0,
@@ -1313,12 +1643,12 @@ class TestLandslideTclClient(unittest.TestCase):
self.assertIsNone(res)
def test__configure_reservation(self):
- _reservation = copy.deepcopy(SESSION_PROFILE['reservations'][0])
+ _reservation = copy.deepcopy(RESERVATIONS[0])
self.ls_tcl_client.resolve_test_server_name = mock.Mock(
return_value='2')
res = self.ls_tcl_client._configure_reservation(_reservation)
self.assertIsNone(res)
- self.assertEqual(6, self.mock_tcl_handler.execute.call_count)
+ self.assertEqual(4, self.mock_tcl_handler.execute.call_count)
def test__configure_preresolved_arp(self):
_arp = [{'StartingAddress': '10.81.1.10',
@@ -1330,6 +1660,7 @@ class TestLandslideTclClient(unittest.TestCase):
def test__configure_preresolved_arp_none(self):
res = self.ls_tcl_client._configure_preresolved_arp(None)
self.assertIsNone(res)
+ self.mock_tcl_handler.execute.assert_not_called()
def test_delete_test_session(self):
self.assertRaises(NotImplementedError,