summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEmma Foley <emma.l.foley@intel.com>2018-08-30 13:54:57 +0100
committerOrest Voznyy <orestx.voznyy@intel.com>2018-09-12 15:29:21 +0300
commit4bcfaec22bd4c9ece155986ed2c0bd91262afd8e (patch)
treead31898be2e5047fb266e8af605a66c17707be79
parentdea3a485f5f196d7ee18ccc0c64a35ab5560385f (diff)
Add Spirent Landslide TG API
New TG class "LandslideTrafficGen" represents an interface to use Spirent Landslide API to configure and execute vEPC test cases on hardware/software Spirent environment. For that purpose this class and its helper classes use Spirent Landslide REST API calls and Landslide TCL API calls via library module lsapi.py. Change-Id: Ib6560d5cd2483c6c9f5c95568345ac39bfebbd4d Signed-off-by: Emma Foley <emma.l.foley@intel.com> Signed-off-by: Orest Voznyy <orestx.voznyy@intel.com>
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_landslide.py182
-rw-r--r--yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py405
2 files changed, 547 insertions, 40 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
index 0ccffeeb1..d4222c136 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
@@ -12,14 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
import logging
import requests
import six
import time
-from collections import Mapping
from yardstick.common import exceptions
from yardstick.common import utils as common_utils
+from yardstick.common import yaml_loader
from yardstick.network_services import utils as net_serv_utils
from yardstick.network_services.vnf_generic.vnf import sample_vnf
@@ -31,6 +32,181 @@ except ImportError:
LOG = logging.getLogger(__name__)
+class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen):
+ APP_NAME = 'LandslideTG'
+
+ def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+ resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = LandslideResourceHelper
+ super(LandslideTrafficGen, self).__init__(name, vnfd, task_id,
+ setup_env_helper_type,
+ resource_helper_type)
+
+ self.bin_path = net_serv_utils.get_nsb_option('bin_path')
+ self.name = name
+ self.runs_traffic = True
+ self.traffic_finished = False
+ self.session_profile = None
+
+ def listen_traffic(self, traffic_profile):
+ pass
+
+ def terminate(self):
+ self.resource_helper.disconnect()
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg)
+ self.resource_helper.connect()
+
+ # Create test servers
+ test_servers = [x['test_server'] for x in self.vnfd_helper['config']]
+ self.resource_helper.create_test_servers(test_servers)
+
+ # Create SUTs
+ [self.resource_helper.create_suts(x['suts']) for x in
+ self.vnfd_helper['config']]
+
+ # Fill in test session based on session profile and test case options
+ self._load_session_profile()
+
+ def run_traffic(self, traffic_profile):
+ self.resource_helper.abort_running_tests()
+ # Update DMF profile with related test case options
+ traffic_profile.update_dmf(self.scenario_helper.all_options)
+ # Create DMF in test user library
+ self.resource_helper.create_dmf(traffic_profile.dmf_config)
+ # Create/update test session in test user library
+ self.resource_helper.create_test_session(self.session_profile)
+ # Start test session
+ self.resource_helper.create_running_tests(self.session_profile['name'])
+
+ def collect_kpi(self):
+ return self.resource_helper.collect_kpi()
+
+ def wait_for_instantiate(self):
+ pass
+
+ @staticmethod
+ def _update_session_suts(suts, testcase):
+ """ Create SUT entry. Update related EPC block in session profile. """
+ for sut in suts:
+ # Update session profile EPC element with SUT info from pod file
+ tc_role = testcase['parameters'].get(sut['role'])
+ if tc_role:
+ _param = {}
+ if tc_role['class'] == 'Sut':
+ _param['name'] = sut['name']
+ elif tc_role['class'] == 'TestNode':
+ _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'}
+ if x in sut and sut[x]})
+ testcase['parameters'][sut['role']].update(_param)
+ else:
+ LOG.info('Unexpected SUT role in pod file: "%s".', sut['role'])
+ return testcase
+
+ def _update_session_test_servers(self, test_server, _tsgroup_index):
+ """ Update tsId, reservations, pre-resolved ARP in session profile """
+ # Update test server name
+ test_groups = self.session_profile['tsGroups']
+ test_groups[_tsgroup_index]['tsId'] = test_server['name']
+
+ # Update preResolvedArpAddress
+ arp_key = 'preResolvedArpAddress'
+ _preresolved_arp = test_server.get(arp_key) # list of dicts
+ if _preresolved_arp:
+ test_groups[_tsgroup_index][arp_key] = _preresolved_arp
+
+ # Update reservations
+ if 'phySubnets' in test_server:
+ reservation = {'tsId': test_server['name'],
+ 'tsIndex': _tsgroup_index,
+ 'tsName': test_server['name'],
+ 'phySubnets': test_server['phySubnets']}
+ if 'reservations' in self.session_profile:
+ self.session_profile['reservations'].append(reservation)
+ else:
+ self.session_profile['reservePorts'] = 'true'
+ self.session_profile['reservations'] = [reservation]
+
+ @staticmethod
+ def _update_session_tc_params(tc_options, testcase):
+ for _param_key in tc_options:
+ if _param_key == 'AssociatedPhys':
+ testcase[_param_key] = tc_options[_param_key]
+ continue
+ testcase['parameters'][_param_key] = tc_options[_param_key]
+ return testcase
+
+ def _load_session_profile(self):
+
+ with common_utils.open_relative_file(
+ self.scenario_helper.scenario_cfg['session_profile'],
+ self.scenario_helper.task_path) as stream:
+ self.session_profile = yaml_loader.yaml_load(stream)
+
+ # Raise exception if number of entries differs in following files,
+ _config_files = ['pod file', 'session_profile file', 'test_case file']
+ # Count testcases number in all tsGroups of session profile
+ session_tests_num = [xx for x in self.session_profile['tsGroups']
+ for xx in x['testCases']]
+ # Create a set containing number of list elements in each structure
+ _config_files_blocks_num = [
+ len(x) for x in
+ (self.vnfd_helper['config'], # test_servers and suts info
+ session_tests_num,
+ self.scenario_helper.all_options['test_cases'])] # test case file
+
+ if len(set(_config_files_blocks_num)) != 1:
+ raise RuntimeError('Unequal number of elements. {}'.format(
+ dict(six.moves.zip_longest(_config_files,
+ _config_files_blocks_num))))
+
+ ts_names = set()
+ _tsgroup_idx = -1
+ _testcase_idx = 0
+
+ # Iterate over data structures to overwrite session profile defaults
+ # _config: single list element holding test servers and SUTs info
+ # _tc_options: single test case parameters
+ for _config, tc_options in zip(
+ self.vnfd_helper['config'], # test servers and SUTS
+ self.scenario_helper.all_options['test_cases']): # testcase
+
+ _ts_config = _config['test_server']
+
+ # Calculate test group/test case indexes based on test server name
+ if _ts_config['name'] in ts_names:
+ _testcase_idx += 1
+ else:
+ _tsgroup_idx += 1
+ _testcase_idx = 0
+
+ _testcase = \
+ self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
+ _testcase_idx]
+
+ if _testcase['type'] != _ts_config['role']:
+ raise RuntimeError(
+ 'Test type mismatch in TC#{} of test server {}'.format(
+ _testcase_idx, _ts_config['name']))
+
+ # Fill session profile with test servers parameters
+ if _ts_config['name'] not in ts_names:
+ self._update_session_test_servers(_ts_config, _tsgroup_idx)
+ ts_names.add(_ts_config['name'])
+
+ # Fill session profile with suts parameters
+ self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
+ _testcase_idx].update(
+ self._update_session_suts(_config['suts'], _testcase))
+
+ # Update test case parameters
+ self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
+ _testcase_idx].update(
+ self._update_session_tc_params(tc_options, _testcase))
+
+
class LandslideResourceHelper(sample_vnf.ClientResourceHelper):
"""Landslide TG helper class"""
@@ -228,7 +404,7 @@ class LandslideResourceHelper(sample_vnf.ClientResourceHelper):
raise ValueError("Method '{}' not supported".format(_method))
if method == 'post' and not action:
- if not (json_data and isinstance(json_data, Mapping)):
+ if not (json_data and isinstance(json_data, collections.Mapping)):
raise ValueError(
'JSON data missing in {} request'.format(_method))
@@ -731,7 +907,7 @@ class LandslideTclClient(object):
self._tcl.execute('ls::config $test_ -Iterations "{}"'.format(
test_session['iterations']))
if 'reservePorts' in test_session:
- if test_session['reservePorts']:
+ if test_session['reservePorts'] == 'true':
self._tcl.execute('ls::config $test_ -Reserve Ports')
if 'reservations' in test_session:
diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py
index 9f0e69b60..d47061c6e 100644
--- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py
+++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py
@@ -11,17 +11,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
import copy
import mock
import requests
import time
import unittest
+import uuid
+from yardstick.benchmark.contexts import base as ctx_base
from yardstick.common import exceptions
+from yardstick.common import utils as common_utils
+from yardstick.common import yaml_loader
+from yardstick.network_services import utils as net_serv_utils
+from yardstick.network_services.traffic_profile import landslide_profile
+from yardstick.network_services.vnf_generic.vnf import sample_vnf
from yardstick.network_services.vnf_generic.vnf import tg_landslide
+
NAME = "tg__0"
EXAMPLE_URL = 'http://example.com/'
@@ -164,6 +171,16 @@ DMF_CFG = {
"serverPort": 2003
}
+RESERVATIONS = [
+ {'tsName': TEST_SERVERS[0]['name'],
+ 'phySubnets': TEST_SERVERS[0]['phySubnets'],
+ 'tsId': TEST_SERVERS[0]['name'],
+ 'tsIndex': 0},
+ {'tsName': TEST_SERVERS[1]['name'],
+ 'phySubnets': TEST_SERVERS[1]['phySubnets'],
+ 'tsId': TEST_SERVERS[1]['name'],
+ 'tsIndex': 1}]
+
SESSION_PROFILE = {
'keywords': '',
'duration': 60,
@@ -171,33 +188,13 @@ SESSION_PROFILE = {
'description': 'UE default bearer creation test case',
'name': 'default_bearer_capacity',
'reportOptions': {'format': 'CSV'},
- "reservePorts": "true",
- "reservations": [
- {"tsId": 4,
- "tsIndex": 0,
- "tsName": TEST_SERVERS[0]['name'],
- "phySubnets": [
- {"base": "10.42.32.100", "mask": "/24", "name": "eth5",
- "numIps": 20},
- {"base": "10.42.33.100", "mask": "/24", "name": "eth6",
- "numIps": 20}
- ]},
- {"tsId": 2,
- "tsIndex": 1,
- "tsName": TEST_SERVERS[1]['name'],
- "phySubnets": [
- {"base": "10.42.32.1", "mask": "/24", "name": "eth5",
- "numIps": 100},
- {"base": "10.42.33.1", "mask": "/24", "name": "eth6",
- "numIps": 100}
- ]}
- ],
+ 'reservePorts': 'false',
'tsGroups': [
{
'testCases': [{
'type': 'SGW_Node',
'name': '',
- 'linked': False,
+ 'linked': "false",
'AssociatedPhys': '',
'parameters': {
'SgiPtpTunnelEn': 'false',
@@ -217,7 +214,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'SGW_USER_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500
},
'SgwControlAddr': {
@@ -226,7 +223,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'SGW_CONTROL_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500,
'nextHop': 'SGW_CONTROL_NEXT_HOP'
},
@@ -254,7 +251,7 @@ SESSION_PROFILE = {
'class': 'Dmf',
'instanceGroups': [
{
- 'startPaused': False,
+ 'startPaused': "false",
'rate': 0,
'mainflowIdx': 0,
'mixType': ''
@@ -270,7 +267,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'MME_CONTROL_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500
},
'SgwUserSut': {
@@ -284,7 +281,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'NET_HOST_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500
},
'DedicatedsPerDefaultBearer': '0',
@@ -310,7 +307,7 @@ SESSION_PROFILE = {
'forcedEthInterface': '',
'ip': 'ENB_USER_IP',
'class': 'TestNode',
- 'ethStatsEnabled': False,
+ 'ethStatsEnabled': "false",
'mtu': 1500
},
'TestType': 'SGW-NODAL'
@@ -322,7 +319,340 @@ SESSION_PROFILE = {
}
+class TestLandslideTrafficGen(unittest.TestCase):
+ SCENARIO_CFG = {
+ 'session_profile': '/traffic_profiles/landslide/'
+ 'landslide_session_default_bearer.yaml',
+ 'task_path': '',
+ 'runner': {
+ 'type': 'Iteration',
+ 'iterations': 1
+ },
+ 'nodes': {
+ 'tg__0': 'tg__0.traffic_gen',
+ 'vnf__0': 'vnf__0.vnf_epc'
+ },
+ 'topology': 'landslide_tg_topology.yaml',
+ 'type': 'NSPerf',
+ 'traffic_profile': '../../traffic_profiles/landslide/'
+ 'landslide_dmf_udp.yaml',
+ 'options': {
+ 'test_cases': [
+ {
+ 'BearerAddrPool': '2002::2',
+ 'type': 'SGW_Node',
+ 'BearerV4AddrPool': '2.0.0.2',
+ 'Sessions': '90000'
+ },
+ {
+ 'StartRate': '900.0',
+ 'type': 'SGW_Nodal',
+ 'DisconnectRate': '900.0',
+ 'Sessions': '90000'
+ }
+ ],
+ 'dmf':
+ {
+ 'transactionRate': 1000,
+ 'packetSize': 512
+ }
+ }
+ }
+
+ CONTEXT_CFG = {
+ 'contexts': [
+ {
+ 'type': 'Node',
+ 'name': 'traffic_gen',
+ 'file': '/etc/yardstick/nodes/pod_landslide.yaml'
+ },
+ {
+ 'type': 'Node',
+ 'name': 'vnf_epc',
+ 'file': '/etc/yardstick/nodes/pod_vepc_sut.yaml'
+ }
+ ]
+ }
+
+ TRAFFIC_PROFILE = {
+ "schema": "nsb:traffic_profile:0.1",
+ "name": "LandslideProfile",
+ "description": "Spirent Landslide traffic profile",
+ "traffic_profile": {
+ "traffic_type": "LandslideProfile"
+ },
+ "dmf_config": {
+ "dmf": {
+ "library": "test",
+ "name": "Basic UDP"
+ },
+ "description": "Basic data flow using UDP/IP",
+ "keywords": "UDP",
+ "dataProtocol": "udp"
+ }
+ }
+
+ SUCCESS_CREATED_CODE = 201
+ SUCCESS_OK_CODE = 200
+ SUCCESS_RECORD_ID = 5
+ TEST_USER_ID = 11
+
+ def setUp(self):
+ self._id = uuid.uuid1().int
+
+ self.mock_lsapi = mock.patch.object(tg_landslide, 'LsApi')
+ self.mock_lsapi.start()
+
+ self.mock_ssh_helper = mock.patch.object(sample_vnf, 'VnfSshHelper')
+ self.mock_ssh_helper.start()
+ self.vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+ self.ls_tg = tg_landslide.LandslideTrafficGen(
+ NAME, self.vnfd, self._id)
+ self.session_profile = copy.deepcopy(SESSION_PROFILE)
+ self.ls_tg.session_profile = self.session_profile
+
+ self.addCleanup(self._cleanup)
+
+ def _cleanup(self):
+ self.mock_lsapi.stop()
+ self.mock_ssh_helper.stop()
+
+ @mock.patch.object(net_serv_utils, 'get_nsb_option')
+ def test___init__(self, mock_get_nsb_option, *args):
+ _path_to_nsb = 'path/to/nsb'
+ mock_get_nsb_option.return_value = _path_to_nsb
+ ls_tg = tg_landslide.LandslideTrafficGen(NAME, self.vnfd, self._id)
+ self.assertIsInstance(ls_tg.resource_helper,
+ tg_landslide.LandslideResourceHelper)
+ mock_get_nsb_option.assert_called_once_with('bin_path')
+ self.assertEqual(_path_to_nsb, ls_tg.bin_path)
+ self.assertEqual(NAME, ls_tg.name)
+ self.assertTrue(ls_tg.runs_traffic)
+ self.assertFalse(ls_tg.traffic_finished)
+ self.assertIsNone(ls_tg.session_profile)
+
+ def test_listen_traffic(self):
+ _traffic_profile = {}
+ self.assertIsNone(self.ls_tg.listen_traffic(_traffic_profile))
+
+ def test_terminate(self, *args):
+ self.ls_tg.resource_helper._tcl = mock.Mock()
+ self.assertIsNone(self.ls_tg.terminate())
+ self.ls_tg.resource_helper._tcl.disconnect.assert_called_once()
+
+ @mock.patch.object(ctx_base.Context, 'get_context_from_server',
+ return_value='fake_context')
+ def test_instantiate(self, *args):
+ self.ls_tg._tg_process = mock.Mock()
+ self.ls_tg._tg_process.start = mock.Mock()
+ self.ls_tg.resource_helper.connect = mock.Mock()
+ self.ls_tg.resource_helper.create_test_servers = mock.Mock()
+ self.ls_tg.resource_helper.create_suts = mock.Mock()
+ self.ls_tg._load_session_profile = mock.Mock()
+ self.assertIsNone(self.ls_tg.instantiate(self.SCENARIO_CFG,
+ self.CONTEXT_CFG))
+ self.ls_tg.resource_helper.connect.assert_called_once()
+ self.ls_tg.resource_helper.create_test_servers.assert_called_once()
+ _suts_blocks_num = len([item['suts'] for item in self.vnfd['config']])
+ self.assertEqual(_suts_blocks_num,
+ self.ls_tg.resource_helper.create_suts.call_count)
+ self.ls_tg._load_session_profile.assert_called_once()
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'get_running_tests')
+ def test_run_traffic(self, mock_get_tests, *args):
+ self.ls_tg.resource_helper._url = EXAMPLE_URL
+ self.ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_traffic_profile = mock.Mock(
+ spec=landslide_profile.LandslideProfile)
+ mock_traffic_profile.dmf_config = {'keywords': 'UDP',
+ 'dataProtocol': 'udp'}
+ mock_traffic_profile.params = self.TRAFFIC_PROFILE
+ self.ls_tg.resource_helper._user_id = self.TEST_USER_ID
+ mock_get_tests.return_value = [{'id': self.SUCCESS_RECORD_ID,
+ 'testStateOrStep': 'COMPLETE'}]
+ mock_post = mock.Mock()
+ mock_post.status_code = self.SUCCESS_CREATED_CODE
+ mock_post.json.return_value = {'id': self.SUCCESS_RECORD_ID}
+ mock_session = mock.Mock(spec=requests.Session)
+ mock_session.post.return_value = mock_post
+ self.ls_tg.resource_helper.session = mock_session
+ self.ls_tg.resource_helper._tcl = mock.Mock()
+ _tcl = self.ls_tg.resource_helper._tcl
+ self.assertIsNone(self.ls_tg.run_traffic(mock_traffic_profile))
+ self.assertEqual(self.SUCCESS_RECORD_ID,
+ self.ls_tg.resource_helper.run_id)
+ mock_traffic_profile.update_dmf.assert_called_with(
+ self.ls_tg.scenario_helper.all_options)
+ _tcl.create_dmf.assert_called_with(mock_traffic_profile.dmf_config)
+ _tcl.create_test_session.assert_called_with(self.session_profile)
+
+ @mock.patch.object(tg_landslide.LandslideResourceHelper,
+ 'check_running_test_state')
+ def test_collect_kpi(self, mock_check_running_test_state, *args):
+ self.ls_tg.resource_helper.run_id = self.SUCCESS_RECORD_ID
+ mock_check_running_test_state.return_value = 'COMPLETE'
+ self.assertEqual({'done': True}, self.ls_tg.collect_kpi())
+ mock_check_running_test_state.assert_called_once()
+
+ def test_wait_for_instantiate(self):
+ self.assertIsNone(self.ls_tg.wait_for_instantiate())
+ self.ls_tg.wait_for_instantiate()
+
+ def test__update_session_suts_no_tc_role(self, *args):
+ _suts = [{'role': 'epc_role'}]
+ _testcase = {'parameters': {'diff_epc_role': {'class': 'Sut'}}}
+ res = self.ls_tg._update_session_suts(_suts, _testcase)
+ self.assertEqual(_testcase, res)
+
+ def test__update_session_suts(self, *args):
+
+ def get_testnode_param(role, key, session_prof):
+ """ Get value by key from the deep nested dict to avoid calls like:
+ e.g. session_prof['tsGroups'][0]['testCases'][1]['parameters'][key]
+ """
+ for group in session_prof['tsGroups']:
+ for tc in group['testCases']:
+ tc_params = tc['parameters']
+ if tc_params.get(role):
+ return tc_params[role][key]
+
+ def get_sut_param(role, key, suts):
+ """ Search list of dicts for one with specific role.
+ Return the value of related dict by key. Expect key presence.
+ """
+ for sut in suts:
+ if sut.get('role') == role:
+ return sut[key]
+
+ # TestNode to verify
+ testnode_role = 'SgwControlAddr'
+ # SUT to verify
+ sut_role = 'SgwUserSut'
+
+ config_suts = [config['suts'] for config in self.vnfd['config']]
+ session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups']
+ for _tc in _ts_group['testCases']]
+ for suts, tc in zip(config_suts, session_tcs):
+ self.assertEqual(tc, self.ls_tg._update_session_suts(suts, tc))
+
+ # Verify TestNode class objects keys were updated
+ for _key in {'ip', 'phy', 'nextHop'}:
+ self.assertEqual(
+ get_testnode_param(testnode_role, _key, self.ls_tg.session_profile),
+ get_sut_param(testnode_role, _key, TS1_SUTS))
+ # Verify Sut class objects name was updated
+ self.assertEqual(
+ get_testnode_param(sut_role, 'name', self.ls_tg.session_profile),
+ get_sut_param(sut_role, 'name', TS2_SUTS))
+
+ def test__update_session_test_servers(self, *args):
+ for ts_index, ts in enumerate(TEST_SERVERS):
+ self.assertIsNone(
+ self.ls_tg._update_session_test_servers(ts, ts_index))
+ # Verify preResolvedArpAddress key was added
+ self.assertTrue(any(
+ _item.get('preResolvedArpAddress')
+ for _item in self.ls_tg.session_profile['tsGroups']))
+ # Verify reservations key was added to session profile
+ self.assertEqual(RESERVATIONS,
+ self.ls_tg.session_profile.get('reservations'))
+ self.assertEqual('true',
+ self.ls_tg.session_profile.get('reservePorts'))
+
+ def test__update_session_tc_params_assoc_phys(self):
+ _tc_options = {'AssociatedPhys': 'eth1'}
+ _testcase = {}
+ _testcase_orig = copy.deepcopy(_testcase)
+ res = self.ls_tg._update_session_tc_params(_tc_options, _testcase)
+ self.assertNotEqual(_testcase_orig, res)
+ self.assertEqual(_tc_options, _testcase)
+
+ def test__update_session_tc_params(self, *args):
+
+ def get_session_tc_param_value(param, tc_type, session_prof):
+ """ Get param value from the deep nested dict to avoid calls like:
+ session_prof['tsGroups'][0]['testCases'][0]['parameters'][key]
+ """
+ for test_group in session_prof['tsGroups']:
+ session_tc = test_group['testCases'][0]
+ if session_tc['type'] == tc_type:
+ return session_tc['parameters'].get(param)
+
+ session_tcs = [_tc for _ts_group in self.ls_tg.session_profile['tsGroups']
+ for _tc in _ts_group['testCases']]
+ scenario_tcs = [_tc for _tc in
+ self.SCENARIO_CFG['options']['test_cases']]
+ for tc_options, tc in zip(scenario_tcs, session_tcs):
+ self.assertEqual(
+ tc,
+ self.ls_tg._update_session_tc_params(tc_options, tc))
+
+ # Verify that each test case parameter was updated
+ # Params been compared are deeply nested. Using loops to ease access.
+ for _tc in self.SCENARIO_CFG['options']['test_cases']:
+ for _key, _val in _tc.items():
+ if _key != 'type':
+ self.assertEqual(
+ _val,
+ get_session_tc_param_value(_key, _tc.get('type'),
+ self.ls_tg.session_profile))
+
+ @mock.patch.object(common_utils, 'open_relative_file')
+ @mock.patch.object(yaml_loader, 'yaml_load')
+ @mock.patch.object(tg_landslide.LandslideTrafficGen,
+ '_update_session_test_servers')
+ @mock.patch.object(tg_landslide.LandslideTrafficGen,
+ '_update_session_suts')
+ @mock.patch.object(tg_landslide.LandslideTrafficGen,
+ '_update_session_tc_params')
+ def test__load_session_profile(self, mock_upd_ses_tc_params,
+ mock_upd_ses_suts, mock_upd_ses_ts,
+ mock_yaml_load, *args):
+ self.ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_yaml_load.return_value = SESSION_PROFILE
+ self.assertIsNone(self.ls_tg._load_session_profile())
+ self.assertIsNotNone(self.ls_tg.session_profile)
+ # Number of blocks in configuration files
+ # Number of test servers, suts and tc params blocks should be equal
+ _config_files_blocks_num = len([item['test_server']
+ for item in self.vnfd['config']])
+ self.assertEqual(_config_files_blocks_num,
+ mock_upd_ses_ts.call_count)
+ self.assertEqual(_config_files_blocks_num,
+ mock_upd_ses_suts.call_count)
+ self.assertEqual(_config_files_blocks_num,
+ mock_upd_ses_tc_params.call_count)
+
+ @mock.patch.object(common_utils, 'open_relative_file')
+ @mock.patch.object(yaml_loader, 'yaml_load')
+ def test__load_session_profile_unequal_num_of_cfg_blocks(
+ self, mock_yaml_load, *args):
+ vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0])
+ ls_traffic_gen = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id)
+ ls_traffic_gen.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_yaml_load.return_value = SESSION_PROFILE
+ # Delete test_servers item from pod file to make it not valid
+ ls_traffic_gen.vnfd_helper['config'].pop()
+ with self.assertRaises(RuntimeError):
+ ls_traffic_gen._load_session_profile()
+
+ @mock.patch.object(common_utils, 'open_relative_file')
+ @mock.patch.object(yaml_loader, 'yaml_load')
+ def test__load_session_profile_test_type_mismatch(self, mock_yaml_load,
+ *args):
+ vnfd = copy.deepcopy(VNFD['vnfd:vnfd-catalog']['vnfd'][0])
+ # Swap test servers data in pod file
+ vnfd['config'] = list(reversed(vnfd['config']))
+ ls_tg = tg_landslide.LandslideTrafficGen(NAME, vnfd, self._id)
+ ls_tg.scenario_helper.scenario_cfg = self.SCENARIO_CFG
+ mock_yaml_load.return_value = SESSION_PROFILE
+ with self.assertRaises(RuntimeError):
+ ls_tg._load_session_profile()
+
+
class TestLandslideResourceHelper(unittest.TestCase):
+
PROTO_PORT = 8080
EXAMPLE_URL = ''.join([TAS_INFO['proto'], '://', TAS_INFO['ip'], ':',
str(PROTO_PORT), '/api/'])
@@ -829,8 +1159,8 @@ class TestLandslideResourceHelper(unittest.TestCase):
res = self.res_helper.configure_test_servers(
action={'action': 'recycle'})
self.assertEqual(
- {x['id'] for x in self.TEST_SERVERS_DATA['testServers']},
- set(res))
+ [x['id'] for x in self.TEST_SERVERS_DATA['testServers']],
+ res)
self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']),
mock_session.post.call_count)
@@ -844,7 +1174,7 @@ class TestLandslideResourceHelper(unittest.TestCase):
self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']),
mock_session.delete.call_count)
- def test_create_test_session(self, *args):
+ def test_create_test_session_res_helper(self, *args):
self.res_helper._user_id = self.SUCCESS_RECORD_ID
self.res_helper._tcl = mock.Mock()
test_session = {'name': 'test'}
@@ -976,7 +1306,6 @@ class TestLandslideResourceHelper(unittest.TestCase):
self.assertEqual(self.TEST_RESULTS_DATA, res)
def test__write_results(self, *args):
- self.maxDiff = None
res = self.res_helper._write_results(self.TEST_RESULTS_DATA)
exp_res = {
"Test Summary::Actual Dedicated Bearer Session Connects": 100.0,
@@ -1166,10 +1495,11 @@ class TestLandslideTclClient(unittest.TestCase):
'resolve_test_server_name', return_value='2')
def test_create_test_session(self, *args):
_session_profile = copy.deepcopy(SESSION_PROFILE)
+ _session_profile['reservations'] = RESERVATIONS
self.ls_tcl_client._save_test_session = mock.Mock()
self.ls_tcl_client._configure_ts_group = mock.Mock()
self.ls_tcl_client.create_test_session(_session_profile)
- self.assertEqual(20, self.mock_tcl_handler.execute.call_count)
+ self.assertEqual(17, self.mock_tcl_handler.execute.call_count)
def test_create_dmf(self):
self.mock_tcl_handler.execute.return_value = '2'
@@ -1293,7 +1623,7 @@ class TestLandslideTclClient(unittest.TestCase):
"clientPort": 0,
"context": 0,
"node": 0,
- "overridePort": False,
+ "overridePort": "false",
"ratingGroup": 0,
"role": 0,
"serviceId": 0,
@@ -1313,12 +1643,12 @@ class TestLandslideTclClient(unittest.TestCase):
self.assertIsNone(res)
def test__configure_reservation(self):
- _reservation = copy.deepcopy(SESSION_PROFILE['reservations'][0])
+ _reservation = copy.deepcopy(RESERVATIONS[0])
self.ls_tcl_client.resolve_test_server_name = mock.Mock(
return_value='2')
res = self.ls_tcl_client._configure_reservation(_reservation)
self.assertIsNone(res)
- self.assertEqual(6, self.mock_tcl_handler.execute.call_count)
+ self.assertEqual(4, self.mock_tcl_handler.execute.call_count)
def test__configure_preresolved_arp(self):
_arp = [{'StartingAddress': '10.81.1.10',
@@ -1330,6 +1660,7 @@ class TestLandslideTclClient(unittest.TestCase):
def test__configure_preresolved_arp_none(self):
res = self.ls_tcl_client._configure_preresolved_arp(None)
self.assertIsNone(res)
+ self.mock_tcl_handler.execute.assert_not_called()
def test_delete_test_session(self):
self.assertRaises(NotImplementedError,