diff options
Diffstat (limited to 'testapi/opnfv_testapi/tests/unit/handlers')
11 files changed, 1721 insertions, 0 deletions
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/__init__.py b/testapi/opnfv_testapi/tests/unit/handlers/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/__init__.py diff --git a/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json new file mode 100644 index 0000000..1878022 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json @@ -0,0 +1,38 @@ +{ + "name": "nosdn-nofeature-ha", + "installers": + [ + { + "installer": "apex", + "versions": + [ + { + "owner": "Luke", + "version": "master", + "projects": + [ + { + "project": "functest", + "customs": [ "healthcheck", "vping_ssh"], + "scores": + [ + { + "date": "2017-01-08 22:46:44", + "score": "12/14" + } + + ], + "trust_indicators": [] + }, + { + "project": "yardstick", + "customs": [], + "scores": [], + "trust_indicators": [] + } + ] + } + ] + } + ] +} diff --git a/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json new file mode 100644 index 0000000..980051c --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json @@ -0,0 +1,73 @@ +{ + "name": "odl_2-nofeature-ha", + "installers": + [ + { + "installer": "fuel", + "versions": + [ + { + "owner": "Lucky", + "version": "danube", + "projects": + [ + { + "project": "functest", + "customs": [ "healthcheck", "vping_ssh"], + "scores": [], + "trust_indicators": [ + { + "date": "2017-01-18 22:46:44", + "status": "silver" + } + + ] + }, + { + "project": "yardstick", + "customs": ["suite-a"], + "scores": [ + { + "date": "2017-01-08 22:46:44", + "score": "0/1" + } + ], + "trust_indicators": [ + { + "date": "2017-01-18 22:46:44", + "status": "gold" + } + ] + } + ] + }, + { + "owner": "Luke", + "version": "colorado", + "projects": + [ + { + "project": "functest", + "customs": [ "healthcheck", "vping_ssh"], + "scores": + [ + { + "date": "2017-01-09 22:46:44", + "score": "11/14" + } + + ], + "trust_indicators": [] + }, + { + "project": "yardstick", + "customs": [], + "scores": [], + "trust_indicators": [] + } + ] + } + ] + } + ] +} diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_base.py b/testapi/opnfv_testapi/tests/unit/handlers/test_base.py new file mode 100644 index 0000000..b7fabb9 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_base.py @@ -0,0 +1,204 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from datetime import datetime +import json +from os import path + +from bson.objectid import ObjectId +import mock +from tornado import testing + +from opnfv_testapi.models import base_models +from opnfv_testapi.models import pod_models +from opnfv_testapi.tests.unit import fake_pymongo + + +class TestBase(testing.AsyncHTTPTestCase): + headers = {'Content-Type': 'application/json; charset=UTF-8'} + + def setUp(self): + self._patch_server() + self.basePath = '' + self.create_res = base_models.CreateResponse + self.get_res = None + self.list_res = None + self.update_res = None + self.pod_d = pod_models.Pod(name='zte-pod1', + mode='virtual', + details='zte pod 1', + role='community-ci', + _id=str(ObjectId()), + owner='ValidUser', + create_date=str(datetime.now())) + self.pod_e = pod_models.Pod(name='zte-pod2', + mode='metal', + details='zte pod 2', + role='production-ci', + _id=str(ObjectId()), + owner='ValidUser', + create_date=str(datetime.now())) + self.req_d = None + self.req_e = None + self.addCleanup(self._clear) + super(TestBase, self).setUp() + fake_pymongo.users.insert({"user": "ValidUser", + 'email': 'validuser@lf.com', + 'fullname': 'Valid User', + 'groups': [ + 'opnfv-testapi-users', + 'opnfv-gerrit-functest-submitters', + 'opnfv-gerrit-qtip-contributors'] + }) + + def tearDown(self): + self.db_patcher.stop() + self.config_patcher.stop() + + def _patch_server(self): + import argparse + config = path.join(path.dirname(__file__), + '../../../../etc/config.ini') + self.config_patcher = mock.patch( + 'argparse.ArgumentParser.parse_known_args', + return_value=(argparse.Namespace(config_file=config), None)) + self.db_patcher = mock.patch('opnfv_testapi.db.api.DB', + fake_pymongo) + self.config_patcher.start() + self.db_patcher.start() + + def get_app(self): + from opnfv_testapi.cmd import server + return server.make_app() + + def create_d(self, *args): + return self.create(self.req_d, *args) + + def create_e(self, *args): + return self.create(self.req_e, *args) + + def create(self, req=None, *args): + return self.create_help(self.basePath, req, *args) + + def create_help(self, uri, req, *args): + return self.post_direct_url(self._update_uri(uri, *args), req) + + def post_direct_url(self, url, req): + if req and not isinstance(req, str) and hasattr(req, 'format'): + req = req.format() + res = self.fetch(url, + method='POST', + body=json.dumps(req), + headers=self.headers) + + return self._get_return(res, self.create_res) + + def get(self, *args): + res = self.fetch(self._get_uri(*args), + method='GET', + headers=self.headers) + + def inner(): + new_args, num = self._get_valid_args(*args) + return self.get_res \ + if num != self._need_arg_num(self.basePath) else self.list_res + return self._get_return(res, inner()) + + def query(self, query): + res = self.fetch(self._get_query_uri(query), + method='GET', + headers=self.headers) + return self._get_return(res, self.list_res) + + def update_direct_url(self, url, new=None): + if new and hasattr(new, 'format'): + new = new.format() + res = self.fetch(url, + method='PUT', + body=json.dumps(new), + headers=self.headers) + return self._get_return(res, self.update_res) + + def update(self, new=None, *args): + return self.update_direct_url(self._get_uri(*args), new) + + def delete_direct_url(self, url, body): + if body: + res = self.fetch(url, + method='DELETE', + body=json.dumps(body), + headers=self.headers, + allow_nonstandard_methods=True) + else: + res = self.fetch(url, + method='DELETE', + headers=self.headers) + + return res.code, res.body + + def delete(self, *args): + return self.delete_direct_url(self._get_uri(*args), None) + + @staticmethod + def _get_valid_args(*args): + new_args = tuple(['%s' % arg for arg in args if arg is not None]) + return new_args, len(new_args) + + def _need_arg_num(self, uri): + return uri.count('%s') + + def _get_query_uri(self, query): + return self.basePath + '?' + query if query else self.basePath + + def _get_uri(self, *args): + return self._update_uri(self.basePath, *args) + + def _update_uri(self, uri, *args): + r_uri = uri + new_args, num = self._get_valid_args(*args) + if num != self._need_arg_num(uri): + r_uri += '/%s' + + return r_uri % tuple(['%s' % arg for arg in new_args]) + + def _get_return(self, res, cls): + code = res.code + body = res.body + if body: + return code, self._get_return_body(code, body, cls) + else: + return code, None + + @staticmethod + def _get_return_body(code, body, cls): + return cls.from_dict(json.loads(body)) if code < 300 and cls else body + + def assert_href(self, body): + self.assertIn(self.basePath, body.href) + + def assert_create_body(self, body, req=None, *args): + import inspect + if not req: + req = self.req_d + resource_name = '' + if inspect.isclass(req): + resource_name = req.name + elif isinstance(req, dict): + resource_name = req['name'] + elif isinstance(req, str): + resource_name = json.loads(req)['name'] + new_args = args + tuple([resource_name]) + self.assertIn(self._get_uri(*new_args), body.href) + + @staticmethod + def _clear(): + fake_pymongo.pods.clear() + fake_pymongo.projects.clear() + fake_pymongo.testcases.clear() + fake_pymongo.results.clear() + fake_pymongo.scenarios.clear() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py b/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py new file mode 100644 index 0000000..95ed8ba --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py @@ -0,0 +1,118 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import httplib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.models import pod_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit import fake_pymongo +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class TestPodBase(base.TestBase): + def setUp(self): + super(TestPodBase, self).setUp() + self.get_res = pod_models.Pod + self.list_res = pod_models.Pods + self.basePath = '/api/v1/pods' + self.req_d = pod_models.PodCreateRequest(name=self.pod_d.name, + mode=self.pod_d.mode, + details=self.pod_d.details, + role=self.pod_d.role) + self.req_e = pod_models.PodCreateRequest(name=self.pod_e.name, + mode=self.pod_e.mode, + details=self.pod_e.details, + role=self.pod_e.role) + + def assert_get_body(self, pod, req=None): + if not req: + req = self.req_d + self.assertEqual(pod.owner, 'ValidUser') + self.assertEqual(pod.name, req.name) + self.assertEqual(pod.mode, req.mode) + self.assertEqual(pod.details, req.details) + self.assertEqual(pod.role, req.role) + self.assertIsNotNone(pod.creation_date) + self.assertIsNotNone(pod._id) + + +class TestPodCreate(TestPodBase): + @executor.create(httplib.BAD_REQUEST, message.not_login()) + def test_notlogin(self): + return self.req_d + + @executor.mock_invalid_lfid() + @executor.create(httplib.BAD_REQUEST, message.not_lfid()) + def test_invalidLfid(self): + return self.req_d + + @executor.mock_valid_lfid() + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_withoutBody(self): + return None + + @executor.mock_valid_lfid() + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_emptyName(self): + return pod_models.PodCreateRequest('') + + @executor.mock_valid_lfid() + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_noneName(self): + return pod_models.PodCreateRequest(None) + + @executor.mock_valid_lfid() + @executor.create(httplib.OK, 'assert_create_body') + def test_success(self): + return self.req_d + + @executor.mock_valid_lfid() + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExist(self): + fake_pymongo.pods.insert(self.pod_d.format()) + return self.req_d + + @executor.mock_valid_lfid() + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExistCaseInsensitive(self): + fake_pymongo.pods.insert(self.pod_d.format()) + self.req_d.name = self.req_d.name.upper() + return self.req_d + + +class TestPodGet(TestPodBase): + def setUp(self): + super(TestPodGet, self).setUp() + fake_pymongo.pods.insert(self.pod_d.format()) + fake_pymongo.pods.insert(self.pod_e.format()) + + @executor.get(httplib.NOT_FOUND, message.not_found_base) + def test_notExist(self): + return 'notExist' + + @executor.get(httplib.OK, 'assert_get_body') + def test_getOne(self): + return self.pod_d.name + + @executor.get(httplib.OK, '_assert_list') + def test_list(self): + return None + + def _assert_list(self, body): + self.assertEqual(len(body.pods), 2) + for pod in body.pods: + if self.pod_d.name == pod.name: + self.assert_get_body(pod) + else: + self.assert_get_body(pod, self.pod_e) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_project.py b/testapi/opnfv_testapi/tests/unit/handlers/test_project.py new file mode 100644 index 0000000..939cc0d --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_project.py @@ -0,0 +1,137 @@ +import httplib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.models import project_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class TestProjectBase(base.TestBase): + def setUp(self): + super(TestProjectBase, self).setUp() + self.req_d = project_models.ProjectCreateRequest('vping', + 'vping-ssh test') + self.req_e = project_models.ProjectCreateRequest('doctor', + 'doctor test') + self.get_res = project_models.Project + self.list_res = project_models.Projects + self.update_res = project_models.Project + self.basePath = '/api/v1/projects' + + def assert_body(self, project, req=None): + if not req: + req = self.req_d + self.assertEqual(project.name, req.name) + self.assertEqual(project.description, req.description) + self.assertIsNotNone(project._id) + self.assertIsNotNone(project.creation_date) + + +class TestProjectCreate(TestProjectBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_withoutBody(self): + return None + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_emptyName(self): + return project_models.ProjectCreateRequest('') + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_noneName(self): + return project_models.ProjectCreateRequest(None) + + @executor.create(httplib.OK, 'assert_create_body') + def test_success(self): + return self.req_d + + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExist(self): + self.create_d() + return self.req_d + + +class TestProjectGet(TestProjectBase): + def setUp(self): + super(TestProjectGet, self).setUp() + self.create_d() + self.create_e() + + @executor.get(httplib.NOT_FOUND, message.not_found_base) + def test_notExist(self): + return 'notExist' + + @executor.get(httplib.OK, 'assert_body') + def test_getOne(self): + return self.req_d.name + + @executor.get(httplib.OK, '_assert_list') + def test_list(self): + return None + + def _assert_list(self, body): + for project in body.projects: + if self.req_d.name == project.name: + self.assert_body(project) + else: + self.assert_body(project, self.req_e) + + +class TestProjectUpdate(TestProjectBase): + def setUp(self): + super(TestProjectUpdate, self).setUp() + _, d_body = self.create_d() + _, get_res = self.get(self.req_d.name) + self.index_d = get_res._id + self.create_e() + + @executor.update(httplib.BAD_REQUEST, message.no_body()) + def test_withoutBody(self): + return None, 'noBody' + + @executor.update(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return self.req_e, 'notFound' + + @executor.update(httplib.FORBIDDEN, message.exist_base) + def test_newNameExist(self): + return self.req_e, self.req_d.name + + @executor.update(httplib.FORBIDDEN, message.no_update()) + def test_noUpdate(self): + return self.req_d, self.req_d.name + + @executor.update(httplib.OK, '_assert_update') + def test_success(self): + req = project_models.ProjectUpdateRequest('newName', 'new description') + return req, self.req_d.name + + def _assert_update(self, req, body): + self.assertEqual(self.index_d, body._id) + self.assert_body(body, req) + _, new_body = self.get(req.name) + self.assertEqual(self.index_d, new_body._id) + self.assert_body(new_body, req) + + +class TestProjectDelete(TestProjectBase): + def setUp(self): + super(TestProjectDelete, self).setUp() + self.create_d() + + @executor.delete(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return 'notFound' + + @executor.delete(httplib.OK, '_assert_delete') + def test_success(self): + return self.req_d.name + + def _assert_delete(self, body): + self.assertEqual(body, '') + code, body = self.get(self.req_d.name) + self.assertEqual(code, httplib.NOT_FOUND) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_result.py b/testapi/opnfv_testapi/tests/unit/handlers/test_result.py new file mode 100644 index 0000000..b9f9ede --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_result.py @@ -0,0 +1,413 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import copy +from datetime import datetime +from datetime import timedelta +import httplib +import json +import urllib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.models import project_models +from opnfv_testapi.models import result_models +from opnfv_testapi.models import testcase_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit import fake_pymongo +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class Details(object): + def __init__(self, timestart=None, duration=None, status=None): + self.timestart = timestart + self.duration = duration + self.status = status + self.items = [{'item1': 1}, {'item2': 2}] + + def format(self): + return { + "timestart": self.timestart, + "duration": self.duration, + "status": self.status, + 'items': [{'item1': 1}, {'item2': 2}] + } + + @staticmethod + def from_dict(a_dict): + + if a_dict is None: + return None + + t = Details() + t.timestart = a_dict.get('timestart') + t.duration = a_dict.get('duration') + t.status = a_dict.get('status') + t.items = a_dict.get('items') + return t + + +class TestResultBase(base.TestBase): + def setUp(self): + super(TestResultBase, self).setUp() + self.pod = self.pod_d.name + self.project = 'functest' + self.case = 'vPing' + self.installer = 'fuel' + self.version = 'C' + self.build_tag = 'v3.0' + self.scenario = 'odl-l2' + self.criteria = 'PASS' + self.trust_indicator = result_models.TI(0.7) + self.start_date = str(datetime.now()) + self.stop_date = str(datetime.now() + timedelta(minutes=1)) + self.update_date = str(datetime.now() + timedelta(days=1)) + self.update_step = -0.05 + self.details = Details(timestart='0', duration='9s', status='OK') + self.req_d = result_models.ResultCreateRequest( + pod_name=self.pod, + project_name=self.project, + case_name=self.case, + installer=self.installer, + version=self.version, + start_date=self.start_date, + stop_date=self.stop_date, + details=self.details.format(), + build_tag=self.build_tag, + scenario=self.scenario, + criteria=self.criteria, + trust_indicator=self.trust_indicator) + self.get_res = result_models.TestResult + self.list_res = result_models.TestResults + self.update_res = result_models.TestResult + self.basePath = '/api/v1/results' + self.req_project = project_models.ProjectCreateRequest( + self.project, + 'vping test') + self.req_testcase = testcase_models.TestcaseCreateRequest( + self.case, + '/cases/vping', + 'vping-ssh test') + fake_pymongo.pods.insert(self.pod_d.format()) + self.create_help('/api/v1/projects', self.req_project) + self.create_help('/api/v1/projects/%s/cases', + self.req_testcase, + self.project) + + def assert_res(self, result, req=None): + if req is None: + req = self.req_d + self.assertEqual(result.pod_name, req.pod_name) + self.assertEqual(result.project_name, req.project_name) + self.assertEqual(result.case_name, req.case_name) + self.assertEqual(result.installer, req.installer) + self.assertEqual(result.version, req.version) + details_req = Details.from_dict(req.details) + details_res = Details.from_dict(result.details) + self.assertEqual(details_res.duration, details_req.duration) + self.assertEqual(details_res.timestart, details_req.timestart) + self.assertEqual(details_res.status, details_req.status) + self.assertEqual(details_res.items, details_req.items) + self.assertEqual(result.build_tag, req.build_tag) + self.assertEqual(result.scenario, req.scenario) + self.assertEqual(result.criteria, req.criteria) + self.assertEqual(result.start_date, req.start_date) + self.assertEqual(result.stop_date, req.stop_date) + self.assertIsNotNone(result._id) + ti = result.trust_indicator + self.assertEqual(ti.current, req.trust_indicator.current) + if ti.histories: + history = ti.histories[0] + self.assertEqual(history.date, self.update_date) + self.assertEqual(history.step, self.update_step) + + def _create_d(self): + _, res = self.create_d() + return res.href.split('/')[-1] + + def upload(self, req): + if req and not isinstance(req, str) and hasattr(req, 'format'): + req = req.format() + res = self.fetch(self.basePath + '/upload', + method='POST', + body=json.dumps(req), + headers=self.headers) + + return self._get_return(res, self.create_res) + + +class TestResultUpload(TestResultBase): + @executor.upload(httplib.BAD_REQUEST, message.key_error('file')) + def test_filenotfind(self): + return None + + +class TestResultCreate(TestResultBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_nobody(self): + return None + + @executor.create(httplib.BAD_REQUEST, message.missing('pod_name')) + def test_podNotProvided(self): + req = self.req_d + req.pod_name = None + return req + + @executor.create(httplib.BAD_REQUEST, message.missing('project_name')) + def test_projectNotProvided(self): + req = self.req_d + req.project_name = None + return req + + @executor.create(httplib.BAD_REQUEST, message.missing('case_name')) + def test_testcaseNotProvided(self): + req = self.req_d + req.case_name = None + return req + + @executor.create(httplib.BAD_REQUEST, + message.invalid_value('criteria', ['PASS', 'FAIL'])) + def test_invalid_criteria(self): + req = self.req_d + req.criteria = 'invalid' + return req + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noPod(self): + req = self.req_d + req.pod_name = 'notExistPod' + return req + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noProject(self): + req = self.req_d + req.project_name = 'notExistProject' + return req + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noTestcase(self): + req = self.req_d + req.case_name = 'notExistTestcase' + return req + + @executor.create(httplib.OK, 'assert_href') + def test_success(self): + return self.req_d + + @executor.create(httplib.OK, 'assert_href') + def test_key_with_doc(self): + req = copy.deepcopy(self.req_d) + req.details = {'1.name': 'dot_name'} + return req + + @executor.create(httplib.OK, '_assert_no_ti') + def test_no_ti(self): + req = result_models.ResultCreateRequest(pod_name=self.pod, + project_name=self.project, + case_name=self.case, + installer=self.installer, + version=self.version, + start_date=self.start_date, + stop_date=self.stop_date, + details=self.details.format(), + build_tag=self.build_tag, + scenario=self.scenario, + criteria=self.criteria) + self.actual_req = req + return req + + def _assert_no_ti(self, body): + _id = body.href.split('/')[-1] + code, body = self.get(_id) + self.assert_res(body, self.actual_req) + + +class TestResultGet(TestResultBase): + def setUp(self): + super(TestResultGet, self).setUp() + self.req_10d_before = self._create_changed_date(days=-10) + self.req_d_id = self._create_d() + self.req_10d_later = self._create_changed_date(days=10) + + @executor.get(httplib.OK, 'assert_res') + def test_getOne(self): + return self.req_d_id + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryPod(self): + return self._set_query('pod') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryProject(self): + return self._set_query('project') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryTestcase(self): + return self._set_query('case') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryVersion(self): + return self._set_query('version') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryInstaller(self): + return self._set_query('installer') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryBuildTag(self): + return self._set_query('build_tag') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryScenario(self): + return self._set_query('scenario') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryTrustIndicator(self): + return self._set_query('trust_indicator') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryCriteria(self): + return self._set_query('criteria') + + @executor.query(httplib.BAD_REQUEST, message.must_int('period')) + def test_queryPeriodNotInt(self): + return self._set_query(period='a') + + @executor.query(httplib.OK, '_query_period_one', 1) + def test_queryPeriodSuccess(self): + return self._set_query(period=5) + + @executor.query(httplib.BAD_REQUEST, message.must_int('last')) + def test_queryLastNotInt(self): + return self._set_query(last='a') + + @executor.query(httplib.OK, '_query_last_one', 1) + def test_queryLast(self): + return self._set_query(last=1) + + @executor.query(httplib.OK, '_query_success', 4) + def test_queryPublic(self): + self._create_public_data() + return self._set_query() + + @executor.query(httplib.OK, '_query_success', 1) + def test_queryPrivate(self): + self._create_private_data() + return self._set_query(public='false') + + @executor.query(httplib.OK, '_query_period_one', 1) + def test_combination(self): + return self._set_query('pod', + 'project', + 'case', + 'version', + 'installer', + 'build_tag', + 'scenario', + 'trust_indicator', + 'criteria', + period=5) + + @executor.query(httplib.OK, '_query_success', 0) + def test_notFound(self): + return self._set_query('project', + 'case', + 'version', + 'installer', + 'build_tag', + 'scenario', + 'trust_indicator', + 'criteria', + pod='notExistPod', + period=1) + + @executor.query(httplib.OK, '_query_success', 1) + def test_filterErrorStartdate(self): + self._create_error_start_date(None) + self._create_error_start_date('None') + self._create_error_start_date('null') + self._create_error_start_date('') + return self._set_query(period=5) + + def _query_success(self, body, number): + self.assertEqual(number, len(body.results)) + + def _query_last_one(self, body, number): + self.assertEqual(number, len(body.results)) + self.assert_res(body.results[0], self.req_10d_later) + + def _query_period_one(self, body, number): + self.assertEqual(number, len(body.results)) + self.assert_res(body.results[0], self.req_d) + + def _create_error_start_date(self, start_date): + req = copy.deepcopy(self.req_d) + req.start_date = start_date + self.create(req) + return req + + def _create_changed_date(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.start_date = datetime.now() + timedelta(**kwargs) + req.stop_date = str(req.start_date + timedelta(minutes=10)) + req.start_date = str(req.start_date) + self.create(req) + return req + + def _create_public_data(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.public = 'true' + self.create(req) + return req + + def _create_private_data(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.public = 'false' + self.create(req) + return req + + def _set_query(self, *args, **kwargs): + def get_value(arg): + return self.__getattribute__(arg) \ + if arg != 'trust_indicator' else self.trust_indicator.current + query = [] + for arg in args: + query.append((arg, get_value(arg))) + for k, v in kwargs.iteritems(): + query.append((k, v)) + return urllib.urlencode(query) + + +class TestResultUpdate(TestResultBase): + def setUp(self): + super(TestResultUpdate, self).setUp() + self.req_d_id = self._create_d() + + @executor.update(httplib.OK, '_assert_update_ti') + def test_success(self): + new_ti = copy.deepcopy(self.trust_indicator) + new_ti.current += self.update_step + new_ti.histories.append( + result_models.TIHistory(self.update_date, self.update_step)) + new_data = copy.deepcopy(self.req_d) + new_data.trust_indicator = new_ti + update = result_models.ResultUpdateRequest(trust_indicator=new_ti) + self.update_req = new_data + return update, self.req_d_id + + def _assert_update_ti(self, request, body): + ti = body.trust_indicator + self.assertEqual(ti.current, request.trust_indicator.current) + if ti.histories: + history = ti.histories[0] + self.assertEqual(history.date, self.update_date) + self.assertEqual(history.step, self.update_step) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py b/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py new file mode 100644 index 0000000..de7777a --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py @@ -0,0 +1,449 @@ +import functools +import httplib +import json +import os + +from datetime import datetime + +from opnfv_testapi.common import message +import opnfv_testapi.models.scenario_models as models +from opnfv_testapi.tests.unit.handlers import test_base as base + + +def _none_default(check, default): + return check if check else default + + +class TestScenarioBase(base.TestBase): + def setUp(self): + super(TestScenarioBase, self).setUp() + self.get_res = models.Scenario + self.list_res = models.Scenarios + self.basePath = '/api/v1/scenarios' + self.req_d = self._load_request('scenario-c1.json') + self.req_2 = self._load_request('scenario-c2.json') + + def tearDown(self): + pass + + def assert_body(self, project, req=None): + pass + + @staticmethod + def _load_request(f_req): + abs_file = os.path.join(os.path.dirname(__file__), f_req) + with open(abs_file, 'r') as f: + loader = json.load(f) + f.close() + return loader + + def create_return_name(self, req): + _, res = self.create(req) + return res.href.split('/')[-1] + + def assert_res(self, code, scenario, req=None): + self.assertEqual(code, httplib.OK) + if req is None: + req = self.req_d + self.assertIsNotNone(scenario._id) + self.assertIsNotNone(scenario.creation_date) + self.assertEqual(scenario, models.Scenario.from_dict(req)) + + @staticmethod + def set_query(*args): + uri = '' + for arg in args: + uri += arg + '&' + return uri[0: -1] + + def get_and_assert(self, name): + code, body = self.get(name) + self.assert_res(code, body, self.req_d) + + +class TestScenarioCreate(TestScenarioBase): + def test_withoutBody(self): + (code, body) = self.create() + self.assertEqual(code, httplib.BAD_REQUEST) + + def test_emptyName(self): + req_empty = models.ScenarioCreateRequest('') + (code, body) = self.create(req_empty) + self.assertEqual(code, httplib.BAD_REQUEST) + self.assertIn(message.missing('name'), body) + + def test_noneName(self): + req_none = models.ScenarioCreateRequest(None) + (code, body) = self.create(req_none) + self.assertEqual(code, httplib.BAD_REQUEST) + self.assertIn(message.missing('name'), body) + + def test_success(self): + (code, body) = self.create_d() + self.assertEqual(code, httplib.OK) + self.assert_create_body(body) + + def test_alreadyExist(self): + self.create_d() + (code, body) = self.create_d() + self.assertEqual(code, httplib.FORBIDDEN) + self.assertIn(message.exist_base, body) + + +class TestScenarioGet(TestScenarioBase): + def setUp(self): + super(TestScenarioGet, self).setUp() + self.scenario_1 = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + + def test_getByName(self): + self.get_and_assert(self.scenario_1) + + def test_getAll(self): + self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) + + def test_queryName(self): + query = self.set_query('name=nosdn-nofeature-ha') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryInstaller(self): + query = self.set_query('installer=apex') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryVersion(self): + query = self.set_query('version=master') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryProject(self): + query = self.set_query('project=functest') + self._query_and_assert(query, reqs=[self.req_d, self.req_2]) + + # close due to random fail, open again after solve it in another patch + # def test_queryCombination(self): + # query = self._set_query('name=nosdn-nofeature-ha', + # 'installer=apex', + # 'version=master', + # 'project=functest') + # + # self._query_and_assert(query, reqs=[self.req_d]) + + def _query_and_assert(self, query, found=True, reqs=None): + code, body = self.query(query) + if not found: + self.assertEqual(code, httplib.OK) + self.assertEqual(0, len(body.scenarios)) + else: + self.assertEqual(len(reqs), len(body.scenarios)) + for req in reqs: + for scenario in body.scenarios: + if req['name'] == scenario.name: + self.assert_res(code, scenario, req) + + +class TestScenarioDelete(TestScenarioBase): + def test_notFound(self): + code, body = self.delete('notFound') + self.assertEqual(code, httplib.NOT_FOUND) + + def test_success(self): + scenario = self.create_return_name(self.req_d) + code, _ = self.delete(scenario) + self.assertEqual(code, httplib.OK) + code, _ = self.get(scenario) + self.assertEqual(code, httplib.NOT_FOUND) + + +class TestScenarioUpdate(TestScenarioBase): + def setUp(self): + super(TestScenarioUpdate, self).setUp() + self.scenario = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + self.update_url = '' + self.scenario_url = '/api/v1/scenarios/{}'.format(self.scenario) + self.installer = self.req_d['installers'][0]['installer'] + self.version = self.req_d['installers'][0]['versions'][0]['version'] + self.locate_project = 'installer={}&version={}&project={}'.format( + self.installer, + self.version, + 'functest') + + def update_url_fixture(item): + def _update_url_fixture(xstep): + def wrapper(self, *args, **kwargs): + self.update_url = '{}/{}'.format(self.scenario_url, item) + locator = None + if item in ['projects', 'owner']: + locator = 'installer={}&version={}'.format( + self.installer, + self.version) + elif item in ['versions']: + locator = 'installer={}'.format( + self.installer) + elif item in ['rename']: + self.update_url = self.scenario_url + + if locator: + self.update_url = '{}?{}'.format(self.update_url, locator) + + xstep(self, *args, **kwargs) + return wrapper + return _update_url_fixture + + def update_partial(operate, expected): + def _update_partial(set_update): + @functools.wraps(set_update) + def wrapper(self): + update = set_update(self) + code, body = getattr(self, operate)(update) + getattr(self, expected)(code) + return wrapper + return _update_partial + + @update_partial('_add', '_success') + def test_addScore(self): + add = models.ScenarioScore(date=str(datetime.now()), score='11/12') + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['scores'].append(add.format()) + self.update_url = '{}/scores?{}'.format(self.scenario_url, + self.locate_project) + + return add + + @update_partial('_add', '_success') + def test_addTrustIndicator(self): + add = models.ScenarioTI(date=str(datetime.now()), status='gold') + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['trust_indicators'].append(add.format()) + self.update_url = '{}/trust_indicators?{}'.format(self.scenario_url, + self.locate_project) + + return add + + @update_partial('_add', '_success') + def test_addCustoms(self): + adds = ['odl', 'parser', 'vping_ssh'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = list(set(functest['customs'] + adds)) + self.update_url = '{}/customs?{}'.format(self.scenario_url, + self.locate_project) + return adds + + @update_partial('_update', '_success') + def test_updateCustoms(self): + updates = ['odl', 'parser', 'vping_ssh'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = updates + self.update_url = '{}/customs?{}'.format(self.scenario_url, + self.locate_project) + + return updates + + @update_partial('_delete', '_success') + def test_deleteCustoms(self): + deletes = ['vping_ssh'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck'] + self.update_url = '{}/customs?{}'.format(self.scenario_url, + self.locate_project) + + return deletes + + @update_url_fixture('projects') + @update_partial('_add', '_success') + def test_addProjects_succ(self): + add = models.ScenarioProject(project='qtip').format() + self.req_d['installers'][0]['versions'][0]['projects'].append(add) + return [add] + + @update_url_fixture('projects') + @update_partial('_add', '_conflict') + def test_addProjects_already_exist(self): + add = models.ScenarioProject(project='functest').format() + return [add] + + @update_url_fixture('projects') + @update_partial('_add', '_bad_request') + def test_addProjects_bad_schema(self): + add = models.ScenarioProject(project='functest').format() + add['score'] = None + return [add] + + @update_url_fixture('projects') + @update_partial('_update', '_success') + def test_updateProjects_succ(self): + update = models.ScenarioProject(project='qtip').format() + self.req_d['installers'][0]['versions'][0]['projects'] = [update] + return [update] + + @update_url_fixture('projects') + @update_partial('_update', '_conflict') + def test_updateProjects_duplicated(self): + update = models.ScenarioProject(project='qtip').format() + return [update, update] + + @update_url_fixture('projects') + @update_partial('_update', '_bad_request') + def test_updateProjects_bad_schema(self): + update = models.ScenarioProject(project='functest').format() + update['score'] = None + return [update] + + @update_url_fixture('projects') + @update_partial('_delete', '_success') + def test_deleteProjects(self): + deletes = ['functest'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + self.req_d['installers'][0]['versions'][0]['projects'] = filter( + lambda f: f['project'] != 'functest', + projects) + return deletes + + @update_url_fixture('owner') + @update_partial('_update', '_success') + def test_changeOwner(self): + new_owner = 'new_owner' + update = models.ScenarioChangeOwnerRequest(new_owner).format() + self.req_d['installers'][0]['versions'][0]['owner'] = new_owner + return update + + @update_url_fixture('versions') + @update_partial('_add', '_success') + def test_addVersions_succ(self): + add = models.ScenarioVersion(version='Euphrates').format() + self.req_d['installers'][0]['versions'].append(add) + return [add] + + @update_url_fixture('versions') + @update_partial('_add', '_conflict') + def test_addVersions_already_exist(self): + add = models.ScenarioVersion(version='master').format() + return [add] + + @update_url_fixture('versions') + @update_partial('_add', '_bad_request') + def test_addVersions_bad_schema(self): + add = models.ScenarioVersion(version='euphrates').format() + add['notexist'] = None + return [add] + + @update_url_fixture('versions') + @update_partial('_update', '_success') + def test_updateVersions_succ(self): + update = models.ScenarioVersion(version='euphrates').format() + self.req_d['installers'][0]['versions'] = [update] + return [update] + + @update_url_fixture('versions') + @update_partial('_update', '_conflict') + def test_updateVersions_duplicated(self): + update = models.ScenarioVersion(version='euphrates').format() + return [update, update] + + @update_url_fixture('versions') + @update_partial('_update', '_bad_request') + def test_updateVersions_bad_schema(self): + update = models.ScenarioVersion(version='euphrates').format() + update['not_owner'] = 'Iam' + return [update] + + @update_url_fixture('versions') + @update_partial('_delete', '_success') + def test_deleteVersions(self): + deletes = ['master'] + versions = self.req_d['installers'][0]['versions'] + self.req_d['installers'][0]['versions'] = filter( + lambda f: f['version'] != 'master', + versions) + return deletes + + @update_url_fixture('installers') + @update_partial('_add', '_success') + def test_addInstallers_succ(self): + add = models.ScenarioInstaller(installer='daisy').format() + self.req_d['installers'].append(add) + return [add] + + @update_url_fixture('installers') + @update_partial('_add', '_conflict') + def test_addInstallers_already_exist(self): + add = models.ScenarioInstaller(installer='apex').format() + return [add] + + @update_url_fixture('installers') + @update_partial('_add', '_bad_request') + def test_addInstallers_bad_schema(self): + add = models.ScenarioInstaller(installer='daisy').format() + add['not_exist'] = 'not_exist' + return [add] + + @update_url_fixture('installers') + @update_partial('_update', '_success') + def test_updateInstallers_succ(self): + update = models.ScenarioInstaller(installer='daisy').format() + self.req_d['installers'] = [update] + return [update] + + @update_url_fixture('installers') + @update_partial('_update', '_conflict') + def test_updateInstallers_duplicated(self): + update = models.ScenarioInstaller(installer='daisy').format() + return [update, update] + + @update_url_fixture('installers') + @update_partial('_update', '_bad_request') + def test_updateInstallers_bad_schema(self): + update = models.ScenarioInstaller(installer='daisy').format() + update['not_exist'] = 'not_exist' + return [update] + + @update_url_fixture('installers') + @update_partial('_delete', '_success') + def test_deleteInstallers(self): + deletes = ['apex'] + installers = self.req_d['installers'] + self.req_d['installers'] = filter( + lambda f: f['installer'] != 'apex', + installers) + return deletes + + @update_url_fixture('rename') + @update_partial('_update', '_success') + def test_renameScenario(self): + new_name = 'new_scenario_name' + update = models.ScenarioUpdateRequest(name=new_name) + self.req_d['name'] = new_name + return update + + @update_url_fixture('rename') + @update_partial('_update', '_forbidden') + def test_renameScenario_exist(self): + new_name = self.req_d['name'] + update = models.ScenarioUpdateRequest(name=new_name) + return update + + def _add(self, update_req): + return self.post_direct_url(self.update_url, update_req) + + def _update(self, update_req): + return self.update_direct_url(self.update_url, update_req) + + def _delete(self, update_req): + return self.delete_direct_url(self.update_url, update_req) + + def _success(self, status): + self.assertEqual(status, httplib.OK) + self.get_and_assert(self.req_d['name']) + + def _forbidden(self, status): + self.assertEqual(status, httplib.FORBIDDEN) + + def _bad_request(self, status): + self.assertEqual(status, httplib.BAD_REQUEST) + + def _conflict(self, status): + self.assertEqual(status, httplib.CONFLICT) diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py b/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py new file mode 100644 index 0000000..e4c668e --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py @@ -0,0 +1,201 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import copy +import httplib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.models import project_models +from opnfv_testapi.models import testcase_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class TestCaseBase(base.TestBase): + def setUp(self): + super(TestCaseBase, self).setUp() + self.req_d = testcase_models.TestcaseCreateRequest('vping_1', + '/cases/vping_1', + 'vping-ssh test') + self.req_e = testcase_models.TestcaseCreateRequest('doctor_1', + '/cases/doctor_1', + 'create doctor') + self.update_d = testcase_models.TestcaseUpdateRequest('vping_1', + 'vping-ssh test', + 'functest') + self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1', + 'create doctor', + 'functest') + self.get_res = testcase_models.Testcase + self.list_res = testcase_models.Testcases + self.update_res = testcase_models.Testcase + self.basePath = '/api/v1/projects/%s/cases' + self.create_project() + + def assert_body(self, case, req=None): + if not req: + req = self.req_d + self.assertEqual(case.name, req.name) + self.assertEqual(case.description, req.description) + self.assertEqual(case.url, req.url) + self.assertIsNotNone(case._id) + self.assertIsNotNone(case.creation_date) + + def assert_update_body(self, old, new, req=None): + if not req: + req = self.req_d + self.assertEqual(new.name, req.name) + self.assertEqual(new.description, req.description) + self.assertEqual(new.url, old.url) + self.assertIsNotNone(new._id) + self.assertIsNotNone(new.creation_date) + + def create_project(self): + req_p = project_models.ProjectCreateRequest('functest', + 'vping-ssh test') + self.create_help('/api/v1/projects', req_p) + self.project = req_p.name + + def create_d(self): + return super(TestCaseBase, self).create_d(self.project) + + def create_e(self): + return super(TestCaseBase, self).create_e(self.project) + + def get(self, case=None): + return super(TestCaseBase, self).get(self.project, case) + + def create(self, req=None, *args): + return super(TestCaseBase, self).create(req, self.project) + + def update(self, new=None, case=None): + return super(TestCaseBase, self).update(new, self.project, case) + + def delete(self, case): + return super(TestCaseBase, self).delete(self.project, case) + + +class TestCaseCreate(TestCaseBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_noBody(self): + return None + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noProject(self): + self.project = 'noProject' + return self.req_d + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_emptyName(self): + req_empty = testcase_models.TestcaseCreateRequest('') + return req_empty + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_noneName(self): + req_none = testcase_models.TestcaseCreateRequest(None) + return req_none + + @executor.create(httplib.OK, '_assert_success') + def test_success(self): + return self.req_d + + def _assert_success(self, body): + self.assert_create_body(body, self.req_d, self.project) + + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExist(self): + self.create_d() + return self.req_d + + +class TestCaseGet(TestCaseBase): + def setUp(self): + super(TestCaseGet, self).setUp() + self.create_d() + self.create_e() + + @executor.get(httplib.NOT_FOUND, message.not_found_base) + def test_notExist(self): + return 'notExist' + + @executor.get(httplib.OK, 'assert_body') + def test_getOne(self): + return self.req_d.name + + @executor.get(httplib.OK, '_list') + def test_list(self): + return None + + def _list(self, body): + for case in body.testcases: + if self.req_d.name == case.name: + self.assert_body(case) + else: + self.assert_body(case, self.req_e) + + +class TestCaseUpdate(TestCaseBase): + def setUp(self): + super(TestCaseUpdate, self).setUp() + self.create_d() + + @executor.update(httplib.BAD_REQUEST, message.no_body()) + def test_noBody(self): + return None, 'noBody' + + @executor.update(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return self.update_e, 'notFound' + + @executor.update(httplib.FORBIDDEN, message.exist_base) + def test_newNameExist(self): + self.create_e() + return self.update_e, self.req_d.name + + @executor.update(httplib.FORBIDDEN, message.no_update()) + def test_noUpdate(self): + return self.update_d, self.req_d.name + + @executor.update(httplib.OK, '_update_success') + def test_success(self): + return self.update_e, self.req_d.name + + @executor.update(httplib.OK, '_update_success') + def test_with_dollar(self): + update = copy.deepcopy(self.update_d) + update.description = {'2. change': 'dollar change'} + return update, self.req_d.name + + def _update_success(self, request, body): + self.assert_update_body(self.req_d, body, request) + _, new_body = self.get(request.name) + self.assert_update_body(self.req_d, new_body, request) + + +class TestCaseDelete(TestCaseBase): + def setUp(self): + super(TestCaseDelete, self).setUp() + self.create_d() + + @executor.delete(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return 'notFound' + + @executor.delete(httplib.OK, '_delete_success') + def test_success(self): + return self.req_d.name + + def _delete_success(self, body): + self.assertEqual(body, '') + code, body = self.get(self.req_d.name) + self.assertEqual(code, httplib.NOT_FOUND) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_token.py b/testapi/opnfv_testapi/tests/unit/handlers/test_token.py new file mode 100644 index 0000000..e8b746d --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_token.py @@ -0,0 +1,52 @@ +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +import httplib +import unittest + +from tornado import web + +from opnfv_testapi.common import message +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit import fake_pymongo +from opnfv_testapi.tests.unit.handlers import test_result + + +class TestTokenCreateResult(test_result.TestResultBase): + def get_app(self): + from opnfv_testapi.router import url_mappings + return web.Application( + url_mappings.mappings, + db=fake_pymongo, + debug=True, + auth=True + ) + + def setUp(self): + super(TestTokenCreateResult, self).setUp() + fake_pymongo.tokens.insert({"access_token": "12345"}) + + @executor.create(httplib.FORBIDDEN, message.invalid_token()) + def test_resultCreateTokenInvalid(self): + self.headers['X-Auth-Token'] = '1234' + return self.req_d + + @executor.create(httplib.UNAUTHORIZED, message.unauthorized()) + def test_resultCreateTokenUnauthorized(self): + if 'X-Auth-Token' in self.headers: + self.headers.pop('X-Auth-Token') + return self.req_d + + @executor.create(httplib.OK, '_create_success') + def test_resultCreateTokenSuccess(self): + self.headers['X-Auth-Token'] = '12345' + return self.req_d + + def _create_success(self, body): + self.assertIn('CreateResponse', str(type(body))) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_version.py b/testapi/opnfv_testapi/tests/unit/handlers/test_version.py new file mode 100644 index 0000000..6ef810f --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_version.py @@ -0,0 +1,36 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import httplib +import unittest + +from opnfv_testapi.models import base_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class TestVersionBase(base.TestBase): + def setUp(self): + super(TestVersionBase, self).setUp() + self.list_res = base_models.Versions + self.basePath = '/versions' + + +class TestVersion(TestVersionBase): + @executor.get(httplib.OK, '_get_success') + def test_success(self): + return None + + def _get_success(self, body): + self.assertEqual(len(body.versions), 1) + self.assertEqual(body.versions[0].version, 'v1.0') + self.assertEqual(body.versions[0].description, 'basics') + + +if __name__ == '__main__': + unittest.main() |