diff options
Diffstat (limited to 'utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py')
-rw-r--r-- | utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py | 360 |
1 files changed, 0 insertions, 360 deletions
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py deleted file mode 100644 index b232bc168..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py +++ /dev/null @@ -1,360 +0,0 @@ -from copy import deepcopy -from datetime import datetime -import functools -import httplib -import json -import os - -from opnfv_testapi.common import message -import opnfv_testapi.resources.scenario_models as models -from opnfv_testapi.tests.unit import test_base as base - - -class TestScenarioBase(base.TestBase): - def setUp(self): - super(TestScenarioBase, self).setUp() - self.get_res = models.Scenario - self.list_res = models.Scenarios - self.basePath = '/api/v1/scenarios' - self.req_d = self._load_request('scenario-c1.json') - self.req_2 = self._load_request('scenario-c2.json') - - def tearDown(self): - pass - - def assert_body(self, project, req=None): - pass - - @staticmethod - def _load_request(f_req): - abs_file = os.path.join(os.path.dirname(__file__), f_req) - with open(abs_file, 'r') as f: - loader = json.load(f) - f.close() - return loader - - def create_return_name(self, req): - _, res = self.create(req) - return res.href.split('/')[-1] - - def assert_res(self, code, scenario, req=None): - self.assertEqual(code, httplib.OK) - if req is None: - req = self.req_d - self.assertIsNotNone(scenario._id) - self.assertIsNotNone(scenario.creation_date) - - scenario == models.Scenario.from_dict(req) - - @staticmethod - def _set_query(*args): - uri = '' - for arg in args: - uri += arg + '&' - return uri[0: -1] - - def _get_and_assert(self, name, req=None): - code, body = self.get(name) - self.assert_res(code, body, req) - - -class TestScenarioCreate(TestScenarioBase): - def test_withoutBody(self): - (code, body) = self.create() - self.assertEqual(code, httplib.BAD_REQUEST) - - def test_emptyName(self): - req_empty = models.ScenarioCreateRequest('') - (code, body) = self.create(req_empty) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_noneName(self): - req_none = models.ScenarioCreateRequest(None) - (code, body) = self.create(req_none) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_success(self): - (code, body) = self.create_d() - self.assertEqual(code, httplib.OK) - self.assert_create_body(body) - - def test_alreadyExist(self): - self.create_d() - (code, body) = self.create_d() - self.assertEqual(code, httplib.FORBIDDEN) - self.assertIn(message.exist_base, body) - - -class TestScenarioGet(TestScenarioBase): - def setUp(self): - super(TestScenarioGet, self).setUp() - self.scenario_1 = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def test_getByName(self): - self._get_and_assert(self.scenario_1, self.req_d) - - def test_getAll(self): - self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) - - def test_queryName(self): - query = self._set_query('name=nosdn-nofeature-ha') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryInstaller(self): - query = self._set_query('installer=apex') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryVersion(self): - query = self._set_query('version=master') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryProject(self): - query = self._set_query('project=functest') - self._query_and_assert(query, reqs=[self.req_d, self.req_2]) - - def test_queryCombination(self): - query = self._set_query('name=nosdn-nofeature-ha', - 'installer=apex', - 'version=master', - 'project=functest') - - self._query_and_assert(query, reqs=[self.req_d]) - - def _query_and_assert(self, query, found=True, reqs=None): - code, body = self.query(query) - if not found: - self.assertEqual(code, httplib.OK) - self.assertEqual(0, len(body.scenarios)) - else: - self.assertEqual(len(reqs), len(body.scenarios)) - for req in reqs: - for scenario in body.scenarios: - if req['name'] == scenario.name: - self.assert_res(code, scenario, req) - - -class TestScenarioUpdate(TestScenarioBase): - def setUp(self): - super(TestScenarioUpdate, self).setUp() - self.scenario = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def _execute(set_update): - @functools.wraps(set_update) - def magic(self): - update, scenario = set_update(self, deepcopy(self.req_d)) - self._update_and_assert(update, scenario) - return magic - - def _update(expected): - def _update(set_update): - @functools.wraps(set_update) - def wrap(self): - update, scenario = set_update(self, deepcopy(self.req_d)) - code, body = self.update(update, self.scenario) - getattr(self, expected)(code, scenario) - return wrap - return _update - - @_update('_success') - def test_renameScenario(self, scenario): - new_name = 'nosdn-nofeature-noha' - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={'name': new_name}) - return update_req, scenario - - @_update('_forbidden') - def test_renameScenario_exist(self, scenario): - new_name = self.scenario_2 - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={'name': new_name}) - return update_req, scenario - - @_update('_bad_request') - def test_renameScenario_noName(self, scenario): - new_name = self.scenario_2 - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={}) - return update_req, scenario - - @_execute - def test_addInstaller(self, scenario): - add = models.ScenarioInstaller(installer='daisy', versions=list()) - scenario['installers'].append(add.format()) - update = models.ScenarioUpdateRequest(field='installer', - op='add', - locate={}, - term=add.format()) - return update, scenario - - @_execute - def test_deleteInstaller(self, scenario): - scenario['installers'] = filter(lambda f: f['installer'] != 'apex', - scenario['installers']) - - update = models.ScenarioUpdateRequest(field='installer', - op='delete', - locate={'installer': 'apex'}) - return update, scenario - - @_execute - def test_addVersion(self, scenario): - add = models.ScenarioVersion(version='danube', projects=list()) - scenario['installers'][0]['versions'].append(add.format()) - update = models.ScenarioUpdateRequest(field='version', - op='add', - locate={'installer': 'apex'}, - term=add.format()) - return update, scenario - - @_execute - def test_deleteVersion(self, scenario): - scenario['installers'][0]['versions'] = filter( - lambda f: f['version'] != 'master', - scenario['installers'][0]['versions']) - - update = models.ScenarioUpdateRequest(field='version', - op='delete', - locate={'installer': 'apex', - 'version': 'master'}) - return update, scenario - - @_execute - def test_changeOwner(self, scenario): - scenario['installers'][0]['versions'][0]['owner'] = 'lucy' - - update = models.ScenarioUpdateRequest(field='owner', - op='update', - locate={'installer': 'apex', - 'version': 'master'}, - term={'owner': 'lucy'}) - return update, scenario - - @_execute - def test_addProject(self, scenario): - add = models.ScenarioProject(project='qtip').format() - scenario['installers'][0]['versions'][0]['projects'].append(add) - update = models.ScenarioUpdateRequest(field='project', - op='add', - locate={'installer': 'apex', - 'version': 'master'}, - term=add) - return update, scenario - - @_execute - def test_deleteProject(self, scenario): - scenario['installers'][0]['versions'][0]['projects'] = filter( - lambda f: f['project'] != 'functest', - scenario['installers'][0]['versions'][0]['projects']) - - update = models.ScenarioUpdateRequest(field='project', - op='delete', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}) - return update, scenario - - @_execute - def test_addCustoms(self, scenario): - add = ['odl', 'parser', 'vping_ssh'] - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh'] - update = models.ScenarioUpdateRequest(field='customs', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add) - return update, scenario - - @_execute - def test_deleteCustoms(self, scenario): - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck'] - update = models.ScenarioUpdateRequest(field='customs', - op='delete', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=['vping_ssh']) - return update, scenario - - @_execute - def test_addScore(self, scenario): - add = models.ScenarioScore(date=str(datetime.now()), score='11/12') - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['scores'].append(add.format()) - update = models.ScenarioUpdateRequest(field='score', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add.format()) - return update, scenario - - @_execute - def test_addTi(self, scenario): - add = models.ScenarioTI(date=str(datetime.now()), status='gold') - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['trust_indicators'].append(add.format()) - update = models.ScenarioUpdateRequest(field='trust_indicator', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add.format()) - return update, scenario - - def _update_and_assert(self, update_req, new_scenario, name=None): - code, _ = self.update(update_req, self.scenario) - self.assertEqual(code, httplib.OK) - self._get_and_assert(_none_default(name, self.scenario), - new_scenario) - - def _success(self, status, new_scenario): - self.assertEqual(status, httplib.OK) - self._get_and_assert(new_scenario.get('name'), new_scenario) - - def _forbidden(self, status, new_scenario): - self.assertEqual(status, httplib.FORBIDDEN) - - def _bad_request(self, status, new_scenario): - self.assertEqual(status, httplib.BAD_REQUEST) - - -class TestScenarioDelete(TestScenarioBase): - def test_notFound(self): - code, body = self.delete('notFound') - self.assertEqual(code, httplib.NOT_FOUND) - - def test_success(self): - scenario = self.create_return_name(self.req_d) - code, _ = self.delete(scenario) - self.assertEqual(code, httplib.OK) - code, _ = self.get(scenario) - self.assertEqual(code, httplib.NOT_FOUND) - - -def _none_default(check, default): - return check if check else default |