summaryrefslogtreecommitdiffstats
path: root/yardstick/network_services
diff options
context:
space:
mode:
authorEmma Foley <emma.l.foley@intel.com>2018-08-30 13:19:36 +0100
committerOrest Voznyy <orestx.voznyy@intel.com>2018-09-08 01:03:02 +0300
commitdea3a485f5f196d7ee18ccc0c64a35ab5560385f (patch)
tree3a7ecd9452ecda7410561165fa33688877ef9e02 /yardstick/network_services
parentbdcbbd8c7f35d6b0bebf996a8d64379cf29ebc5d (diff)
Added Landslide Resource Helper implementation
Class "LandslideResourceHelper" provides API for operations needed - to configure Landslide test session - manage test session execution (start/stop/abort) - collect measurements during test run This helper class API is responsible to for configure Landslide test runs: - create test user - create test servers (for emulation of specific vEPC blocks) - create SUTs (actual tested VNFs performing specific vEPC roles) - create test session (contains actual test cases) - create DMFs (pre-defined traffic flows in traffic profile) - operate traffic run execution (start, stop, abort) - monitor test run status - collect KPIs on TG side Some of these features use Landslide REST API. Other ones use Landslide TCL API. JIRA: YARDSTICK-1356 Change-Id: I8fc8a7d85301121da465d054b8d38ae09a541c36 Signed-off-by: Orest Voznyy <orestx.voznyy@intel.com> Signed-off-by: Emma Foley <emma.l.foley@intel.com> Signed-off-by: Orest Voznyy <orestx.voznyy@intel.com>
Diffstat (limited to 'yardstick/network_services')
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_landslide.py516
1 files changed, 511 insertions, 5 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
index 3bb849c52..0ccffeeb1 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_landslide.py
@@ -13,6 +13,10 @@
# limitations under the License.
import logging
+import requests
+import six
+import time
+from collections import Mapping
from yardstick.common import exceptions
from yardstick.common import utils as common_utils
@@ -39,14 +43,515 @@ class LandslideResourceHelper(sample_vnf.ClientResourceHelper):
self.vnfd_helper = setup_helper.vnfd_helper
self.scenario_helper = setup_helper.scenario_helper
+ # TAS Manager config initialization
+ self._url = None
+ self._user_id = None
+ self.session = None
+ self.license_data = {}
+
# TCL session initialization
self._tcl = LandslideTclClient(LsTclHandler(), self)
+ self.session = requests.Session()
+ self.running_tests_uri = 'runningTests'
+ self.test_session_uri = 'testSessions'
+ self.test_serv_uri = 'testServers'
+ self.suts_uri = 'suts'
+ self.users_uri = 'users'
+ self.user_lib_uri = None
+ self.run_id = None
+
+ def abort_running_tests(self, timeout=60, delay=5):
+ """ Abort running test sessions, if any """
+ _start_time = time.time()
+ while time.time() < _start_time + timeout:
+ run_tests_states = {x['id']: x['testStateOrStep']
+ for x in self.get_running_tests()}
+ if not set(run_tests_states.values()).difference(
+ {'COMPLETE', 'COMPLETE_ERROR'}):
+ break
+ else:
+ [self.stop_running_tests(running_test_id=_id, force=True)
+ for _id, _state in run_tests_states.items()
+ if 'COMPLETE' not in _state]
+ time.sleep(delay)
+ else:
+ raise RuntimeError(
+ 'Some test runs not stopped during {} seconds'.format(timeout))
+
+ def _build_url(self, resource, action=None):
+ """ Build URL string
+
+ :param resource: REST API resource name
+ :type resource: str
+ :param action: actions name and value
+ :type action: dict('name': <str>, 'value': <str>)
+ :returns str: REST API resource name with optional action info
+ """
+ # Action is optional and accepted only in presence of resource param
+ if action and not resource:
+ raise ValueError("Resource name not provided")
+ # Concatenate actions
+ _action = ''.join(['?{}={}'.format(k, v) for k, v in
+ action.items()]) if action else ''
+
+ return ''.join([self._url, resource, _action])
+
+ def get_response_params(self, method, resource, params=None):
+ """ Retrieve params from JSON response of specific resource URL
+
+ :param method: one of supported REST API methods
+ :type method: str
+ :param resource: URI, requested resource name
+ :type resource: str
+ :param params: attributes to be found in JSON response
+ :type params: list(str)
+ """
+ _res = []
+ params = params if params else []
+ response = self.exec_rest_request(method, resource)
+ # Get substring between last slash sign and question mark (if any)
+ url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0]
+ _response_json = response.json()
+ # Expect dict(), if URL last part and top dict key don't match
+ # Else, if they match, expect list()
+ k, v = list(_response_json.items())[0]
+ if k != url_last_part:
+ v = [v] # v: list(dict(str: str))
+ # Extract params, or whole list of dicts (without top level key)
+ for x in v:
+ _res.append({param: x[param] for param in params} if params else x)
+ return _res
+
+ def _create_user(self, auth, level=1):
+ """ Create new user
+
+ :param auth: data to create user account on REST server
+ :type auth: dict
+ :param level: Landslide user permissions level
+ :type level: int
+ :returns int: user id
+ """
+ # Set expiration date in two years since account creation date
+ _exp_date = time.strftime(
+ '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2))
+ _username = auth['user']
+ _fields = {"contactInformation": "", "expiresOn": _exp_date,
+ "fullName": "Test User",
+ "isActive": "true", "level": level,
+ "password": auth['password'],
+ "username": _username}
+ _response = self.exec_rest_request('post', self.users_uri,
+ json_data=_fields, raise_exc=False)
+ _resp_json = _response.json()
+ if _response.status_code == self.REST_STATUS_CODES['CREATED']:
+ # New user created
+ _id = _resp_json['id']
+ LOG.info("New user created: username='%s', id='%s'", _username,
+ _id)
+ elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']:
+ # User already exists
+ LOG.info("Account '%s' already exists.", _username)
+ # Get user id
+ _id = self._modify_user(_username, {"isActive": "true"})['id']
+ else:
+ raise exceptions.RestApiError(
+ 'Error during new user "{}" creation'.format(_username))
+ return _id
+
+ def _modify_user(self, username, fields):
+ """ Modify information about existing user
+
+ :param username: user name of account to be modified
+ :type username: str
+ :param fields: data to modify user account on REST server
+ :type fields: dict
+ :returns dict: user info
+ """
+ _response = self.exec_rest_request('post', self.users_uri,
+ action={'username': username},
+ json_data=fields, raise_exc=False)
+ if _response.status_code == self.REST_STATUS_CODES['OK']:
+ _response = _response.json()
+ else:
+ raise exceptions.RestApiError(
+ 'Error during user "{}" data update: {}'.format(
+ username,
+ _response.status_code))
+ LOG.info("User account '%s' modified: '%s'", username, _response)
+ return _response
+
+ def _delete_user(self, username):
+ """ Delete user account
+
+ :param username: username field
+ :type username: str
+ :returns bool: True if succeeded
+ """
+ self.exec_rest_request('delete', self.users_uri,
+ action={'username': username})
+
+ def _get_users(self, username=None):
+ """ Get user records from REST server
+
+ :param username: username field
+ :type username: None|str
+ :returns list(dict): empty list, or user record, or list of all users
+ """
+ _response = self.get_response_params('get', self.users_uri)
+ _res = [u for u in _response if
+ u['username'] == username] if username else _response
+ return _res
+
+ def exec_rest_request(self, method, resource, action=None, json_data=None,
+ logs=True, raise_exc=True):
+ """ Execute REST API request, return response object
+
+ :param method: one of supported requests ('post', 'get', 'delete')
+ :type method: str
+ :param resource: URL of resource
+ :type resource: str
+ :param action: data used to provide URI located after question mark
+ :type action: dict
+ :param json_data: mandatory only for 'post' method
+ :type json_data: dict
+ :param logs: debug logs display flag
+ :type raise_exc: bool
+ :param raise_exc: if True, raise exception on REST API call error
+ :returns requests.Response(): REST API call response object
+ """
+ json_data = json_data if json_data else {}
+ action = action if action else {}
+ _method = method.upper()
+ method = method.lower()
+ if method not in ('post', 'get', 'delete'):
+ raise ValueError("Method '{}' not supported".format(_method))
+
+ if method == 'post' and not action:
+ if not (json_data and isinstance(json_data, Mapping)):
+ raise ValueError(
+ 'JSON data missing in {} request'.format(_method))
+
+ r = getattr(self.session, method)(self._build_url(resource, action),
+ json=json_data)
+ if raise_exc and not r.ok:
+ msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format(
+ method, self._build_url(resource, action), r.reason)
+ raise exceptions.RestApiError(msg)
+
+ if logs:
+ LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method,
+ r.request.url)
+ LOG.debug("Response: %s", r.json())
+ return r
+
+ def connect(self):
+ """Connect to RESTful server using test user account"""
+ tas_info = self.vnfd_helper['mgmt-interface']
+ # Supported REST Server ports: HTTP - 8080, HTTPS - 8181
+ _port = '8080' if tas_info['proto'] == 'http' else '8181'
+ tas_info.update({'port': _port})
+ self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info)
+ self.session.headers.update({'Accept': 'application/json',
+ 'Content-type': 'application/json'})
+ # Login with super user to create test user
+ self.session.auth = (
+ tas_info['super-user'], tas_info['super-user-password'])
+ LOG.info("Connect using superuser: server='%s'", self._url)
+ auth = {x: tas_info[x] for x in ('user', 'password')}
+ self._user_id = self._create_user(auth)
+ # Login with test user
+ self.session.auth = auth['user'], auth['password']
+ # Test user validity
+ self.exec_rest_request('get', '')
+
+ self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri)
+ LOG.info("Login with test user: server='%s'", self._url)
+ # Read existing license
+ self.license_data['lic_id'] = tas_info['license']
+
+ # Tcl client init
+ self._tcl.connect(tas_info['ip'], *self.session.auth)
+
+ return self.session
+
+ def disconnect(self):
+ self.session = None
+ self._tcl.disconnect()
+
def terminate(self):
- raise NotImplementedError()
+ self._terminated.value = 1
+
+ def create_dmf(self, dmf):
+ if isinstance(dmf, list):
+ for _dmf in dmf:
+ self._tcl.create_dmf(_dmf)
+ else:
+ self._tcl.create_dmf(dmf)
+
+ def delete_dmf(self, dmf):
+ if isinstance(dmf, list):
+ for _dmf in dmf:
+ self._tcl.delete_dmf(_dmf)
+ else:
+ self._tcl.delete_dmf(dmf)
+
+ def create_suts(self, suts):
+ # Keep only supported keys in suts object
+ for _sut in suts:
+ sut_entry = {k: v for k, v in _sut.items()
+ if k not in {'phy', 'nextHop', 'role'}}
+ _response = self.exec_rest_request(
+ 'post', self.suts_uri, json_data=sut_entry,
+ logs=False, raise_exc=False)
+ if _response.status_code != self.REST_STATUS_CODES['CREATED']:
+ LOG.info(_response.reason) # Failed to create
+ _name = sut_entry.pop('name')
+ # Modify existing SUT
+ self.configure_sut(sut_name=_name, json_data=sut_entry)
+ else:
+ LOG.info("SUT created: %s", sut_entry)
+
+ def get_suts(self, suts_id=None):
+ if suts_id:
+ _suts = self.exec_rest_request(
+ 'get', '{}/{}'.format(self.suts_uri, suts_id)).json()
+ else:
+ _suts = self.get_response_params('get', self.suts_uri)
+
+ return _suts
+
+ def configure_sut(self, sut_name, json_data):
+ """ Modify information of specific SUTs
+
+ :param sut_name: name of existing SUT
+ :type sut_name: str
+ :param json_data: SUT settings
+ :type json_data: dict()
+ """
+ LOG.info("Modifying SUT information...")
+ _response = self.exec_rest_request('post',
+ self.suts_uri,
+ action={'name': sut_name},
+ json_data=json_data,
+ raise_exc=False)
+ if _response.status_code not in {self.REST_STATUS_CODES[x] for x in
+ {'OK', 'NO CHANGE'}}:
+ raise exceptions.RestApiError(_response.reason)
+
+ LOG.info("Modified SUT: %s", sut_name)
+
+ def delete_suts(self, suts_ids=None):
+ if not suts_ids:
+ _curr_suts = self.get_response_params('get', self.suts_uri)
+ suts_ids = [x['id'] for x in _curr_suts]
+ LOG.info("Deleting SUTs with following IDs: %s", suts_ids)
+ for _id in suts_ids:
+ self.exec_rest_request('delete',
+ '{}/{}'.format(self.suts_uri, _id))
+ LOG.info("\tDone for SUT id: %s", _id)
+
+ def _check_test_servers_state(self, test_servers_ids=None, delay=10,
+ timeout=300):
+ LOG.info("Waiting for related test servers state change to READY...")
+ # Wait on state change
+ _start_time = time.time()
+ while time.time() - _start_time < timeout:
+ ts_ids_not_ready = {x['id'] for x in
+ self.get_test_servers(test_servers_ids)
+ if x['state'] != 'READY'}
+ if ts_ids_not_ready == set():
+ break
+ time.sleep(delay)
+ else:
+ raise RuntimeError(
+ 'Test servers not in READY state after {} seconds.'.format(
+ timeout))
+
+ def create_test_servers(self, test_servers):
+ """ Create test servers
+
+ :param test_servers: input data for test servers creation
+ mandatory fields: managementIp
+ optional fields: name
+ :type test_servers: list(dict)
+ """
+ _ts_ids = []
+ for _ts in test_servers:
+ _msg = 'Created test server "%(name)s"'
+ _ts_ids.append(self._tcl.create_test_server(_ts))
+ if _ts.get('thread_model'):
+ _msg += ' in mode: "%(thread_model)s"'
+ LOG.info(_msg, _ts)
+
+ self._check_test_servers_state(_ts_ids)
+
+ def get_test_servers(self, test_server_ids=None):
+ if not test_server_ids: # Get all test servers info
+ _test_servers = self.exec_rest_request(
+ 'get', self.test_serv_uri).json()[self.test_serv_uri]
+ LOG.info("Current test servers configuration: %s", _test_servers)
+ return _test_servers
+
+ _test_servers = []
+ for _id in test_server_ids:
+ _test_servers.append(self.exec_rest_request(
+ 'get', '{}/{}'.format(self.test_serv_uri, _id)).json())
+ LOG.info("Current test servers configuration: %s", _test_servers)
+ return _test_servers
+
+ def configure_test_servers(self, action, json_data=None,
+ test_server_ids=None):
+ if not test_server_ids:
+ test_server_ids = [x['id'] for x in self.get_test_servers()]
+ elif isinstance(test_server_ids, int):
+ test_server_ids = [test_server_ids]
+ for _id in test_server_ids:
+ self.exec_rest_request('post',
+ '{}/{}'.format(self.test_serv_uri, _id),
+ action=action, json_data=json_data)
+ LOG.info("Test server (id: %s) configuration done: %s", _id,
+ action)
+ return test_server_ids
+
+ def delete_test_servers(self, test_servers_ids=None):
+ # Delete test servers
+ for _ts in self.get_test_servers(test_servers_ids):
+ self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri,
+ _ts['id']))
+ LOG.info("Deleted test server: %s", _ts['name'])
+
+ def create_test_session(self, test_session):
+ # Use tcl client to create session
+ test_session['library'] = self._user_id
+ LOG.debug("Creating session='%s'", test_session['name'])
+ self._tcl.create_test_session(test_session)
+
+ def get_test_session(self, test_session_name=None):
+ if test_session_name:
+ uri = 'libraries/{}/{}/{}'.format(self._user_id,
+ self.test_session_uri,
+ test_session_name)
+ else:
+ uri = self.user_lib_uri.format(self._user_id)
+ _test_sessions = self.exec_rest_request('get', uri).json()
+ return _test_sessions
+
+ def configure_test_session(self, template_name, test_session):
+ # Override specified test session parameters
+ LOG.info('Update test session parameters: %s', test_session['name'])
+ test_session.update({'library': self._user_id})
+ return self.exec_rest_request(
+ method='post',
+ action={'action': 'overrideAndSaveAs'},
+ json_data=test_session,
+ resource='{}/{}'.format(self.user_lib_uri.format(self._user_id),
+ template_name))
+
+ def delete_test_session(self, test_session):
+ return self.exec_rest_request('delete', '{}/{}'.format(
+ self.user_lib_uri.format(self._user_id), test_session))
+
+ def create_running_tests(self, test_session_name):
+ r = self.exec_rest_request('post',
+ self.running_tests_uri,
+ json_data={'library': self._user_id,
+ 'name': test_session_name})
+ if r.status_code != self.REST_STATUS_CODES['CREATED']:
+ raise exceptions.RestApiError('Failed to start test session.')
+ self.run_id = r.json()['id']
+
+ def get_running_tests(self, running_test_id=None):
+ """Get JSON structure of specified running test entity
+
+ :param running_test_id: ID of created running test entity
+ :type running_test_id: int
+ :returns list: running tests entity
+ """
+ if not running_test_id:
+ running_test_id = ''
+ _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
+ _res = self.exec_rest_request('get', _res_name, logs=False).json()
+ # If no run_id specified, skip top level key in response dict.
+ # Else return JSON as list
+ return _res.get('runningTests', [_res])
+
+ def delete_running_tests(self, running_test_id=None):
+ if not running_test_id:
+ running_test_id = ''
+ _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
+ self.get_response_params('delete', _res_name)
+ LOG.info("Deleted running test with id: %s", running_test_id)
+
+ def _running_tests_action(self, running_test_id, action, json_data=None):
+ if not json_data:
+ json_data = {}
+ # Supported actions:
+ # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc'
+ _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
+ self.exec_rest_request('post', _res_name, {'action': action},
+ json_data)
+ LOG.debug("Executed action: '%s' on running test id: %s", action,
+ running_test_id)
+
+ def stop_running_tests(self, running_test_id, json_data=None, force=False):
+ _action = 'abort' if force else 'stop'
+ self._running_tests_action(running_test_id, _action,
+ json_data=json_data)
+ LOG.info('Performed action: "%s" to test run with id: %s', _action,
+ running_test_id)
+
+ def check_running_test_state(self, run_id):
+ r = self.exec_rest_request('get',
+ '{}/{}'.format(self.running_tests_uri,
+ run_id))
+ return r.json().get("testStateOrStep")
+
+ def get_running_tests_results(self, run_id):
+ _res = self.exec_rest_request(
+ 'get',
+ '{}/{}/{}'.format(self.running_tests_uri,
+ run_id,
+ 'measurements')).json()
+ return _res
+
+ def _write_results(self, results):
+ # Avoid None value at test session start
+ _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0
+
+ _res_tabs = results.get('tabs')
+ # Avoid parsing 'tab' dict key initially (missing or empty)
+ if not _res_tabs:
+ return
+
+ # Flatten nested dict holding Landslide KPIs of current test run
+ flat_kpis_dict = {}
+ for _tab, _kpis in six.iteritems(_res_tabs):
+ for _kpi, _value in six.iteritems(_kpis):
+ # Combine table name and KPI name using delimiter "::"
+ _key = '::'.join([_tab, _kpi])
+ try:
+ # Cast value from str to float
+ # Remove comma and/or measure units, e.g. "us"
+ flat_kpis_dict[_key] = float(
+ _value.split(' ')[0].replace(',', ''))
+ except ValueError: # E.g. if KPI represents datetime
+ pass
+ LOG.info("Polling test results of test run id: %s. Elapsed time: %s "
+ "seconds", self.run_id, _elapsed_time)
+ return flat_kpis_dict
def collect_kpi(self):
- raise NotImplementedError()
+ if 'COMPLETE' in self.check_running_test_state(self.run_id):
+ self._result.update({'done': True})
+ return self._result
+ _res = self.get_running_tests_results(self.run_id)
+ _kpis = self._write_results(_res)
+ if _kpis:
+ _kpis.update({'run_id': int(self.run_id)})
+ _kpis.update({'iteration': _res['iteration']})
+ self._result.update(_kpis)
+ return self._result
class LandslideTclClient(object):
@@ -470,10 +975,11 @@ class LandslideTclClient(object):
if res == 'Invalid':
res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings')
raise exceptions.LandslideTclException(
- "_save_test_session: {}".format(res))
+ "Test session validation failed. Server response: {}".format(
+ res))
else:
- res = self._tcl.execute('ls::save $test_ -overwrite')
- LOG.debug("_save_test_session: result (%s)", res)
+ self._tcl.execute('ls::save $test_ -overwrite')
+ LOG.debug("Test session saved successfully.")
def _get_library_id(self, library):
_library_id = self._tcl.execute(