From dea3a485f5f196d7ee18ccc0c64a35ab5560385f Mon Sep 17 00:00:00 2001 From: Emma Foley Date: Thu, 30 Aug 2018 13:19:36 +0100 Subject: Added Landslide Resource Helper implementation Class "LandslideResourceHelper" provides API for operations needed - to configure Landslide test session - manage test session execution (start/stop/abort) - collect measurements during test run This helper class API is responsible to for configure Landslide test runs: - create test user - create test servers (for emulation of specific vEPC blocks) - create SUTs (actual tested VNFs performing specific vEPC roles) - create test session (contains actual test cases) - create DMFs (pre-defined traffic flows in traffic profile) - operate traffic run execution (start, stop, abort) - monitor test run status - collect KPIs on TG side Some of these features use Landslide REST API. Other ones use Landslide TCL API. JIRA: YARDSTICK-1356 Change-Id: I8fc8a7d85301121da465d054b8d38ae09a541c36 Signed-off-by: Orest Voznyy Signed-off-by: Emma Foley Signed-off-by: Orest Voznyy --- yardstick/common/exceptions.py | 6 + .../vnf_generic/vnf/tg_landslide.py | 516 ++++++++++++++- .../vnf_generic/vnf/test_tg_landslide.py | 734 ++++++++++++++++++++- 3 files changed, 1218 insertions(+), 38 deletions(-) (limited to 'yardstick') diff --git a/yardstick/common/exceptions.py b/yardstick/common/exceptions.py index 4a4006ae4..539e0fec3 100644 --- a/yardstick/common/exceptions.py +++ b/yardstick/common/exceptions.py @@ -422,6 +422,12 @@ class ValueCheckError(YardstickException): message = 'Constraint "%(value1)s %(operator)s %(value2)s" does not hold' +class RestApiError(RuntimeError): + def __init__(self, message): + self._message = message + super(RestApiError, self).__init__(message) + + class LandslideTclException(RuntimeError): def __init__(self, message): self._message = message diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py index 3bb849c52..0ccffeeb1 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py @@ -13,6 +13,10 @@ # limitations under the License. import logging +import requests +import six +import time +from collections import Mapping from yardstick.common import exceptions from yardstick.common import utils as common_utils @@ -39,14 +43,515 @@ class LandslideResourceHelper(sample_vnf.ClientResourceHelper): self.vnfd_helper = setup_helper.vnfd_helper self.scenario_helper = setup_helper.scenario_helper + # TAS Manager config initialization + self._url = None + self._user_id = None + self.session = None + self.license_data = {} + # TCL session initialization self._tcl = LandslideTclClient(LsTclHandler(), self) + self.session = requests.Session() + self.running_tests_uri = 'runningTests' + self.test_session_uri = 'testSessions' + self.test_serv_uri = 'testServers' + self.suts_uri = 'suts' + self.users_uri = 'users' + self.user_lib_uri = None + self.run_id = None + + def abort_running_tests(self, timeout=60, delay=5): + """ Abort running test sessions, if any """ + _start_time = time.time() + while time.time() < _start_time + timeout: + run_tests_states = {x['id']: x['testStateOrStep'] + for x in self.get_running_tests()} + if not set(run_tests_states.values()).difference( + {'COMPLETE', 'COMPLETE_ERROR'}): + break + else: + [self.stop_running_tests(running_test_id=_id, force=True) + for _id, _state in run_tests_states.items() + if 'COMPLETE' not in _state] + time.sleep(delay) + else: + raise RuntimeError( + 'Some test runs not stopped during {} seconds'.format(timeout)) + + def _build_url(self, resource, action=None): + """ Build URL string + + :param resource: REST API resource name + :type resource: str + :param action: actions name and value + :type action: dict('name': , 'value': ) + :returns str: REST API resource name with optional action info + """ + # Action is optional and accepted only in presence of resource param + if action and not resource: + raise ValueError("Resource name not provided") + # Concatenate actions + _action = ''.join(['?{}={}'.format(k, v) for k, v in + action.items()]) if action else '' + + return ''.join([self._url, resource, _action]) + + def get_response_params(self, method, resource, params=None): + """ Retrieve params from JSON response of specific resource URL + + :param method: one of supported REST API methods + :type method: str + :param resource: URI, requested resource name + :type resource: str + :param params: attributes to be found in JSON response + :type params: list(str) + """ + _res = [] + params = params if params else [] + response = self.exec_rest_request(method, resource) + # Get substring between last slash sign and question mark (if any) + url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0] + _response_json = response.json() + # Expect dict(), if URL last part and top dict key don't match + # Else, if they match, expect list() + k, v = list(_response_json.items())[0] + if k != url_last_part: + v = [v] # v: list(dict(str: str)) + # Extract params, or whole list of dicts (without top level key) + for x in v: + _res.append({param: x[param] for param in params} if params else x) + return _res + + def _create_user(self, auth, level=1): + """ Create new user + + :param auth: data to create user account on REST server + :type auth: dict + :param level: Landslide user permissions level + :type level: int + :returns int: user id + """ + # Set expiration date in two years since account creation date + _exp_date = time.strftime( + '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2)) + _username = auth['user'] + _fields = {"contactInformation": "", "expiresOn": _exp_date, + "fullName": "Test User", + "isActive": "true", "level": level, + "password": auth['password'], + "username": _username} + _response = self.exec_rest_request('post', self.users_uri, + json_data=_fields, raise_exc=False) + _resp_json = _response.json() + if _response.status_code == self.REST_STATUS_CODES['CREATED']: + # New user created + _id = _resp_json['id'] + LOG.info("New user created: username='%s', id='%s'", _username, + _id) + elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']: + # User already exists + LOG.info("Account '%s' already exists.", _username) + # Get user id + _id = self._modify_user(_username, {"isActive": "true"})['id'] + else: + raise exceptions.RestApiError( + 'Error during new user "{}" creation'.format(_username)) + return _id + + def _modify_user(self, username, fields): + """ Modify information about existing user + + :param username: user name of account to be modified + :type username: str + :param fields: data to modify user account on REST server + :type fields: dict + :returns dict: user info + """ + _response = self.exec_rest_request('post', self.users_uri, + action={'username': username}, + json_data=fields, raise_exc=False) + if _response.status_code == self.REST_STATUS_CODES['OK']: + _response = _response.json() + else: + raise exceptions.RestApiError( + 'Error during user "{}" data update: {}'.format( + username, + _response.status_code)) + LOG.info("User account '%s' modified: '%s'", username, _response) + return _response + + def _delete_user(self, username): + """ Delete user account + + :param username: username field + :type username: str + :returns bool: True if succeeded + """ + self.exec_rest_request('delete', self.users_uri, + action={'username': username}) + + def _get_users(self, username=None): + """ Get user records from REST server + + :param username: username field + :type username: None|str + :returns list(dict): empty list, or user record, or list of all users + """ + _response = self.get_response_params('get', self.users_uri) + _res = [u for u in _response if + u['username'] == username] if username else _response + return _res + + def exec_rest_request(self, method, resource, action=None, json_data=None, + logs=True, raise_exc=True): + """ Execute REST API request, return response object + + :param method: one of supported requests ('post', 'get', 'delete') + :type method: str + :param resource: URL of resource + :type resource: str + :param action: data used to provide URI located after question mark + :type action: dict + :param json_data: mandatory only for 'post' method + :type json_data: dict + :param logs: debug logs display flag + :type raise_exc: bool + :param raise_exc: if True, raise exception on REST API call error + :returns requests.Response(): REST API call response object + """ + json_data = json_data if json_data else {} + action = action if action else {} + _method = method.upper() + method = method.lower() + if method not in ('post', 'get', 'delete'): + raise ValueError("Method '{}' not supported".format(_method)) + + if method == 'post' and not action: + if not (json_data and isinstance(json_data, Mapping)): + raise ValueError( + 'JSON data missing in {} request'.format(_method)) + + r = getattr(self.session, method)(self._build_url(resource, action), + json=json_data) + if raise_exc and not r.ok: + msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format( + method, self._build_url(resource, action), r.reason) + raise exceptions.RestApiError(msg) + + if logs: + LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method, + r.request.url) + LOG.debug("Response: %s", r.json()) + return r + + def connect(self): + """Connect to RESTful server using test user account""" + tas_info = self.vnfd_helper['mgmt-interface'] + # Supported REST Server ports: HTTP - 8080, HTTPS - 8181 + _port = '8080' if tas_info['proto'] == 'http' else '8181' + tas_info.update({'port': _port}) + self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info) + self.session.headers.update({'Accept': 'application/json', + 'Content-type': 'application/json'}) + # Login with super user to create test user + self.session.auth = ( + tas_info['super-user'], tas_info['super-user-password']) + LOG.info("Connect using superuser: server='%s'", self._url) + auth = {x: tas_info[x] for x in ('user', 'password')} + self._user_id = self._create_user(auth) + # Login with test user + self.session.auth = auth['user'], auth['password'] + # Test user validity + self.exec_rest_request('get', '') + + self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri) + LOG.info("Login with test user: server='%s'", self._url) + # Read existing license + self.license_data['lic_id'] = tas_info['license'] + + # Tcl client init + self._tcl.connect(tas_info['ip'], *self.session.auth) + + return self.session + + def disconnect(self): + self.session = None + self._tcl.disconnect() + def terminate(self): - raise NotImplementedError() + self._terminated.value = 1 + + def create_dmf(self, dmf): + if isinstance(dmf, list): + for _dmf in dmf: + self._tcl.create_dmf(_dmf) + else: + self._tcl.create_dmf(dmf) + + def delete_dmf(self, dmf): + if isinstance(dmf, list): + for _dmf in dmf: + self._tcl.delete_dmf(_dmf) + else: + self._tcl.delete_dmf(dmf) + + def create_suts(self, suts): + # Keep only supported keys in suts object + for _sut in suts: + sut_entry = {k: v for k, v in _sut.items() + if k not in {'phy', 'nextHop', 'role'}} + _response = self.exec_rest_request( + 'post', self.suts_uri, json_data=sut_entry, + logs=False, raise_exc=False) + if _response.status_code != self.REST_STATUS_CODES['CREATED']: + LOG.info(_response.reason) # Failed to create + _name = sut_entry.pop('name') + # Modify existing SUT + self.configure_sut(sut_name=_name, json_data=sut_entry) + else: + LOG.info("SUT created: %s", sut_entry) + + def get_suts(self, suts_id=None): + if suts_id: + _suts = self.exec_rest_request( + 'get', '{}/{}'.format(self.suts_uri, suts_id)).json() + else: + _suts = self.get_response_params('get', self.suts_uri) + + return _suts + + def configure_sut(self, sut_name, json_data): + """ Modify information of specific SUTs + + :param sut_name: name of existing SUT + :type sut_name: str + :param json_data: SUT settings + :type json_data: dict() + """ + LOG.info("Modifying SUT information...") + _response = self.exec_rest_request('post', + self.suts_uri, + action={'name': sut_name}, + json_data=json_data, + raise_exc=False) + if _response.status_code not in {self.REST_STATUS_CODES[x] for x in + {'OK', 'NO CHANGE'}}: + raise exceptions.RestApiError(_response.reason) + + LOG.info("Modified SUT: %s", sut_name) + + def delete_suts(self, suts_ids=None): + if not suts_ids: + _curr_suts = self.get_response_params('get', self.suts_uri) + suts_ids = [x['id'] for x in _curr_suts] + LOG.info("Deleting SUTs with following IDs: %s", suts_ids) + for _id in suts_ids: + self.exec_rest_request('delete', + '{}/{}'.format(self.suts_uri, _id)) + LOG.info("\tDone for SUT id: %s", _id) + + def _check_test_servers_state(self, test_servers_ids=None, delay=10, + timeout=300): + LOG.info("Waiting for related test servers state change to READY...") + # Wait on state change + _start_time = time.time() + while time.time() - _start_time < timeout: + ts_ids_not_ready = {x['id'] for x in + self.get_test_servers(test_servers_ids) + if x['state'] != 'READY'} + if ts_ids_not_ready == set(): + break + time.sleep(delay) + else: + raise RuntimeError( + 'Test servers not in READY state after {} seconds.'.format( + timeout)) + + def create_test_servers(self, test_servers): + """ Create test servers + + :param test_servers: input data for test servers creation + mandatory fields: managementIp + optional fields: name + :type test_servers: list(dict) + """ + _ts_ids = [] + for _ts in test_servers: + _msg = 'Created test server "%(name)s"' + _ts_ids.append(self._tcl.create_test_server(_ts)) + if _ts.get('thread_model'): + _msg += ' in mode: "%(thread_model)s"' + LOG.info(_msg, _ts) + + self._check_test_servers_state(_ts_ids) + + def get_test_servers(self, test_server_ids=None): + if not test_server_ids: # Get all test servers info + _test_servers = self.exec_rest_request( + 'get', self.test_serv_uri).json()[self.test_serv_uri] + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + _test_servers = [] + for _id in test_server_ids: + _test_servers.append(self.exec_rest_request( + 'get', '{}/{}'.format(self.test_serv_uri, _id)).json()) + LOG.info("Current test servers configuration: %s", _test_servers) + return _test_servers + + def configure_test_servers(self, action, json_data=None, + test_server_ids=None): + if not test_server_ids: + test_server_ids = [x['id'] for x in self.get_test_servers()] + elif isinstance(test_server_ids, int): + test_server_ids = [test_server_ids] + for _id in test_server_ids: + self.exec_rest_request('post', + '{}/{}'.format(self.test_serv_uri, _id), + action=action, json_data=json_data) + LOG.info("Test server (id: %s) configuration done: %s", _id, + action) + return test_server_ids + + def delete_test_servers(self, test_servers_ids=None): + # Delete test servers + for _ts in self.get_test_servers(test_servers_ids): + self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri, + _ts['id'])) + LOG.info("Deleted test server: %s", _ts['name']) + + def create_test_session(self, test_session): + # Use tcl client to create session + test_session['library'] = self._user_id + LOG.debug("Creating session='%s'", test_session['name']) + self._tcl.create_test_session(test_session) + + def get_test_session(self, test_session_name=None): + if test_session_name: + uri = 'libraries/{}/{}/{}'.format(self._user_id, + self.test_session_uri, + test_session_name) + else: + uri = self.user_lib_uri.format(self._user_id) + _test_sessions = self.exec_rest_request('get', uri).json() + return _test_sessions + + def configure_test_session(self, template_name, test_session): + # Override specified test session parameters + LOG.info('Update test session parameters: %s', test_session['name']) + test_session.update({'library': self._user_id}) + return self.exec_rest_request( + method='post', + action={'action': 'overrideAndSaveAs'}, + json_data=test_session, + resource='{}/{}'.format(self.user_lib_uri.format(self._user_id), + template_name)) + + def delete_test_session(self, test_session): + return self.exec_rest_request('delete', '{}/{}'.format( + self.user_lib_uri.format(self._user_id), test_session)) + + def create_running_tests(self, test_session_name): + r = self.exec_rest_request('post', + self.running_tests_uri, + json_data={'library': self._user_id, + 'name': test_session_name}) + if r.status_code != self.REST_STATUS_CODES['CREATED']: + raise exceptions.RestApiError('Failed to start test session.') + self.run_id = r.json()['id'] + + def get_running_tests(self, running_test_id=None): + """Get JSON structure of specified running test entity + + :param running_test_id: ID of created running test entity + :type running_test_id: int + :returns list: running tests entity + """ + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + _res = self.exec_rest_request('get', _res_name, logs=False).json() + # If no run_id specified, skip top level key in response dict. + # Else return JSON as list + return _res.get('runningTests', [_res]) + + def delete_running_tests(self, running_test_id=None): + if not running_test_id: + running_test_id = '' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.get_response_params('delete', _res_name) + LOG.info("Deleted running test with id: %s", running_test_id) + + def _running_tests_action(self, running_test_id, action, json_data=None): + if not json_data: + json_data = {} + # Supported actions: + # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc' + _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id) + self.exec_rest_request('post', _res_name, {'action': action}, + json_data) + LOG.debug("Executed action: '%s' on running test id: %s", action, + running_test_id) + + def stop_running_tests(self, running_test_id, json_data=None, force=False): + _action = 'abort' if force else 'stop' + self._running_tests_action(running_test_id, _action, + json_data=json_data) + LOG.info('Performed action: "%s" to test run with id: %s', _action, + running_test_id) + + def check_running_test_state(self, run_id): + r = self.exec_rest_request('get', + '{}/{}'.format(self.running_tests_uri, + run_id)) + return r.json().get("testStateOrStep") + + def get_running_tests_results(self, run_id): + _res = self.exec_rest_request( + 'get', + '{}/{}/{}'.format(self.running_tests_uri, + run_id, + 'measurements')).json() + return _res + + def _write_results(self, results): + # Avoid None value at test session start + _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0 + + _res_tabs = results.get('tabs') + # Avoid parsing 'tab' dict key initially (missing or empty) + if not _res_tabs: + return + + # Flatten nested dict holding Landslide KPIs of current test run + flat_kpis_dict = {} + for _tab, _kpis in six.iteritems(_res_tabs): + for _kpi, _value in six.iteritems(_kpis): + # Combine table name and KPI name using delimiter "::" + _key = '::'.join([_tab, _kpi]) + try: + # Cast value from str to float + # Remove comma and/or measure units, e.g. "us" + flat_kpis_dict[_key] = float( + _value.split(' ')[0].replace(',', '')) + except ValueError: # E.g. if KPI represents datetime + pass + LOG.info("Polling test results of test run id: %s. Elapsed time: %s " + "seconds", self.run_id, _elapsed_time) + return flat_kpis_dict def collect_kpi(self): - raise NotImplementedError() + if 'COMPLETE' in self.check_running_test_state(self.run_id): + self._result.update({'done': True}) + return self._result + _res = self.get_running_tests_results(self.run_id) + _kpis = self._write_results(_res) + if _kpis: + _kpis.update({'run_id': int(self.run_id)}) + _kpis.update({'iteration': _res['iteration']}) + self._result.update(_kpis) + return self._result class LandslideTclClient(object): @@ -470,10 +975,11 @@ class LandslideTclClient(object): if res == 'Invalid': res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings') raise exceptions.LandslideTclException( - "_save_test_session: {}".format(res)) + "Test session validation failed. Server response: {}".format( + res)) else: - res = self._tcl.execute('ls::save $test_ -overwrite') - LOG.debug("_save_test_session: result (%s)", res) + self._tcl.execute('ls::save $test_ -overwrite') + LOG.debug("Test session saved successfully.") def _get_library_id(self, library): _library_id = self._tcl.execute( diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py index 178ec5dab..9f0e69b60 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_landslide.py @@ -13,7 +13,10 @@ # limitations under the License. # +import copy import mock +import requests +import time import unittest from yardstick.common import exceptions @@ -167,7 +170,7 @@ SESSION_PROFILE = { 'iterations': 1, 'description': 'UE default bearer creation test case', 'name': 'default_bearer_capacity', - 'reportOptions': {'Format': '1'}, + 'reportOptions': {'format': 'CSV'}, "reservePorts": "true", "reservations": [ {"tsId": 4, @@ -181,7 +184,7 @@ SESSION_PROFILE = { ]}, {"tsId": 2, "tsIndex": 1, - "tsName": "TestServer_2", + "tsName": TEST_SERVERS[1]['name'], "phySubnets": [ {"base": "10.42.32.1", "mask": "/24", "name": "eth5", "numIps": 100}, @@ -320,8 +323,85 @@ SESSION_PROFILE = { class TestLandslideResourceHelper(unittest.TestCase): - TEST_TERMINATED = 1 + PROTO_PORT = 8080 + EXAMPLE_URL = ''.join([TAS_INFO['proto'], '://', TAS_INFO['ip'], ':', + str(PROTO_PORT), '/api/']) + SUCCESS_CREATED_CODE = 201 + SUCCESS_OK_CODE = 200 + INVALID_REST_CODE = '400' + NOT_MODIFIED_CODE = 500810 + ERROR_CODE = 500800 SUCCESS_RECORD_ID = 11 + EXPIRE_DATE = '2020/01/01 12:00 FLE Standard Time' + TEST_USER = 'test' + TEST_TERMINATED = 1 + AUTH_DATA = {'user': TAS_INFO['user'], 'password': TAS_INFO['password']} + TEST_SESSION_NAME = 'default_bearer_capacity' + + USERS_DATA = { + "users": [{ + "url": ''.join([EXAMPLE_URL, 'users/', str(SUCCESS_RECORD_ID)]), + "id": SUCCESS_RECORD_ID, + "level": 1, + "username": TEST_USER + }] + } + + CREATE_USER_DATA = {'username': TAS_INFO['user'], + 'expiresOn': EXPIRE_DATE, + 'level': 1, + 'contactInformation': '', + 'fullName': 'Test User', + 'password': TAS_INFO['password'], + 'isActive': 'true'} + + SUTS_DATA = { + "suts": [ + { + "url": ''.join([EXAMPLE_URL, 'suts/', str(SUCCESS_RECORD_ID)]), + "id": SUCCESS_RECORD_ID, + "name": "10.41.32.1" + }]} + + TEST_SERVERS_DATA = { + "testServers": [ + { + "url": ''.join([EXAMPLE_URL, "testServers/1"]), + "id": 1, + "name": TEST_SERVERS[0]['name'], + "state": "READY", + "version": "16.4.0.10" + }, + { + "url": ''.join([EXAMPLE_URL, "testServers/2"]), + "id": 2, + "name": TEST_SERVERS[1]['name'], + "state": "READY", + "version": "16.4.0.10" + } + + ] + } + + RUN_ID = 3 + + RUNNING_TESTS_DATA = { + "runningTests": [{ + "url": ''.join([EXAMPLE_URL, "runningTests/{}".format(RUN_ID)]), + "measurementsUrl": ''.join( + [EXAMPLE_URL, + "runningTests/{}/measurements".format(RUN_ID)]), + "criteriaUrl": ''.join( + [EXAMPLE_URL, + "runningTests/{}/criteria".format(RUN_ID)]), + "noteToUser": "", + "id": RUN_ID, + "library": SUCCESS_RECORD_ID, + "name": "default_bearer_capacity", + "user": TEST_USER, + "criteriaStatus": "NA", + "testStateOrStep": "COMPLETE" + }]} TEST_RESULTS_DATA = { "interval": 0, @@ -331,12 +411,11 @@ class TestLandslideResourceHelper(unittest.TestCase): "tabs": { "Test Summary": { "Start Time": "Tue Mar 20 07:11:55 CDT 2018", - "Attempted Dedicated Bearer Session Connects": "0", - "Attempted Dedicated Bearer Session Disconnects": "0", - "Actual Dedicated Bearer Session Connects": "0", - "Actual Dedicated Bearer Session Disconnects": "0", - "Dedicated Bearer Sessions Pending": "0", - "Dedicated Bearer Sessions Established": "0" + "Actual Dedicated Bearer Session Connects": "100", + "Actual Dedicated Bearer Session Disconnects": "100", + "Actual Disconnect Rate(Sessions / Second)(P - I)": "164.804", + "Average Session Disconnect Time(P - I)": "5.024 s", + "Total Data Sent + Received Packets / Sec(P - I)": "1,452.294" }}} def setUp(self): @@ -357,12 +436,590 @@ class TestLandslideResourceHelper(unittest.TestCase): self.assertIsInstance(self.res_helper, tg_landslide.LandslideResourceHelper) self.assertEqual({}, self.res_helper._result) + self.assertIsNone(self.res_helper.run_id) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'stop_running_tests') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests') + def test_abort_running_tests_no_running_tests(self, mock_get_tests, + mock_stop_tests, *args): + tests_data = [{'id': self.SUCCESS_RECORD_ID, + 'testStateOrStep': 'COMPLETE'}] + mock_get_tests.return_value = tests_data + self.assertIsNone(self.res_helper.abort_running_tests()) + mock_stop_tests.assert_not_called() + + @mock.patch.object(time, 'sleep') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'stop_running_tests') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests') + def test_abort_running_tests(self, mock_get_tests, mock_stop_tests, *args): + test_states_seq = iter(['RUNNING', 'COMPLETE']) + + def configure_mock(*args): + return [{'id': self.SUCCESS_RECORD_ID, + 'testStateOrStep': next(test_states_seq)}] + + mock_get_tests.side_effect = configure_mock + self.assertIsNone(self.res_helper.abort_running_tests()) + mock_stop_tests.assert_called_once_with( + running_test_id=self.SUCCESS_RECORD_ID, + force=True) + self.assertEqual(2, mock_get_tests.call_count) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'stop_running_tests') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests') + def test_abort_running_tests_error(self, mock_get_tests, mock_stop_tests, + *args): + tests_data = {'id': self.SUCCESS_RECORD_ID, + 'testStateOrStep': 'RUNNING'} + mock_get_tests.return_value = [tests_data] + with self.assertRaises(RuntimeError): + self.res_helper.abort_running_tests(timeout=1, delay=1) + mock_stop_tests.assert_called_with( + running_test_id=self.SUCCESS_RECORD_ID, + force=True) + + def test__build_url(self, *args): + resource = 'users' + action = {'action': 'userCreate'} + expected_url = ''.join([EXAMPLE_URL, 'users?action=userCreate']) + self.assertEqual(expected_url, + self.res_helper._build_url(resource, action)) + + def test__build_url_error(self, *args): + resource = '' + action = {'action': 'userCreate'} + + with self.assertRaises(ValueError): + self.res_helper._build_url(resource, action) + + def test_get_response_params(self, *args): + method = 'get' + resource = 'users' + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.USERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + resp = self.res_helper.get_response_params(method, resource) + self.assertTrue(resp) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, '_get_users') + @mock.patch.object(time, 'time') + def test__create_user(self, mock_time, mock_get_users, *args): + mock_time.strftime.return_value = self.EXPIRE_DATE + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertEqual(self.SUCCESS_RECORD_ID, + self.res_helper._create_user(self.AUTH_DATA)) + mock_get_users.assert_not_called() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, '_modify_user') + @mock.patch.object(time, 'time') + def test__create_user_username_exists(self, mock_time, mock_modify_user, + *args): + mock_time.strftime.return_value = self.EXPIRE_DATE + mock_modify_user.return_value = {'id': self.SUCCESS_RECORD_ID, + 'result': 'No changes requested'} + post_resp_data = { + 'status_code': self.ERROR_CODE, + 'json.return_value': {'id': self.SUCCESS_OK_CODE, + 'apiCode': self.NOT_MODIFIED_CODE}} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + res = self.res_helper._create_user(self.AUTH_DATA) + mock_modify_user.assert_called_once_with(TAS_INFO['user'], + {'isActive': 'true'}) + self.assertEqual(self.SUCCESS_RECORD_ID, res) + + @mock.patch.object(time, 'time') + def test__create_user_error(self, mock_time, *args): + mock_time.strftime.return_value = self.EXPIRE_DATE + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': {'apiCode': self.ERROR_CODE}} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + with self.assertRaises(exceptions.RestApiError): + self.res_helper._create_user(self.AUTH_DATA) + + def test__modify_user(self, *args): + post_data = {'username': 'test_user'} + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + res = self.res_helper._modify_user(username=self.TEST_USER, + fields=post_data) + self.assertEqual(self.SUCCESS_RECORD_ID, res['id']) + + def test__modify_user_rest_resp_fail(self, *args): + post_data = {'non-existing-key': ''} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.ok = False + self.res_helper.session = mock_session + self.assertRaises(exceptions.RestApiError, + self.res_helper._modify_user, + username=self.TEST_USER, fields=post_data) + mock_session.post.assert_called_once() + + def test__delete_user(self, *args): + mock_session = mock.Mock(spec=requests.Session) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper._delete_user( + username=self.TEST_USER)) + + def test__get_users(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.USERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + self.assertEqual(self.USERS_DATA['users'], + self.res_helper._get_users()) + + def test_exec_rest_request(self, *args): + resource = 'testServers' + action = {'action': 'modify'} + expected_url = ''.join([EXAMPLE_URL, 'testServers?action=modify']) + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.res_helper.exec_rest_request('post', resource, action) + self.res_helper.session.post.assert_called_once_with(expected_url, + json={}) + + def test_exec_rest_request_unsupported_method_error(self, *args): + resource = 'testServers' + action = {'action': 'modify'} + with self.assertRaises(ValueError): + self.res_helper.exec_rest_request('patch', resource, action) + + def test_exec_rest_request_missed_action_arg(self, *args): + resource = 'testServers' + with self.assertRaises(ValueError): + self.res_helper.exec_rest_request('post', resource) + + def test_exec_rest_request_raise_exc(self): + resource = 'users' + action = {'action': 'modify'} + post_resp_data = {'status_code': self.ERROR_CODE, + 'json.return_value': { + 'status_code': self.ERROR_CODE}} + mock_session = mock.Mock(spec=requests.Session) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.assertRaises(exceptions.RestApiError, + self.res_helper.exec_rest_request, + 'post', resource, action, raise_exc=True) + + @mock.patch.object(time, 'time') + def test_connect(self, mock_time, *args): + vnfd = VNFD['vnfd:vnfd-catalog']['vnfd'][0] + mock_time.strftime.return_value = self.EXPIRE_DATE + self.res_helper.vnfd_helper = vnfd + + self.res_helper._tcl = mock.Mock() + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session = mock.Mock(spec=requests.Session, headers={}) + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertIsInstance(self.res_helper.connect(), requests.Session) + self.res_helper._tcl.connect.assert_called_once_with( + TAS_INFO['ip'], + TAS_INFO['user'], + TAS_INFO['password']) + + def test_disconnect(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.disconnect()) + self.assertIsNone(self.res_helper.session) + self.res_helper._tcl.disconnect.assert_called_once() + + def test_terminate(self, *args): + self.assertIsNone(self.res_helper.terminate()) + self.assertEqual(self.TEST_TERMINATED, + self.res_helper._terminated.value) + + def test_create_dmf(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.create_dmf(DMF_CFG)) + self.res_helper._tcl.create_dmf.assert_called_once_with(DMF_CFG) + + def test_create_dmf_as_list(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.create_dmf([DMF_CFG])) + self.res_helper._tcl.create_dmf.assert_called_once_with(DMF_CFG) + + def test_delete_dmf(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.delete_dmf(DMF_CFG)) + self.res_helper._tcl.delete_dmf.assert_called_once_with(DMF_CFG) + + def test_delete_dmf_as_list(self, *args): + self.res_helper._tcl = mock.Mock() + self.assertIsNone(self.res_helper.delete_dmf([DMF_CFG])) + self.res_helper._tcl.delete_dmf.assert_called_once_with(DMF_CFG) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, 'configure_sut') + def test_create_suts(self, mock_configure_sut, *args): + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.create_suts(TS1_SUTS)) + mock_configure_sut.assert_not_called() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, 'configure_sut') + def test_create_suts_sut_exists(self, mock_configure_sut, *args): + sut_name = 'test_sut' + suts = [ + {'name': sut_name, + 'role': 'SgwControlAddr', + 'managementIp': '12.0.1.1', + 'ip': '10.42.32.100' + } + ] + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.NOT_MODIFIED_CODE} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.create_suts(suts)) + mock_configure_sut.assert_called_once_with( + sut_name=sut_name, + json_data={k: v for k, v in suts[0].items() + if k not in {'phy', 'nextHop', 'role', 'name'}}) + + def test_get_suts(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.SUTS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + self.assertIsInstance(self.res_helper.get_suts(), list) + + def test_get_suts_single_id(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.SUTS_DATA['suts'][0]} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + self.assertIsInstance(self.res_helper.get_suts(suts_id=2), dict) + + def test_configure_sut(self, *args): + post_data = {'managementIp': '2.2.2.2'} + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': {'id': self.SUCCESS_RECORD_ID}} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.configure_sut('test_name', + post_data)) + mock_session.post.assert_called_once() + + def test_configure_sut_error(self, *args): + post_data = {'managementIp': '2.2.2.2'} + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.NOT_MODIFIED_CODE} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + with self.assertRaises(exceptions.RestApiError): + self.res_helper.configure_sut('test_name', post_data) + + def test_delete_suts(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.SUTS_DATA} + delete_resp_data = {'status_code': self.SUCCESS_OK_CODE} + mock_session.get.return_value.configure_mock(**get_resp_data) + mock_session.delete.return_value.configure_mock(**delete_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.delete_suts()) + mock_session.delete.assert_called_once() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_test_servers') + def test__check_test_servers_state(self, mock_get_test_servers, *args): + mock_get_test_servers.return_value = \ + self.TEST_SERVERS_DATA['testServers'] + self.res_helper._check_test_servers_state() + mock_get_test_servers.assert_called_once() + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_test_servers') + def test__check_test_servers_state_server_not_ready( + self, mock_get_test_servers, *args): + test_servers_not_ready = [ + { + "url": ''.join([EXAMPLE_URL, "testServers/1"]), + "id": 1, + "name": "TestServer_1", + "state": "NOT_READY", + "version": "16.4.0.10" + } + ] + + mock_get_test_servers.return_value = test_servers_not_ready + with self.assertRaises(RuntimeError): + self.res_helper._check_test_servers_state(timeout=1, delay=0) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + '_check_test_servers_state') + def test_create_test_servers(self, mock_check_ts_state, *args): + test_servers_ids = [ + ts['id'] for ts in self.TEST_SERVERS_DATA['testServers']] + + self.res_helper.license_data['lic_id'] = TAS_INFO['license'] + self.res_helper._tcl.create_test_server = mock.Mock() + self.res_helper._tcl.create_test_server.side_effect = test_servers_ids + self.assertIsNone(self.res_helper.create_test_servers(TEST_SERVERS)) + mock_check_ts_state.assert_called_once_with(test_servers_ids) + + @mock.patch.object(tg_landslide.LandslideTclClient, + 'resolve_test_server_name') + @mock.patch.object(tg_landslide.LsTclHandler, 'execute') + def test_create_test_servers_error(self, mock_execute, + mock_resolve_ts_name, *args): + self.res_helper.license_data['lic_id'] = TAS_INFO['license'] + # Return message for case test server wasn't created + mock_execute.return_value = 'TS not found' + # Return message for case test server name wasn't resolved + mock_resolve_ts_name.return_value = 'TS not found' + with self.assertRaises(RuntimeError): + self.res_helper.create_test_servers(TEST_SERVERS) + + def test_get_test_servers(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.TEST_SERVERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_test_servers() + self.assertEqual(self.TEST_SERVERS_DATA['testServers'], res) + + def test_get_test_servers_by_id(self, *args): + mock_session = mock.Mock(spec=requests.Session) + + _ts = self.TEST_SERVERS_DATA['testServers'][0] + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': _ts} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_test_servers(test_server_ids=[_ts['id']]) + self.assertEqual([_ts], res) + + def test_configure_test_servers(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.TEST_SERVERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.configure_test_servers( + action={'action': 'recycle'}) + self.assertEqual( + {x['id'] for x in self.TEST_SERVERS_DATA['testServers']}, + set(res)) + self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']), + mock_session.post.call_count) + + def test_delete_test_servers(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.TEST_SERVERS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.delete_test_servers()) + self.assertEqual(len(self.TEST_SERVERS_DATA['testServers']), + mock_session.delete.call_count) - def test_terminate(self): - self.assertRaises(NotImplementedError, self.res_helper.terminate) + def test_create_test_session(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + self.res_helper._tcl = mock.Mock() + test_session = {'name': 'test'} + self.assertIsNone(self.res_helper.create_test_session(test_session)) + self.res_helper._tcl.create_test_session.assert_called_once_with( + {'name': 'test', 'library': self.SUCCESS_RECORD_ID}) + + @mock.patch.object(tg_landslide.LandslideTclClient, + 'resolve_test_server_name', + return_value='Not Found') + def test_create_test_session_ts_name_not_found(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + test_session = { + 'duration': 60, + 'description': 'UE default bearer creation test case', + 'name': 'default_bearer_capacity', + 'tsGroups': [{'testCases': [{'type': 'SGW_Node', + 'name': ''}], + 'tsId': 'TestServer_3'}] + } + with self.assertRaises(RuntimeError): + self.res_helper.create_test_session(test_session) + + def test_get_test_session(self, *args): + test_session = {"name": self.TEST_SESSION_NAME} + self.res_helper._user_id = self.SUCCESS_RECORD_ID + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': test_session} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_test_session(self.TEST_SESSION_NAME) + self.assertEqual(test_session, res) + + def test_configure_test_session(self, *args): + test_session = {'name': self.TEST_SESSION_NAME} + self.res_helper._user_id = self.SUCCESS_RECORD_ID + self.res_helper.user_lib_uri = 'libraries/{{}}/{}'.format( + self.res_helper.test_session_uri) + mock_session = mock.Mock(spec=requests.Session) + self.res_helper.session = mock_session + res = self.res_helper.configure_test_session(self.TEST_SESSION_NAME, + test_session) + self.assertIsNotNone(res) + mock_session.post.assert_called_once() + + def test_delete_test_session(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + self.res_helper.user_lib_uri = 'libraries/{{}}/{}'.format( + self.res_helper.test_session_uri) + mock_session = mock.Mock(spec=requests.Session) + self.res_helper.session = mock_session + res = self.res_helper.delete_test_session(self.TEST_SESSION_NAME) + self.assertIsNotNone(res) + mock_session.delete.assert_called_once() + + def test_create_running_tests(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + test_session = {'id': self.SUCCESS_RECORD_ID} + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.SUCCESS_CREATED_CODE, + 'json.return_value': test_session} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + self.res_helper.create_running_tests(self.TEST_SESSION_NAME) + self.assertEqual(self.SUCCESS_RECORD_ID, self.res_helper.run_id) + + def test_create_running_tests_error(self, *args): + self.res_helper._user_id = self.SUCCESS_RECORD_ID + mock_session = mock.Mock(spec=requests.Session) + post_resp_data = {'status_code': self.NOT_MODIFIED_CODE} + mock_session.post.return_value.configure_mock(**post_resp_data) + self.res_helper.session = mock_session + with self.assertRaises(exceptions.RestApiError): + self.res_helper.create_running_tests(self.TEST_SESSION_NAME) + + def test_get_running_tests(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.RUNNING_TESTS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_running_tests() + self.assertEqual(self.RUNNING_TESTS_DATA['runningTests'], res) + + def test_delete_running_tests(self, *args): + mock_session = mock.Mock(spec=requests.Session) + delete_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.RUNNING_TESTS_DATA} + mock_session.delete.return_value.configure_mock(**delete_resp_data) + self.res_helper.session = mock_session + self.assertIsNone(self.res_helper.delete_running_tests()) + + def test__running_tests_action(self, *args): + action = 'abort' + mock_session = mock.Mock(spec=requests.Session) + self.res_helper.session = mock_session + res = self.res_helper._running_tests_action(self.SUCCESS_RECORD_ID, + action) + self.assertIsNone(res) - def test_collect_kpi_test_running(self): - self.assertRaises(NotImplementedError, self.res_helper.collect_kpi) + @mock.patch.object(tg_landslide.LandslideResourceHelper, + '_running_tests_action') + def test_stop_running_tests(self, mock_tests_action, *args): + res = self.res_helper.stop_running_tests(self.SUCCESS_RECORD_ID) + self.assertIsNone(res) + mock_tests_action.assert_called_once() + + def test_check_running_test_state(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = { + 'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.RUNNING_TESTS_DATA["runningTests"][0]} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.check_running_test_state(self.SUCCESS_RECORD_ID) + self.assertEqual( + self.RUNNING_TESTS_DATA["runningTests"][0]['testStateOrStep'], + res) + + def test_get_running_tests_results(self, *args): + mock_session = mock.Mock(spec=requests.Session) + get_resp_data = {'status_code': self.SUCCESS_OK_CODE, + 'json.return_value': self.TEST_RESULTS_DATA} + mock_session.get.return_value.configure_mock(**get_resp_data) + self.res_helper.session = mock_session + res = self.res_helper.get_running_tests_results( + self.SUCCESS_RECORD_ID) + self.assertEqual(self.TEST_RESULTS_DATA, res) + + def test__write_results(self, *args): + self.maxDiff = None + res = self.res_helper._write_results(self.TEST_RESULTS_DATA) + exp_res = { + "Test Summary::Actual Dedicated Bearer Session Connects": 100.0, + "Test Summary::Actual Dedicated Bearer Session Disconnects": 100.0, + "Test Summary::Actual Disconnect Rate(Sessions / Second)(P - I)": 164.804, + "Test Summary::Average Session Disconnect Time(P - I)": 5.024, + "Test Summary::Total Data Sent + Received Packets / Sec(P - I)": 1452.294 + } + self.assertEqual(exp_res, res) + + def test__write_results_no_tabs(self, *args): + _res_data = copy.deepcopy(self.TEST_RESULTS_DATA) + del _res_data['tabs'] + # Return None if tabs not found in test results dict + self.assertIsNone(self.res_helper._write_results(_res_data)) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'check_running_test_state') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests_results') + def test_collect_kpi_test_running(self, mock_tests_results, + mock_tests_state, *args): + self.res_helper.run_id = self.SUCCESS_RECORD_ID + mock_tests_state.return_value = 'RUNNING' + mock_tests_results.return_value = self.TEST_RESULTS_DATA + res = self.res_helper.collect_kpi() + self.assertNotIn('done', res) + mock_tests_state.assert_called_once_with(self.res_helper.run_id) + mock_tests_results.assert_called_once_with(self.res_helper.run_id) + + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'check_running_test_state') + @mock.patch.object(tg_landslide.LandslideResourceHelper, + 'get_running_tests_results') + def test_collect_kpi_test_completed(self, mock_tests_results, + mock_tests_state, *args): + self.res_helper.run_id = self.SUCCESS_RECORD_ID + mock_tests_state.return_value = 'COMPLETE' + res = self.res_helper.collect_kpi() + self.assertIsNotNone(res) + mock_tests_state.assert_called_once_with(self.res_helper.run_id) + mock_tests_results.assert_not_called() + self.assertDictContainsSubset({'done': True}, res) class TestLandslideTclClient(unittest.TestCase): @@ -416,7 +1073,7 @@ class TestLandslideTclClient(unittest.TestCase): def test_disconnect(self, *args): self.ls_tcl_client.disconnect() - self.assertEqual(1, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_called_once() self.assertIsNone(self.ls_tcl_client.tcl_server_ip) self.assertIsNone(self.ls_tcl_client._user) self.assertIsNone(self.ls_tcl_client._library_id) @@ -508,9 +1165,10 @@ class TestLandslideTclClient(unittest.TestCase): @mock.patch.object(tg_landslide.LandslideTclClient, 'resolve_test_server_name', return_value='2') def test_create_test_session(self, *args): + _session_profile = copy.deepcopy(SESSION_PROFILE) self.ls_tcl_client._save_test_session = mock.Mock() self.ls_tcl_client._configure_ts_group = mock.Mock() - self.ls_tcl_client.create_test_session(SESSION_PROFILE) + self.ls_tcl_client.create_test_session(_session_profile) self.assertEqual(20, self.mock_tcl_handler.execute.call_count) def test_create_dmf(self): @@ -546,37 +1204,46 @@ class TestLandslideTclClient(unittest.TestCase): self.mock_tcl_handler.execute.call_count) def test__configure_report_options(self): - _options = {'format': 'CSV', 'PerInterval': False} + _options = {'format': 'CSV', 'PerInterval': 'false'} self.ls_tcl_client._configure_report_options(_options) self.assertEqual(2, self.mock_tcl_handler.execute.call_count) def test___configure_ts_group(self, *args): + _ts_group = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]) self.ls_tcl_client._configure_tc_type = mock.Mock() self.ls_tcl_client._configure_preresolved_arp = mock.Mock() self.ls_tcl_client.resolve_test_server_name = mock.Mock( return_value='2') - self.ls_tcl_client._configure_ts_group( - SESSION_PROFILE['tsGroups'][0], 0) - self.assertEqual(1, self.mock_tcl_handler.execute.call_count) + self.ls_tcl_client._configure_ts_group(_ts_group, 0) + self.mock_tcl_handler.execute.assert_called_once() def test___configure_ts_group_resolve_ts_fail(self, *args): + _ts_group = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]) self.ls_tcl_client._configure_tc_type = mock.Mock() self.ls_tcl_client._configure_preresolved_arp = mock.Mock() self.ls_tcl_client.resolve_test_server_name = mock.Mock( return_value='TS Not Found') self.assertRaises(RuntimeError, self.ls_tcl_client._configure_ts_group, - SESSION_PROFILE['tsGroups'][0], 0) - self.assertEqual(0, self.mock_tcl_handler.execute.call_count) + _ts_group, 0) + self.mock_tcl_handler.execute.assert_not_called() def test__configure_tc_type(self): - _tc = SESSION_PROFILE['tsGroups'][0]['testCases'][0] + _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0]) self.mock_tcl_handler.execute.return_value = TCL_SUCCESS_RESPONSE self.ls_tcl_client._configure_parameters = mock.Mock() self.ls_tcl_client._configure_tc_type(_tc, 0) self.assertEqual(7, self.mock_tcl_handler.execute.call_count) + def test__configure_tc_type_optional_param_omitted(self): + _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0]) + del _tc['linked'] + self.mock_tcl_handler.execute.return_value = TCL_SUCCESS_RESPONSE + self.ls_tcl_client._configure_parameters = mock.Mock() + self.ls_tcl_client._configure_tc_type(_tc, 0) + self.assertEqual(6, self.mock_tcl_handler.execute.call_count) + def test__configure_tc_type_wrong_type(self): - _tc = SESSION_PROFILE['tsGroups'][0]['testCases'][0] + _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0]) _tc['type'] = 'not_supported' self.ls_tcl_client._configure_parameters = mock.Mock() self.assertRaises(RuntimeError, @@ -584,16 +1251,16 @@ class TestLandslideTclClient(unittest.TestCase): _tc, 0) def test__configure_tc_type_not_found_basic_lib(self): - _tc = SESSION_PROFILE['tsGroups'][0]['testCases'][0] + _tc = copy.deepcopy(SESSION_PROFILE['tsGroups'][0]['testCases'][0]) self.ls_tcl_client._configure_parameters = mock.Mock() self.mock_tcl_handler.execute.return_value = 'Invalid' self.assertRaises(RuntimeError, self.ls_tcl_client._configure_tc_type, _tc, 0) - self.assertEqual(0, self.mock_tcl_handler.execute.call_count) def test__configure_parameters(self): - _params = SESSION_PROFILE['tsGroups'][0]['testCases'][0]['parameters'] + _params = copy.deepcopy( + SESSION_PROFILE['tsGroups'][0]['testCases'][0]['parameters']) self.ls_tcl_client._configure_parameters(_params) self.assertEqual(16, self.mock_tcl_handler.execute.call_count) @@ -604,15 +1271,16 @@ class TestLandslideTclClient(unittest.TestCase): self.assertEqual(2, self.mock_tcl_handler.execute.call_count) def test__configure_test_node_param(self): - _params = SESSION_PROFILE['tsGroups'][0]['testCases'][0]['parameters'] + _params = copy.deepcopy( + SESSION_PROFILE['tsGroups'][0]['testCases'][0]['parameters']) self.ls_tcl_client._configure_test_node_param('SgwUserAddr', _params['SgwUserAddr']) - self.assertEqual(1, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_called_once() def test__configure_sut_param(self): _params = {'name': 'name'} self.ls_tcl_client._configure_sut_param('name', _params) - self.assertEqual(1, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_called_once() def test__configure_dmf_param(self): _params = {"mainflows": [{"library": '111', @@ -645,7 +1313,7 @@ class TestLandslideTclClient(unittest.TestCase): self.assertIsNone(res) def test__configure_reservation(self): - _reservation = SESSION_PROFILE['reservations'][0] + _reservation = copy.deepcopy(SESSION_PROFILE['reservations'][0]) self.ls_tcl_client.resolve_test_server_name = mock.Mock( return_value='2') res = self.ls_tcl_client._configure_reservation(_reservation) @@ -656,7 +1324,7 @@ class TestLandslideTclClient(unittest.TestCase): _arp = [{'StartingAddress': '10.81.1.10', 'NumNodes': 1}] res = self.ls_tcl_client._configure_preresolved_arp(_arp) - self.assertEqual(1, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_called_once() self.assertIsNone(res) def test__configure_preresolved_arp_none(self): @@ -681,9 +1349,9 @@ class TestLandslideTclClient(unittest.TestCase): self.assertEqual(2, self.mock_tcl_handler.execute.call_count) def test__get_library_id_system_lib(self): - self.mock_tcl_handler.execute.side_effect = ['111'] + self.mock_tcl_handler.execute.return_value = '111' res = self.ls_tcl_client._get_library_id('name') - self.assertEqual(1, self.mock_tcl_handler.execute.call_count) + self.mock_tcl_handler.execute.assert_called_once() self.assertEqual('111', res) def test__get_library_id_user_lib(self): -- cgit 1.2.3-korg