summaryrefslogtreecommitdiffstats
path: root/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py
diff options
context:
space:
mode:
Diffstat (limited to 'testapi/opnfv_testapi/tests/unit/resources/test_scenario.py')
-rw-r--r--testapi/opnfv_testapi/tests/unit/resources/test_scenario.py167
1 files changed, 79 insertions, 88 deletions
diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py b/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py
index 360b4fa..9190af5 100644
--- a/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py
+++ b/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py
@@ -2,7 +2,7 @@ import functools
import httplib
import json
import os
-from copy import deepcopy
+
from datetime import datetime
from opnfv_testapi.common import message
@@ -50,15 +50,15 @@ class TestScenarioBase(base.TestBase):
self.assertEqual(scenario, models.Scenario.from_dict(req))
@staticmethod
- def _set_query(*args):
+ def set_query(*args):
uri = ''
for arg in args:
uri += arg + '&'
return uri[0: -1]
- def _get_and_assert(self, name, req=None):
+ def get_and_assert(self, name):
code, body = self.get(name)
- self.assert_res(code, body, req)
+ self.assert_res(code, body, self.req_d)
class TestScenarioCreate(TestScenarioBase):
@@ -97,25 +97,25 @@ class TestScenarioGet(TestScenarioBase):
self.scenario_2 = self.create_return_name(self.req_2)
def test_getByName(self):
- self._get_and_assert(self.scenario_1, self.req_d)
+ self.get_and_assert(self.scenario_1)
def test_getAll(self):
self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
def test_queryName(self):
- query = self._set_query('name=nosdn-nofeature-ha')
+ query = self.set_query('name=nosdn-nofeature-ha')
self._query_and_assert(query, reqs=[self.req_d])
def test_queryInstaller(self):
- query = self._set_query('installer=apex')
+ query = self.set_query('installer=apex')
self._query_and_assert(query, reqs=[self.req_d])
def test_queryVersion(self):
- query = self._set_query('version=master')
+ query = self.set_query('version=master')
self._query_and_assert(query, reqs=[self.req_d])
def test_queryProject(self):
- query = self._set_query('project=functest')
+ query = self.set_query('project=functest')
self._query_and_assert(query, reqs=[self.req_d, self.req_2])
# close due to random fail, open again after solve it in another patch
@@ -190,203 +190,194 @@ class TestScenarioUpdate(TestScenarioBase):
def _update_partial(set_update):
@functools.wraps(set_update)
def wrapper(self):
- update, scenario = set_update(self, deepcopy(self.req_d))
- code, body = getattr(self, operate)(update, self.scenario)
- getattr(self, expected)(code, scenario)
+ update = set_update(self)
+ code, body = getattr(self, operate)(update)
+ getattr(self, expected)(code)
return wrapper
return _update_partial
@update_partial('_add', '_success')
- def test_addScore(self, scenario):
+ def test_addScore(self):
add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
- projects = scenario['installers'][0]['versions'][0]['projects']
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
functest = filter(lambda f: f['project'] == 'functest', projects)[0]
functest['scores'].append(add.format())
self.update_url = '{}/scores?{}'.format(self.scenario_url,
self.locate_project)
- return add, scenario
+ return add
@update_partial('_add', '_success')
- def test_addTrustIndicator(self, scenario):
+ def test_addTrustIndicator(self):
add = models.ScenarioTI(date=str(datetime.now()), status='gold')
- projects = scenario['installers'][0]['versions'][0]['projects']
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
functest = filter(lambda f: f['project'] == 'functest', projects)[0]
functest['trust_indicators'].append(add.format())
self.update_url = '{}/trust_indicators?{}'.format(self.scenario_url,
self.locate_project)
- return add, scenario
+ return add
@update_partial('_add', '_success')
- def test_addCustoms(self, scenario):
- add = ['odl', 'parser', 'vping_ssh']
- projects = scenario['installers'][0]['versions'][0]['projects']
+ def test_addCustoms(self):
+ adds = ['odl', 'parser', 'vping_ssh']
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['customs'] = list(set(functest['customs'] + add))
+ functest['customs'] = list(set(functest['customs'] + adds))
self.update_url = '{}/customs?{}'.format(self.scenario_url,
self.locate_project)
- return add, scenario
+ return adds
@update_partial('_update', '_success')
- def test_updateCustoms(self, scenario):
- news = ['odl', 'parser', 'vping_ssh']
- projects = scenario['installers'][0]['versions'][0]['projects']
+ def test_updateCustoms(self):
+ updates = ['odl', 'parser', 'vping_ssh']
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['customs'] = news
+ functest['customs'] = updates
self.update_url = '{}/customs?{}'.format(self.scenario_url,
self.locate_project)
- return news, scenario
+ return updates
@update_partial('_delete', '_success')
- def test_deleteCustoms(self, scenario):
- obsoletes = ['vping_ssh']
- projects = scenario['installers'][0]['versions'][0]['projects']
+ def test_deleteCustoms(self):
+ deletes = ['vping_ssh']
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
functest = filter(lambda f: f['project'] == 'functest', projects)[0]
functest['customs'] = ['healthcheck']
self.update_url = '{}/customs?{}'.format(self.scenario_url,
self.locate_project)
- return obsoletes, scenario
+ return deletes
@update_url_fixture('projects')
@update_partial('_add', '_success')
- def test_addProjects_succ(self, scenario):
+ def test_addProjects_succ(self):
add = models.ScenarioProject(project='qtip').format()
- scenario['installers'][0]['versions'][0]['projects'].append(add)
- return [add], scenario
+ self.req_d['installers'][0]['versions'][0]['projects'].append(add)
+ return [add]
@update_url_fixture('projects')
@update_partial('_add', '_conflict')
- def test_addProjects_already_exist(self, scenario):
+ def test_addProjects_already_exist(self):
add = models.ScenarioProject(project='functest').format()
- scenario['installers'][0]['versions'][0]['projects'].append(add)
- return [add], scenario
+ return [add]
@update_url_fixture('projects')
@update_partial('_add', '_bad_request')
- def test_addProjects_bad_schema(self, scenario):
+ def test_addProjects_bad_schema(self):
add = models.ScenarioProject(project='functest').format()
add['score'] = None
- scenario['installers'][0]['versions'][0]['projects'].append(add)
- return [add], scenario
+ return [add]
@update_url_fixture('projects')
@update_partial('_update', '_success')
- def test_updateProjects_succ(self, scenario):
+ def test_updateProjects_succ(self):
update = models.ScenarioProject(project='qtip').format()
- scenario['installers'][0]['versions'][0]['projects'] = [update]
- return [update], scenario
+ self.req_d['installers'][0]['versions'][0]['projects'] = [update]
+ return [update]
@update_url_fixture('projects')
@update_partial('_update', '_conflict')
- def test_updateProjects_duplicated(self, scenario):
+ def test_updateProjects_duplicated(self):
update1 = models.ScenarioProject(project='qtip').format()
update2 = models.ScenarioProject(project='qtip').format()
- scenario['installers'][0]['versions'][0]['projects'] = [update1,
- update2]
- return [update1, update2], scenario
+ return [update1, update2]
@update_url_fixture('projects')
@update_partial('_update', '_bad_request')
- def test_updateProjects_bad_schema(self, scenario):
+ def test_updateProjects_bad_schema(self):
update = models.ScenarioProject(project='functest').format()
update['score'] = None
- scenario['installers'][0]['versions'][0]['projects'] = [update]
- return [update], scenario
+ return [update]
@update_url_fixture('projects')
@update_partial('_delete', '_success')
- def test_deleteProjects(self, scenario):
+ def test_deleteProjects(self):
deletes = ['functest']
- projects = scenario['installers'][0]['versions'][0]['projects']
- scenario['installers'][0]['versions'][0]['projects'] = filter(
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
+ self.req_d['installers'][0]['versions'][0]['projects'] = filter(
lambda f: f['project'] != 'functest',
projects)
- return deletes, scenario
+ return deletes
@update_url_fixture('owner')
@update_partial('_update', '_success')
- def test_changeOwner(self, scenario):
+ def test_changeOwner(self):
new_owner = 'new_owner'
update = models.ScenarioChangeOwnerRequest(new_owner).format()
- scenario['installers'][0]['versions'][0]['owner'] = new_owner
- return update, scenario
+ self.req_d['installers'][0]['versions'][0]['owner'] = new_owner
+ return update
@update_url_fixture('versions')
@update_partial('_add', '_success')
- def test_addVersions_succ(self, scenario):
+ def test_addVersions_succ(self):
add = models.ScenarioVersion(version='Euphrates').format()
- scenario['installers'][0]['versions'].append(add)
- return [add], scenario
+ self.req_d['installers'][0]['versions'].append(add)
+ return [add]
@update_url_fixture('versions')
@update_partial('_add', '_conflict')
- def test_addVersions_already_exist(self, scenario):
+ def test_addVersions_already_exist(self):
add = models.ScenarioVersion(version='master').format()
- scenario['installers'][0]['versions'].append(add)
- return [add], scenario
+ return [add]
@update_url_fixture('versions')
@update_partial('_add', '_bad_request')
- def test_addVersions_bad_schema(self, scenario):
+ def test_addVersions_bad_schema(self):
add = models.ScenarioVersion(version='euphrates').format()
add['notexist'] = None
- scenario['installers'][0]['versions'].append(add)
- return [add], scenario
+ return [add]
@update_url_fixture('versions')
@update_partial('_update', '_success')
- def test_updateVersions_succ(self, scenario):
+ def test_updateVersions_succ(self):
update = models.ScenarioVersion(version='euphrates').format()
- scenario['installers'][0]['versions'] = [update]
- return [update], scenario
+ self.req_d['installers'][0]['versions'] = [update]
+ return [update]
@update_url_fixture('versions')
@update_partial('_update', '_conflict')
- def test_updateVersions_duplicated(self, scenario):
+ def test_updateVersions_duplicated(self):
update1 = models.ScenarioVersion(version='euphrates').format()
update2 = models.ScenarioVersion(version='euphrates').format()
- scenario['installers'][0]['versions'] = [update1, update2]
- return [update1, update2], scenario
+ return [update1, update2]
@update_url_fixture('versions')
@update_partial('_update', '_bad_request')
- def test_updateVersions_bad_schema(self, scenario):
+ def test_updateVersions_bad_schema(self):
update = models.ScenarioVersion(version='euphrates').format()
update['not_owner'] = 'Iam'
- scenario['installers'][0]['versions'] = [update]
- return [update], scenario
+ return [update]
@update_url_fixture('versions')
@update_partial('_delete', '_success')
- def test_deleteVersions(self, scenario):
+ def test_deleteVersions(self):
deletes = ['master']
- versions = scenario['installers'][0]['versions']
- scenario['installers'][0]['versions'] = filter(
+ versions = self.req_d['installers'][0]['versions']
+ self.req_d['installers'][0]['versions'] = filter(
lambda f: f['version'] != 'master',
versions)
- return deletes, scenario
+ return deletes
- def _add(self, update_req, new_scenario):
+ def _add(self, update_req):
return self.post_direct_url(self.update_url, update_req)
- def _update(self, update_req, new_scenario):
+ def _update(self, update_req):
return self.update_direct_url(self.update_url, update_req)
- def _delete(self, update_req, new_scenario):
+ def _delete(self, update_req):
return self.delete_direct_url(self.update_url, update_req)
- def _success(self, status, new_scenario):
+ def _success(self, status):
self.assertEqual(status, httplib.OK)
- self._get_and_assert(new_scenario.get('name'), new_scenario)
+ self.get_and_assert(self.req_d['name'])
- def _forbidden(self, status, new_scenario):
+ def _forbidden(self, status):
self.assertEqual(status, httplib.FORBIDDEN)
- def _bad_request(self, status, new_scenario):
+ def _bad_request(self, status):
self.assertEqual(status, httplib.BAD_REQUEST)
- def _conflict(self, status, new_scenario):
+ def _conflict(self, status):
self.assertEqual(status, httplib.CONFLICT)