diff options
Diffstat (limited to 'utils/test/testapi/opnfv_testapi/tests/unit')
9 files changed, 1255 insertions, 0 deletions
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py new file mode 100644 index 000000000..3fc79f1d5 --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py @@ -0,0 +1,9 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +__author__ = 'serena' diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py new file mode 100644 index 000000000..3dd87e603 --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py @@ -0,0 +1,191 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from bson.objectid import ObjectId +from concurrent.futures import ThreadPoolExecutor +from operator import itemgetter + + +def thread_execute(method, *args, **kwargs): + with ThreadPoolExecutor(max_workers=2) as executor: + result = executor.submit(method, *args, **kwargs) + return result + + +class MemCursor(object): + def __init__(self, collection): + self.collection = collection + self.count = len(self.collection) + self.sorted = [] + + def _is_next_exist(self): + return self.count != 0 + + @property + def fetch_next(self): + return thread_execute(self._is_next_exist) + + def next_object(self): + self.count -= 1 + return self.collection.pop() + + def sort(self, key_or_list): + key = key_or_list[0][0] + if key_or_list[0][1] == -1: + reverse = True + else: + reverse = False + + if key_or_list is not None: + self.collection = sorted(self.collection, + key=itemgetter(key), reverse=reverse) + return self + + def limit(self, limit): + if limit != 0 and limit < len(self.collection): + self.collection = self.collection[0:limit] + self.count = limit + return self + + +class MemDb(object): + + def __init__(self): + self.contents = [] + pass + + def _find_one(self, spec_or_id=None, *args): + if spec_or_id is not None and not isinstance(spec_or_id, dict): + spec_or_id = {"_id": spec_or_id} + if '_id' in spec_or_id: + spec_or_id['_id'] = str(spec_or_id['_id']) + cursor = self._find(spec_or_id, *args) + for result in cursor: + return result + return None + + def find_one(self, spec_or_id=None, *args): + return thread_execute(self._find_one, spec_or_id, *args) + + def _insert(self, doc_or_docs, check_keys=True): + + docs = doc_or_docs + return_one = False + if isinstance(docs, dict): + return_one = True + docs = [docs] + + if check_keys: + for doc in docs: + self._check_keys(doc) + + ids = [] + for doc in docs: + if '_id' not in doc: + doc['_id'] = str(ObjectId()) + if not self._find_one(doc['_id']): + ids.append(doc['_id']) + self.contents.append(doc_or_docs) + + if len(ids) == 0: + return None + if return_one: + return ids[0] + else: + return ids + + def insert(self, doc_or_docs, check_keys=True): + return thread_execute(self._insert, doc_or_docs, check_keys) + + @staticmethod + def _compare_date(spec, value): + for k, v in spec.iteritems(): + if k == '$gte' and value >= v: + return True + return False + + @staticmethod + def _in(content, *args): + for arg in args: + for k, v in arg.iteritems(): + if k == 'start_date': + if not MemDb._compare_date(v, content.get(k)): + return False + elif k == 'trust_indicator.current': + if content.get('trust_indicator').get('current') != v: + return False + elif content.get(k, None) != v: + return False + + return True + + def _find(self, *args): + res = [] + for content in self.contents: + if self._in(content, *args): + res.append(content) + + return res + + def find(self, *args): + return MemCursor(self._find(*args)) + + def _update(self, spec, document, check_keys=True): + updated = False + + if check_keys: + self._check_keys(document) + + for index in range(len(self.contents)): + content = self.contents[index] + if self._in(content, spec): + for k, v in document.iteritems(): + updated = True + content[k] = v + self.contents[index] = content + return updated + + def update(self, spec, document, check_keys=True): + return thread_execute(self._update, spec, document, check_keys) + + def _remove(self, spec_or_id=None): + if spec_or_id is None: + self.contents = [] + if not isinstance(spec_or_id, dict): + spec_or_id = {'_id': spec_or_id} + for index in range(len(self.contents)): + content = self.contents[index] + if self._in(content, spec_or_id): + del self.contents[index] + return True + return False + + def remove(self, spec_or_id=None): + return thread_execute(self._remove, spec_or_id) + + def clear(self): + self._remove() + + def _check_keys(self, doc): + for key in doc.keys(): + if '.' in key: + raise NameError('key {} must not contain .'.format(key)) + if key.startswith('$'): + raise NameError('key {} must not start with $'.format(key)) + if isinstance(doc.get(key), dict): + self._check_keys(doc.get(key)) + + +def __getattr__(name): + return globals()[name] + + +pods = MemDb() +projects = MemDb() +testcases = MemDb() +results = MemDb() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py new file mode 100644 index 000000000..ff1a1932c --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py @@ -0,0 +1,136 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import json + +from tornado.web import Application +from tornado.testing import AsyncHTTPTestCase + +from opnfv_testapi.router import url_mappings +from opnfv_testapi.resources.models import CreateResponse +import fake_pymongo + + +class TestBase(AsyncHTTPTestCase): + headers = {'Content-Type': 'application/json; charset=UTF-8'} + + def setUp(self): + self.basePath = '' + self.create_res = CreateResponse + self.get_res = None + self.list_res = None + self.update_res = None + self.req_d = None + self.req_e = None + self.addCleanup(self._clear) + super(TestBase, self).setUp() + + def get_app(self): + return Application( + url_mappings.mappings, + db=fake_pymongo, + debug=True, + ) + + def create_d(self, *args): + return self.create(self.req_d, *args) + + def create_e(self, *args): + return self.create(self.req_e, *args) + + def create(self, req=None, *args): + return self.create_help(self.basePath, req, *args) + + def create_help(self, uri, req, *args): + if req: + req = req.format() + res = self.fetch(self._update_uri(uri, *args), + method='POST', + body=json.dumps(req), + headers=self.headers) + + return self._get_return(res, self.create_res) + + def get(self, *args): + res = self.fetch(self._get_uri(*args), + method='GET', + headers=self.headers) + + def inner(): + new_args, num = self._get_valid_args(*args) + return self.get_res \ + if num != self._need_arg_num(self.basePath) else self.list_res + return self._get_return(res, inner()) + + def query(self, query): + res = self.fetch(self._get_query_uri(query), + method='GET', + headers=self.headers) + return self._get_return(res, self.list_res) + + def update(self, new=None, *args): + if new: + new = new.format() + res = self.fetch(self._get_uri(*args), + method='PUT', + body=json.dumps(new), + headers=self.headers) + return self._get_return(res, self.update_res) + + def delete(self, *args): + res = self.fetch(self._get_uri(*args), + method='DELETE', + headers=self.headers) + return res.code, res.body + + @staticmethod + def _get_valid_args(*args): + new_args = tuple(['%s' % arg for arg in args if arg is not None]) + return new_args, len(new_args) + + def _need_arg_num(self, uri): + return uri.count('%s') + + def _get_query_uri(self, query): + return self.basePath + '?' + query + + def _get_uri(self, *args): + return self._update_uri(self.basePath, *args) + + def _update_uri(self, uri, *args): + r_uri = uri + new_args, num = self._get_valid_args(*args) + if num != self._need_arg_num(uri): + r_uri += '/%s' + + return r_uri % tuple(['%s' % arg for arg in new_args]) + + def _get_return(self, res, cls): + code = res.code + body = res.body + return code, self._get_return_body(code, body, cls) + + @staticmethod + def _get_return_body(code, body, cls): + return cls.from_dict(json.loads(body)) if code < 300 and cls else body + + def assert_href(self, body): + self.assertIn(self.basePath, body.href) + + def assert_create_body(self, body, req=None, *args): + if not req: + req = self.req_d + new_args = args + tuple([req.name]) + self.assertIn(self._get_uri(*new_args), body.href) + + @staticmethod + def _clear(): + fake_pymongo.pods.clear() + fake_pymongo.projects.clear() + fake_pymongo.testcases.clear() + fake_pymongo.results.clear() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py new file mode 100644 index 000000000..5f50ba867 --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py @@ -0,0 +1,123 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest + +from tornado import gen +from tornado.testing import AsyncHTTPTestCase, gen_test +from tornado.web import Application + +import fake_pymongo + + +class MyTest(AsyncHTTPTestCase): + def setUp(self): + super(MyTest, self).setUp() + self.db = fake_pymongo + self.addCleanup(self._clear) + self.io_loop.run_sync(self.fixture_setup) + + def get_app(self): + return Application() + + @gen.coroutine + def fixture_setup(self): + self.test1 = {'_id': '1', 'name': 'test1'} + self.test2 = {'name': 'test2'} + yield self.db.pods.insert({'_id': '1', 'name': 'test1'}) + yield self.db.pods.insert({'name': 'test2'}) + + @gen_test + def test_find_one(self): + user = yield self.db.pods.find_one({'name': 'test1'}) + self.assertEqual(user, self.test1) + self.db.pods.remove() + + @gen_test + def test_find(self): + cursor = self.db.pods.find() + names = [] + while (yield cursor.fetch_next): + ob = cursor.next_object() + names.append(ob.get('name')) + self.assertItemsEqual(names, ['test1', 'test2']) + + @gen_test + def test_update(self): + yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'}) + user = yield self.db.pods.find_one({'_id': '1'}) + self.assertEqual(user.get('name', None), 'new_test1') + + def test_update_dot_error(self): + self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, + 'key 1. name must not contain .') + + def test_update_dot_no_error(self): + self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, + None, + check_keys=False) + + def test_update_dollar_error(self): + self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, + 'key $name must not start with $') + + def test_update_dollar_no_error(self): + self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, + None, + check_keys=False) + + @gen_test + def test_remove(self): + yield self.db.pods.remove({'_id': '1'}) + user = yield self.db.pods.find_one({'_id': '1'}) + self.assertIsNone(user) + + def test_insert_dot_error(self): + self._insert_assert({'_id': '1', '2. name': 'test1'}, + 'key 2. name must not contain .') + + def test_insert_dot_no_error(self): + self._insert_assert({'_id': '1', '2. name': 'test1'}, + None, + check_keys=False) + + def test_insert_dollar_error(self): + self._insert_assert({'_id': '1', '$name': 'test1'}, + 'key $name must not start with $') + + def test_insert_dollar_no_error(self): + self._insert_assert({'_id': '1', '$name': 'test1'}, + None, + check_keys=False) + + def _clear(self): + self.db.pods.clear() + + def _update_assert(self, docs, error=None, **kwargs): + self._db_assert('update', error, {'_id': '1'}, docs, **kwargs) + + def _insert_assert(self, docs, error=None, **kwargs): + self._db_assert('insert', error, docs, **kwargs) + + @gen_test + def _db_assert(self, method, error, *args, **kwargs): + name_error = None + try: + yield self._eval_pods_db(method, *args, **kwargs) + except NameError as err: + name_error = err.args[0] + finally: + self.assertEqual(name_error, error) + + def _eval_pods_db(self, method, *args, **kwargs): + table_obj = vars(self.db)['pods'] + return table_obj.__getattribute__(method)(*args, **kwargs) + + +if __name__ == '__main__': + unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py new file mode 100644 index 000000000..a1184d554 --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py @@ -0,0 +1,89 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest + +from test_base import TestBase +from opnfv_testapi.resources.pod_models import PodCreateRequest, Pod, Pods +from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \ + HTTP_FORBIDDEN, HTTP_NOT_FOUND + + +class TestPodBase(TestBase): + def setUp(self): + super(TestPodBase, self).setUp() + self.req_d = PodCreateRequest('zte-1', 'virtual', + 'zte pod 1', 'ci-pod') + self.req_e = PodCreateRequest('zte-2', 'metal', 'zte pod 2') + self.get_res = Pod + self.list_res = Pods + self.basePath = '/api/v1/pods' + + def assert_get_body(self, pod, req=None): + if not req: + req = self.req_d + self.assertEqual(pod.name, req.name) + self.assertEqual(pod.mode, req.mode) + self.assertEqual(pod.details, req.details) + self.assertEqual(pod.role, req.role) + self.assertIsNotNone(pod.creation_date) + self.assertIsNotNone(pod._id) + + +class TestPodCreate(TestPodBase): + def test_withoutBody(self): + (code, body) = self.create() + self.assertEqual(code, HTTP_BAD_REQUEST) + + def test_emptyName(self): + req_empty = PodCreateRequest('') + (code, body) = self.create(req_empty) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('name missing', body) + + def test_noneName(self): + req_none = PodCreateRequest(None) + (code, body) = self.create(req_none) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('name missing', body) + + def test_success(self): + code, body = self.create_d() + self.assertEqual(code, HTTP_OK) + self.assert_create_body(body) + + def test_alreadyExist(self): + self.create_d() + code, body = self.create_d() + self.assertEqual(code, HTTP_FORBIDDEN) + self.assertIn('already exists', body) + + +class TestPodGet(TestPodBase): + def test_notExist(self): + code, body = self.get('notExist') + self.assertEqual(code, HTTP_NOT_FOUND) + + def test_getOne(self): + self.create_d() + code, body = self.get(self.req_d.name) + self.assert_get_body(body) + + def test_list(self): + self.create_d() + self.create_e() + code, body = self.get() + self.assertEqual(len(body.pods), 2) + for pod in body.pods: + if self.req_d.name == pod.name: + self.assert_get_body(pod) + else: + self.assert_get_body(pod, self.req_e) + +if __name__ == '__main__': + unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py new file mode 100644 index 000000000..327ddf7b2 --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py @@ -0,0 +1,141 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest + +from test_base import TestBase +from opnfv_testapi.resources.project_models import ProjectCreateRequest, \ + Project, Projects, ProjectUpdateRequest +from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \ + HTTP_FORBIDDEN, HTTP_NOT_FOUND + + +class TestProjectBase(TestBase): + def setUp(self): + super(TestProjectBase, self).setUp() + self.req_d = ProjectCreateRequest('vping', 'vping-ssh test') + self.req_e = ProjectCreateRequest('doctor', 'doctor test') + self.get_res = Project + self.list_res = Projects + self.update_res = Project + self.basePath = '/api/v1/projects' + + def assert_body(self, project, req=None): + if not req: + req = self.req_d + self.assertEqual(project.name, req.name) + self.assertEqual(project.description, req.description) + self.assertIsNotNone(project._id) + self.assertIsNotNone(project.creation_date) + + +class TestProjectCreate(TestProjectBase): + def test_withoutBody(self): + (code, body) = self.create() + self.assertEqual(code, HTTP_BAD_REQUEST) + + def test_emptyName(self): + req_empty = ProjectCreateRequest('') + (code, body) = self.create(req_empty) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('name missing', body) + + def test_noneName(self): + req_none = ProjectCreateRequest(None) + (code, body) = self.create(req_none) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('name missing', body) + + def test_success(self): + (code, body) = self.create_d() + self.assertEqual(code, HTTP_OK) + self.assert_create_body(body) + + def test_alreadyExist(self): + self.create_d() + (code, body) = self.create_d() + self.assertEqual(code, HTTP_FORBIDDEN) + self.assertIn('already exists', body) + + +class TestProjectGet(TestProjectBase): + def test_notExist(self): + code, body = self.get('notExist') + self.assertEqual(code, HTTP_NOT_FOUND) + + def test_getOne(self): + self.create_d() + code, body = self.get(self.req_d.name) + self.assertEqual(code, HTTP_OK) + self.assert_body(body) + + def test_list(self): + self.create_d() + self.create_e() + code, body = self.get() + for project in body.projects: + if self.req_d.name == project.name: + self.assert_body(project) + else: + self.assert_body(project, self.req_e) + + +class TestProjectUpdate(TestProjectBase): + def test_withoutBody(self): + code, _ = self.update(None, 'noBody') + self.assertEqual(code, HTTP_BAD_REQUEST) + + def test_notFound(self): + code, _ = self.update(self.req_e, 'notFound') + self.assertEqual(code, HTTP_NOT_FOUND) + + def test_newNameExist(self): + self.create_d() + self.create_e() + code, body = self.update(self.req_e, self.req_d.name) + self.assertEqual(code, HTTP_FORBIDDEN) + self.assertIn("already exists", body) + + def test_noUpdate(self): + self.create_d() + code, body = self.update(self.req_d, self.req_d.name) + self.assertEqual(code, HTTP_FORBIDDEN) + self.assertIn("Nothing to update", body) + + def test_success(self): + self.create_d() + code, body = self.get(self.req_d.name) + _id = body._id + + req = ProjectUpdateRequest('newName', 'new description') + code, body = self.update(req, self.req_d.name) + self.assertEqual(code, HTTP_OK) + self.assertEqual(_id, body._id) + self.assert_body(body, req) + + _, new_body = self.get(req.name) + self.assertEqual(_id, new_body._id) + self.assert_body(new_body, req) + + +class TestProjectDelete(TestProjectBase): + def test_notFound(self): + code, body = self.delete('notFound') + self.assertEqual(code, HTTP_NOT_FOUND) + + def test_success(self): + self.create_d() + code, body = self.delete(self.req_d.name) + self.assertEqual(code, HTTP_OK) + self.assertEqual(body, '') + + code, body = self.get(self.req_d.name) + self.assertEqual(code, HTTP_NOT_FOUND) + +if __name__ == '__main__': + unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py new file mode 100644 index 000000000..8479b35cd --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py @@ -0,0 +1,339 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import copy +import unittest +from datetime import datetime, timedelta + +from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \ + HTTP_NOT_FOUND +from opnfv_testapi.resources.pod_models import PodCreateRequest +from opnfv_testapi.resources.project_models import ProjectCreateRequest +from opnfv_testapi.resources.result_models import ResultCreateRequest, \ + TestResult, TestResults, ResultUpdateRequest, TI, TIHistory +from opnfv_testapi.resources.testcase_models import TestcaseCreateRequest +from test_base import TestBase + + +class Details(object): + def __init__(self, timestart=None, duration=None, status=None): + self.timestart = timestart + self.duration = duration + self.status = status + + def format(self): + return { + "timestart": self.timestart, + "duration": self.duration, + "status": self.status + } + + @staticmethod + def from_dict(a_dict): + + if a_dict is None: + return None + + t = Details() + t.timestart = a_dict.get('timestart') + t.duration = a_dict.get('duration') + t.status = a_dict.get('status') + return t + + +class TestResultBase(TestBase): + def setUp(self): + self.pod = 'zte-pod1' + self.project = 'functest' + self.case = 'vPing' + self.installer = 'fuel' + self.version = 'C' + self.build_tag = 'v3.0' + self.scenario = 'odl-l2' + self.criteria = 'passed' + self.trust_indicator = TI(0.7) + self.start_date = "2016-05-23 07:16:09.477097" + self.stop_date = "2016-05-23 07:16:19.477097" + self.update_date = "2016-05-24 07:16:19.477097" + self.update_step = -0.05 + super(TestResultBase, self).setUp() + self.details = Details(timestart='0', duration='9s', status='OK') + self.req_d = ResultCreateRequest(pod_name=self.pod, + project_name=self.project, + case_name=self.case, + installer=self.installer, + version=self.version, + start_date=self.start_date, + stop_date=self.stop_date, + details=self.details.format(), + build_tag=self.build_tag, + scenario=self.scenario, + criteria=self.criteria, + trust_indicator=self.trust_indicator) + self.get_res = TestResult + self.list_res = TestResults + self.update_res = TestResult + self.basePath = '/api/v1/results' + self.req_pod = PodCreateRequest(self.pod, 'metal', 'zte pod 1') + self.req_project = ProjectCreateRequest(self.project, 'vping test') + self.req_testcase = TestcaseCreateRequest(self.case, + '/cases/vping', + 'vping-ssh test') + self.create_help('/api/v1/pods', self.req_pod) + self.create_help('/api/v1/projects', self.req_project) + self.create_help('/api/v1/projects/%s/cases', + self.req_testcase, + self.project) + + def assert_res(self, code, result, req=None): + self.assertEqual(code, HTTP_OK) + if req is None: + req = self.req_d + self.assertEqual(result.pod_name, req.pod_name) + self.assertEqual(result.project_name, req.project_name) + self.assertEqual(result.case_name, req.case_name) + self.assertEqual(result.installer, req.installer) + self.assertEqual(result.version, req.version) + details_req = Details.from_dict(req.details) + details_res = Details.from_dict(result.details) + self.assertEqual(details_res.duration, details_req.duration) + self.assertEqual(details_res.timestart, details_req.timestart) + self.assertEqual(details_res.status, details_req.status) + self.assertEqual(result.build_tag, req.build_tag) + self.assertEqual(result.scenario, req.scenario) + self.assertEqual(result.criteria, req.criteria) + self.assertEqual(result.start_date, req.start_date) + self.assertEqual(result.stop_date, req.stop_date) + self.assertIsNotNone(result._id) + ti = result.trust_indicator + self.assertEqual(ti.current, req.trust_indicator.current) + if ti.histories: + history = ti.histories[0] + self.assertEqual(history.date, self.update_date) + self.assertEqual(history.step, self.update_step) + + def _create_d(self): + _, res = self.create_d() + return res.href.split('/')[-1] + + +class TestResultCreate(TestResultBase): + def test_nobody(self): + (code, body) = self.create(None) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('no body', body) + + def test_podNotProvided(self): + req = self.req_d + req.pod_name = None + (code, body) = self.create(req) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('pod_name missing', body) + + def test_projectNotProvided(self): + req = self.req_d + req.project_name = None + (code, body) = self.create(req) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('project_name missing', body) + + def test_testcaseNotProvided(self): + req = self.req_d + req.case_name = None + (code, body) = self.create(req) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('case_name missing', body) + + def test_noPod(self): + req = self.req_d + req.pod_name = 'notExistPod' + (code, body) = self.create(req) + self.assertEqual(code, HTTP_NOT_FOUND) + self.assertIn('Could not find pod', body) + + def test_noProject(self): + req = self.req_d + req.project_name = 'notExistProject' + (code, body) = self.create(req) + self.assertEqual(code, HTTP_NOT_FOUND) + self.assertIn('Could not find project', body) + + def test_noTestcase(self): + req = self.req_d + req.case_name = 'notExistTestcase' + (code, body) = self.create(req) + self.assertEqual(code, HTTP_NOT_FOUND) + self.assertIn('Could not find testcase', body) + + def test_success(self): + (code, body) = self.create_d() + self.assertEqual(code, HTTP_OK) + self.assert_href(body) + + def test_key_with_doc(self): + req = copy.deepcopy(self.req_d) + req.details = {'1.name': 'dot_name'} + (code, body) = self.create(req) + self.assertEqual(code, HTTP_OK) + self.assert_href(body) + + def test_no_ti(self): + req = ResultCreateRequest(pod_name=self.pod, + project_name=self.project, + case_name=self.case, + installer=self.installer, + version=self.version, + start_date=self.start_date, + stop_date=self.stop_date, + details=self.details.format(), + build_tag=self.build_tag, + scenario=self.scenario, + criteria=self.criteria) + (code, res) = self.create(req) + _id = res.href.split('/')[-1] + self.assertEqual(code, HTTP_OK) + code, body = self.get(_id) + self.assert_res(code, body, req) + + +class TestResultGet(TestResultBase): + def test_getOne(self): + _id = self._create_d() + code, body = self.get(_id) + self.assert_res(code, body) + + def test_queryPod(self): + self._query_and_assert(self._set_query('pod')) + + def test_queryProject(self): + self._query_and_assert(self._set_query('project')) + + def test_queryTestcase(self): + self._query_and_assert(self._set_query('case')) + + def test_queryVersion(self): + self._query_and_assert(self._set_query('version')) + + def test_queryInstaller(self): + self._query_and_assert(self._set_query('installer')) + + def test_queryBuildTag(self): + self._query_and_assert(self._set_query('build_tag')) + + def test_queryScenario(self): + self._query_and_assert(self._set_query('scenario')) + + def test_queryTrustIndicator(self): + self._query_and_assert(self._set_query('trust_indicator')) + + def test_queryCriteria(self): + self._query_and_assert(self._set_query('criteria')) + + def test_queryPeriodNotInt(self): + code, body = self.query(self._set_query('period=a')) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('period must be int', body) + + def test_queryPeriodFail(self): + self._query_and_assert(self._set_query('period=1'), + found=False, days=-10) + + def test_queryPeriodSuccess(self): + self._query_and_assert(self._set_query('period=1'), + found=True) + + def test_queryLastNotInt(self): + code, body = self.query(self._set_query('last=a')) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('last must be int', body) + + def test_queryLast(self): + self._create_changed_date() + req = self._create_changed_date(minutes=20) + self._create_changed_date(minutes=-20) + self._query_and_assert(self._set_query('last=1'), req=req) + + def test_combination(self): + self._query_and_assert(self._set_query('pod', + 'project', + 'case', + 'version', + 'installer', + 'build_tag', + 'scenario', + 'trust_indicator', + 'criteria', + 'period=1')) + + def test_notFound(self): + self._query_and_assert(self._set_query('pod=notExistPod', + 'project', + 'case', + 'version', + 'installer', + 'build_tag', + 'scenario', + 'trust_indicator', + 'criteria', + 'period=1'), + found=False) + + def _query_and_assert(self, query, found=True, req=None, **kwargs): + if req is None: + req = self._create_changed_date(**kwargs) + code, body = self.query(query) + if not found: + self.assertEqual(code, HTTP_OK) + self.assertEqual(0, len(body.results)) + else: + self.assertEqual(1, len(body.results)) + for result in body.results: + self.assert_res(code, result, req) + + def _create_changed_date(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.start_date = datetime.now() + timedelta(**kwargs) + req.stop_date = str(req.start_date + timedelta(minutes=10)) + req.start_date = str(req.start_date) + self.create(req) + return req + + def _set_query(self, *args): + def get_value(arg): + return self.__getattribute__(arg) \ + if arg != 'trust_indicator' else self.trust_indicator.current + uri = '' + for arg in args: + if '=' in arg: + uri += arg + '&' + else: + uri += '{}={}&'.format(arg, get_value(arg)) + return uri[0: -1] + + +class TestResultUpdate(TestResultBase): + def test_success(self): + _id = self._create_d() + + new_ti = copy.deepcopy(self.trust_indicator) + new_ti.current += self.update_step + new_ti.histories.append(TIHistory(self.update_date, self.update_step)) + new_data = copy.deepcopy(self.req_d) + new_data.trust_indicator = new_ti + update = ResultUpdateRequest(trust_indicator=new_ti) + code, body = self.update(update, _id) + self.assertEqual(_id, body._id) + self.assert_res(code, body, new_data) + + code, new_body = self.get(_id) + self.assertEqual(_id, new_body._id) + self.assert_res(code, new_body, new_data) + + +if __name__ == '__main__': + unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py new file mode 100644 index 000000000..cb767844a --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py @@ -0,0 +1,196 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest +import copy + +from test_base import TestBase +from opnfv_testapi.resources.testcase_models import TestcaseCreateRequest, \ + Testcase, Testcases, TestcaseUpdateRequest +from opnfv_testapi.resources.project_models import ProjectCreateRequest +from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \ + HTTP_FORBIDDEN, HTTP_NOT_FOUND + + +class TestCaseBase(TestBase): + def setUp(self): + super(TestCaseBase, self).setUp() + self.req_d = TestcaseCreateRequest('vping_1', + '/cases/vping_1', + 'vping-ssh test') + self.req_e = TestcaseCreateRequest('doctor_1', + '/cases/doctor_1', + 'create doctor') + self.update_d = TestcaseUpdateRequest('vping_1', + 'vping-ssh test', + 'functest') + self.update_e = TestcaseUpdateRequest('doctor_1', + 'create doctor', + 'functest') + self.get_res = Testcase + self.list_res = Testcases + self.update_res = Testcase + self.basePath = '/api/v1/projects/%s/cases' + self.create_project() + + def assert_body(self, case, req=None): + if not req: + req = self.req_d + self.assertEqual(case.name, req.name) + self.assertEqual(case.description, req.description) + self.assertEqual(case.url, req.url) + self.assertIsNotNone(case._id) + self.assertIsNotNone(case.creation_date) + + def assert_update_body(self, old, new, req=None): + if not req: + req = self.req_d + self.assertEqual(new.name, req.name) + self.assertEqual(new.description, req.description) + self.assertEqual(new.url, old.url) + self.assertIsNotNone(new._id) + self.assertIsNotNone(new.creation_date) + + def create_project(self): + req_p = ProjectCreateRequest('functest', 'vping-ssh test') + self.create_help('/api/v1/projects', req_p) + self.project = req_p.name + + def create_d(self): + return super(TestCaseBase, self).create_d(self.project) + + def create_e(self): + return super(TestCaseBase, self).create_e(self.project) + + def get(self, case=None): + return super(TestCaseBase, self).get(self.project, case) + + def update(self, new=None, case=None): + return super(TestCaseBase, self).update(new, self.project, case) + + def delete(self, case): + return super(TestCaseBase, self).delete(self.project, case) + + +class TestCaseCreate(TestCaseBase): + def test_noBody(self): + (code, body) = self.create(None, 'vping') + self.assertEqual(code, HTTP_BAD_REQUEST) + + def test_noProject(self): + code, body = self.create(self.req_d, 'noProject') + self.assertEqual(code, HTTP_FORBIDDEN) + self.assertIn('Could not find project', body) + + def test_emptyName(self): + req_empty = TestcaseCreateRequest('') + (code, body) = self.create(req_empty, self.project) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('name missing', body) + + def test_noneName(self): + req_none = TestcaseCreateRequest(None) + (code, body) = self.create(req_none, self.project) + self.assertEqual(code, HTTP_BAD_REQUEST) + self.assertIn('name missing', body) + + def test_success(self): + code, body = self.create_d() + self.assertEqual(code, HTTP_OK) + self.assert_create_body(body, None, self.project) + + def test_alreadyExist(self): + self.create_d() + code, body = self.create_d() + self.assertEqual(code, HTTP_FORBIDDEN) + self.assertIn('already exists', body) + + +class TestCaseGet(TestCaseBase): + def test_notExist(self): + code, body = self.get('notExist') + self.assertEqual(code, HTTP_NOT_FOUND) + + def test_getOne(self): + self.create_d() + code, body = self.get(self.req_d.name) + self.assertEqual(code, HTTP_OK) + self.assert_body(body) + + def test_list(self): + self.create_d() + self.create_e() + code, body = self.get() + for case in body.testcases: + if self.req_d.name == case.name: + self.assert_body(case) + else: + self.assert_body(case, self.req_e) + + +class TestCaseUpdate(TestCaseBase): + def test_noBody(self): + code, _ = self.update(case='noBody') + self.assertEqual(code, HTTP_BAD_REQUEST) + + def test_notFound(self): + code, _ = self.update(self.update_e, 'notFound') + self.assertEqual(code, HTTP_NOT_FOUND) + + def test_newNameExist(self): + self.create_d() + self.create_e() + code, body = self.update(self.update_e, self.req_d.name) + self.assertEqual(code, HTTP_FORBIDDEN) + self.assertIn("already exists", body) + + def test_noUpdate(self): + self.create_d() + code, body = self.update(self.update_d, self.req_d.name) + self.assertEqual(code, HTTP_FORBIDDEN) + self.assertIn("Nothing to update", body) + + def test_success(self): + self.create_d() + code, body = self.get(self.req_d.name) + _id = body._id + + code, body = self.update(self.update_e, self.req_d.name) + self.assertEqual(code, HTTP_OK) + self.assertEqual(_id, body._id) + self.assert_update_body(self.req_d, body, self.update_e) + + _, new_body = self.get(self.req_e.name) + self.assertEqual(_id, new_body._id) + self.assert_update_body(self.req_d, new_body, self.update_e) + + def test_with_dollar(self): + self.create_d() + update = copy.deepcopy(self.update_d) + update.description = {'2. change': 'dollar change'} + code, body = self.update(update, self.req_d.name) + self.assertEqual(code, HTTP_OK) + + +class TestCaseDelete(TestCaseBase): + def test_notFound(self): + code, body = self.delete('notFound') + self.assertEqual(code, HTTP_NOT_FOUND) + + def test_success(self): + self.create_d() + code, body = self.delete(self.req_d.name) + self.assertEqual(code, HTTP_OK) + self.assertEqual(body, '') + + code, body = self.get(self.req_d.name) + self.assertEqual(code, HTTP_NOT_FOUND) + + +if __name__ == '__main__': + unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py new file mode 100644 index 000000000..b6fbf45dc --- /dev/null +++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py @@ -0,0 +1,31 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import unittest + +from test_base import TestBase +from opnfv_testapi.resources.models import Versions + + +class TestVersionBase(TestBase): + def setUp(self): + super(TestVersionBase, self).setUp() + self.list_res = Versions + self.basePath = '/versions' + + +class TestVersion(TestVersionBase): + def test_success(self): + code, body = self.get() + self.assertEqual(200, code) + self.assertEqual(len(body.versions), 1) + self.assertEqual(body.versions[0].version, 'v1.0') + self.assertEqual(body.versions[0].description, 'basics') + +if __name__ == '__main__': + unittest.main() |