From d43c5992199b99c738a74d22ad126e86153d1008 Mon Sep 17 00:00:00 2001 From: SerenaFeng Date: Wed, 12 Jul 2017 14:27:58 +0800 Subject: move resources unit tests to tests/unit/resources As webportal is introduced, the project structure is becoming more complex, it is time to make it a a little bit more official Change-Id: Id380d37b07719f053b0bd385a326a2f2944a4b22 Signed-off-by: SerenaFeng --- .../opnfv_testapi/tests/unit/resources/__init__.py | 0 .../tests/unit/resources/scenario-c1.json | 38 +++ .../tests/unit/resources/scenario-c2.json | 73 +++++ .../tests/unit/resources/test_base.py | 164 ++++++++++ .../tests/unit/resources/test_fake_pymongo.py | 123 +++++++ .../opnfv_testapi/tests/unit/resources/test_pod.py | 89 +++++ .../tests/unit/resources/test_project.py | 136 ++++++++ .../tests/unit/resources/test_result.py | 352 ++++++++++++++++++++ .../tests/unit/resources/test_scenario.py | 360 +++++++++++++++++++++ .../tests/unit/resources/test_testcase.py | 201 ++++++++++++ .../tests/unit/resources/test_token.py | 113 +++++++ .../tests/unit/resources/test_version.py | 36 +++ testapi/opnfv_testapi/tests/unit/scenario-c1.json | 38 --- testapi/opnfv_testapi/tests/unit/scenario-c2.json | 73 ----- testapi/opnfv_testapi/tests/unit/test_base.py | 164 ---------- .../opnfv_testapi/tests/unit/test_fake_pymongo.py | 123 ------- testapi/opnfv_testapi/tests/unit/test_pod.py | 89 ----- testapi/opnfv_testapi/tests/unit/test_project.py | 136 -------- testapi/opnfv_testapi/tests/unit/test_result.py | 352 -------------------- testapi/opnfv_testapi/tests/unit/test_scenario.py | 360 --------------------- testapi/opnfv_testapi/tests/unit/test_testcase.py | 201 ------------ testapi/opnfv_testapi/tests/unit/test_token.py | 113 ------- testapi/opnfv_testapi/tests/unit/test_version.py | 36 --- 23 files changed, 1685 insertions(+), 1685 deletions(-) create mode 100644 testapi/opnfv_testapi/tests/unit/resources/__init__.py create mode 100644 testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json create mode 100644 testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json create mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_base.py create mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py create mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_pod.py create mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_project.py create mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_result.py create mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_scenario.py create mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_testcase.py create mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_token.py create mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_version.py delete mode 100644 testapi/opnfv_testapi/tests/unit/scenario-c1.json delete mode 100644 testapi/opnfv_testapi/tests/unit/scenario-c2.json delete mode 100644 testapi/opnfv_testapi/tests/unit/test_base.py delete mode 100644 testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py delete mode 100644 testapi/opnfv_testapi/tests/unit/test_pod.py delete mode 100644 testapi/opnfv_testapi/tests/unit/test_project.py delete mode 100644 testapi/opnfv_testapi/tests/unit/test_result.py delete mode 100644 testapi/opnfv_testapi/tests/unit/test_scenario.py delete mode 100644 testapi/opnfv_testapi/tests/unit/test_testcase.py delete mode 100644 testapi/opnfv_testapi/tests/unit/test_token.py delete mode 100644 testapi/opnfv_testapi/tests/unit/test_version.py (limited to 'testapi') diff --git a/testapi/opnfv_testapi/tests/unit/resources/__init__.py b/testapi/opnfv_testapi/tests/unit/resources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json b/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json new file mode 100644 index 0000000..1878022 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json @@ -0,0 +1,38 @@ +{ + "name": "nosdn-nofeature-ha", + "installers": + [ + { + "installer": "apex", + "versions": + [ + { + "owner": "Luke", + "version": "master", + "projects": + [ + { + "project": "functest", + "customs": [ "healthcheck", "vping_ssh"], + "scores": + [ + { + "date": "2017-01-08 22:46:44", + "score": "12/14" + } + + ], + "trust_indicators": [] + }, + { + "project": "yardstick", + "customs": [], + "scores": [], + "trust_indicators": [] + } + ] + } + ] + } + ] +} diff --git a/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json b/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json new file mode 100644 index 0000000..b6a3b83 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json @@ -0,0 +1,73 @@ +{ + "name": "odl_2-nofeature-ha", + "installers": + [ + { + "installer": "fuel", + "versions": + [ + { + "owner": "Lucky", + "version": "colorado", + "projects": + [ + { + "project": "functest", + "customs": [ "healthcheck", "vping_ssh"], + "scores": [], + "trust_indicators": [ + { + "date": "2017-01-18 22:46:44", + "status": "silver" + } + + ] + }, + { + "project": "yardstick", + "customs": ["suite-a"], + "scores": [ + { + "date": "2017-01-08 22:46:44", + "score": "0" + } + ], + "trust_indicators": [ + { + "date": "2017-01-18 22:46:44", + "status": "gold" + } + ] + } + ] + }, + { + "owner": "Luke", + "version": "colorado", + "projects": + [ + { + "project": "functest", + "customs": [ "healthcheck", "vping_ssh"], + "scores": + [ + { + "date": "2017-01-09 22:46:44", + "score": "11/14" + } + + ], + "trust_indicators": [] + }, + { + "project": "yardstick", + "customs": [], + "scores": [], + "trust_indicators": [] + } + ] + } + ] + } + ] +} diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_base.py b/testapi/opnfv_testapi/tests/unit/resources/test_base.py new file mode 100644 index 0000000..6e4d454 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/test_base.py @@ -0,0 +1,164 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import json +from os import path + +import mock +from tornado import testing + +from opnfv_testapi.common import config +from opnfv_testapi.resources import models +from opnfv_testapi.tests.unit import fake_pymongo + +config.Config.CONFIG = path.join(path.dirname(__file__), + '../../../../etc/config.ini') + + +class TestBase(testing.AsyncHTTPTestCase): + headers = {'Content-Type': 'application/json; charset=UTF-8'} + + def setUp(self): + self._patch_server() + self.basePath = '' + self.create_res = models.CreateResponse + self.get_res = None + self.list_res = None + self.update_res = None + self.req_d = None + self.req_e = None + self.addCleanup(self._clear) + super(TestBase, self).setUp() + + def tearDown(self): + self.db_patcher.stop() + + def _patch_server(self): + from opnfv_testapi.cmd import server + server.parse_config([ + '--config-file', + path.join(path.dirname(__file__), path.pardir, 'common/normal.ini') + ]) + self.db_patcher = mock.patch('opnfv_testapi.cmd.server.get_db', + self._fake_pymongo) + self.db_patcher.start() + + @staticmethod + def _fake_pymongo(): + return fake_pymongo + + def get_app(self): + from opnfv_testapi.cmd import server + return server.make_app() + + def create_d(self, *args): + return self.create(self.req_d, *args) + + def create_e(self, *args): + return self.create(self.req_e, *args) + + def create(self, req=None, *args): + return self.create_help(self.basePath, req, *args) + + def create_help(self, uri, req, *args): + if req and not isinstance(req, str) and hasattr(req, 'format'): + req = req.format() + res = self.fetch(self._update_uri(uri, *args), + method='POST', + body=json.dumps(req), + headers=self.headers) + + return self._get_return(res, self.create_res) + + def get(self, *args): + res = self.fetch(self._get_uri(*args), + method='GET', + headers=self.headers) + + def inner(): + new_args, num = self._get_valid_args(*args) + return self.get_res \ + if num != self._need_arg_num(self.basePath) else self.list_res + return self._get_return(res, inner()) + + def query(self, query): + res = self.fetch(self._get_query_uri(query), + method='GET', + headers=self.headers) + return self._get_return(res, self.list_res) + + def update(self, new=None, *args): + if new: + new = new.format() + res = self.fetch(self._get_uri(*args), + method='PUT', + body=json.dumps(new), + headers=self.headers) + return self._get_return(res, self.update_res) + + def delete(self, *args): + res = self.fetch(self._get_uri(*args), + method='DELETE', + headers=self.headers) + return res.code, res.body + + @staticmethod + def _get_valid_args(*args): + new_args = tuple(['%s' % arg for arg in args if arg is not None]) + return new_args, len(new_args) + + def _need_arg_num(self, uri): + return uri.count('%s') + + def _get_query_uri(self, query): + return self.basePath + '?' + query if query else self.basePath + + def _get_uri(self, *args): + return self._update_uri(self.basePath, *args) + + def _update_uri(self, uri, *args): + r_uri = uri + new_args, num = self._get_valid_args(*args) + if num != self._need_arg_num(uri): + r_uri += '/%s' + + return r_uri % tuple(['%s' % arg for arg in new_args]) + + def _get_return(self, res, cls): + code = res.code + body = res.body + return code, self._get_return_body(code, body, cls) + + @staticmethod + def _get_return_body(code, body, cls): + return cls.from_dict(json.loads(body)) if code < 300 and cls else body + + def assert_href(self, body): + self.assertIn(self.basePath, body.href) + + def assert_create_body(self, body, req=None, *args): + import inspect + if not req: + req = self.req_d + resource_name = '' + if inspect.isclass(req): + resource_name = req.name + elif isinstance(req, dict): + resource_name = req['name'] + elif isinstance(req, str): + resource_name = json.loads(req)['name'] + new_args = args + tuple([resource_name]) + self.assertIn(self._get_uri(*new_args), body.href) + + @staticmethod + def _clear(): + fake_pymongo.pods.clear() + fake_pymongo.projects.clear() + fake_pymongo.testcases.clear() + fake_pymongo.results.clear() + fake_pymongo.scenarios.clear() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py b/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py new file mode 100644 index 0000000..1ebc96f --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py @@ -0,0 +1,123 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest + +from tornado import gen +from tornado import testing +from tornado import web + +from opnfv_testapi.tests.unit import fake_pymongo + + +class MyTest(testing.AsyncHTTPTestCase): + def setUp(self): + super(MyTest, self).setUp() + self.db = fake_pymongo + self.addCleanup(self._clear) + self.io_loop.run_sync(self.fixture_setup) + + def get_app(self): + return web.Application() + + @gen.coroutine + def fixture_setup(self): + self.test1 = {'_id': '1', 'name': 'test1'} + self.test2 = {'name': 'test2'} + yield self.db.pods.insert({'_id': '1', 'name': 'test1'}) + yield self.db.pods.insert({'name': 'test2'}) + + @testing.gen_test + def test_find_one(self): + user = yield self.db.pods.find_one({'name': 'test1'}) + self.assertEqual(user, self.test1) + self.db.pods.remove() + + @testing.gen_test + def test_find(self): + cursor = self.db.pods.find() + names = [] + while (yield cursor.fetch_next): + ob = cursor.next_object() + names.append(ob.get('name')) + self.assertItemsEqual(names, ['test1', 'test2']) + + @testing.gen_test + def test_update(self): + yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'}) + user = yield self.db.pods.find_one({'_id': '1'}) + self.assertEqual(user.get('name', None), 'new_test1') + + def test_update_dot_error(self): + self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, + 'key 1. name must not contain .') + + def test_update_dot_no_error(self): + self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, + None, + check_keys=False) + + def test_update_dollar_error(self): + self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, + 'key $name must not start with $') + + def test_update_dollar_no_error(self): + self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, + None, + check_keys=False) + + @testing.gen_test + def test_remove(self): + yield self.db.pods.remove({'_id': '1'}) + user = yield self.db.pods.find_one({'_id': '1'}) + self.assertIsNone(user) + + def test_insert_dot_error(self): + self._insert_assert({'_id': '1', '2. name': 'test1'}, + 'key 2. name must not contain .') + + def test_insert_dot_no_error(self): + self._insert_assert({'_id': '1', '2. name': 'test1'}, + None, + check_keys=False) + + def test_insert_dollar_error(self): + self._insert_assert({'_id': '1', '$name': 'test1'}, + 'key $name must not start with $') + + def test_insert_dollar_no_error(self): + self._insert_assert({'_id': '1', '$name': 'test1'}, + None, + check_keys=False) + + def _clear(self): + self.db.pods.clear() + + def _update_assert(self, docs, error=None, **kwargs): + self._db_assert('update', error, {'_id': '1'}, docs, **kwargs) + + def _insert_assert(self, docs, error=None, **kwargs): + self._db_assert('insert', error, docs, **kwargs) + + @testing.gen_test + def _db_assert(self, method, error, *args, **kwargs): + name_error = None + try: + yield self._eval_pods_db(method, *args, **kwargs) + except NameError as err: + name_error = err.args[0] + finally: + self.assertEqual(name_error, error) + + def _eval_pods_db(self, method, *args, **kwargs): + table_obj = vars(self.db)['pods'] + return table_obj.__getattribute__(method)(*args, **kwargs) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_pod.py b/testapi/opnfv_testapi/tests/unit/resources/test_pod.py new file mode 100644 index 0000000..8e0ae40 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/test_pod.py @@ -0,0 +1,89 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import httplib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.resources import pod_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.resources import test_base as base + + +class TestPodBase(base.TestBase): + def setUp(self): + super(TestPodBase, self).setUp() + self.req_d = pod_models.PodCreateRequest('zte-1', 'virtual', + 'zte pod 1', 'ci-pod') + self.req_e = pod_models.PodCreateRequest('zte-2', 'metal', 'zte pod 2') + self.get_res = pod_models.Pod + self.list_res = pod_models.Pods + self.basePath = '/api/v1/pods' + + def assert_get_body(self, pod, req=None): + if not req: + req = self.req_d + self.assertEqual(pod.name, req.name) + self.assertEqual(pod.mode, req.mode) + self.assertEqual(pod.details, req.details) + self.assertEqual(pod.role, req.role) + self.assertIsNotNone(pod.creation_date) + self.assertIsNotNone(pod._id) + + +class TestPodCreate(TestPodBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_withoutBody(self): + return None + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_emptyName(self): + return pod_models.PodCreateRequest('') + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_noneName(self): + return pod_models.PodCreateRequest(None) + + @executor.create(httplib.OK, 'assert_create_body') + def test_success(self): + return self.req_d + + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExist(self): + self.create_d() + return self.req_d + + +class TestPodGet(TestPodBase): + def setUp(self): + super(TestPodGet, self).setUp() + self.create_d() + self.create_e() + + @executor.get(httplib.NOT_FOUND, message.not_found_base) + def test_notExist(self): + return 'notExist' + + @executor.get(httplib.OK, 'assert_get_body') + def test_getOne(self): + return self.req_d.name + + @executor.get(httplib.OK, '_assert_list') + def test_list(self): + return None + + def _assert_list(self, body): + self.assertEqual(len(body.pods), 2) + for pod in body.pods: + if self.req_d.name == pod.name: + self.assert_get_body(pod) + else: + self.assert_get_body(pod, self.req_e) + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_project.py b/testapi/opnfv_testapi/tests/unit/resources/test_project.py new file mode 100644 index 0000000..5a2ce75 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/test_project.py @@ -0,0 +1,136 @@ +import httplib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.resources import project_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.resources import test_base as base + + +class TestProjectBase(base.TestBase): + def setUp(self): + super(TestProjectBase, self).setUp() + self.req_d = project_models.ProjectCreateRequest('vping', + 'vping-ssh test') + self.req_e = project_models.ProjectCreateRequest('doctor', + 'doctor test') + self.get_res = project_models.Project + self.list_res = project_models.Projects + self.update_res = project_models.Project + self.basePath = '/api/v1/projects' + + def assert_body(self, project, req=None): + if not req: + req = self.req_d + self.assertEqual(project.name, req.name) + self.assertEqual(project.description, req.description) + self.assertIsNotNone(project._id) + self.assertIsNotNone(project.creation_date) + + +class TestProjectCreate(TestProjectBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_withoutBody(self): + return None + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_emptyName(self): + return project_models.ProjectCreateRequest('') + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_noneName(self): + return project_models.ProjectCreateRequest(None) + + @executor.create(httplib.OK, 'assert_create_body') + def test_success(self): + return self.req_d + + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExist(self): + self.create_d() + return self.req_d + + +class TestProjectGet(TestProjectBase): + def setUp(self): + super(TestProjectGet, self).setUp() + self.create_d() + self.create_e() + + @executor.get(httplib.NOT_FOUND, message.not_found_base) + def test_notExist(self): + return 'notExist' + + @executor.get(httplib.OK, 'assert_body') + def test_getOne(self): + return self.req_d.name + + @executor.get(httplib.OK, '_assert_list') + def test_list(self): + return None + + def _assert_list(self, body): + for project in body.projects: + if self.req_d.name == project.name: + self.assert_body(project) + else: + self.assert_body(project, self.req_e) + + +class TestProjectUpdate(TestProjectBase): + def setUp(self): + super(TestProjectUpdate, self).setUp() + _, d_body = self.create_d() + _, get_res = self.get(self.req_d.name) + self.index_d = get_res._id + self.create_e() + + @executor.update(httplib.BAD_REQUEST, message.no_body()) + def test_withoutBody(self): + return None, 'noBody' + + @executor.update(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return self.req_e, 'notFound' + + @executor.update(httplib.FORBIDDEN, message.exist_base) + def test_newNameExist(self): + return self.req_e, self.req_d.name + + @executor.update(httplib.FORBIDDEN, message.no_update()) + def test_noUpdate(self): + return self.req_d, self.req_d.name + + @executor.update(httplib.OK, '_assert_update') + def test_success(self): + req = project_models.ProjectUpdateRequest('newName', 'new description') + return req, self.req_d.name + + def _assert_update(self, req, body): + self.assertEqual(self.index_d, body._id) + self.assert_body(body, req) + _, new_body = self.get(req.name) + self.assertEqual(self.index_d, new_body._id) + self.assert_body(new_body, req) + + +class TestProjectDelete(TestProjectBase): + def setUp(self): + super(TestProjectDelete, self).setUp() + self.create_d() + + @executor.delete(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return 'notFound' + + @executor.delete(httplib.OK, '_assert_delete') + def test_success(self): + return self.req_d.name + + def _assert_delete(self, body): + self.assertEqual(body, '') + code, body = self.get(self.req_d.name) + self.assertEqual(code, httplib.NOT_FOUND) + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_result.py b/testapi/opnfv_testapi/tests/unit/resources/test_result.py new file mode 100644 index 0000000..c8463cb --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/test_result.py @@ -0,0 +1,352 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import copy +import httplib +import unittest +from datetime import datetime, timedelta + +from opnfv_testapi.common import message +from opnfv_testapi.resources import pod_models +from opnfv_testapi.resources import project_models +from opnfv_testapi.resources import result_models +from opnfv_testapi.resources import testcase_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.resources import test_base as base + + +class Details(object): + def __init__(self, timestart=None, duration=None, status=None): + self.timestart = timestart + self.duration = duration + self.status = status + self.items = [{'item1': 1}, {'item2': 2}] + + def format(self): + return { + "timestart": self.timestart, + "duration": self.duration, + "status": self.status, + 'items': [{'item1': 1}, {'item2': 2}] + } + + @staticmethod + def from_dict(a_dict): + + if a_dict is None: + return None + + t = Details() + t.timestart = a_dict.get('timestart') + t.duration = a_dict.get('duration') + t.status = a_dict.get('status') + t.items = a_dict.get('items') + return t + + +class TestResultBase(base.TestBase): + def setUp(self): + self.pod = 'zte-pod1' + self.project = 'functest' + self.case = 'vPing' + self.installer = 'fuel' + self.version = 'C' + self.build_tag = 'v3.0' + self.scenario = 'odl-l2' + self.criteria = 'passed' + self.trust_indicator = result_models.TI(0.7) + self.start_date = "2016-05-23 07:16:09.477097" + self.stop_date = "2016-05-23 07:16:19.477097" + self.update_date = "2016-05-24 07:16:19.477097" + self.update_step = -0.05 + super(TestResultBase, self).setUp() + self.details = Details(timestart='0', duration='9s', status='OK') + self.req_d = result_models.ResultCreateRequest( + pod_name=self.pod, + project_name=self.project, + case_name=self.case, + installer=self.installer, + version=self.version, + start_date=self.start_date, + stop_date=self.stop_date, + details=self.details.format(), + build_tag=self.build_tag, + scenario=self.scenario, + criteria=self.criteria, + trust_indicator=self.trust_indicator) + self.get_res = result_models.TestResult + self.list_res = result_models.TestResults + self.update_res = result_models.TestResult + self.basePath = '/api/v1/results' + self.req_pod = pod_models.PodCreateRequest( + self.pod, + 'metal', + 'zte pod 1') + self.req_project = project_models.ProjectCreateRequest( + self.project, + 'vping test') + self.req_testcase = testcase_models.TestcaseCreateRequest( + self.case, + '/cases/vping', + 'vping-ssh test') + self.create_help('/api/v1/pods', self.req_pod) + self.create_help('/api/v1/projects', self.req_project) + self.create_help('/api/v1/projects/%s/cases', + self.req_testcase, + self.project) + + def assert_res(self, result, req=None): + if req is None: + req = self.req_d + self.assertEqual(result.pod_name, req.pod_name) + self.assertEqual(result.project_name, req.project_name) + self.assertEqual(result.case_name, req.case_name) + self.assertEqual(result.installer, req.installer) + self.assertEqual(result.version, req.version) + details_req = Details.from_dict(req.details) + details_res = Details.from_dict(result.details) + self.assertEqual(details_res.duration, details_req.duration) + self.assertEqual(details_res.timestart, details_req.timestart) + self.assertEqual(details_res.status, details_req.status) + self.assertEqual(details_res.items, details_req.items) + self.assertEqual(result.build_tag, req.build_tag) + self.assertEqual(result.scenario, req.scenario) + self.assertEqual(result.criteria, req.criteria) + self.assertEqual(result.start_date, req.start_date) + self.assertEqual(result.stop_date, req.stop_date) + self.assertIsNotNone(result._id) + ti = result.trust_indicator + self.assertEqual(ti.current, req.trust_indicator.current) + if ti.histories: + history = ti.histories[0] + self.assertEqual(history.date, self.update_date) + self.assertEqual(history.step, self.update_step) + + def _create_d(self): + _, res = self.create_d() + return res.href.split('/')[-1] + + +class TestResultCreate(TestResultBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_nobody(self): + return None + + @executor.create(httplib.BAD_REQUEST, message.missing('pod_name')) + def test_podNotProvided(self): + req = self.req_d + req.pod_name = None + return req + + @executor.create(httplib.BAD_REQUEST, message.missing('project_name')) + def test_projectNotProvided(self): + req = self.req_d + req.project_name = None + return req + + @executor.create(httplib.BAD_REQUEST, message.missing('case_name')) + def test_testcaseNotProvided(self): + req = self.req_d + req.case_name = None + return req + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noPod(self): + req = self.req_d + req.pod_name = 'notExistPod' + return req + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noProject(self): + req = self.req_d + req.project_name = 'notExistProject' + return req + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noTestcase(self): + req = self.req_d + req.case_name = 'notExistTestcase' + return req + + @executor.create(httplib.OK, 'assert_href') + def test_success(self): + return self.req_d + + @executor.create(httplib.OK, 'assert_href') + def test_key_with_doc(self): + req = copy.deepcopy(self.req_d) + req.details = {'1.name': 'dot_name'} + return req + + @executor.create(httplib.OK, '_assert_no_ti') + def test_no_ti(self): + req = result_models.ResultCreateRequest(pod_name=self.pod, + project_name=self.project, + case_name=self.case, + installer=self.installer, + version=self.version, + start_date=self.start_date, + stop_date=self.stop_date, + details=self.details.format(), + build_tag=self.build_tag, + scenario=self.scenario, + criteria=self.criteria) + self.actual_req = req + return req + + def _assert_no_ti(self, body): + _id = body.href.split('/')[-1] + code, body = self.get(_id) + self.assert_res(body, self.actual_req) + + +class TestResultGet(TestResultBase): + def setUp(self): + super(TestResultGet, self).setUp() + self.req_d_id = self._create_d() + self.req_10d_later = self._create_changed_date(days=10) + self.req_10d_before = self._create_changed_date(days=-10) + + @executor.get(httplib.OK, 'assert_res') + def test_getOne(self): + return self.req_d_id + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryPod(self): + return self._set_query('pod') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryProject(self): + return self._set_query('project') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryTestcase(self): + return self._set_query('case') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryVersion(self): + return self._set_query('version') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryInstaller(self): + return self._set_query('installer') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryBuildTag(self): + return self._set_query('build_tag') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryScenario(self): + return self._set_query('scenario') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryTrustIndicator(self): + return self._set_query('trust_indicator') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryCriteria(self): + return self._set_query('criteria') + + @executor.query(httplib.BAD_REQUEST, message.must_int('period')) + def test_queryPeriodNotInt(self): + return self._set_query('period=a') + + @executor.query(httplib.OK, '_query_last_one', 1) + def test_queryPeriodSuccess(self): + return self._set_query('period=1') + + @executor.query(httplib.BAD_REQUEST, message.must_int('last')) + def test_queryLastNotInt(self): + return self._set_query('last=a') + + @executor.query(httplib.OK, '_query_last_one', 1) + def test_queryLast(self): + return self._set_query('last=1') + + @executor.query(httplib.OK, '_query_last_one', 1) + def test_combination(self): + return self._set_query('pod', + 'project', + 'case', + 'version', + 'installer', + 'build_tag', + 'scenario', + 'trust_indicator', + 'criteria', + 'period=1') + + @executor.query(httplib.OK, '_query_success', 0) + def test_notFound(self): + return self._set_query('pod=notExistPod', + 'project', + 'case', + 'version', + 'installer', + 'build_tag', + 'scenario', + 'trust_indicator', + 'criteria', + 'period=1') + + def _query_success(self, body, number): + self.assertEqual(number, len(body.results)) + + def _query_last_one(self, body, number): + self.assertEqual(number, len(body.results)) + self.assert_res(body.results[0], self.req_10d_later) + + def _create_changed_date(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.start_date = datetime.now() + timedelta(**kwargs) + req.stop_date = str(req.start_date + timedelta(minutes=10)) + req.start_date = str(req.start_date) + self.create(req) + return req + + def _set_query(self, *args): + def get_value(arg): + return self.__getattribute__(arg) \ + if arg != 'trust_indicator' else self.trust_indicator.current + uri = '' + for arg in args: + if '=' in arg: + uri += arg + '&' + else: + uri += '{}={}&'.format(arg, get_value(arg)) + return uri[0: -1] + + +class TestResultUpdate(TestResultBase): + def setUp(self): + super(TestResultUpdate, self).setUp() + self.req_d_id = self._create_d() + + @executor.update(httplib.OK, '_assert_update_ti') + def test_success(self): + new_ti = copy.deepcopy(self.trust_indicator) + new_ti.current += self.update_step + new_ti.histories.append( + result_models.TIHistory(self.update_date, self.update_step)) + new_data = copy.deepcopy(self.req_d) + new_data.trust_indicator = new_ti + update = result_models.ResultUpdateRequest(trust_indicator=new_ti) + self.update_req = new_data + return update, self.req_d_id + + def _assert_update_ti(self, request, body): + ti = body.trust_indicator + self.assertEqual(ti.current, request.trust_indicator.current) + if ti.histories: + history = ti.histories[0] + self.assertEqual(history.date, self.update_date) + self.assertEqual(history.step, self.update_step) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py b/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py new file mode 100644 index 0000000..bd72067 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py @@ -0,0 +1,360 @@ +import functools +import httplib +import json +import os +from copy import deepcopy +from datetime import datetime + +import opnfv_testapi.resources.scenario_models as models +from opnfv_testapi.common import message +from opnfv_testapi.tests.unit.resources import test_base as base + + +class TestScenarioBase(base.TestBase): + def setUp(self): + super(TestScenarioBase, self).setUp() + self.get_res = models.Scenario + self.list_res = models.Scenarios + self.basePath = '/api/v1/scenarios' + self.req_d = self._load_request('scenario-c1.json') + self.req_2 = self._load_request('scenario-c2.json') + + def tearDown(self): + pass + + def assert_body(self, project, req=None): + pass + + @staticmethod + def _load_request(f_req): + abs_file = os.path.join(os.path.dirname(__file__), f_req) + with open(abs_file, 'r') as f: + loader = json.load(f) + f.close() + return loader + + def create_return_name(self, req): + _, res = self.create(req) + return res.href.split('/')[-1] + + def assert_res(self, code, scenario, req=None): + self.assertEqual(code, httplib.OK) + if req is None: + req = self.req_d + self.assertIsNotNone(scenario._id) + self.assertIsNotNone(scenario.creation_date) + + scenario == models.Scenario.from_dict(req) + + @staticmethod + def _set_query(*args): + uri = '' + for arg in args: + uri += arg + '&' + return uri[0: -1] + + def _get_and_assert(self, name, req=None): + code, body = self.get(name) + self.assert_res(code, body, req) + + +class TestScenarioCreate(TestScenarioBase): + def test_withoutBody(self): + (code, body) = self.create() + self.assertEqual(code, httplib.BAD_REQUEST) + + def test_emptyName(self): + req_empty = models.ScenarioCreateRequest('') + (code, body) = self.create(req_empty) + self.assertEqual(code, httplib.BAD_REQUEST) + self.assertIn(message.missing('name'), body) + + def test_noneName(self): + req_none = models.ScenarioCreateRequest(None) + (code, body) = self.create(req_none) + self.assertEqual(code, httplib.BAD_REQUEST) + self.assertIn(message.missing('name'), body) + + def test_success(self): + (code, body) = self.create_d() + self.assertEqual(code, httplib.OK) + self.assert_create_body(body) + + def test_alreadyExist(self): + self.create_d() + (code, body) = self.create_d() + self.assertEqual(code, httplib.FORBIDDEN) + self.assertIn(message.exist_base, body) + + +class TestScenarioGet(TestScenarioBase): + def setUp(self): + super(TestScenarioGet, self).setUp() + self.scenario_1 = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + + def test_getByName(self): + self._get_and_assert(self.scenario_1, self.req_d) + + def test_getAll(self): + self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) + + def test_queryName(self): + query = self._set_query('name=nosdn-nofeature-ha') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryInstaller(self): + query = self._set_query('installer=apex') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryVersion(self): + query = self._set_query('version=master') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryProject(self): + query = self._set_query('project=functest') + self._query_and_assert(query, reqs=[self.req_d, self.req_2]) + + def test_queryCombination(self): + query = self._set_query('name=nosdn-nofeature-ha', + 'installer=apex', + 'version=master', + 'project=functest') + + self._query_and_assert(query, reqs=[self.req_d]) + + def _query_and_assert(self, query, found=True, reqs=None): + code, body = self.query(query) + if not found: + self.assertEqual(code, httplib.OK) + self.assertEqual(0, len(body.scenarios)) + else: + self.assertEqual(len(reqs), len(body.scenarios)) + for req in reqs: + for scenario in body.scenarios: + if req['name'] == scenario.name: + self.assert_res(code, scenario, req) + + +class TestScenarioUpdate(TestScenarioBase): + def setUp(self): + super(TestScenarioUpdate, self).setUp() + self.scenario = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + + def _execute(set_update): + @functools.wraps(set_update) + def magic(self): + update, scenario = set_update(self, deepcopy(self.req_d)) + self._update_and_assert(update, scenario) + return magic + + def _update(expected): + def _update(set_update): + @functools.wraps(set_update) + def wrap(self): + update, scenario = set_update(self, deepcopy(self.req_d)) + code, body = self.update(update, self.scenario) + getattr(self, expected)(code, scenario) + return wrap + return _update + + @_update('_success') + def test_renameScenario(self, scenario): + new_name = 'nosdn-nofeature-noha' + scenario['name'] = new_name + update_req = models.ScenarioUpdateRequest(field='name', + op='update', + locate={}, + term={'name': new_name}) + return update_req, scenario + + @_update('_forbidden') + def test_renameScenario_exist(self, scenario): + new_name = self.scenario_2 + scenario['name'] = new_name + update_req = models.ScenarioUpdateRequest(field='name', + op='update', + locate={}, + term={'name': new_name}) + return update_req, scenario + + @_update('_bad_request') + def test_renameScenario_noName(self, scenario): + new_name = self.scenario_2 + scenario['name'] = new_name + update_req = models.ScenarioUpdateRequest(field='name', + op='update', + locate={}, + term={}) + return update_req, scenario + + @_execute + def test_addInstaller(self, scenario): + add = models.ScenarioInstaller(installer='daisy', versions=list()) + scenario['installers'].append(add.format()) + update = models.ScenarioUpdateRequest(field='installer', + op='add', + locate={}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteInstaller(self, scenario): + scenario['installers'] = filter(lambda f: f['installer'] != 'apex', + scenario['installers']) + + update = models.ScenarioUpdateRequest(field='installer', + op='delete', + locate={'installer': 'apex'}) + return update, scenario + + @_execute + def test_addVersion(self, scenario): + add = models.ScenarioVersion(version='danube', projects=list()) + scenario['installers'][0]['versions'].append(add.format()) + update = models.ScenarioUpdateRequest(field='version', + op='add', + locate={'installer': 'apex'}, + term=add.format()) + return update, scenario + + @_execute + def test_deleteVersion(self, scenario): + scenario['installers'][0]['versions'] = filter( + lambda f: f['version'] != 'master', + scenario['installers'][0]['versions']) + + update = models.ScenarioUpdateRequest(field='version', + op='delete', + locate={'installer': 'apex', + 'version': 'master'}) + return update, scenario + + @_execute + def test_changeOwner(self, scenario): + scenario['installers'][0]['versions'][0]['owner'] = 'lucy' + + update = models.ScenarioUpdateRequest(field='owner', + op='update', + locate={'installer': 'apex', + 'version': 'master'}, + term={'owner': 'lucy'}) + return update, scenario + + @_execute + def test_addProject(self, scenario): + add = models.ScenarioProject(project='qtip').format() + scenario['installers'][0]['versions'][0]['projects'].append(add) + update = models.ScenarioUpdateRequest(field='project', + op='add', + locate={'installer': 'apex', + 'version': 'master'}, + term=add) + return update, scenario + + @_execute + def test_deleteProject(self, scenario): + scenario['installers'][0]['versions'][0]['projects'] = filter( + lambda f: f['project'] != 'functest', + scenario['installers'][0]['versions'][0]['projects']) + + update = models.ScenarioUpdateRequest(field='project', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}) + return update, scenario + + @_execute + def test_addCustoms(self, scenario): + add = ['odl', 'parser', 'vping_ssh'] + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh'] + update = models.ScenarioUpdateRequest(field='customs', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add) + return update, scenario + + @_execute + def test_deleteCustoms(self, scenario): + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck'] + update = models.ScenarioUpdateRequest(field='customs', + op='delete', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=['vping_ssh']) + return update, scenario + + @_execute + def test_addScore(self, scenario): + add = models.ScenarioScore(date=str(datetime.now()), score='11/12') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['scores'].append(add.format()) + update = models.ScenarioUpdateRequest(field='score', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + @_execute + def test_addTi(self, scenario): + add = models.ScenarioTI(date=str(datetime.now()), status='gold') + projects = scenario['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['trust_indicators'].append(add.format()) + update = models.ScenarioUpdateRequest(field='trust_indicator', + op='add', + locate={ + 'installer': 'apex', + 'version': 'master', + 'project': 'functest'}, + term=add.format()) + return update, scenario + + def _update_and_assert(self, update_req, new_scenario, name=None): + code, _ = self.update(update_req, self.scenario) + self.assertEqual(code, httplib.OK) + self._get_and_assert(_none_default(name, self.scenario), + new_scenario) + + def _success(self, status, new_scenario): + self.assertEqual(status, httplib.OK) + self._get_and_assert(new_scenario.get('name'), new_scenario) + + def _forbidden(self, status, new_scenario): + self.assertEqual(status, httplib.FORBIDDEN) + + def _bad_request(self, status, new_scenario): + self.assertEqual(status, httplib.BAD_REQUEST) + + +class TestScenarioDelete(TestScenarioBase): + def test_notFound(self): + code, body = self.delete('notFound') + self.assertEqual(code, httplib.NOT_FOUND) + + def test_success(self): + scenario = self.create_return_name(self.req_d) + code, _ = self.delete(scenario) + self.assertEqual(code, httplib.OK) + code, _ = self.get(scenario) + self.assertEqual(code, httplib.NOT_FOUND) + + +def _none_default(check, default): + return check if check else default diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py b/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py new file mode 100644 index 0000000..4f2bc2a --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py @@ -0,0 +1,201 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import copy +import httplib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.resources import project_models +from opnfv_testapi.resources import testcase_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.resources import test_base as base + + +class TestCaseBase(base.TestBase): + def setUp(self): + super(TestCaseBase, self).setUp() + self.req_d = testcase_models.TestcaseCreateRequest('vping_1', + '/cases/vping_1', + 'vping-ssh test') + self.req_e = testcase_models.TestcaseCreateRequest('doctor_1', + '/cases/doctor_1', + 'create doctor') + self.update_d = testcase_models.TestcaseUpdateRequest('vping_1', + 'vping-ssh test', + 'functest') + self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1', + 'create doctor', + 'functest') + self.get_res = testcase_models.Testcase + self.list_res = testcase_models.Testcases + self.update_res = testcase_models.Testcase + self.basePath = '/api/v1/projects/%s/cases' + self.create_project() + + def assert_body(self, case, req=None): + if not req: + req = self.req_d + self.assertEqual(case.name, req.name) + self.assertEqual(case.description, req.description) + self.assertEqual(case.url, req.url) + self.assertIsNotNone(case._id) + self.assertIsNotNone(case.creation_date) + + def assert_update_body(self, old, new, req=None): + if not req: + req = self.req_d + self.assertEqual(new.name, req.name) + self.assertEqual(new.description, req.description) + self.assertEqual(new.url, old.url) + self.assertIsNotNone(new._id) + self.assertIsNotNone(new.creation_date) + + def create_project(self): + req_p = project_models.ProjectCreateRequest('functest', + 'vping-ssh test') + self.create_help('/api/v1/projects', req_p) + self.project = req_p.name + + def create_d(self): + return super(TestCaseBase, self).create_d(self.project) + + def create_e(self): + return super(TestCaseBase, self).create_e(self.project) + + def get(self, case=None): + return super(TestCaseBase, self).get(self.project, case) + + def create(self, req=None, *args): + return super(TestCaseBase, self).create(req, self.project) + + def update(self, new=None, case=None): + return super(TestCaseBase, self).update(new, self.project, case) + + def delete(self, case): + return super(TestCaseBase, self).delete(self.project, case) + + +class TestCaseCreate(TestCaseBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_noBody(self): + return None + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noProject(self): + self.project = 'noProject' + return self.req_d + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_emptyName(self): + req_empty = testcase_models.TestcaseCreateRequest('') + return req_empty + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_noneName(self): + req_none = testcase_models.TestcaseCreateRequest(None) + return req_none + + @executor.create(httplib.OK, '_assert_success') + def test_success(self): + return self.req_d + + def _assert_success(self, body): + self.assert_create_body(body, self.req_d, self.project) + + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExist(self): + self.create_d() + return self.req_d + + +class TestCaseGet(TestCaseBase): + def setUp(self): + super(TestCaseGet, self).setUp() + self.create_d() + self.create_e() + + @executor.get(httplib.NOT_FOUND, message.not_found_base) + def test_notExist(self): + return 'notExist' + + @executor.get(httplib.OK, 'assert_body') + def test_getOne(self): + return self.req_d.name + + @executor.get(httplib.OK, '_list') + def test_list(self): + return None + + def _list(self, body): + for case in body.testcases: + if self.req_d.name == case.name: + self.assert_body(case) + else: + self.assert_body(case, self.req_e) + + +class TestCaseUpdate(TestCaseBase): + def setUp(self): + super(TestCaseUpdate, self).setUp() + self.create_d() + + @executor.update(httplib.BAD_REQUEST, message.no_body()) + def test_noBody(self): + return None, 'noBody' + + @executor.update(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return self.update_e, 'notFound' + + @executor.update(httplib.FORBIDDEN, message.exist_base) + def test_newNameExist(self): + self.create_e() + return self.update_e, self.req_d.name + + @executor.update(httplib.FORBIDDEN, message.no_update()) + def test_noUpdate(self): + return self.update_d, self.req_d.name + + @executor.update(httplib.OK, '_update_success') + def test_success(self): + return self.update_e, self.req_d.name + + @executor.update(httplib.OK, '_update_success') + def test_with_dollar(self): + update = copy.deepcopy(self.update_d) + update.description = {'2. change': 'dollar change'} + return update, self.req_d.name + + def _update_success(self, request, body): + self.assert_update_body(self.req_d, body, request) + _, new_body = self.get(request.name) + self.assert_update_body(self.req_d, new_body, request) + + +class TestCaseDelete(TestCaseBase): + def setUp(self): + super(TestCaseDelete, self).setUp() + self.create_d() + + @executor.delete(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return 'notFound' + + @executor.delete(httplib.OK, '_delete_success') + def test_success(self): + return self.req_d.name + + def _delete_success(self, body): + self.assertEqual(body, '') + code, body = self.get(self.req_d.name) + self.assertEqual(code, httplib.NOT_FOUND) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_token.py b/testapi/opnfv_testapi/tests/unit/resources/test_token.py new file mode 100644 index 0000000..c9d4b72 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/test_token.py @@ -0,0 +1,113 @@ +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +import httplib +import unittest + +from tornado import web + +from opnfv_testapi.common import message +from opnfv_testapi.resources import project_models +from opnfv_testapi.router import url_mappings +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit import fake_pymongo +from opnfv_testapi.tests.unit.resources import test_base as base + + +class TestToken(base.TestBase): + def get_app(self): + return web.Application( + url_mappings.mappings, + db=fake_pymongo, + debug=True, + auth=True + ) + + +class TestTokenCreateProject(TestToken): + def setUp(self): + super(TestTokenCreateProject, self).setUp() + self.req_d = project_models.ProjectCreateRequest('vping') + fake_pymongo.tokens.insert({"access_token": "12345"}) + self.basePath = '/api/v1/projects' + + @executor.create(httplib.FORBIDDEN, message.invalid_token()) + def test_projectCreateTokenInvalid(self): + self.headers['X-Auth-Token'] = '1234' + return self.req_d + + @executor.create(httplib.UNAUTHORIZED, message.unauthorized()) + def test_projectCreateTokenUnauthorized(self): + if 'X-Auth-Token' in self.headers: + self.headers.pop('X-Auth-Token') + return self.req_d + + @executor.create(httplib.OK, '_create_success') + def test_projectCreateTokenSuccess(self): + self.headers['X-Auth-Token'] = '12345' + return self.req_d + + def _create_success(self, body): + self.assertIn('CreateResponse', str(type(body))) + + +class TestTokenDeleteProject(TestToken): + def setUp(self): + super(TestTokenDeleteProject, self).setUp() + self.req_d = project_models.ProjectCreateRequest('vping') + fake_pymongo.tokens.insert({"access_token": "12345"}) + self.basePath = '/api/v1/projects' + self.headers['X-Auth-Token'] = '12345' + self.create_d() + + @executor.delete(httplib.FORBIDDEN, message.invalid_token()) + def test_projectDeleteTokenIvalid(self): + self.headers['X-Auth-Token'] = '1234' + return self.req_d.name + + @executor.delete(httplib.UNAUTHORIZED, message.unauthorized()) + def test_projectDeleteTokenUnauthorized(self): + self.headers.pop('X-Auth-Token') + return self.req_d.name + + @executor.delete(httplib.OK, '_delete_success') + def test_projectDeleteTokenSuccess(self): + return self.req_d.name + + def _delete_success(self, body): + self.assertEqual('', body) + + +class TestTokenUpdateProject(TestToken): + def setUp(self): + super(TestTokenUpdateProject, self).setUp() + self.req_d = project_models.ProjectCreateRequest('vping') + fake_pymongo.tokens.insert({"access_token": "12345"}) + self.basePath = '/api/v1/projects' + self.headers['X-Auth-Token'] = '12345' + self.create_d() + + @executor.update(httplib.FORBIDDEN, message.invalid_token()) + def test_projectUpdateTokenIvalid(self): + self.headers['X-Auth-Token'] = '1234' + req = project_models.ProjectUpdateRequest('newName', 'new description') + return req, self.req_d.name + + @executor.update(httplib.UNAUTHORIZED, message.unauthorized()) + def test_projectUpdateTokenUnauthorized(self): + self.headers.pop('X-Auth-Token') + req = project_models.ProjectUpdateRequest('newName', 'new description') + return req, self.req_d.name + + @executor.update(httplib.OK, '_update_success') + def test_projectUpdateTokenSuccess(self): + req = project_models.ProjectUpdateRequest('newName', 'new description') + return req, self.req_d.name + + def _update_success(self, request, body): + self.assertIn(request.name, body) + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_version.py b/testapi/opnfv_testapi/tests/unit/resources/test_version.py new file mode 100644 index 0000000..51fed11 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/resources/test_version.py @@ -0,0 +1,36 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import httplib +import unittest + +from opnfv_testapi.resources import models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.resources import test_base as base + + +class TestVersionBase(base.TestBase): + def setUp(self): + super(TestVersionBase, self).setUp() + self.list_res = models.Versions + self.basePath = '/versions' + + +class TestVersion(TestVersionBase): + @executor.get(httplib.OK, '_get_success') + def test_success(self): + return None + + def _get_success(self, body): + self.assertEqual(len(body.versions), 1) + self.assertEqual(body.versions[0].version, 'v1.0') + self.assertEqual(body.versions[0].description, 'basics') + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/scenario-c1.json b/testapi/opnfv_testapi/tests/unit/scenario-c1.json deleted file mode 100644 index 1878022..0000000 --- a/testapi/opnfv_testapi/tests/unit/scenario-c1.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "name": "nosdn-nofeature-ha", - "installers": - [ - { - "installer": "apex", - "versions": - [ - { - "owner": "Luke", - "version": "master", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": - [ - { - "date": "2017-01-08 22:46:44", - "score": "12/14" - } - - ], - "trust_indicators": [] - }, - { - "project": "yardstick", - "customs": [], - "scores": [], - "trust_indicators": [] - } - ] - } - ] - } - ] -} diff --git a/testapi/opnfv_testapi/tests/unit/scenario-c2.json b/testapi/opnfv_testapi/tests/unit/scenario-c2.json deleted file mode 100644 index b6a3b83..0000000 --- a/testapi/opnfv_testapi/tests/unit/scenario-c2.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "name": "odl_2-nofeature-ha", - "installers": - [ - { - "installer": "fuel", - "versions": - [ - { - "owner": "Lucky", - "version": "colorado", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": [], - "trust_indicators": [ - { - "date": "2017-01-18 22:46:44", - "status": "silver" - } - - ] - }, - { - "project": "yardstick", - "customs": ["suite-a"], - "scores": [ - { - "date": "2017-01-08 22:46:44", - "score": "0" - } - ], - "trust_indicators": [ - { - "date": "2017-01-18 22:46:44", - "status": "gold" - } - ] - } - ] - }, - { - "owner": "Luke", - "version": "colorado", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": - [ - { - "date": "2017-01-09 22:46:44", - "score": "11/14" - } - - ], - "trust_indicators": [] - }, - { - "project": "yardstick", - "customs": [], - "scores": [], - "trust_indicators": [] - } - ] - } - ] - } - ] -} diff --git a/testapi/opnfv_testapi/tests/unit/test_base.py b/testapi/opnfv_testapi/tests/unit/test_base.py deleted file mode 100644 index 4d34456..0000000 --- a/testapi/opnfv_testapi/tests/unit/test_base.py +++ /dev/null @@ -1,164 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import json -from os import path - -import mock -from tornado import testing - -from opnfv_testapi.common import config -from opnfv_testapi.resources import models -from opnfv_testapi.tests.unit import fake_pymongo - -config.Config.CONFIG = path.join(path.dirname(__file__), - '../../../etc/config.ini') - - -class TestBase(testing.AsyncHTTPTestCase): - headers = {'Content-Type': 'application/json; charset=UTF-8'} - - def setUp(self): - self._patch_server() - self.basePath = '' - self.create_res = models.CreateResponse - self.get_res = None - self.list_res = None - self.update_res = None - self.req_d = None - self.req_e = None - self.addCleanup(self._clear) - super(TestBase, self).setUp() - - def tearDown(self): - self.db_patcher.stop() - - def _patch_server(self): - from opnfv_testapi.cmd import server - server.parse_config([ - '--config-file', - path.join(path.dirname(__file__), 'common/normal.ini') - ]) - self.db_patcher = mock.patch('opnfv_testapi.cmd.server.get_db', - self._fake_pymongo) - self.db_patcher.start() - - @staticmethod - def _fake_pymongo(): - return fake_pymongo - - def get_app(self): - from opnfv_testapi.cmd import server - return server.make_app() - - def create_d(self, *args): - return self.create(self.req_d, *args) - - def create_e(self, *args): - return self.create(self.req_e, *args) - - def create(self, req=None, *args): - return self.create_help(self.basePath, req, *args) - - def create_help(self, uri, req, *args): - if req and not isinstance(req, str) and hasattr(req, 'format'): - req = req.format() - res = self.fetch(self._update_uri(uri, *args), - method='POST', - body=json.dumps(req), - headers=self.headers) - - return self._get_return(res, self.create_res) - - def get(self, *args): - res = self.fetch(self._get_uri(*args), - method='GET', - headers=self.headers) - - def inner(): - new_args, num = self._get_valid_args(*args) - return self.get_res \ - if num != self._need_arg_num(self.basePath) else self.list_res - return self._get_return(res, inner()) - - def query(self, query): - res = self.fetch(self._get_query_uri(query), - method='GET', - headers=self.headers) - return self._get_return(res, self.list_res) - - def update(self, new=None, *args): - if new: - new = new.format() - res = self.fetch(self._get_uri(*args), - method='PUT', - body=json.dumps(new), - headers=self.headers) - return self._get_return(res, self.update_res) - - def delete(self, *args): - res = self.fetch(self._get_uri(*args), - method='DELETE', - headers=self.headers) - return res.code, res.body - - @staticmethod - def _get_valid_args(*args): - new_args = tuple(['%s' % arg for arg in args if arg is not None]) - return new_args, len(new_args) - - def _need_arg_num(self, uri): - return uri.count('%s') - - def _get_query_uri(self, query): - return self.basePath + '?' + query if query else self.basePath - - def _get_uri(self, *args): - return self._update_uri(self.basePath, *args) - - def _update_uri(self, uri, *args): - r_uri = uri - new_args, num = self._get_valid_args(*args) - if num != self._need_arg_num(uri): - r_uri += '/%s' - - return r_uri % tuple(['%s' % arg for arg in new_args]) - - def _get_return(self, res, cls): - code = res.code - body = res.body - return code, self._get_return_body(code, body, cls) - - @staticmethod - def _get_return_body(code, body, cls): - return cls.from_dict(json.loads(body)) if code < 300 and cls else body - - def assert_href(self, body): - self.assertIn(self.basePath, body.href) - - def assert_create_body(self, body, req=None, *args): - import inspect - if not req: - req = self.req_d - resource_name = '' - if inspect.isclass(req): - resource_name = req.name - elif isinstance(req, dict): - resource_name = req['name'] - elif isinstance(req, str): - resource_name = json.loads(req)['name'] - new_args = args + tuple([resource_name]) - self.assertIn(self._get_uri(*new_args), body.href) - - @staticmethod - def _clear(): - fake_pymongo.pods.clear() - fake_pymongo.projects.clear() - fake_pymongo.testcases.clear() - fake_pymongo.results.clear() - fake_pymongo.scenarios.clear() diff --git a/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py b/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py deleted file mode 100644 index 1ebc96f..0000000 --- a/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py +++ /dev/null @@ -1,123 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import unittest - -from tornado import gen -from tornado import testing -from tornado import web - -from opnfv_testapi.tests.unit import fake_pymongo - - -class MyTest(testing.AsyncHTTPTestCase): - def setUp(self): - super(MyTest, self).setUp() - self.db = fake_pymongo - self.addCleanup(self._clear) - self.io_loop.run_sync(self.fixture_setup) - - def get_app(self): - return web.Application() - - @gen.coroutine - def fixture_setup(self): - self.test1 = {'_id': '1', 'name': 'test1'} - self.test2 = {'name': 'test2'} - yield self.db.pods.insert({'_id': '1', 'name': 'test1'}) - yield self.db.pods.insert({'name': 'test2'}) - - @testing.gen_test - def test_find_one(self): - user = yield self.db.pods.find_one({'name': 'test1'}) - self.assertEqual(user, self.test1) - self.db.pods.remove() - - @testing.gen_test - def test_find(self): - cursor = self.db.pods.find() - names = [] - while (yield cursor.fetch_next): - ob = cursor.next_object() - names.append(ob.get('name')) - self.assertItemsEqual(names, ['test1', 'test2']) - - @testing.gen_test - def test_update(self): - yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'}) - user = yield self.db.pods.find_one({'_id': '1'}) - self.assertEqual(user.get('name', None), 'new_test1') - - def test_update_dot_error(self): - self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, - 'key 1. name must not contain .') - - def test_update_dot_no_error(self): - self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, - None, - check_keys=False) - - def test_update_dollar_error(self): - self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, - 'key $name must not start with $') - - def test_update_dollar_no_error(self): - self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, - None, - check_keys=False) - - @testing.gen_test - def test_remove(self): - yield self.db.pods.remove({'_id': '1'}) - user = yield self.db.pods.find_one({'_id': '1'}) - self.assertIsNone(user) - - def test_insert_dot_error(self): - self._insert_assert({'_id': '1', '2. name': 'test1'}, - 'key 2. name must not contain .') - - def test_insert_dot_no_error(self): - self._insert_assert({'_id': '1', '2. name': 'test1'}, - None, - check_keys=False) - - def test_insert_dollar_error(self): - self._insert_assert({'_id': '1', '$name': 'test1'}, - 'key $name must not start with $') - - def test_insert_dollar_no_error(self): - self._insert_assert({'_id': '1', '$name': 'test1'}, - None, - check_keys=False) - - def _clear(self): - self.db.pods.clear() - - def _update_assert(self, docs, error=None, **kwargs): - self._db_assert('update', error, {'_id': '1'}, docs, **kwargs) - - def _insert_assert(self, docs, error=None, **kwargs): - self._db_assert('insert', error, docs, **kwargs) - - @testing.gen_test - def _db_assert(self, method, error, *args, **kwargs): - name_error = None - try: - yield self._eval_pods_db(method, *args, **kwargs) - except NameError as err: - name_error = err.args[0] - finally: - self.assertEqual(name_error, error) - - def _eval_pods_db(self, method, *args, **kwargs): - table_obj = vars(self.db)['pods'] - return table_obj.__getattribute__(method)(*args, **kwargs) - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/test_pod.py b/testapi/opnfv_testapi/tests/unit/test_pod.py deleted file mode 100644 index 0ed348d..0000000 --- a/testapi/opnfv_testapi/tests/unit/test_pod.py +++ /dev/null @@ -1,89 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import pod_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import test_base as base - - -class TestPodBase(base.TestBase): - def setUp(self): - super(TestPodBase, self).setUp() - self.req_d = pod_models.PodCreateRequest('zte-1', 'virtual', - 'zte pod 1', 'ci-pod') - self.req_e = pod_models.PodCreateRequest('zte-2', 'metal', 'zte pod 2') - self.get_res = pod_models.Pod - self.list_res = pod_models.Pods - self.basePath = '/api/v1/pods' - - def assert_get_body(self, pod, req=None): - if not req: - req = self.req_d - self.assertEqual(pod.name, req.name) - self.assertEqual(pod.mode, req.mode) - self.assertEqual(pod.details, req.details) - self.assertEqual(pod.role, req.role) - self.assertIsNotNone(pod.creation_date) - self.assertIsNotNone(pod._id) - - -class TestPodCreate(TestPodBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - return pod_models.PodCreateRequest('') - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - return pod_models.PodCreateRequest(None) - - @executor.create(httplib.OK, 'assert_create_body') - def test_success(self): - return self.req_d - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestPodGet(TestPodBase): - def setUp(self): - super(TestPodGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_get_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_assert_list') - def test_list(self): - return None - - def _assert_list(self, body): - self.assertEqual(len(body.pods), 2) - for pod in body.pods: - if self.req_d.name == pod.name: - self.assert_get_body(pod) - else: - self.assert_get_body(pod, self.req_e) - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/test_project.py b/testapi/opnfv_testapi/tests/unit/test_project.py deleted file mode 100644 index 323a116..0000000 --- a/testapi/opnfv_testapi/tests/unit/test_project.py +++ /dev/null @@ -1,136 +0,0 @@ -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import project_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import test_base as base - - -class TestProjectBase(base.TestBase): - def setUp(self): - super(TestProjectBase, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping', - 'vping-ssh test') - self.req_e = project_models.ProjectCreateRequest('doctor', - 'doctor test') - self.get_res = project_models.Project - self.list_res = project_models.Projects - self.update_res = project_models.Project - self.basePath = '/api/v1/projects' - - def assert_body(self, project, req=None): - if not req: - req = self.req_d - self.assertEqual(project.name, req.name) - self.assertEqual(project.description, req.description) - self.assertIsNotNone(project._id) - self.assertIsNotNone(project.creation_date) - - -class TestProjectCreate(TestProjectBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - return project_models.ProjectCreateRequest('') - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - return project_models.ProjectCreateRequest(None) - - @executor.create(httplib.OK, 'assert_create_body') - def test_success(self): - return self.req_d - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestProjectGet(TestProjectBase): - def setUp(self): - super(TestProjectGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_assert_list') - def test_list(self): - return None - - def _assert_list(self, body): - for project in body.projects: - if self.req_d.name == project.name: - self.assert_body(project) - else: - self.assert_body(project, self.req_e) - - -class TestProjectUpdate(TestProjectBase): - def setUp(self): - super(TestProjectUpdate, self).setUp() - _, d_body = self.create_d() - _, get_res = self.get(self.req_d.name) - self.index_d = get_res._id - self.create_e() - - @executor.update(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None, 'noBody' - - @executor.update(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return self.req_e, 'notFound' - - @executor.update(httplib.FORBIDDEN, message.exist_base) - def test_newNameExist(self): - return self.req_e, self.req_d.name - - @executor.update(httplib.FORBIDDEN, message.no_update()) - def test_noUpdate(self): - return self.req_d, self.req_d.name - - @executor.update(httplib.OK, '_assert_update') - def test_success(self): - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - def _assert_update(self, req, body): - self.assertEqual(self.index_d, body._id) - self.assert_body(body, req) - _, new_body = self.get(req.name) - self.assertEqual(self.index_d, new_body._id) - self.assert_body(new_body, req) - - -class TestProjectDelete(TestProjectBase): - def setUp(self): - super(TestProjectDelete, self).setUp() - self.create_d() - - @executor.delete(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return 'notFound' - - @executor.delete(httplib.OK, '_assert_delete') - def test_success(self): - return self.req_d.name - - def _assert_delete(self, body): - self.assertEqual(body, '') - code, body = self.get(self.req_d.name) - self.assertEqual(code, httplib.NOT_FOUND) - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/test_result.py b/testapi/opnfv_testapi/tests/unit/test_result.py deleted file mode 100644 index ef2ce30..0000000 --- a/testapi/opnfv_testapi/tests/unit/test_result.py +++ /dev/null @@ -1,352 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import copy -from datetime import datetime, timedelta -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import pod_models -from opnfv_testapi.resources import project_models -from opnfv_testapi.resources import result_models -from opnfv_testapi.resources import testcase_models -from opnfv_testapi.tests.unit import test_base as base -from opnfv_testapi.tests.unit import executor - - -class Details(object): - def __init__(self, timestart=None, duration=None, status=None): - self.timestart = timestart - self.duration = duration - self.status = status - self.items = [{'item1': 1}, {'item2': 2}] - - def format(self): - return { - "timestart": self.timestart, - "duration": self.duration, - "status": self.status, - 'items': [{'item1': 1}, {'item2': 2}] - } - - @staticmethod - def from_dict(a_dict): - - if a_dict is None: - return None - - t = Details() - t.timestart = a_dict.get('timestart') - t.duration = a_dict.get('duration') - t.status = a_dict.get('status') - t.items = a_dict.get('items') - return t - - -class TestResultBase(base.TestBase): - def setUp(self): - self.pod = 'zte-pod1' - self.project = 'functest' - self.case = 'vPing' - self.installer = 'fuel' - self.version = 'C' - self.build_tag = 'v3.0' - self.scenario = 'odl-l2' - self.criteria = 'passed' - self.trust_indicator = result_models.TI(0.7) - self.start_date = "2016-05-23 07:16:09.477097" - self.stop_date = "2016-05-23 07:16:19.477097" - self.update_date = "2016-05-24 07:16:19.477097" - self.update_step = -0.05 - super(TestResultBase, self).setUp() - self.details = Details(timestart='0', duration='9s', status='OK') - self.req_d = result_models.ResultCreateRequest( - pod_name=self.pod, - project_name=self.project, - case_name=self.case, - installer=self.installer, - version=self.version, - start_date=self.start_date, - stop_date=self.stop_date, - details=self.details.format(), - build_tag=self.build_tag, - scenario=self.scenario, - criteria=self.criteria, - trust_indicator=self.trust_indicator) - self.get_res = result_models.TestResult - self.list_res = result_models.TestResults - self.update_res = result_models.TestResult - self.basePath = '/api/v1/results' - self.req_pod = pod_models.PodCreateRequest( - self.pod, - 'metal', - 'zte pod 1') - self.req_project = project_models.ProjectCreateRequest( - self.project, - 'vping test') - self.req_testcase = testcase_models.TestcaseCreateRequest( - self.case, - '/cases/vping', - 'vping-ssh test') - self.create_help('/api/v1/pods', self.req_pod) - self.create_help('/api/v1/projects', self.req_project) - self.create_help('/api/v1/projects/%s/cases', - self.req_testcase, - self.project) - - def assert_res(self, result, req=None): - if req is None: - req = self.req_d - self.assertEqual(result.pod_name, req.pod_name) - self.assertEqual(result.project_name, req.project_name) - self.assertEqual(result.case_name, req.case_name) - self.assertEqual(result.installer, req.installer) - self.assertEqual(result.version, req.version) - details_req = Details.from_dict(req.details) - details_res = Details.from_dict(result.details) - self.assertEqual(details_res.duration, details_req.duration) - self.assertEqual(details_res.timestart, details_req.timestart) - self.assertEqual(details_res.status, details_req.status) - self.assertEqual(details_res.items, details_req.items) - self.assertEqual(result.build_tag, req.build_tag) - self.assertEqual(result.scenario, req.scenario) - self.assertEqual(result.criteria, req.criteria) - self.assertEqual(result.start_date, req.start_date) - self.assertEqual(result.stop_date, req.stop_date) - self.assertIsNotNone(result._id) - ti = result.trust_indicator - self.assertEqual(ti.current, req.trust_indicator.current) - if ti.histories: - history = ti.histories[0] - self.assertEqual(history.date, self.update_date) - self.assertEqual(history.step, self.update_step) - - def _create_d(self): - _, res = self.create_d() - return res.href.split('/')[-1] - - -class TestResultCreate(TestResultBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_nobody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('pod_name')) - def test_podNotProvided(self): - req = self.req_d - req.pod_name = None - return req - - @executor.create(httplib.BAD_REQUEST, message.missing('project_name')) - def test_projectNotProvided(self): - req = self.req_d - req.project_name = None - return req - - @executor.create(httplib.BAD_REQUEST, message.missing('case_name')) - def test_testcaseNotProvided(self): - req = self.req_d - req.case_name = None - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noPod(self): - req = self.req_d - req.pod_name = 'notExistPod' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noProject(self): - req = self.req_d - req.project_name = 'notExistProject' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noTestcase(self): - req = self.req_d - req.case_name = 'notExistTestcase' - return req - - @executor.create(httplib.OK, 'assert_href') - def test_success(self): - return self.req_d - - @executor.create(httplib.OK, 'assert_href') - def test_key_with_doc(self): - req = copy.deepcopy(self.req_d) - req.details = {'1.name': 'dot_name'} - return req - - @executor.create(httplib.OK, '_assert_no_ti') - def test_no_ti(self): - req = result_models.ResultCreateRequest(pod_name=self.pod, - project_name=self.project, - case_name=self.case, - installer=self.installer, - version=self.version, - start_date=self.start_date, - stop_date=self.stop_date, - details=self.details.format(), - build_tag=self.build_tag, - scenario=self.scenario, - criteria=self.criteria) - self.actual_req = req - return req - - def _assert_no_ti(self, body): - _id = body.href.split('/')[-1] - code, body = self.get(_id) - self.assert_res(body, self.actual_req) - - -class TestResultGet(TestResultBase): - def setUp(self): - super(TestResultGet, self).setUp() - self.req_d_id = self._create_d() - self.req_10d_later = self._create_changed_date(days=10) - self.req_10d_before = self._create_changed_date(days=-10) - - @executor.get(httplib.OK, 'assert_res') - def test_getOne(self): - return self.req_d_id - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryPod(self): - return self._set_query('pod') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryProject(self): - return self._set_query('project') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryTestcase(self): - return self._set_query('case') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryVersion(self): - return self._set_query('version') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryInstaller(self): - return self._set_query('installer') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryBuildTag(self): - return self._set_query('build_tag') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryScenario(self): - return self._set_query('scenario') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryTrustIndicator(self): - return self._set_query('trust_indicator') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryCriteria(self): - return self._set_query('criteria') - - @executor.query(httplib.BAD_REQUEST, message.must_int('period')) - def test_queryPeriodNotInt(self): - return self._set_query('period=a') - - @executor.query(httplib.OK, '_query_last_one', 1) - def test_queryPeriodSuccess(self): - return self._set_query('period=1') - - @executor.query(httplib.BAD_REQUEST, message.must_int('last')) - def test_queryLastNotInt(self): - return self._set_query('last=a') - - @executor.query(httplib.OK, '_query_last_one', 1) - def test_queryLast(self): - return self._set_query('last=1') - - @executor.query(httplib.OK, '_query_last_one', 1) - def test_combination(self): - return self._set_query('pod', - 'project', - 'case', - 'version', - 'installer', - 'build_tag', - 'scenario', - 'trust_indicator', - 'criteria', - 'period=1') - - @executor.query(httplib.OK, '_query_success', 0) - def test_notFound(self): - return self._set_query('pod=notExistPod', - 'project', - 'case', - 'version', - 'installer', - 'build_tag', - 'scenario', - 'trust_indicator', - 'criteria', - 'period=1') - - def _query_success(self, body, number): - self.assertEqual(number, len(body.results)) - - def _query_last_one(self, body, number): - self.assertEqual(number, len(body.results)) - self.assert_res(body.results[0], self.req_10d_later) - - def _create_changed_date(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.start_date = datetime.now() + timedelta(**kwargs) - req.stop_date = str(req.start_date + timedelta(minutes=10)) - req.start_date = str(req.start_date) - self.create(req) - return req - - def _set_query(self, *args): - def get_value(arg): - return self.__getattribute__(arg) \ - if arg != 'trust_indicator' else self.trust_indicator.current - uri = '' - for arg in args: - if '=' in arg: - uri += arg + '&' - else: - uri += '{}={}&'.format(arg, get_value(arg)) - return uri[0: -1] - - -class TestResultUpdate(TestResultBase): - def setUp(self): - super(TestResultUpdate, self).setUp() - self.req_d_id = self._create_d() - - @executor.update(httplib.OK, '_assert_update_ti') - def test_success(self): - new_ti = copy.deepcopy(self.trust_indicator) - new_ti.current += self.update_step - new_ti.histories.append( - result_models.TIHistory(self.update_date, self.update_step)) - new_data = copy.deepcopy(self.req_d) - new_data.trust_indicator = new_ti - update = result_models.ResultUpdateRequest(trust_indicator=new_ti) - self.update_req = new_data - return update, self.req_d_id - - def _assert_update_ti(self, request, body): - ti = body.trust_indicator - self.assertEqual(ti.current, request.trust_indicator.current) - if ti.histories: - history = ti.histories[0] - self.assertEqual(history.date, self.update_date) - self.assertEqual(history.step, self.update_step) - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/test_scenario.py b/testapi/opnfv_testapi/tests/unit/test_scenario.py deleted file mode 100644 index b232bc1..0000000 --- a/testapi/opnfv_testapi/tests/unit/test_scenario.py +++ /dev/null @@ -1,360 +0,0 @@ -from copy import deepcopy -from datetime import datetime -import functools -import httplib -import json -import os - -from opnfv_testapi.common import message -import opnfv_testapi.resources.scenario_models as models -from opnfv_testapi.tests.unit import test_base as base - - -class TestScenarioBase(base.TestBase): - def setUp(self): - super(TestScenarioBase, self).setUp() - self.get_res = models.Scenario - self.list_res = models.Scenarios - self.basePath = '/api/v1/scenarios' - self.req_d = self._load_request('scenario-c1.json') - self.req_2 = self._load_request('scenario-c2.json') - - def tearDown(self): - pass - - def assert_body(self, project, req=None): - pass - - @staticmethod - def _load_request(f_req): - abs_file = os.path.join(os.path.dirname(__file__), f_req) - with open(abs_file, 'r') as f: - loader = json.load(f) - f.close() - return loader - - def create_return_name(self, req): - _, res = self.create(req) - return res.href.split('/')[-1] - - def assert_res(self, code, scenario, req=None): - self.assertEqual(code, httplib.OK) - if req is None: - req = self.req_d - self.assertIsNotNone(scenario._id) - self.assertIsNotNone(scenario.creation_date) - - scenario == models.Scenario.from_dict(req) - - @staticmethod - def _set_query(*args): - uri = '' - for arg in args: - uri += arg + '&' - return uri[0: -1] - - def _get_and_assert(self, name, req=None): - code, body = self.get(name) - self.assert_res(code, body, req) - - -class TestScenarioCreate(TestScenarioBase): - def test_withoutBody(self): - (code, body) = self.create() - self.assertEqual(code, httplib.BAD_REQUEST) - - def test_emptyName(self): - req_empty = models.ScenarioCreateRequest('') - (code, body) = self.create(req_empty) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_noneName(self): - req_none = models.ScenarioCreateRequest(None) - (code, body) = self.create(req_none) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_success(self): - (code, body) = self.create_d() - self.assertEqual(code, httplib.OK) - self.assert_create_body(body) - - def test_alreadyExist(self): - self.create_d() - (code, body) = self.create_d() - self.assertEqual(code, httplib.FORBIDDEN) - self.assertIn(message.exist_base, body) - - -class TestScenarioGet(TestScenarioBase): - def setUp(self): - super(TestScenarioGet, self).setUp() - self.scenario_1 = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def test_getByName(self): - self._get_and_assert(self.scenario_1, self.req_d) - - def test_getAll(self): - self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) - - def test_queryName(self): - query = self._set_query('name=nosdn-nofeature-ha') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryInstaller(self): - query = self._set_query('installer=apex') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryVersion(self): - query = self._set_query('version=master') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryProject(self): - query = self._set_query('project=functest') - self._query_and_assert(query, reqs=[self.req_d, self.req_2]) - - def test_queryCombination(self): - query = self._set_query('name=nosdn-nofeature-ha', - 'installer=apex', - 'version=master', - 'project=functest') - - self._query_and_assert(query, reqs=[self.req_d]) - - def _query_and_assert(self, query, found=True, reqs=None): - code, body = self.query(query) - if not found: - self.assertEqual(code, httplib.OK) - self.assertEqual(0, len(body.scenarios)) - else: - self.assertEqual(len(reqs), len(body.scenarios)) - for req in reqs: - for scenario in body.scenarios: - if req['name'] == scenario.name: - self.assert_res(code, scenario, req) - - -class TestScenarioUpdate(TestScenarioBase): - def setUp(self): - super(TestScenarioUpdate, self).setUp() - self.scenario = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def _execute(set_update): - @functools.wraps(set_update) - def magic(self): - update, scenario = set_update(self, deepcopy(self.req_d)) - self._update_and_assert(update, scenario) - return magic - - def _update(expected): - def _update(set_update): - @functools.wraps(set_update) - def wrap(self): - update, scenario = set_update(self, deepcopy(self.req_d)) - code, body = self.update(update, self.scenario) - getattr(self, expected)(code, scenario) - return wrap - return _update - - @_update('_success') - def test_renameScenario(self, scenario): - new_name = 'nosdn-nofeature-noha' - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={'name': new_name}) - return update_req, scenario - - @_update('_forbidden') - def test_renameScenario_exist(self, scenario): - new_name = self.scenario_2 - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={'name': new_name}) - return update_req, scenario - - @_update('_bad_request') - def test_renameScenario_noName(self, scenario): - new_name = self.scenario_2 - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={}) - return update_req, scenario - - @_execute - def test_addInstaller(self, scenario): - add = models.ScenarioInstaller(installer='daisy', versions=list()) - scenario['installers'].append(add.format()) - update = models.ScenarioUpdateRequest(field='installer', - op='add', - locate={}, - term=add.format()) - return update, scenario - - @_execute - def test_deleteInstaller(self, scenario): - scenario['installers'] = filter(lambda f: f['installer'] != 'apex', - scenario['installers']) - - update = models.ScenarioUpdateRequest(field='installer', - op='delete', - locate={'installer': 'apex'}) - return update, scenario - - @_execute - def test_addVersion(self, scenario): - add = models.ScenarioVersion(version='danube', projects=list()) - scenario['installers'][0]['versions'].append(add.format()) - update = models.ScenarioUpdateRequest(field='version', - op='add', - locate={'installer': 'apex'}, - term=add.format()) - return update, scenario - - @_execute - def test_deleteVersion(self, scenario): - scenario['installers'][0]['versions'] = filter( - lambda f: f['version'] != 'master', - scenario['installers'][0]['versions']) - - update = models.ScenarioUpdateRequest(field='version', - op='delete', - locate={'installer': 'apex', - 'version': 'master'}) - return update, scenario - - @_execute - def test_changeOwner(self, scenario): - scenario['installers'][0]['versions'][0]['owner'] = 'lucy' - - update = models.ScenarioUpdateRequest(field='owner', - op='update', - locate={'installer': 'apex', - 'version': 'master'}, - term={'owner': 'lucy'}) - return update, scenario - - @_execute - def test_addProject(self, scenario): - add = models.ScenarioProject(project='qtip').format() - scenario['installers'][0]['versions'][0]['projects'].append(add) - update = models.ScenarioUpdateRequest(field='project', - op='add', - locate={'installer': 'apex', - 'version': 'master'}, - term=add) - return update, scenario - - @_execute - def test_deleteProject(self, scenario): - scenario['installers'][0]['versions'][0]['projects'] = filter( - lambda f: f['project'] != 'functest', - scenario['installers'][0]['versions'][0]['projects']) - - update = models.ScenarioUpdateRequest(field='project', - op='delete', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}) - return update, scenario - - @_execute - def test_addCustoms(self, scenario): - add = ['odl', 'parser', 'vping_ssh'] - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh'] - update = models.ScenarioUpdateRequest(field='customs', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add) - return update, scenario - - @_execute - def test_deleteCustoms(self, scenario): - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck'] - update = models.ScenarioUpdateRequest(field='customs', - op='delete', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=['vping_ssh']) - return update, scenario - - @_execute - def test_addScore(self, scenario): - add = models.ScenarioScore(date=str(datetime.now()), score='11/12') - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['scores'].append(add.format()) - update = models.ScenarioUpdateRequest(field='score', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add.format()) - return update, scenario - - @_execute - def test_addTi(self, scenario): - add = models.ScenarioTI(date=str(datetime.now()), status='gold') - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['trust_indicators'].append(add.format()) - update = models.ScenarioUpdateRequest(field='trust_indicator', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add.format()) - return update, scenario - - def _update_and_assert(self, update_req, new_scenario, name=None): - code, _ = self.update(update_req, self.scenario) - self.assertEqual(code, httplib.OK) - self._get_and_assert(_none_default(name, self.scenario), - new_scenario) - - def _success(self, status, new_scenario): - self.assertEqual(status, httplib.OK) - self._get_and_assert(new_scenario.get('name'), new_scenario) - - def _forbidden(self, status, new_scenario): - self.assertEqual(status, httplib.FORBIDDEN) - - def _bad_request(self, status, new_scenario): - self.assertEqual(status, httplib.BAD_REQUEST) - - -class TestScenarioDelete(TestScenarioBase): - def test_notFound(self): - code, body = self.delete('notFound') - self.assertEqual(code, httplib.NOT_FOUND) - - def test_success(self): - scenario = self.create_return_name(self.req_d) - code, _ = self.delete(scenario) - self.assertEqual(code, httplib.OK) - code, _ = self.get(scenario) - self.assertEqual(code, httplib.NOT_FOUND) - - -def _none_default(check, default): - return check if check else default diff --git a/testapi/opnfv_testapi/tests/unit/test_testcase.py b/testapi/opnfv_testapi/tests/unit/test_testcase.py deleted file mode 100644 index e28eaf5..0000000 --- a/testapi/opnfv_testapi/tests/unit/test_testcase.py +++ /dev/null @@ -1,201 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import copy -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import project_models -from opnfv_testapi.resources import testcase_models -from opnfv_testapi.tests.unit import test_base as base -from opnfv_testapi.tests.unit import executor - - -class TestCaseBase(base.TestBase): - def setUp(self): - super(TestCaseBase, self).setUp() - self.req_d = testcase_models.TestcaseCreateRequest('vping_1', - '/cases/vping_1', - 'vping-ssh test') - self.req_e = testcase_models.TestcaseCreateRequest('doctor_1', - '/cases/doctor_1', - 'create doctor') - self.update_d = testcase_models.TestcaseUpdateRequest('vping_1', - 'vping-ssh test', - 'functest') - self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1', - 'create doctor', - 'functest') - self.get_res = testcase_models.Testcase - self.list_res = testcase_models.Testcases - self.update_res = testcase_models.Testcase - self.basePath = '/api/v1/projects/%s/cases' - self.create_project() - - def assert_body(self, case, req=None): - if not req: - req = self.req_d - self.assertEqual(case.name, req.name) - self.assertEqual(case.description, req.description) - self.assertEqual(case.url, req.url) - self.assertIsNotNone(case._id) - self.assertIsNotNone(case.creation_date) - - def assert_update_body(self, old, new, req=None): - if not req: - req = self.req_d - self.assertEqual(new.name, req.name) - self.assertEqual(new.description, req.description) - self.assertEqual(new.url, old.url) - self.assertIsNotNone(new._id) - self.assertIsNotNone(new.creation_date) - - def create_project(self): - req_p = project_models.ProjectCreateRequest('functest', - 'vping-ssh test') - self.create_help('/api/v1/projects', req_p) - self.project = req_p.name - - def create_d(self): - return super(TestCaseBase, self).create_d(self.project) - - def create_e(self): - return super(TestCaseBase, self).create_e(self.project) - - def get(self, case=None): - return super(TestCaseBase, self).get(self.project, case) - - def create(self, req=None, *args): - return super(TestCaseBase, self).create(req, self.project) - - def update(self, new=None, case=None): - return super(TestCaseBase, self).update(new, self.project, case) - - def delete(self, case): - return super(TestCaseBase, self).delete(self.project, case) - - -class TestCaseCreate(TestCaseBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_noBody(self): - return None - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noProject(self): - self.project = 'noProject' - return self.req_d - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - req_empty = testcase_models.TestcaseCreateRequest('') - return req_empty - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - req_none = testcase_models.TestcaseCreateRequest(None) - return req_none - - @executor.create(httplib.OK, '_assert_success') - def test_success(self): - return self.req_d - - def _assert_success(self, body): - self.assert_create_body(body, self.req_d, self.project) - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestCaseGet(TestCaseBase): - def setUp(self): - super(TestCaseGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_list') - def test_list(self): - return None - - def _list(self, body): - for case in body.testcases: - if self.req_d.name == case.name: - self.assert_body(case) - else: - self.assert_body(case, self.req_e) - - -class TestCaseUpdate(TestCaseBase): - def setUp(self): - super(TestCaseUpdate, self).setUp() - self.create_d() - - @executor.update(httplib.BAD_REQUEST, message.no_body()) - def test_noBody(self): - return None, 'noBody' - - @executor.update(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return self.update_e, 'notFound' - - @executor.update(httplib.FORBIDDEN, message.exist_base) - def test_newNameExist(self): - self.create_e() - return self.update_e, self.req_d.name - - @executor.update(httplib.FORBIDDEN, message.no_update()) - def test_noUpdate(self): - return self.update_d, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_success(self): - return self.update_e, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_with_dollar(self): - update = copy.deepcopy(self.update_d) - update.description = {'2. change': 'dollar change'} - return update, self.req_d.name - - def _update_success(self, request, body): - self.assert_update_body(self.req_d, body, request) - _, new_body = self.get(request.name) - self.assert_update_body(self.req_d, new_body, request) - - -class TestCaseDelete(TestCaseBase): - def setUp(self): - super(TestCaseDelete, self).setUp() - self.create_d() - - @executor.delete(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return 'notFound' - - @executor.delete(httplib.OK, '_delete_success') - def test_success(self): - return self.req_d.name - - def _delete_success(self, body): - self.assertEqual(body, '') - code, body = self.get(self.req_d.name) - self.assertEqual(code, httplib.NOT_FOUND) - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/test_token.py b/testapi/opnfv_testapi/tests/unit/test_token.py deleted file mode 100644 index ca247a3..0000000 --- a/testapi/opnfv_testapi/tests/unit/test_token.py +++ /dev/null @@ -1,113 +0,0 @@ -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -import httplib -import unittest - -from tornado import web - -from opnfv_testapi.common import message -from opnfv_testapi.resources import project_models -from opnfv_testapi.router import url_mappings -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import fake_pymongo -from opnfv_testapi.tests.unit import test_base as base - - -class TestToken(base.TestBase): - def get_app(self): - return web.Application( - url_mappings.mappings, - db=fake_pymongo, - debug=True, - auth=True - ) - - -class TestTokenCreateProject(TestToken): - def setUp(self): - super(TestTokenCreateProject, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping') - fake_pymongo.tokens.insert({"access_token": "12345"}) - self.basePath = '/api/v1/projects' - - @executor.create(httplib.FORBIDDEN, message.invalid_token()) - def test_projectCreateTokenInvalid(self): - self.headers['X-Auth-Token'] = '1234' - return self.req_d - - @executor.create(httplib.UNAUTHORIZED, message.unauthorized()) - def test_projectCreateTokenUnauthorized(self): - if 'X-Auth-Token' in self.headers: - self.headers.pop('X-Auth-Token') - return self.req_d - - @executor.create(httplib.OK, '_create_success') - def test_projectCreateTokenSuccess(self): - self.headers['X-Auth-Token'] = '12345' - return self.req_d - - def _create_success(self, body): - self.assertIn('CreateResponse', str(type(body))) - - -class TestTokenDeleteProject(TestToken): - def setUp(self): - super(TestTokenDeleteProject, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping') - fake_pymongo.tokens.insert({"access_token": "12345"}) - self.basePath = '/api/v1/projects' - self.headers['X-Auth-Token'] = '12345' - self.create_d() - - @executor.delete(httplib.FORBIDDEN, message.invalid_token()) - def test_projectDeleteTokenIvalid(self): - self.headers['X-Auth-Token'] = '1234' - return self.req_d.name - - @executor.delete(httplib.UNAUTHORIZED, message.unauthorized()) - def test_projectDeleteTokenUnauthorized(self): - self.headers.pop('X-Auth-Token') - return self.req_d.name - - @executor.delete(httplib.OK, '_delete_success') - def test_projectDeleteTokenSuccess(self): - return self.req_d.name - - def _delete_success(self, body): - self.assertEqual('', body) - - -class TestTokenUpdateProject(TestToken): - def setUp(self): - super(TestTokenUpdateProject, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping') - fake_pymongo.tokens.insert({"access_token": "12345"}) - self.basePath = '/api/v1/projects' - self.headers['X-Auth-Token'] = '12345' - self.create_d() - - @executor.update(httplib.FORBIDDEN, message.invalid_token()) - def test_projectUpdateTokenIvalid(self): - self.headers['X-Auth-Token'] = '1234' - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - @executor.update(httplib.UNAUTHORIZED, message.unauthorized()) - def test_projectUpdateTokenUnauthorized(self): - self.headers.pop('X-Auth-Token') - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_projectUpdateTokenSuccess(self): - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - def _update_success(self, request, body): - self.assertIn(request.name, body) - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/test_version.py b/testapi/opnfv_testapi/tests/unit/test_version.py deleted file mode 100644 index fff802a..0000000 --- a/testapi/opnfv_testapi/tests/unit/test_version.py +++ /dev/null @@ -1,36 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import httplib -import unittest - -from opnfv_testapi.resources import models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import test_base as base - - -class TestVersionBase(base.TestBase): - def setUp(self): - super(TestVersionBase, self).setUp() - self.list_res = models.Versions - self.basePath = '/versions' - - -class TestVersion(TestVersionBase): - @executor.get(httplib.OK, '_get_success') - def test_success(self): - return None - - def _get_success(self, body): - self.assertEqual(len(body.versions), 1) - self.assertEqual(body.versions[0].version, 'v1.0') - self.assertEqual(body.versions[0].description, 'basics') - - -if __name__ == '__main__': - unittest.main() -- cgit 1.2.3-korg