diff options
Diffstat (limited to 'utils/test/testapi/opnfv_testapi/tests/unit')
22 files changed, 674 insertions, 447 deletions
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/noparam.ini b/utils/test/testapi/opnfv_testapi/tests/unit/common/noparam.ini index fda2a09e9..be7f2b9f8 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/common/noparam.ini +++ b/utils/test/testapi/opnfv_testapi/tests/unit/common/noparam.ini @@ -12,5 +12,5 @@ port = 8000 debug = True authenticate = False -[swagger] -base_url = http://localhost:8000 +[ui] +url = http://localhost:8000 diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/normal.ini b/utils/test/testapi/opnfv_testapi/tests/unit/common/normal.ini index 77cc6c6ee..c81c6c56a 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/common/normal.ini +++ b/utils/test/testapi/opnfv_testapi/tests/unit/common/normal.ini @@ -13,5 +13,5 @@ port = 8000 debug = True authenticate = False -[swagger] -base_url = http://localhost:8000 +[ui] +url = http://localhost:8000 diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/nosection.ini b/utils/test/testapi/opnfv_testapi/tests/unit/common/nosection.ini index 9988fc0a4..a9ed49c5c 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/common/nosection.ini +++ b/utils/test/testapi/opnfv_testapi/tests/unit/common/nosection.ini @@ -7,5 +7,5 @@ port = 8000 debug = True authenticate = False -[swagger] -base_url = http://localhost:8000 +[ui] +url = http://localhost:8000 diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/notboolean.ini b/utils/test/testapi/opnfv_testapi/tests/unit/common/notboolean.ini index b3f327670..3a11f9dd3 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/common/notboolean.ini +++ b/utils/test/testapi/opnfv_testapi/tests/unit/common/notboolean.ini @@ -13,5 +13,5 @@ port = 8000 debug = True authenticate = notboolean -[swagger] -base_url = http://localhost:8000 +[ui] +url = http://localhost:8000 diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/notint.ini b/utils/test/testapi/opnfv_testapi/tests/unit/common/notint.ini index d1b752a34..8180719b8 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/common/notint.ini +++ b/utils/test/testapi/opnfv_testapi/tests/unit/common/notint.ini @@ -13,5 +13,5 @@ port = notint debug = True authenticate = False -[swagger] -base_url = http://localhost:8000 +[ui] +url = http://localhost:8000 diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py b/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py index 446b9442a..8cfc513be 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py @@ -1,16 +1,15 @@ -import os +import argparse -from opnfv_testapi.common import config - -def test_config_success(): - config_file = os.path.join(os.path.dirname(__file__), - '../../../../etc/config.ini') - config.Config.CONFIG = config_file - conf = config.Config() - assert conf.mongo_url == 'mongodb://127.0.0.1:27017/' - assert conf.mongo_dbname == 'test_results_collection' - assert conf.api_port == 8000 - assert conf.api_debug is True - assert conf.api_authenticate is False - assert conf.swagger_base_url == 'http://localhost:8000' +def test_config_normal(mocker, config_normal): + mocker.patch( + 'argparse.ArgumentParser.parse_known_args', + return_value=(argparse.Namespace(config_file=config_normal), None)) + from opnfv_testapi.common import config + CONF = config.Config() + assert CONF.mongo_url == 'mongodb://127.0.0.1:27017/' + assert CONF.mongo_dbname == 'test_results_collection' + assert CONF.api_port == 8000 + assert CONF.api_debug is True + assert CONF.api_authenticate is False + assert CONF.ui_url == 'http://localhost:8000' diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/conftest.py b/utils/test/testapi/opnfv_testapi/tests/unit/conftest.py new file mode 100644 index 000000000..feff1daaa --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/conftest.py @@ -0,0 +1,8 @@ +from os import path + +import pytest + + +@pytest.fixture +def config_normal(): + return path.join(path.dirname(__file__), 'common/normal.ini') diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/executor.py b/utils/test/testapi/opnfv_testapi/tests/unit/executor.py index b30c3258b..b8f696caf 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/executor.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/executor.py @@ -10,6 +10,20 @@ import functools import httplib +def upload(excepted_status, excepted_response): + def _upload(create_request): + @functools.wraps(create_request) + def wrap(self): + request = create_request(self) + status, body = self.upload(request) + if excepted_status == httplib.OK: + getattr(self, excepted_response)(body) + else: + self.assertIn(excepted_response, body) + return wrap + return _upload + + def create(excepted_status, excepted_response): def _create(create_request): @functools.wraps(create_request) diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py index ef74a0857..0ca83df62 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py @@ -6,9 +6,10 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from operator import itemgetter + from bson.objectid import ObjectId from concurrent.futures import ThreadPoolExecutor -from operator import itemgetter def thread_execute(method, *args, **kwargs): @@ -20,38 +21,52 @@ def thread_execute(method, *args, **kwargs): class MemCursor(object): def __init__(self, collection): self.collection = collection - self.count = len(self.collection) + self.length = len(self.collection) self.sorted = [] def _is_next_exist(self): - return self.count != 0 + return self.length != 0 @property def fetch_next(self): return thread_execute(self._is_next_exist) def next_object(self): - self.count -= 1 + self.length -= 1 return self.collection.pop() def sort(self, key_or_list): - key = key_or_list[0][0] - if key_or_list[0][1] == -1: - reverse = True - else: - reverse = False + for k, v in key_or_list.iteritems(): + if v == -1: + reverse = True + else: + reverse = False - if key_or_list is not None: self.collection = sorted(self.collection, - key=itemgetter(key), reverse=reverse) + key=itemgetter(k), reverse=reverse) return self def limit(self, limit): if limit != 0 and limit < len(self.collection): - self.collection = self.collection[0:limit] - self.count = limit + self.collection = self.collection[0: limit] + self.length = limit return self + def skip(self, skip): + if skip < self.length and (skip > 0): + self.collection = self.collection[self.length - skip: -1] + self.length -= skip + elif skip >= self.length: + self.collection = [] + self.length = 0 + return self + + def _count(self): + return self.length + + def count(self): + return thread_execute(self._count) + class MemDb(object): @@ -105,10 +120,14 @@ class MemDb(object): @staticmethod def _compare_date(spec, value): + gte = True + lt = False for k, v in spec.iteritems(): - if k == '$gte' and value >= v: - return True - return False + if k == '$gte' and value < v: + gte = False + elif k == '$lt' and value < v: + lt = True + return gte and lt def _in(self, content, *args): if self.name == 'scenarios': @@ -171,9 +190,8 @@ class MemDb(object): elif k == 'trust_indicator.current': if content.get('trust_indicator').get('current') != v: return False - elif content.get(k, None) != v: + elif not isinstance(v, dict) and content.get(k, None) != v: return False - return True def _find(self, *args): @@ -187,6 +205,27 @@ class MemDb(object): def find(self, *args): return MemCursor(self._find(*args)) + def _aggregate(self, *args, **kwargs): + res = self.contents + print args + for arg in args[0]: + for k, v in arg.iteritems(): + if k == '$match': + res = self._find(v) + cursor = MemCursor(res) + for arg in args[0]: + for k, v in arg.iteritems(): + if k == '$sort': + cursor = cursor.sort(v) + elif k == '$skip': + cursor = cursor.skip(v) + elif k == '$limit': + cursor = cursor.limit(v) + return cursor + + def aggregate(self, *args, **kwargs): + return self._aggregate(*args, **kwargs) + def _update(self, spec, document, check_keys=True): updated = False diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/__init__.py diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/scenario-c1.json b/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json index 187802215..187802215 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/scenario-c1.json +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/scenario-c2.json b/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json index b6a3b83ab..980051c4f 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/scenario-c2.json +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json @@ -8,7 +8,7 @@ [ { "owner": "Lucky", - "version": "colorado", + "version": "danube", "projects": [ { @@ -29,7 +29,7 @@ "scores": [ { "date": "2017-01-08 22:46:44", - "score": "0" + "score": "0/1" } ], "trust_indicators": [ diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_base.py index 4d3445659..77a8d18c1 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_base.py @@ -12,13 +12,9 @@ from os import path import mock from tornado import testing -from opnfv_testapi.common import config from opnfv_testapi.resources import models from opnfv_testapi.tests.unit import fake_pymongo -config.Config.CONFIG = path.join(path.dirname(__file__), - '../../../etc/config.ini') - class TestBase(testing.AsyncHTTPTestCase): headers = {'Content-Type': 'application/json; charset=UTF-8'} @@ -37,20 +33,21 @@ class TestBase(testing.AsyncHTTPTestCase): def tearDown(self): self.db_patcher.stop() + self.config_patcher.stop() def _patch_server(self): - from opnfv_testapi.cmd import server - server.parse_config([ - '--config-file', - path.join(path.dirname(__file__), 'common/normal.ini') - ]) - self.db_patcher = mock.patch('opnfv_testapi.cmd.server.get_db', - self._fake_pymongo) + import argparse + config = path.join(path.dirname(__file__), '../common/normal.ini') + self.config_patcher = mock.patch( + 'argparse.ArgumentParser.parse_known_args', + return_value=(argparse.Namespace(config_file=config), None)) + self.db_patcher = mock.patch('opnfv_testapi.db.api.DB', + fake_pymongo) + self.config_patcher.start() self.db_patcher.start() - @staticmethod - def _fake_pymongo(): - return fake_pymongo + def set_config_file(self): + self.config_file = 'normal.ini' def get_app(self): from opnfv_testapi.cmd import server @@ -66,9 +63,12 @@ class TestBase(testing.AsyncHTTPTestCase): return self.create_help(self.basePath, req, *args) def create_help(self, uri, req, *args): + return self.post_direct_url(self._update_uri(uri, *args), req) + + def post_direct_url(self, url, req): if req and not isinstance(req, str) and hasattr(req, 'format'): req = req.format() - res = self.fetch(self._update_uri(uri, *args), + res = self.fetch(url, method='POST', body=json.dumps(req), headers=self.headers) @@ -92,21 +92,35 @@ class TestBase(testing.AsyncHTTPTestCase): headers=self.headers) return self._get_return(res, self.list_res) - def update(self, new=None, *args): - if new: + def update_direct_url(self, url, new=None): + if new and hasattr(new, 'format'): new = new.format() - res = self.fetch(self._get_uri(*args), + res = self.fetch(url, method='PUT', body=json.dumps(new), headers=self.headers) return self._get_return(res, self.update_res) - def delete(self, *args): - res = self.fetch(self._get_uri(*args), - method='DELETE', - headers=self.headers) + def update(self, new=None, *args): + return self.update_direct_url(self._get_uri(*args), new) + + def delete_direct_url(self, url, body): + if body: + res = self.fetch(url, + method='DELETE', + body=json.dumps(body), + headers=self.headers, + allow_nonstandard_methods=True) + else: + res = self.fetch(url, + method='DELETE', + headers=self.headers) + return res.code, res.body + def delete(self, *args): + return self.delete_direct_url(self._get_uri(*args), None) + @staticmethod def _get_valid_args(*args): new_args = tuple(['%s' % arg for arg in args if arg is not None]) @@ -132,7 +146,10 @@ class TestBase(testing.AsyncHTTPTestCase): def _get_return(self, res, cls): code = res.code body = res.body - return code, self._get_return_body(code, body, cls) + if body: + return code, self._get_return_body(code, body, cls) + else: + return code, None @staticmethod def _get_return_body(code, body, cls): diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py index 1ebc96f3b..1ebc96f3b 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_pod.py index 0ed348df9..cb4f1d92c 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_pod.py @@ -12,7 +12,7 @@ import unittest from opnfv_testapi.common import message from opnfv_testapi.resources import pod_models from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import test_base as base +from opnfv_testapi.tests.unit.resources import test_base as base class TestPodBase(base.TestBase): @@ -85,5 +85,6 @@ class TestPodGet(TestPodBase): else: self.assert_get_body(pod, self.req_e) + if __name__ == '__main__': unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_project.py index 323a1168f..0622ba8d4 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_project.py @@ -4,7 +4,7 @@ import unittest from opnfv_testapi.common import message from opnfv_testapi.resources import project_models from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import test_base as base +from opnfv_testapi.tests.unit.resources import test_base as base class TestProjectBase(base.TestBase): @@ -132,5 +132,6 @@ class TestProjectDelete(TestProjectBase): code, body = self.get(self.req_d.name) self.assertEqual(code, httplib.NOT_FOUND) + if __name__ == '__main__': unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_result.py index ef2ce307e..1e83ed308 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_result.py @@ -7,17 +7,18 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## import copy -from datetime import datetime, timedelta import httplib import unittest +from datetime import datetime, timedelta +import json from opnfv_testapi.common import message from opnfv_testapi.resources import pod_models from opnfv_testapi.resources import project_models from opnfv_testapi.resources import result_models from opnfv_testapi.resources import testcase_models -from opnfv_testapi.tests.unit import test_base as base from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.resources import test_base as base class Details(object): @@ -60,9 +61,9 @@ class TestResultBase(base.TestBase): self.scenario = 'odl-l2' self.criteria = 'passed' self.trust_indicator = result_models.TI(0.7) - self.start_date = "2016-05-23 07:16:09.477097" - self.stop_date = "2016-05-23 07:16:19.477097" - self.update_date = "2016-05-24 07:16:19.477097" + self.start_date = str(datetime.now()) + self.stop_date = str(datetime.now() + timedelta(minutes=1)) + self.update_date = str(datetime.now() + timedelta(days=1)) self.update_step = -0.05 super(TestResultBase, self).setUp() self.details = Details(timestart='0', duration='9s', status='OK') @@ -131,6 +132,22 @@ class TestResultBase(base.TestBase): _, res = self.create_d() return res.href.split('/')[-1] + def upload(self, req): + if req and not isinstance(req, str) and hasattr(req, 'format'): + req = req.format() + res = self.fetch(self.basePath + '/upload', + method='POST', + body=json.dumps(req), + headers=self.headers) + + return self._get_return(res, self.create_res) + + +class TestResultUpload(TestResultBase): + @executor.upload(httplib.BAD_REQUEST, message.key_error('file')) + def test_filenotfind(self): + return None + class TestResultCreate(TestResultBase): @executor.create(httplib.BAD_REQUEST, message.no_body()) @@ -208,9 +225,9 @@ class TestResultCreate(TestResultBase): class TestResultGet(TestResultBase): def setUp(self): super(TestResultGet, self).setUp() + self.req_10d_before = self._create_changed_date(days=-10) self.req_d_id = self._create_d() self.req_10d_later = self._create_changed_date(days=10) - self.req_10d_before = self._create_changed_date(days=-10) @executor.get(httplib.OK, 'assert_res') def test_getOne(self): @@ -256,9 +273,9 @@ class TestResultGet(TestResultBase): def test_queryPeriodNotInt(self): return self._set_query('period=a') - @executor.query(httplib.OK, '_query_last_one', 1) + @executor.query(httplib.OK, '_query_period_one', 1) def test_queryPeriodSuccess(self): - return self._set_query('period=1') + return self._set_query('period=5') @executor.query(httplib.BAD_REQUEST, message.must_int('last')) def test_queryLastNotInt(self): @@ -268,7 +285,17 @@ class TestResultGet(TestResultBase): def test_queryLast(self): return self._set_query('last=1') - @executor.query(httplib.OK, '_query_last_one', 1) + @executor.query(httplib.OK, '_query_success', 4) + def test_queryPublic(self): + self._create_public_data() + return self._set_query('') + + @executor.query(httplib.OK, '_query_success', 1) + def test_queryPrivate(self): + self._create_private_data() + return self._set_query('public=false') + + @executor.query(httplib.OK, '_query_period_one', 1) def test_combination(self): return self._set_query('pod', 'project', @@ -279,7 +306,7 @@ class TestResultGet(TestResultBase): 'scenario', 'trust_indicator', 'criteria', - 'period=1') + 'period=5') @executor.query(httplib.OK, '_query_success', 0) def test_notFound(self): @@ -294,6 +321,14 @@ class TestResultGet(TestResultBase): 'criteria', 'period=1') + @executor.query(httplib.OK, '_query_success', 1) + def test_filterErrorStartdate(self): + self._create_error_start_date(None) + self._create_error_start_date('None') + self._create_error_start_date('null') + self._create_error_start_date('') + return self._set_query('period=5') + def _query_success(self, body, number): self.assertEqual(number, len(body.results)) @@ -301,6 +336,16 @@ class TestResultGet(TestResultBase): self.assertEqual(number, len(body.results)) self.assert_res(body.results[0], self.req_10d_later) + def _query_period_one(self, body, number): + self.assertEqual(number, len(body.results)) + self.assert_res(body.results[0], self.req_d) + + def _create_error_start_date(self, start_date): + req = copy.deepcopy(self.req_d) + req.start_date = start_date + self.create(req) + return req + def _create_changed_date(self, **kwargs): req = copy.deepcopy(self.req_d) req.start_date = datetime.now() + timedelta(**kwargs) @@ -309,16 +354,29 @@ class TestResultGet(TestResultBase): self.create(req) return req + def _create_public_data(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.public = 'true' + self.create(req) + return req + + def _create_private_data(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.public = 'false' + self.create(req) + return req + def _set_query(self, *args): def get_value(arg): return self.__getattribute__(arg) \ if arg != 'trust_indicator' else self.trust_indicator.current uri = '' for arg in args: - if '=' in arg: - uri += arg + '&' - else: - uri += '{}={}&'.format(arg, get_value(arg)) + if arg: + if '=' in arg: + uri += arg + '&' + else: + uri += '{}={}&'.format(arg, get_value(arg)) return uri[0: -1] diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py new file mode 100644 index 000000000..1367fc669 --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py @@ -0,0 +1,449 @@ +import functools +import httplib +import json +import os + +from datetime import datetime + +from opnfv_testapi.common import message +import opnfv_testapi.resources.scenario_models as models +from opnfv_testapi.tests.unit.resources import test_base as base + + +def _none_default(check, default): + return check if check else default + + +class TestScenarioBase(base.TestBase): + def setUp(self): + super(TestScenarioBase, self).setUp() + self.get_res = models.Scenario + self.list_res = models.Scenarios + self.basePath = '/api/v1/scenarios' + self.req_d = self._load_request('scenario-c1.json') + self.req_2 = self._load_request('scenario-c2.json') + + def tearDown(self): + pass + + def assert_body(self, project, req=None): + pass + + @staticmethod + def _load_request(f_req): + abs_file = os.path.join(os.path.dirname(__file__), f_req) + with open(abs_file, 'r') as f: + loader = json.load(f) + f.close() + return loader + + def create_return_name(self, req): + _, res = self.create(req) + return res.href.split('/')[-1] + + def assert_res(self, code, scenario, req=None): + self.assertEqual(code, httplib.OK) + if req is None: + req = self.req_d + self.assertIsNotNone(scenario._id) + self.assertIsNotNone(scenario.creation_date) + self.assertEqual(scenario, models.Scenario.from_dict(req)) + + @staticmethod + def set_query(*args): + uri = '' + for arg in args: + uri += arg + '&' + return uri[0: -1] + + def get_and_assert(self, name): + code, body = self.get(name) + self.assert_res(code, body, self.req_d) + + +class TestScenarioCreate(TestScenarioBase): + def test_withoutBody(self): + (code, body) = self.create() + self.assertEqual(code, httplib.BAD_REQUEST) + + def test_emptyName(self): + req_empty = models.ScenarioCreateRequest('') + (code, body) = self.create(req_empty) + self.assertEqual(code, httplib.BAD_REQUEST) + self.assertIn(message.missing('name'), body) + + def test_noneName(self): + req_none = models.ScenarioCreateRequest(None) + (code, body) = self.create(req_none) + self.assertEqual(code, httplib.BAD_REQUEST) + self.assertIn(message.missing('name'), body) + + def test_success(self): + (code, body) = self.create_d() + self.assertEqual(code, httplib.OK) + self.assert_create_body(body) + + def test_alreadyExist(self): + self.create_d() + (code, body) = self.create_d() + self.assertEqual(code, httplib.FORBIDDEN) + self.assertIn(message.exist_base, body) + + +class TestScenarioGet(TestScenarioBase): + def setUp(self): + super(TestScenarioGet, self).setUp() + self.scenario_1 = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + + def test_getByName(self): + self.get_and_assert(self.scenario_1) + + def test_getAll(self): + self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) + + def test_queryName(self): + query = self.set_query('name=nosdn-nofeature-ha') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryInstaller(self): + query = self.set_query('installer=apex') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryVersion(self): + query = self.set_query('version=master') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryProject(self): + query = self.set_query('project=functest') + self._query_and_assert(query, reqs=[self.req_d, self.req_2]) + + # close due to random fail, open again after solve it in another patch + # def test_queryCombination(self): + # query = self._set_query('name=nosdn-nofeature-ha', + # 'installer=apex', + # 'version=master', + # 'project=functest') + # + # self._query_and_assert(query, reqs=[self.req_d]) + + def _query_and_assert(self, query, found=True, reqs=None): + code, body = self.query(query) + if not found: + self.assertEqual(code, httplib.OK) + self.assertEqual(0, len(body.scenarios)) + else: + self.assertEqual(len(reqs), len(body.scenarios)) + for req in reqs: + for scenario in body.scenarios: + if req['name'] == scenario.name: + self.assert_res(code, scenario, req) + + +class TestScenarioDelete(TestScenarioBase): + def test_notFound(self): + code, body = self.delete('notFound') + self.assertEqual(code, httplib.NOT_FOUND) + + def test_success(self): + scenario = self.create_return_name(self.req_d) + code, _ = self.delete(scenario) + self.assertEqual(code, httplib.OK) + code, _ = self.get(scenario) + self.assertEqual(code, httplib.NOT_FOUND) + + +class TestScenarioUpdate(TestScenarioBase): + def setUp(self): + super(TestScenarioUpdate, self).setUp() + self.scenario = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + self.update_url = '' + self.scenario_url = '/api/v1/scenarios/{}'.format(self.scenario) + self.installer = self.req_d['installers'][0]['installer'] + self.version = self.req_d['installers'][0]['versions'][0]['version'] + self.locate_project = 'installer={}&version={}&project={}'.format( + self.installer, + self.version, + 'functest') + + def update_url_fixture(item): + def _update_url_fixture(xstep): + def wrapper(self, *args, **kwargs): + self.update_url = '{}/{}'.format(self.scenario_url, item) + locator = None + if item in ['projects', 'owner']: + locator = 'installer={}&version={}'.format( + self.installer, + self.version) + elif item in ['versions']: + locator = 'installer={}'.format( + self.installer) + elif item in ['rename']: + self.update_url = self.scenario_url + + if locator: + self.update_url = '{}?{}'.format(self.update_url, locator) + + xstep(self, *args, **kwargs) + return wrapper + return _update_url_fixture + + def update_partial(operate, expected): + def _update_partial(set_update): + @functools.wraps(set_update) + def wrapper(self): + update = set_update(self) + code, body = getattr(self, operate)(update) + getattr(self, expected)(code) + return wrapper + return _update_partial + + @update_partial('_add', '_success') + def test_addScore(self): + add = models.ScenarioScore(date=str(datetime.now()), score='11/12') + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['scores'].append(add.format()) + self.update_url = '{}/scores?{}'.format(self.scenario_url, + self.locate_project) + + return add + + @update_partial('_add', '_success') + def test_addTrustIndicator(self): + add = models.ScenarioTI(date=str(datetime.now()), status='gold') + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['trust_indicators'].append(add.format()) + self.update_url = '{}/trust_indicators?{}'.format(self.scenario_url, + self.locate_project) + + return add + + @update_partial('_add', '_success') + def test_addCustoms(self): + adds = ['odl', 'parser', 'vping_ssh'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = list(set(functest['customs'] + adds)) + self.update_url = '{}/customs?{}'.format(self.scenario_url, + self.locate_project) + return adds + + @update_partial('_update', '_success') + def test_updateCustoms(self): + updates = ['odl', 'parser', 'vping_ssh'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = updates + self.update_url = '{}/customs?{}'.format(self.scenario_url, + self.locate_project) + + return updates + + @update_partial('_delete', '_success') + def test_deleteCustoms(self): + deletes = ['vping_ssh'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck'] + self.update_url = '{}/customs?{}'.format(self.scenario_url, + self.locate_project) + + return deletes + + @update_url_fixture('projects') + @update_partial('_add', '_success') + def test_addProjects_succ(self): + add = models.ScenarioProject(project='qtip').format() + self.req_d['installers'][0]['versions'][0]['projects'].append(add) + return [add] + + @update_url_fixture('projects') + @update_partial('_add', '_conflict') + def test_addProjects_already_exist(self): + add = models.ScenarioProject(project='functest').format() + return [add] + + @update_url_fixture('projects') + @update_partial('_add', '_bad_request') + def test_addProjects_bad_schema(self): + add = models.ScenarioProject(project='functest').format() + add['score'] = None + return [add] + + @update_url_fixture('projects') + @update_partial('_update', '_success') + def test_updateProjects_succ(self): + update = models.ScenarioProject(project='qtip').format() + self.req_d['installers'][0]['versions'][0]['projects'] = [update] + return [update] + + @update_url_fixture('projects') + @update_partial('_update', '_conflict') + def test_updateProjects_duplicated(self): + update = models.ScenarioProject(project='qtip').format() + return [update, update] + + @update_url_fixture('projects') + @update_partial('_update', '_bad_request') + def test_updateProjects_bad_schema(self): + update = models.ScenarioProject(project='functest').format() + update['score'] = None + return [update] + + @update_url_fixture('projects') + @update_partial('_delete', '_success') + def test_deleteProjects(self): + deletes = ['functest'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + self.req_d['installers'][0]['versions'][0]['projects'] = filter( + lambda f: f['project'] != 'functest', + projects) + return deletes + + @update_url_fixture('owner') + @update_partial('_update', '_success') + def test_changeOwner(self): + new_owner = 'new_owner' + update = models.ScenarioChangeOwnerRequest(new_owner).format() + self.req_d['installers'][0]['versions'][0]['owner'] = new_owner + return update + + @update_url_fixture('versions') + @update_partial('_add', '_success') + def test_addVersions_succ(self): + add = models.ScenarioVersion(version='Euphrates').format() + self.req_d['installers'][0]['versions'].append(add) + return [add] + + @update_url_fixture('versions') + @update_partial('_add', '_conflict') + def test_addVersions_already_exist(self): + add = models.ScenarioVersion(version='master').format() + return [add] + + @update_url_fixture('versions') + @update_partial('_add', '_bad_request') + def test_addVersions_bad_schema(self): + add = models.ScenarioVersion(version='euphrates').format() + add['notexist'] = None + return [add] + + @update_url_fixture('versions') + @update_partial('_update', '_success') + def test_updateVersions_succ(self): + update = models.ScenarioVersion(version='euphrates').format() + self.req_d['installers'][0]['versions'] = [update] + return [update] + + @update_url_fixture('versions') + @update_partial('_update', '_conflict') + def test_updateVersions_duplicated(self): + update = models.ScenarioVersion(version='euphrates').format() + return [update, update] + + @update_url_fixture('versions') + @update_partial('_update', '_bad_request') + def test_updateVersions_bad_schema(self): + update = models.ScenarioVersion(version='euphrates').format() + update['not_owner'] = 'Iam' + return [update] + + @update_url_fixture('versions') + @update_partial('_delete', '_success') + def test_deleteVersions(self): + deletes = ['master'] + versions = self.req_d['installers'][0]['versions'] + self.req_d['installers'][0]['versions'] = filter( + lambda f: f['version'] != 'master', + versions) + return deletes + + @update_url_fixture('installers') + @update_partial('_add', '_success') + def test_addInstallers_succ(self): + add = models.ScenarioInstaller(installer='daisy').format() + self.req_d['installers'].append(add) + return [add] + + @update_url_fixture('installers') + @update_partial('_add', '_conflict') + def test_addInstallers_already_exist(self): + add = models.ScenarioInstaller(installer='apex').format() + return [add] + + @update_url_fixture('installers') + @update_partial('_add', '_bad_request') + def test_addInstallers_bad_schema(self): + add = models.ScenarioInstaller(installer='daisy').format() + add['not_exist'] = 'not_exist' + return [add] + + @update_url_fixture('installers') + @update_partial('_update', '_success') + def test_updateInstallers_succ(self): + update = models.ScenarioInstaller(installer='daisy').format() + self.req_d['installers'] = [update] + return [update] + + @update_url_fixture('installers') + @update_partial('_update', '_conflict') + def test_updateInstallers_duplicated(self): + update = models.ScenarioInstaller(installer='daisy').format() + return [update, update] + + @update_url_fixture('installers') + @update_partial('_update', '_bad_request') + def test_updateInstallers_bad_schema(self): + update = models.ScenarioInstaller(installer='daisy').format() + update['not_exist'] = 'not_exist' + return [update] + + @update_url_fixture('installers') + @update_partial('_delete', '_success') + def test_deleteInstallers(self): + deletes = ['apex'] + installers = self.req_d['installers'] + self.req_d['installers'] = filter( + lambda f: f['installer'] != 'apex', + installers) + return deletes + + @update_url_fixture('rename') + @update_partial('_update', '_success') + def test_renameScenario(self): + new_name = 'new_scenario_name' + update = models.ScenarioUpdateRequest(name=new_name) + self.req_d['name'] = new_name + return update + + @update_url_fixture('rename') + @update_partial('_update', '_forbidden') + def test_renameScenario_exist(self): + new_name = self.req_d['name'] + update = models.ScenarioUpdateRequest(name=new_name) + return update + + def _add(self, update_req): + return self.post_direct_url(self.update_url, update_req) + + def _update(self, update_req): + return self.update_direct_url(self.update_url, update_req) + + def _delete(self, update_req): + return self.delete_direct_url(self.update_url, update_req) + + def _success(self, status): + self.assertEqual(status, httplib.OK) + self.get_and_assert(self.req_d['name']) + + def _forbidden(self, status): + self.assertEqual(status, httplib.FORBIDDEN) + + def _bad_request(self, status): + self.assertEqual(status, httplib.BAD_REQUEST) + + def _conflict(self, status): + self.assertEqual(status, httplib.CONFLICT) diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py index e28eaf5b8..4f2bc2ad1 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py @@ -13,8 +13,8 @@ import unittest from opnfv_testapi.common import message from opnfv_testapi.resources import project_models from opnfv_testapi.resources import testcase_models -from opnfv_testapi.tests.unit import test_base as base from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.resources import test_base as base class TestCaseBase(base.TestBase): diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_token.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_token.py index ca247a3b7..940e256c6 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_token.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_token.py @@ -10,14 +10,14 @@ from tornado import web from opnfv_testapi.common import message from opnfv_testapi.resources import project_models -from opnfv_testapi.router import url_mappings from opnfv_testapi.tests.unit import executor from opnfv_testapi.tests.unit import fake_pymongo -from opnfv_testapi.tests.unit import test_base as base +from opnfv_testapi.tests.unit.resources import test_base as base class TestToken(base.TestBase): def get_app(self): + from opnfv_testapi.router import url_mappings return web.Application( url_mappings.mappings, db=fake_pymongo, @@ -109,5 +109,6 @@ class TestTokenUpdateProject(TestToken): def _update_success(self, request, body): self.assertIn(request.name, body) + if __name__ == '__main__': unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_version.py index fff802ac8..51fed11ea 100644 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py +++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_version.py @@ -11,7 +11,7 @@ import unittest from opnfv_testapi.resources import models from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import test_base as base +from opnfv_testapi.tests.unit.resources import test_base as base class TestVersionBase(base.TestBase): diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py deleted file mode 100644 index b232bc168..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py +++ /dev/null @@ -1,360 +0,0 @@ -from copy import deepcopy -from datetime import datetime -import functools -import httplib -import json -import os - -from opnfv_testapi.common import message -import opnfv_testapi.resources.scenario_models as models -from opnfv_testapi.tests.unit import test_base as base - - -class TestScenarioBase(base.TestBase): - def setUp(self): - super(TestScenarioBase, self).setUp() - self.get_res = models.Scenario - self.list_res = models.Scenarios - self.basePath = '/api/v1/scenarios' - self.req_d = self._load_request('scenario-c1.json') - self.req_2 = self._load_request('scenario-c2.json') - - def tearDown(self): - pass - - def assert_body(self, project, req=None): - pass - - @staticmethod - def _load_request(f_req): - abs_file = os.path.join(os.path.dirname(__file__), f_req) - with open(abs_file, 'r') as f: - loader = json.load(f) - f.close() - return loader - - def create_return_name(self, req): - _, res = self.create(req) - return res.href.split('/')[-1] - - def assert_res(self, code, scenario, req=None): - self.assertEqual(code, httplib.OK) - if req is None: - req = self.req_d - self.assertIsNotNone(scenario._id) - self.assertIsNotNone(scenario.creation_date) - - scenario == models.Scenario.from_dict(req) - - @staticmethod - def _set_query(*args): - uri = '' - for arg in args: - uri += arg + '&' - return uri[0: -1] - - def _get_and_assert(self, name, req=None): - code, body = self.get(name) - self.assert_res(code, body, req) - - -class TestScenarioCreate(TestScenarioBase): - def test_withoutBody(self): - (code, body) = self.create() - self.assertEqual(code, httplib.BAD_REQUEST) - - def test_emptyName(self): - req_empty = models.ScenarioCreateRequest('') - (code, body) = self.create(req_empty) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_noneName(self): - req_none = models.ScenarioCreateRequest(None) - (code, body) = self.create(req_none) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_success(self): - (code, body) = self.create_d() - self.assertEqual(code, httplib.OK) - self.assert_create_body(body) - - def test_alreadyExist(self): - self.create_d() - (code, body) = self.create_d() - self.assertEqual(code, httplib.FORBIDDEN) - self.assertIn(message.exist_base, body) - - -class TestScenarioGet(TestScenarioBase): - def setUp(self): - super(TestScenarioGet, self).setUp() - self.scenario_1 = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def test_getByName(self): - self._get_and_assert(self.scenario_1, self.req_d) - - def test_getAll(self): - self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) - - def test_queryName(self): - query = self._set_query('name=nosdn-nofeature-ha') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryInstaller(self): - query = self._set_query('installer=apex') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryVersion(self): - query = self._set_query('version=master') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryProject(self): - query = self._set_query('project=functest') - self._query_and_assert(query, reqs=[self.req_d, self.req_2]) - - def test_queryCombination(self): - query = self._set_query('name=nosdn-nofeature-ha', - 'installer=apex', - 'version=master', - 'project=functest') - - self._query_and_assert(query, reqs=[self.req_d]) - - def _query_and_assert(self, query, found=True, reqs=None): - code, body = self.query(query) - if not found: - self.assertEqual(code, httplib.OK) - self.assertEqual(0, len(body.scenarios)) - else: - self.assertEqual(len(reqs), len(body.scenarios)) - for req in reqs: - for scenario in body.scenarios: - if req['name'] == scenario.name: - self.assert_res(code, scenario, req) - - -class TestScenarioUpdate(TestScenarioBase): - def setUp(self): - super(TestScenarioUpdate, self).setUp() - self.scenario = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def _execute(set_update): - @functools.wraps(set_update) - def magic(self): - update, scenario = set_update(self, deepcopy(self.req_d)) - self._update_and_assert(update, scenario) - return magic - - def _update(expected): - def _update(set_update): - @functools.wraps(set_update) - def wrap(self): - update, scenario = set_update(self, deepcopy(self.req_d)) - code, body = self.update(update, self.scenario) - getattr(self, expected)(code, scenario) - return wrap - return _update - - @_update('_success') - def test_renameScenario(self, scenario): - new_name = 'nosdn-nofeature-noha' - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={'name': new_name}) - return update_req, scenario - - @_update('_forbidden') - def test_renameScenario_exist(self, scenario): - new_name = self.scenario_2 - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={'name': new_name}) - return update_req, scenario - - @_update('_bad_request') - def test_renameScenario_noName(self, scenario): - new_name = self.scenario_2 - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={}) - return update_req, scenario - - @_execute - def test_addInstaller(self, scenario): - add = models.ScenarioInstaller(installer='daisy', versions=list()) - scenario['installers'].append(add.format()) - update = models.ScenarioUpdateRequest(field='installer', - op='add', - locate={}, - term=add.format()) - return update, scenario - - @_execute - def test_deleteInstaller(self, scenario): - scenario['installers'] = filter(lambda f: f['installer'] != 'apex', - scenario['installers']) - - update = models.ScenarioUpdateRequest(field='installer', - op='delete', - locate={'installer': 'apex'}) - return update, scenario - - @_execute - def test_addVersion(self, scenario): - add = models.ScenarioVersion(version='danube', projects=list()) - scenario['installers'][0]['versions'].append(add.format()) - update = models.ScenarioUpdateRequest(field='version', - op='add', - locate={'installer': 'apex'}, - term=add.format()) - return update, scenario - - @_execute - def test_deleteVersion(self, scenario): - scenario['installers'][0]['versions'] = filter( - lambda f: f['version'] != 'master', - scenario['installers'][0]['versions']) - - update = models.ScenarioUpdateRequest(field='version', - op='delete', - locate={'installer': 'apex', - 'version': 'master'}) - return update, scenario - - @_execute - def test_changeOwner(self, scenario): - scenario['installers'][0]['versions'][0]['owner'] = 'lucy' - - update = models.ScenarioUpdateRequest(field='owner', - op='update', - locate={'installer': 'apex', - 'version': 'master'}, - term={'owner': 'lucy'}) - return update, scenario - - @_execute - def test_addProject(self, scenario): - add = models.ScenarioProject(project='qtip').format() - scenario['installers'][0]['versions'][0]['projects'].append(add) - update = models.ScenarioUpdateRequest(field='project', - op='add', - locate={'installer': 'apex', - 'version': 'master'}, - term=add) - return update, scenario - - @_execute - def test_deleteProject(self, scenario): - scenario['installers'][0]['versions'][0]['projects'] = filter( - lambda f: f['project'] != 'functest', - scenario['installers'][0]['versions'][0]['projects']) - - update = models.ScenarioUpdateRequest(field='project', - op='delete', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}) - return update, scenario - - @_execute - def test_addCustoms(self, scenario): - add = ['odl', 'parser', 'vping_ssh'] - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh'] - update = models.ScenarioUpdateRequest(field='customs', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add) - return update, scenario - - @_execute - def test_deleteCustoms(self, scenario): - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck'] - update = models.ScenarioUpdateRequest(field='customs', - op='delete', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=['vping_ssh']) - return update, scenario - - @_execute - def test_addScore(self, scenario): - add = models.ScenarioScore(date=str(datetime.now()), score='11/12') - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['scores'].append(add.format()) - update = models.ScenarioUpdateRequest(field='score', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add.format()) - return update, scenario - - @_execute - def test_addTi(self, scenario): - add = models.ScenarioTI(date=str(datetime.now()), status='gold') - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['trust_indicators'].append(add.format()) - update = models.ScenarioUpdateRequest(field='trust_indicator', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add.format()) - return update, scenario - - def _update_and_assert(self, update_req, new_scenario, name=None): - code, _ = self.update(update_req, self.scenario) - self.assertEqual(code, httplib.OK) - self._get_and_assert(_none_default(name, self.scenario), - new_scenario) - - def _success(self, status, new_scenario): - self.assertEqual(status, httplib.OK) - self._get_and_assert(new_scenario.get('name'), new_scenario) - - def _forbidden(self, status, new_scenario): - self.assertEqual(status, httplib.FORBIDDEN) - - def _bad_request(self, status, new_scenario): - self.assertEqual(status, httplib.BAD_REQUEST) - - -class TestScenarioDelete(TestScenarioBase): - def test_notFound(self): - code, body = self.delete('notFound') - self.assertEqual(code, httplib.NOT_FOUND) - - def test_success(self): - scenario = self.create_return_name(self.req_d) - code, _ = self.delete(scenario) - self.assertEqual(code, httplib.OK) - code, _ = self.get(scenario) - self.assertEqual(code, httplib.NOT_FOUND) - - -def _none_default(check, default): - return check if check else default |