diff options
Diffstat (limited to 'yardstick/network_services/vnf_generic/vnf/tg_landslide.py')
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/tg_landslide.py | 1203 |
1 files changed, 1203 insertions, 0 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py new file mode 100644 index 000000000..a146b72ca --- /dev/null +++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py @@ -0,0 +1,1203 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging +import requests +import six +import time + +from yardstick.common import exceptions +from yardstick.common import utils as common_utils +from yardstick.common import yaml_loader +from yardstick.network_services import utils as net_serv_utils +from yardstick.network_services.vnf_generic.vnf import sample_vnf + +try: + from lsapi import LsApi +except ImportError: + LsApi = common_utils.ErrorClass + +LOG = logging.getLogger(__name__) + + +class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen): + APP_NAME = 'LandslideTG' + + def __init__(self, name, vnfd, task_id, setup_env_helper_type=None, + resource_helper_type=None): + if resource_helper_type is None: + resource_helper_type = LandslideResourceHelper + super(LandslideTrafficGen, self).__init__(name, vnfd, task_id, + setup_env_helper_type, + resource_helper_type) + + self.bin_path = net_serv_utils.get_nsb_option('bin_path') + self.name = name + self.runs_traffic = True + self.traffic_finished = False + self.session_profile = None + + def listen_traffic(self, traffic_profile): + pass + + def terminate(self): + self.resource_helper.disconnect() + + def instantiate(self, scenario_cfg, context_cfg): + super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg) + self.resource_helper.connect() + + # Create test servers + test_servers = [x['test_server'] for x in self.vnfd_helper['config']] + self.resource_helper.create_test_servers(test_servers) + + # Create SUTs + [self.resource_helper.create_suts(x['suts']) for x in + self.vnfd_helper['config']] + + # Fill in test session based on session profile and test case options + self._load_session_profile() + + def run_traffic(self, traffic_profile): + self.resource_helper.abort_running_tests() + # Update DMF profile with related test case options + traffic_profile.update_dmf(self.scenario_helper.all_options) + # Create DMF in test user library + self.resource_helper.create_dmf(traffic_profile.dmf_config) + # Create/update test session in test user library + self.resource_helper.create_test_session(self.session_profile) + # Start test session + self.resource_helper.create_running_tests(self.session_profile['name']) + + def collect_kpi(self): + return self.resource_helper.collect_kpi() + + def wait_for_instantiate(self): + pass + + @staticmethod + def _update_session_suts(suts, testcase): + """ Create SUT entry. Update related EPC block in session profile. """ + for sut in suts: + # Update session profile EPC element with SUT info from pod file + tc_role = testcase['parameters'].get(sut['role']) + if tc_role: + _param = {} + if tc_role['class'] == 'Sut': + _param['name'] = sut['name'] + elif tc_role['class'] == 'TestNode': + _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'} + if x in sut and sut[x]}) + testcase['parameters'][sut['role']].update(_param) + else: + LOG.info('Unexpected SUT role in pod file: "%s".', sut['role']) + return testcase + + def _update_session_test_servers(self, test_server, _tsgroup_index): + """ Update tsId, reservations, pre-resolved ARP in session profile """ + # Update test server name + test_groups = self.session_profile['tsGroups'] + test_groups[_tsgroup_index]['tsId'] = test_server['name'] + + # Update preResolvedArpAddress + arp_key = 'preResolvedArpAddress' + _preresolved_arp = test_server.get(arp_key) # list of dicts + if _preresolved_arp: + test_groups[_tsgroup_index][arp_key] = _preresolved_arp + + # Update reservations + if 'phySubnets' in test_server: + reservation = {'tsId': test_server['name'], + 'tsIndex': _tsgroup_index, + 'tsName': test_server['name'], + 'phySubnets': test_server['phySubnets']} + if 'reservations' in self.session_profile: + self.session_profile['reservations'].append(reservation) + else: + self.session_profile['reservePorts'] = 'true' + self.session_profile['reservations'] = [reservation] + + @staticmethod + def _update_session_tc_params(tc_options, testcase): + for _param_key in tc_options: + if _param_key == 'AssociatedPhys': + testcase[_param_key] = tc_options[_param_key] + continue + testcase['parameters'][_param_key] = tc_options[_param_key] + return testcase + + def _load_session_profile(self): + + with common_utils.open_relative_file( + self.scenario_helper.scenario_cfg['session_profile'], + self.scenario_helper.task_path) as stream: + self.session_profile = yaml_loader.yaml_load(stream) + + # Raise exception if number of entries differs in following files, + _config_files = ['pod file', 'session_profile file', 'test_case file'] + # Count testcases number in all tsGroups of session profile + session_tests_num = [xx for x in self.session_profile['tsGroups'] + for xx in x['testCases']] + # Create a set containing number of list elements in each structure + _config_files_blocks_num = [ + len(x) for x in + (self.vnfd_helper['config'], # test_servers and suts info + session_tests_num, + self.scenario_helper.all_options['test_cases'])] # test case file + + if len(set(_config_files_blocks_num)) != 1: + raise RuntimeError('Unequal number of elements. {}'.format( + dict(six.moves.zip_longest(_config_files, + _config_files_blocks_num)))) + + ts_names = set() + _tsgroup_idx = -1 + _testcase_idx = 0 + + # Iterate over data structures to overwrite session profile defaults + # _config: single list element holding test servers and SUTs info + # _tc_options: single test case parameters + for _config, tc_options in zip( + self.vnfd_helper['config'], # test servers and SUTS + self.scenario_helper.all_options['test_cases']): # testcase + + _ts_config = _config['test_server'] + + # Calculate test group/test case indexes based on test server name + if _ts_config['name'] in ts_names: + _testcase_idx += 1 + else: + _tsgroup_idx += 1 + _testcase_idx = 0 + + _testcase = \ + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx] + + if _testcase['type'] != _ts_config['role']: + raise RuntimeError( + 'Test type mismatch in TC#{} of test server {}'.format( + _testcase_idx, _ts_config['name'])) + + # Fill session profile with test servers parameters + if _ts_config['name'] not in ts_names: + self._update_session_test_servers(_ts_config, _tsgroup_idx) + ts_names.add(_ts_config['name']) + + # Fill session profile with suts parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_suts(_config['suts'], _testcase)) + + # Update test case parameters + self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][ + _testcase_idx].update( + self._update_session_tc_params(tc_options, _testcase)) + + +class LandslideResourceHelper(sample_vnf.ClientResourceHelper): + """Landslide TG helper class""" + + REST_STATUS_CODES = {'OK': 200, 'CREATED': 201, 'NO CHANGE': 409} + REST_API_CODES = {'NOT MODIFIED': 500810} + + def __init__(self, setup_helper): + super(LandslideResourceHelper, self).__init__(setup_helper) + self._result = {} + self.vnfd_helper = setup_helper.vnfd_helper + self.scenario_helper = setup_helper.scenario_helper + + # TAS Manager config initialization + self._url = None + self._user_id = None + self.session = None + self.license_data = {} + + # TCL session initialization + self._tcl = LandslideTclClient(LsTclHandler(), self) + + self.session = requests.Session() + self.running_tests_uri = 'runningTests' + self.test_session_uri = 'testSessions' + self.test_serv_uri = 'testServers' + self.suts_uri = 'suts' + self.users_uri = 'users' + self.user_lib_uri = None + self.run_id = None + + def abort_running_tests(self, timeout=60, delay=5): + """ Abort running test sessions, if any """ + _start_time = time.time() + while time.time() < _start_time + timeout: + run_tests_states = {x['id']: x['testStateOrStep'] + for x in self.get_running_tests()} + if not set(run_tests_states.values()).difference( + {'COMPLETE', 'COMPLETE_ERROR'}): + break + else: + [self.stop_running_tests(running_test_id=_id, force=True) + for _id, _state in run_tests_states.items() + if 'COMPLETE' not in _state] + time.sleep(delay) + else: + raise RuntimeError( + 'Some test runs not stopped during {} seconds'.format(timeout)) + + def _build_url(self, resource, action=None): + """ Build URL string + + :param resource: REST API resource name + :type resource: str + :param action: actions name and value + :type action: dict('name': <str>, 'value': <str>) + :returns str: REST API resource name with optional action info + """ + # Action is optional and accepted only in presence of resource param + if action and not resource: + raise ValueError("Resource name not provided") + # Concatenate actions + _action = ''.join(['?{}={}'.format(k, v) for k, v in + action.items()]) if action else '' + + return ''.join([self._url, resource, _action]) + + def get_response_params(self, method, resource, params=None): + """ Retrieve params from JSON response of specific resource URL + + :param method: one of supported REST API methods + :type method: str + :param resource: URI, requested resource name + :type resource: str + :param params: attributes to be found in JSON response + :type params: list(str) + """ + _res = [] + params = params if params else [] + response = self.exec_rest_request(method, resource) + # Get substring between last slash sign and question mark (if any) + url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0] + _response_json = response.json() + # Expect dict(), if URL last part and top dict key don't match + # Else, if they match, expect list() + k, v = list(_response_json.items())[0] + if k != url_last_part: + v = [v] # v: list(dict(str: str)) + # Extract params, or whole list of dicts (without top level key) + for x in v: + _res.append({param: x[param] for param in params} if params else x) + return _res + + def _create_user(self, auth, level=1): + """ Create new user + + :param auth: data to create user account on REST server + :type auth: dict + :param level: Landslide user permissions level + :type level: int + :returns int: user id + """ + # Set expiration date in two years since account creation date + _exp_date = time.strftime( + '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2)) + _username = auth['user'] + _fields = {"contactInformation": "", "expiresOn": _exp_date, + "fullName": "Test User", + "isActive": "true", "level": level, + "password": auth['password'], + "username": _username} + _response = self.exec_rest_request('post', self.users_uri, + json_data=_fields, raise_exc=False) + _resp_json = _response.json() + if _response.status_code == self.REST_STATUS_CODES['CREATED']: + # New user created + _id = _resp_json['id'] + LOG.info("New user created: username='%s', id='%s'", _username, + _id) + elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']: + # User already exists + LOG.info("Account '%s' already exists.", _username) + # Get user id + _id = self._modify_user(_username, {"isActive": "true"})['id'] + else: + raise exceptions.RestApiError( + 'Error during new user "{}" creation'.format(_username)) + return _id + + def _modify_user(self, username, fields): + """ Modify information about existing user + + :param username: user name of account to be modified + :type username: str + :param fields: data to modify user account on REST server + :type fields: dict + :returns dict: user info + """ + _response = self.exec_rest_request('post', self.users_uri, + action={'username': username}, + json_data=fields, raise_exc=False) + if _response.status_code == self.REST_STATUS_CODES['OK']: + _response = _response.json() + else: + raise exceptions.RestApiError( + 'Error during user "{}" data update: {}'.format( + username, + _response.status_code)) + LOG.info("User account '%s' modified: '%s'", username, _response) + return _response + + def _delete_user(self, username): + """ Delete user account + + :param username: username field + :type username: str + :returns bool: True if succeeded + """ + self.exec_rest_request('delete', self.users_uri, + action={'username': username}) + + def _get_users(self, username=None): + """ Get user records from REST server + + :param username: username field + :type username: None|str + :returns list(dict): empty list, or user record, or list of all users + """ + _response = self.get_response_params('get', self.users_uri) + _res = [u for u in _response if + u['username'] == username] if username else _response + return _res + + def exec_rest_request(self, method, resource, action=None, json_data=None, + logs=True, raise_exc=True): + """ Execute REST API request, return response object + + :param method: one of supported requests ('post', 'get', 'delete') + :type method: str + :param resource: URL of resource + :type resource: str + :param action: data used to provide URI located after question mark + :type action: dict + :param json_data: mandatory only for 'post' method + :type json_data: dict + :param logs: debug logs display flag + :type raise_exc: bool + :param raise_exc: if True, raise exception on REST API call error + :returns requests.Response(): REST API call response object + """ + json_data = json_data if json_data else {} + action = action if action else {} + _method = method.upper() + method = method.lower() + if method not in ('post', 'get', 'delete'): + raise ValueError("Method '{}' not supported".format(_method)) + + if method == 'post' and not action: + if not (json_data and isinstance(json_data, collections.Mapping)): + raise ValueError( + 'JSON data missing in {} request'.format(_method)) + + r = getattr(self.session, method)(self._build_url(resource, action), + json=json_data) + if raise_exc and not r.ok: + msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format( + method, self._build_url(resource, action), r.reason) + raise exceptions.RestApiError(msg) + + if logs: + LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method, + r.request.url) + LOG.debug("Response: %s", r.json()) + return r + + def connect(self): + """Connect to RESTful server using test user account""" + tas_info = self.vnfd_helper['mgmt-interface'] + # Supported REST Server ports: HTTP - 8080, HTTPS - 8181 + _port = '8080' if tas_info['proto'] == 'http' else '8181' + tas_info.update({'port': _port}) + self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info) + self.session.headers.update({'Accept': 'application/json', + 'Content-type': 'application/json'}) + # Login with super user to create test user + self.session.auth = ( + tas_info['super-user'], tas_info['super-user-password']) + LOG.info("Connect using superuser: server='%s'", self._url) + auth = {x: tas_info[x] for x in ('user', 'password')} + self._user_id = self._create_user(auth) + # Login with test user + self.session.auth = auth['user'], auth['password'] + # Test user validity + self.exec_rest_request('get', '') + + self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri) + LOG.info("Login with test user: server='%s'", self._url) + # Read existing license + self.license_data['lic_id'] = tas_info['license'] + + # Tcl client init + self._tcl.connect(tas_info['ip'], *self.session.auth) + + return self.session + + def disconnect(self): + self.session = None + self._tcl.disconnect() + + def terminate(self): + self._terminated.value = 1 + + def create_dmf(self, dmf): + if isinstance(dmf, list): + for _dmf in dmf: + self._tcl.create_dmf(_dmf) + else: + self._tcl.create_dmf(dmf) + + def delete_dmf(self, dmf): + if isinstance(dmf, list): + for _dmf in dmf: + self._tcl.delete_dmf(_dmf) + else: + self._tcl.delete_dmf(dmf) + + def create_suts(self, suts): + # Keep only supported keys in suts object + for _sut in suts: + sut_entry = {k: v for k, v in _sut.items() + if k not in {'phy', 'nextHop', 'role'}} + _response = self.exec_rest_request( + 'post', self.suts_uri, json_data=sut_entry, + logs=False, raise_exc=False) + if _response.status_code != self.REST_STATUS_CODES['CREATED']: + LOG.info(_response.reason) # Failed to create + _name = sut_entry.pop('name') + # Modify existing SUT + self.configure_sut(sut_name=_name, json_data=sut_entry) + else: + LOG.info("SUT created: %s", sut_entry) + + def get_suts(self, suts_id=None): + if suts_id: + _suts = self.exec_rest_request( + 'get', '{}/{}'.format(self.suts_uri, suts_id)).json() + else: + _suts = self.get_response_params('get', self.suts_uri) + + return _suts + + def configure_sut(self, sut_name, json_data): + """ Modify information of specific SUTs + + :param sut_name: name of existing SUT + :type sut_name: str + :param json_data: SUT settings + :type json_data: dict() + """ + LOG.info("Modifying SUT information...") + _response = self.exec_rest_request('post', + self.suts_uri, + action={'name': sut_name}, + json_data=json_data, + raise_exc=False) + if _response.status_code not in {self.REST_STATUS_CODES[x] for x in + {'OK', 'NO CHANGE'}}: + raise exceptions.RestApiError(_response.reason) + + LOG.info("Modified SUT: %s", sut_name) + + def delete_suts(self, suts_ids=None): + if not suts_ids: + _curr_suts = self.get_response_params('get', self.suts_uri) + suts_ids = [x['id'] for x in _curr_suts] + LOG.info("Deleting SUTs with following IDs: %s", suts_ids) + for _id in suts_ids: + self.exec_rest_request('delete', + '{}/{}'.format(self.suts_uri, _id)) + LOG.info("\tDone for SUT id: %s", _id) + + def _check_test_servers_state(self, test_servers_ids=None, delay=10, + timeout=300): + LOG.info("Waiting for related test servers state change to READY...") + # Wait on state change + _start_time = time.time() + while time.time() - _start_time < timeout: + ts_ids_not_ready = {x['id'] for x in + self.get_test_servers(test_servers_ids) + if x['state'] != 'READY'} + if ts_ids_not_ready == set(): + break + time.sleep(delay) + else: + raise RuntimeError( + 'Test servers not in READY state after {} seconds.'.format( + timeout)) + + def create_test_servers(self, test_servers): + """ Create test servers + + :param test_servers: input data for test servers creation + mandatory fields: managementIp + optional fields: name + :type test_servers: list(dict) + """ + _ts_ids = [] + for _ts in test_servers: + _msg = 'Created test server "%(name)s"' + _ts_ids.append(self._tcl.create_test_server(_ts)) + if _ts.get('thread_model'): + _msg += ' in mode: "%(thread_model)s"' + LOG.info(_msg, _ts) + + self._check_test_servers_state(_ts_ids) + + def get_test_servers(self, test_server_ids=None): + if not test_server_ids: # Get all test servers info + _test_servers = self.exec_rest_request( + 'get', self.test_serv_uri).json()[self.test_serv_uri] + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + _test_servers = [] + for _id in test_server_ids: + _test_servers.append(self.exec_rest_request( + 'get', '{}/{}'.format(self.test_serv_uri, _id)).json()) + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + def configure_test_servers(self, action, json_data=None, + test_server_ids=None): + if not test_server_ids: + test_server_ids = [x['id'] for x in self.get_test_servers()] + elif isinstance(test_server_ids, int): + test_server_ids = [test_server_ids] + for _id in test_server_ids: + self.exec_rest_request('post', + '{}/{}'.format(self.test_serv_uri, _id), + action=action, json_data=json_data) + LOG.info("Test server (id: %s) configuration done: %s", _id, + action) + return test_server_ids + + def delete_test_servers(self, test_servers_ids=None): + # Delete test servers + for _ts in self.get_test_servers(test_servers_ids): + self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri, + _ts['id'])) + LOG.info("Deleted test server: %s", _ts['name']) + + def create_test_session(self, test_session): + # Use tcl client to create session + test_session['library'] = self._user_id + LOG.debug("Creating session='%s'", test_session['name']) + self._tcl.create_test_session(test_session) + + def get_test_session(self, test_session_name=None): + if test_session_name: + uri = 'libraries/{}/{}/{}'.format(self._user_id, + self.test_session_uri, + test_session_name) + else: + uri = self.user_lib_uri.format(self._user_id) + _test_sessions = self.exec_rest_request('get', uri).json() + return _test_sessions + + def configure_test_session(self, template_name, test_session): + # Override specified test session parameters + LOG.info('Update test session parameters: %s', test_session['name']) + test_session.update({'library': self._user_id}) + return self.exec_rest_request( + method='post', + action={'action': 'overrideAndSaveAs'}, + json_data=test_session, + resource='{}/{}'.format(self.user_lib_uri.format(self._user_id), + template_name)) + + def delete_test_session(self, test_session): + return self.exec_rest_request('delete', '{}/{}'.format( + self.user_lib_uri.format(self._user_id), test_session)) + + def create_running_tests(self, test_session_name): + r = self.exec_rest_request('post', + self.running_tests_uri, + json_data={'library': self._user_id, + 'name': test_session_name}) + if r.status_code != self.REST_STATUS_CODES['CREATED']: + raise exceptions.RestApiError('Failed to start test session.') + self.run_id = r.json()['id'] + + def get_running_tests(self, running_test_id=None): + """Get JSON structure of specified running test entity + + :param running_test_id: ID of created running test entity + :type running_test_id: int + :returns list: running tests entity + """ + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + _res = self.exec_rest_request('get', _res_name, logs=False).json() + # If no run_id specified, skip top level key in response dict. + # Else return JSON as list + return _res.get('runningTests', [_res]) + + def delete_running_tests(self, running_test_id=None): + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.get_response_params('delete', _res_name) + LOG.info("Deleted running test with id: %s", running_test_id) + + def _running_tests_action(self, running_test_id, action, json_data=None): + if not json_data: + json_data = {} + # Supported actions: + # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.exec_rest_request('post', _res_name, {'action': action}, + json_data) + LOG.debug("Executed action: '%s' on running test id: %s", action, + running_test_id) + + def stop_running_tests(self, running_test_id, json_data=None, force=False): + _action = 'abort' if force else 'stop' + self._running_tests_action(running_test_id, _action, + json_data=json_data) + LOG.info('Performed action: "%s" to test run with id: %s', _action, + running_test_id) + + def check_running_test_state(self, run_id): + r = self.exec_rest_request('get', + '{}/{}'.format(self.running_tests_uri, + run_id)) + return r.json().get("testStateOrStep") + + def get_running_tests_results(self, run_id): + _res = self.exec_rest_request( + 'get', + '{}/{}/{}'.format(self.running_tests_uri, + run_id, + 'measurements')).json() + return _res + + def _write_results(self, results): + # Avoid None value at test session start + _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0 + + _res_tabs = results.get('tabs') + # Avoid parsing 'tab' dict key initially (missing or empty) + if not _res_tabs: + return + + # Flatten nested dict holding Landslide KPIs of current test run + flat_kpis_dict = {} + for _tab, _kpis in six.iteritems(_res_tabs): + for _kpi, _value in six.iteritems(_kpis): + # Combine table name and KPI name using delimiter "::" + _key = '::'.join([_tab, _kpi]) + try: + # Cast value from str to float + # Remove comma and/or measure units, e.g. "us" + flat_kpis_dict[_key] = float( + _value.split(' ')[0].replace(',', '')) + except ValueError: # E.g. if KPI represents datetime + pass + LOG.info("Polling test results of test run id: %s. Elapsed time: %s " + "seconds", self.run_id, _elapsed_time) + return flat_kpis_dict + + def collect_kpi(self): + if 'COMPLETE' in self.check_running_test_state(self.run_id): + self._result.update({'done': True}) + return self._result + _res = self.get_running_tests_results(self.run_id) + _kpis = self._write_results(_res) + if _kpis: + _kpis.update({'run_id': int(self.run_id)}) + _kpis.update({'iteration': _res['iteration']}) + self._result.update(_kpis) + return self._result + + +class LandslideTclClient(object): + """Landslide TG TCL client class""" + + DEFAULT_TEST_NODE = { + 'ethStatsEnabled': True, + 'forcedEthInterface': '', + 'innerVlanId': 0, + 'ip': '', + 'mac': '', + 'mtu': 1500, + 'nextHop': '', + 'numLinksOrNodes': 1, + 'numVlan': 1, + 'phy': '', + 'uniqueVlanAddr': False, + 'vlanDynamic': 0, + 'vlanId': 0, + 'vlanUserPriority': 0, + 'vlanTagType': 0 + } + + TEST_NODE_CMD = \ + 'ls::create -TestNode-{} -under $p_ -Type "eth"' \ + ' -Phy "{phy}" -Ip "{ip}" -NumLinksOrNodes {numLinksOrNodes}' \ + ' -NextHop "{nextHop}" -Mac "{mac}" -MTU {mtu}' \ + ' -ForcedEthInterface "{forcedEthInterface}"' \ + ' -EthStatsEnabled {ethStatsEnabled}' \ + ' -VlanId {vlanId} -VlanUserPriority {vlanUserPriority}' \ + ' -NumVlan {numVlan} -UniqueVlanAddr {uniqueVlanAddr}' \ + ';' + + def __init__(self, tcl_handler, ts_context): + self.tcl_server_ip = None + self._user = None + self._library_id = None + self._basic_library_id = None + self._tcl = tcl_handler + self._ts_context = ts_context + self.ts_ids = set() + + # Test types names expected in session profile, test case and pod files + self._tc_types = {"SGW_Nodal", "SGW_Node", "MME_Nodal", "PGW_Node", + "PCRF_Node"} + + self._class_param_config_handler = { + "Array": self._configure_array_param, + "TestNode": self._configure_test_node_param, + "Sut": self._configure_sut_param, + "Dmf": self._configure_dmf_param + } + + def connect(self, tcl_server_ip, username, password): + """ Connect to TCL server with username and password + + :param tcl_server_ip: TCL server IP address + :type tcl_server_ip: str + :param username: existing username on TCL server + :type username: str + :param password: password related to username on TCL server + :type password: str + """ + LOG.info("connect: server='%s' user='%s'", tcl_server_ip, username) + res = self._tcl.execute( + "ls::login {} {} {}".format(tcl_server_ip, username, password)) + if 'java0x' not in res: # handle assignment reflects login success + raise exceptions.LandslideTclException( + "connect: login failed ='{}'.".format(res)) + self._library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format( + username)) + self._basic_library_id = self._get_library_id('Basic') + self.tcl_server_ip = tcl_server_ip + self._user = username + LOG.debug("connect: user='%s' me='%s' basic='%s'", self._user, + self._library_id, + self._basic_library_id) + + def disconnect(self): + """ Disconnect from TCL server. Drop TCL connection configuration """ + LOG.info("disconnect: server='%s' user='%s'", + self.tcl_server_ip, self._user) + self._tcl.execute("ls::logout") + self.tcl_server_ip = None + self._user = None + self._library_id = None + self._basic_library_id = None + + def _add_test_server(self, name, ip): + try: + # Check if test server exists with name equal to _ts_name + ts_id = int(self.resolve_test_server_name(name)) + except ValueError: + # Such test server does not exist. Attempt to create it + ts_id = self._tcl.execute( + 'ls::perform AddTs -Name "{}" -Ip "{}"'.format(name, ip)) + try: + int(ts_id) + except ValueError: + # Failed to create test server, e.g. limit reached + raise RuntimeError( + 'Failed to create test server: "{}". {}'.format(name, + ts_id)) + return ts_id + + def _update_license(self, name): + """ Setup/update test server license + + :param name: test server name + :type name: str + """ + # Retrieve current TsInfo configuration, result stored in handle "ts" + self._tcl.execute( + 'set ts [ls::retrieve TsInfo -Name "{}"]'.format(name)) + + # Set license ID, if it differs from current one, update test server + _curr_lic_id = self._tcl.execute('ls::get $ts -RequestedLicense') + if _curr_lic_id != self._ts_context.license_data['lic_id']: + self._tcl.execute('ls::config $ts -RequestedLicense {}'.format( + self._ts_context.license_data['lic_id'])) + self._tcl.execute('ls::perform ModifyTs $ts') + + def _set_thread_model(self, name, thread_model): + # Retrieve test server configuration, store it in handle "tsc" + _cfguser_password = self._ts_context.vnfd_helper['mgmt-interface'][ + 'cfguser_password'] + self._tcl.execute( + 'set tsc [ls::perform RetrieveTsConfiguration ' + '-name "{}" {}]'.format(name, _cfguser_password)) + # Configure ThreadModel, if it differs from current one + thread_model_map = {'Legacy': 'V0', + 'Max': 'V1', + 'Fireball': 'V1_FB3'} + _model = thread_model_map[thread_model] + _curr_model = self._tcl.execute('ls::get $tsc -ThreadModel') + if _curr_model != _model: + self._tcl.execute( + 'ls::config $tsc -ThreadModel "{}"'.format(_model)) + self._tcl.execute( + 'ls::perform ApplyTsConfiguration $tsc {}'.format( + _cfguser_password)) + + def create_test_server(self, test_server): + _ts_thread_model = test_server.get('thread_model') + _ts_name = test_server['name'] + + ts_id = self._add_test_server(_ts_name, test_server['ip']) + + self._update_license(_ts_name) + + # Skip below code modifying thread_model if it is not defined + if _ts_thread_model: + self._set_thread_model(_ts_name, _ts_thread_model) + + return ts_id + + def create_test_session(self, test_session): + """ Create, configure and save Landslide test session object. + + :param test_session: Landslide TestSession object + :type test_session: dict + """ + LOG.info("create_test_session: name='%s'", test_session['name']) + self._tcl.execute('set test_ [ls::create TestSession]') + self._tcl.execute('ls::config $test_ -Library {} -Name "{}"'.format( + self._library_id, test_session['name'])) + self._tcl.execute('ls::config $test_ -Description "{}"'.format( + test_session['description'])) + if 'keywords' in test_session: + self._tcl.execute('ls::config $test_ -Keywords "{}"'.format( + test_session['keywords'])) + if 'duration' in test_session: + self._tcl.execute('ls::config $test_ -Duration "{}"'.format( + test_session['duration'])) + if 'iterations' in test_session: + self._tcl.execute('ls::config $test_ -Iterations "{}"'.format( + test_session['iterations'])) + if 'reservePorts' in test_session: + if test_session['reservePorts'] == 'true': + self._tcl.execute('ls::config $test_ -Reserve Ports') + + if 'reservations' in test_session: + for _reservation in test_session['reservations']: + self._configure_reservation(_reservation) + + if 'reportOptions' in test_session: + self._configure_report_options(test_session['reportOptions']) + + for _index, _group in enumerate(test_session['tsGroups']): + self._configure_ts_group(_group, _index) + + self._save_test_session() + + def create_dmf(self, dmf): + """ Create, configure and save Landslide Data Message Flow object. + + :param dmf: Landslide Data Message Flow object + :type: dmf: dict + """ + self._tcl.execute('set dmf_ [ls::create Dmf]') + _lib_id = self._get_library_id(dmf['dmf']['library']) + self._tcl.execute('ls::config $dmf_ -Library {} -Name "{}"'.format( + _lib_id, + dmf['dmf']['name'])) + for _param_key in dmf: + if _param_key == 'dmf': + continue + _param_value = dmf[_param_key] + if isinstance(_param_value, dict): + # Configure complex parameter + _tcl_cmd = 'ls::config $dmf_' + for _sub_param_key in _param_value: + _sub_param_value = _param_value[_sub_param_key] + if isinstance(_sub_param_value, str): + _tcl_cmd += ' -{} "{}"'.format(_sub_param_key, + _sub_param_value) + else: + _tcl_cmd += ' -{} {}'.format(_sub_param_key, + _sub_param_value) + + self._tcl.execute(_tcl_cmd) + else: + # Configure simple parameter + if isinstance(_param_value, str): + self._tcl.execute( + 'ls::config $dmf_ -{} "{}"'.format(_param_key, + _param_value)) + else: + self._tcl.execute( + 'ls::config $dmf_ -{} {}'.format(_param_key, + _param_value)) + self._save_dmf() + + def configure_dmf(self, dmf): + # Use create to reconfigure and overwrite existing dmf + self.create_dmf(dmf) + + def delete_dmf(self, dmf): + raise NotImplementedError + + def _save_dmf(self): + # Call 'Validate' to set default values for missing parameters + res = self._tcl.execute('ls::perform Validate -Dmf $dmf_') + if res == 'Invalid': + res = self._tcl.execute('ls::get $dmf_ -ErrorsAndWarnings') + LOG.error("_save_dmf: %s", res) + raise exceptions.LandslideTclException("_save_dmf: {}".format(res)) + else: + res = self._tcl.execute('ls::save $dmf_ -overwrite') + LOG.debug("_save_dmf: result (%s)", res) + + def _configure_report_options(self, options): + for _option_key in options: + _option_value = options[_option_key] + if _option_key == 'format': + _format = 0 + if _option_value == 'CSV': + _format = 1 + self._tcl.execute( + 'ls::config $test_.ReportOptions -Format {} ' + '-Ts -3 -Tc -3'.format(_format)) + else: + self._tcl.execute( + 'ls::config $test_.ReportOptions -{} {}'.format( + _option_key, + _option_value)) + + def _configure_ts_group(self, ts_group, ts_group_index): + try: + _ts_id = int(self.resolve_test_server_name(ts_group['tsId'])) + except ValueError: + raise RuntimeError('Test server name "{}" does not exist.'.format( + ts_group['tsId'])) + if _ts_id not in self.ts_ids: + self._tcl.execute( + 'set tss_ [ls::create TsGroup -under $test_ -tsId {} ]'.format( + _ts_id)) + self.ts_ids.add(_ts_id) + for _case in ts_group.get('testCases', []): + self._configure_tc_type(_case, ts_group_index) + + self._configure_preresolved_arp(ts_group.get('preResolvedArpAddress')) + + def _configure_tc_type(self, tc, ts_group_index): + if tc['type'] not in self._tc_types: + raise RuntimeError('Test type {} not supported.'.format( + tc['type'])) + tc['type'] = tc['type'].replace('_', ' ') + res = self._tcl.execute( + 'set tc_ [ls::retrieve testcase -libraryId {0} "{1}"]'.format( + self._basic_library_id, tc['type'])) + if 'Invalid' in res: + raise RuntimeError('Test type {} not found in "Basic" ' + 'library.'.format(tc['type'])) + self._tcl.execute( + 'ls::config $test_.TsGroup({}) -children-Tc $tc_'.format( + ts_group_index)) + self._tcl.execute('ls::config $tc_ -Library {0} -Name "{1}"'.format( + self._basic_library_id, tc['name'])) + self._tcl.execute( + 'ls::config $tc_ -Description "{}"'.format(tc['type'])) + self._tcl.execute( + 'ls::config $tc_ -Keywords "GTP LTE {}"'.format(tc['type'])) + if 'linked' in tc: + self._tcl.execute( + 'ls::config $tc_ -Linked {}'.format(tc['linked'])) + if 'AssociatedPhys' in tc: + self._tcl.execute('ls::config $tc_ -AssociatedPhys "{}"'.format( + tc['AssociatedPhys'])) + if 'parameters' in tc: + self._configure_parameters(tc['parameters']) + + def _configure_parameters(self, params): + self._tcl.execute('set p_ [ls::get $tc_ -children-Parameters(0)]') + for _param_key in sorted(params): + _param_value = params[_param_key] + if isinstance(_param_value, dict): + # Configure complex parameter + if _param_value['class'] in self._class_param_config_handler: + self._class_param_config_handler[_param_value['class']]( + _param_key, + _param_value) + else: + # Configure simple parameter + self._tcl.execute( + 'ls::create {} -under $p_ -Value "{}"'.format( + _param_key, + _param_value)) + + def _configure_array_param(self, name, params): + self._tcl.execute('ls::create -Array-{} -under $p_ ;'.format(name)) + for param in params['array']: + self._tcl.execute( + 'ls::create ArrayItem -under $p_.{} -Value "{}"'.format(name, + param)) + + def _configure_test_node_param(self, name, params): + _params = self.DEFAULT_TEST_NODE + _params.update(params) + + # TCL command expects lower case 'true' or 'false' + _params['ethStatsEnabled'] = str(_params['ethStatsEnabled']).lower() + _params['uniqueVlanAddr'] = str(_params['uniqueVlanAddr']).lower() + + cmd = self.TEST_NODE_CMD.format(name, **_params) + self._tcl.execute(cmd) + + def _configure_sut_param(self, name, params): + self._tcl.execute( + 'ls::create -Sut-{} -under $p_ -Name "{}";'.format(name, + params['name'])) + + def _configure_dmf_param(self, name, params): + self._tcl.execute('ls::create -Dmf-{} -under $p_ ;'.format(name)) + + for _flow_index, _flow in enumerate(params['mainflows']): + _lib_id = self._get_library_id(_flow['library']) + self._tcl.execute( + 'ls::perform AddDmfMainflow $p_.Dmf {} "{}"'.format( + _lib_id, + _flow['name'])) + + if not params.get('instanceGroups'): + return + + _instance_group = params['instanceGroups'][_flow_index] + + # Traffic Mixer parameters handling + for _key in ['mixType', 'rate']: + if _key in _instance_group: + self._tcl.execute( + 'ls::config $p_.Dmf.InstanceGroup({}) -{} {}'.format( + _flow_index, _key, _instance_group[_key])) + + # Assignments parameters handling + for _row_id, _row in enumerate(_instance_group.get('rows', [])): + self._tcl.execute( + 'ls::config $p_.Dmf.InstanceGroup({}).Row({}) -Node {} ' + '-OverridePort {} -ClientPort {} -Context {} -Role {} ' + '-PreferredTransport {} -RatingGroup {} ' + '-ServiceID {}'.format( + _flow_index, _row_id, _row['node'], + _row['overridePort'], _row['clientPort'], + _row['context'], _row['role'], _row['transport'], + _row['ratingGroup'], _row['serviceId'])) + + def _configure_reservation(self, reservation): + _ts_id = self.resolve_test_server_name(reservation['tsId']) + self._tcl.execute( + 'set reservation_ [ls::create Reservation -under $test_]') + self._tcl.execute( + 'ls::config $reservation_ -TsIndex {} -TsId {} ' + '-TsName "{}"'.format(reservation['tsIndex'], + _ts_id, + reservation['tsName'])) + for _subnet in reservation['phySubnets']: + self._tcl.execute( + 'set physubnet_ [ls::create PhySubnet -under $reservation_]') + self._tcl.execute( + 'ls::config $physubnet_ -Name "{}" -Base "{}" -Mask "{}" ' + '-NumIps {}'.format(_subnet['name'], _subnet['base'], + _subnet['mask'], _subnet['numIps'])) + + def _configure_preresolved_arp(self, pre_resolved_arp): + if not pre_resolved_arp: # Pre-resolved ARP configuration not found + return + for _entry in pre_resolved_arp: + # TsGroup handle name should correspond in _configure_ts_group() + self._tcl.execute( + 'ls::create PreResolvedArpAddress -under $tss_ ' + '-StartingAddress "{StartingAddress}" ' + '-NumNodes {NumNodes}'.format(**_entry)) + + def delete_test_session(self, test_session): + raise NotImplementedError + + def _save_test_session(self): + # Call 'Validate' to set default values for missing parameters + res = self._tcl.execute('ls::perform Validate -TestSession $test_') + if res == 'Invalid': + res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings') + raise exceptions.LandslideTclException( + "Test session validation failed. Server response: {}".format( + res)) + else: + self._tcl.execute('ls::save $test_ -overwrite') + LOG.debug("Test session saved successfully.") + + def _get_library_id(self, library): + _library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -systemLibraryName {}] -Id".format( + library)) + try: + int(_library_id) + return _library_id + except ValueError: + pass + + _library_id = self._tcl.execute( + "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format( + library)) + try: + int(_library_id) + except ValueError: + LOG.error("_get_library_id: library='%s' not found.", library) + raise exceptions.LandslideTclException( + "_get_library_id: library='{}' not found.".format( + library)) + + return _library_id + + def resolve_test_server_name(self, ts_name): + return self._tcl.execute("ls::query TsId {}".format(ts_name)) + + +class LsTclHandler(object): + """Landslide TCL Handler class""" + + LS_OK = "ls_ok" + JRE_PATH = net_serv_utils.get_nsb_option('jre_path_i386') + + def __init__(self): + self.tcl_cmds = {} + self._ls = LsApi(jre_path=self.JRE_PATH) + self._ls.tcl( + "ls::config ApiOptions -NoReturnSuccessResponseString '{}'".format( + self.LS_OK)) + + def execute(self, command): + res = self._ls.tcl(command) + self.tcl_cmds[command] = res + return res |