diff options
author | Trevor Bramwell <tbramwell@linuxfoundation.org> | 2017-11-10 15:43:35 -0800 |
---|---|---|
committer | Trevor Bramwell <tbramwell@linuxfoundation.org> | 2017-11-10 15:45:32 -0800 |
commit | f11f26d23dabde24b0bcd67ac81b094aa89eb6c9 (patch) | |
tree | 500546f6f553b049eb9ac146e7c8359d073fbf7a /utils/test/testapi/opnfv_testapi/tests/unit | |
parent | 122cf34bf3e656e1b7fa35e07dd8a71e42ed4d59 (diff) |
Remove 'utils/test' Directory and update INFO
utils/test has been migrated to the releng-testresults repo
Change-Id: If14a30e6abed1424d1e00b0fae048b7d869ec99b
Signed-off-by: Trevor Bramwell <tbramwell@linuxfoundation.org>
Diffstat (limited to 'utils/test/testapi/opnfv_testapi/tests/unit')
17 files changed, 0 insertions, 2184 deletions
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py deleted file mode 100644 index 3fc79f1d5..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -__author__ = 'serena' diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/common/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/common/__init__.py +++ /dev/null diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py b/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py deleted file mode 100644 index 6d160ce1d..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py +++ /dev/null @@ -1,25 +0,0 @@ -import argparse -import pytest - - -def test_config_normal(mocker, config_normal): - mocker.patch( - 'argparse.ArgumentParser.parse_known_args', - return_value=(argparse.Namespace(config_file=config_normal), None)) - from opnfv_testapi.common import config - CONF = config.Config() - assert CONF.mongo_url == 'mongodb://127.0.0.1:27017/' - assert CONF.mongo_dbname == 'test_results_collection' - assert CONF.api_port == 8000 - assert CONF.api_debug is True - assert CONF.api_token_check is False - assert CONF.api_authenticate is True - assert CONF.ui_url == 'http://localhost:8000' - - -def test_config_file_not_exist(mocker): - mocker.patch('os.path.exists', return_value=False) - with pytest.raises(Exception) as m_exc: - from opnfv_testapi.common import config - config.Config() - assert 'not found' in str(m_exc.value) diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/conftest.py b/utils/test/testapi/opnfv_testapi/tests/unit/conftest.py deleted file mode 100644 index 75e621d0e..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/conftest.py +++ /dev/null @@ -1,8 +0,0 @@ -from os import path - -import pytest - - -@pytest.fixture -def config_normal(): - return path.join(path.dirname(__file__), '../../../etc/config.ini') diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/executor.py b/utils/test/testapi/opnfv_testapi/tests/unit/executor.py deleted file mode 100644 index 743c07615..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/executor.py +++ /dev/null @@ -1,130 +0,0 @@ -############################################################################## -# Copyright (c) 2017 ZTE Corp -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import functools -import httplib - -from concurrent.futures import ThreadPoolExecutor -import mock - - -O_get_secure_cookie = ( - 'opnfv_testapi.handlers.base_handlers.GenericApiHandler.get_secure_cookie') - - -def thread_execute(method, *args, **kwargs): - with ThreadPoolExecutor(max_workers=2) as executor: - result = executor.submit(method, *args, **kwargs) - return result - - -def mock_invalid_lfid(): - def _mock_invalid_lfid(xstep): - def wrap(self, *args, **kwargs): - with mock.patch(O_get_secure_cookie) as m_cookie: - m_cookie.return_value = 'InvalidUser' - return xstep(self, *args, **kwargs) - return wrap - return _mock_invalid_lfid - - -def mock_valid_lfid(): - def _mock_valid_lfid(xstep): - def wrap(self, *args, **kwargs): - with mock.patch(O_get_secure_cookie) as m_cookie: - m_cookie.return_value = 'ValidUser' - return xstep(self, *args, **kwargs) - return wrap - return _mock_valid_lfid - - -def upload(excepted_status, excepted_response): - def _upload(create_request): - @functools.wraps(create_request) - def wrap(self): - request = create_request(self) - status, body = self.upload(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body) - else: - self.assertIn(excepted_response, body) - return wrap - return _upload - - -def create(excepted_status, excepted_response): - def _create(create_request): - @functools.wraps(create_request) - def wrap(self): - request = create_request(self) - status, body = self.create(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body) - else: - self.assertIn(excepted_response, body) - return wrap - return _create - - -def get(excepted_status, excepted_response): - def _get(get_request): - @functools.wraps(get_request) - def wrap(self): - request = get_request(self) - status, body = self.get(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body) - else: - self.assertIn(excepted_response, body) - return wrap - return _get - - -def update(excepted_status, excepted_response): - def _update(update_request): - @functools.wraps(update_request) - def wrap(self): - request, resource = update_request(self) - status, body = self.update(request, resource) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(request, body) - else: - self.assertIn(excepted_response, body) - return wrap - return _update - - -def delete(excepted_status, excepted_response): - def _delete(delete_request): - @functools.wraps(delete_request) - def wrap(self): - request = delete_request(self) - if isinstance(request, tuple): - status, body = self.delete(request[0], *(request[1])) - else: - status, body = self.delete(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body) - else: - self.assertIn(excepted_response, body) - return wrap - return _delete - - -def query(excepted_status, excepted_response, number=0): - def _query(get_request): - @functools.wraps(get_request) - def wrap(self): - request = get_request(self) - status, body = self.query(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body, number) - else: - self.assertIn(excepted_response, body) - return wrap - return _query diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py deleted file mode 100644 index c44a92c11..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py +++ /dev/null @@ -1,291 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import re - -from operator import itemgetter - -from bson.objectid import ObjectId -from concurrent.futures import ThreadPoolExecutor - - -def thread_execute(method, *args, **kwargs): - with ThreadPoolExecutor(max_workers=2) as executor: - result = executor.submit(method, *args, **kwargs) - return result - - -class MemCursor(object): - def __init__(self, collection): - self.collection = collection - self.length = len(self.collection) - self.sorted = [] - - def _is_next_exist(self): - return self.length != 0 - - @property - def fetch_next(self): - return thread_execute(self._is_next_exist) - - def next_object(self): - self.length -= 1 - return self.collection.pop() - - def sort(self, key_or_list): - for k, v in key_or_list.iteritems(): - if v == -1: - reverse = True - else: - reverse = False - - self.collection = sorted(self.collection, - key=itemgetter(k), reverse=reverse) - return self - - def limit(self, limit): - if limit != 0 and limit < len(self.collection): - self.collection = self.collection[0: limit] - self.length = limit - return self - - def skip(self, skip): - if skip < self.length and (skip > 0): - self.collection = self.collection[self.length - skip: -1] - self.length -= skip - elif skip >= self.length: - self.collection = [] - self.length = 0 - return self - - def _count(self): - return self.length - - def count(self): - return thread_execute(self._count) - - -class MemDb(object): - - def __init__(self, name): - self.name = name - self.contents = [] - pass - - def _find_one(self, spec_or_id=None, *args): - if spec_or_id is not None and not isinstance(spec_or_id, dict): - spec_or_id = {"_id": spec_or_id} - if '_id' in spec_or_id: - spec_or_id['_id'] = str(spec_or_id['_id']) - cursor = self._find(spec_or_id, *args) - for result in cursor: - return result - return None - - def find_one(self, spec_or_id=None, *args): - return thread_execute(self._find_one, spec_or_id, *args) - - def _insert(self, doc_or_docs, check_keys=True): - - docs = doc_or_docs - return_one = False - if isinstance(docs, dict): - return_one = True - docs = [docs] - - if check_keys: - for doc in docs: - self._check_keys(doc) - - ids = [] - for doc in docs: - if '_id' not in doc: - doc['_id'] = str(ObjectId()) - if not self._find_one(doc['_id']): - ids.append(doc['_id']) - self.contents.append(doc_or_docs) - - if len(ids) == 0: - return None - if return_one: - return ids[0] - else: - return ids - - def insert(self, doc_or_docs, check_keys=True): - return thread_execute(self._insert, doc_or_docs, check_keys) - - @staticmethod - def _compare_date(spec, value): - gte = True - lt = False - for k, v in spec.iteritems(): - if k == '$gte' and value < v: - gte = False - elif k == '$lt' and value < v: - lt = True - return gte and lt - - def _in(self, content, *args): - if self.name == 'scenarios': - return self._in_scenarios(content, *args) - else: - return self._in_others(content, *args) - - def _in_scenarios_installer(self, installer, content): - hit = False - for s_installer in content['installers']: - if installer == s_installer['installer']: - hit = True - - return hit - - def _in_scenarios_version(self, version, content): - hit = False - for s_installer in content['installers']: - for s_version in s_installer['versions']: - if version == s_version['version']: - hit = True - return hit - - def _in_scenarios_project(self, project, content): - hit = False - for s_installer in content['installers']: - for s_version in s_installer['versions']: - for s_project in s_version['projects']: - if project == s_project['project']: - hit = True - - return hit - - def _in_scenarios(self, content, *args): - for arg in args: - for k, v in arg.iteritems(): - if k == 'installers': - for inner in v.values(): - for i_k, i_v in inner.iteritems(): - if i_k == 'installer': - return self._in_scenarios_installer(i_v, - content) - elif i_k == 'versions.version': - return self._in_scenarios_version(i_v, - content) - elif i_k == 'versions.projects.project': - return self._in_scenarios_project(i_v, - content) - elif content.get(k, None) != v: - return False - - return True - - def _in_others(self, content, *args): - for arg in args: - for k, v in arg.iteritems(): - if k == 'start_date': - if not MemDb._compare_date(v, content.get(k)): - return False - elif k == 'trust_indicator.current': - if content.get('trust_indicator').get('current') != v: - return False - elif not isinstance(v, dict): - if isinstance(v, re._pattern_type): - if v.match(content.get(k, None)) is None: - return False - else: - if content.get(k, None) != v: - return False - return True - - def _find(self, *args): - res = [] - for content in self.contents: - if self._in(content, *args): - res.append(content) - return res - - def find(self, *args): - return MemCursor(self._find(*args)) - - def _aggregate(self, *args, **kwargs): - res = self.contents - print args - for arg in args[0]: - for k, v in arg.iteritems(): - if k == '$match': - res = self._find(v) - cursor = MemCursor(res) - for arg in args[0]: - for k, v in arg.iteritems(): - if k == '$sort': - cursor = cursor.sort(v) - elif k == '$skip': - cursor = cursor.skip(v) - elif k == '$limit': - cursor = cursor.limit(v) - return cursor - - def aggregate(self, *args, **kwargs): - return self._aggregate(*args, **kwargs) - - def _update(self, spec, document, check_keys=True): - updated = False - - if check_keys: - self._check_keys(document) - - for index in range(len(self.contents)): - content = self.contents[index] - if self._in(content, spec): - for k, v in document.iteritems(): - updated = True - content[k] = v - self.contents[index] = content - return updated - - def update(self, spec, document, check_keys=True): - return thread_execute(self._update, spec, document, check_keys) - - def _remove(self, spec_or_id=None): - if spec_or_id is None: - self.contents = [] - if not isinstance(spec_or_id, dict): - spec_or_id = {'_id': spec_or_id} - for index in range(len(self.contents)): - content = self.contents[index] - if self._in(content, spec_or_id): - del self.contents[index] - return True - return False - - def remove(self, spec_or_id=None): - return thread_execute(self._remove, spec_or_id) - - def clear(self): - self._remove() - - def _check_keys(self, doc): - for key in doc.keys(): - if '.' in key: - raise NameError('key {} must not contain .'.format(key)) - if key.startswith('$'): - raise NameError('key {} must not start with $'.format(key)) - if isinstance(doc.get(key), dict): - self._check_keys(doc.get(key)) - - -def __getattr__(name): - return globals()[name] - - -pods = MemDb('pods') -projects = MemDb('projects') -testcases = MemDb('testcases') -results = MemDb('results') -scenarios = MemDb('scenarios') -tokens = MemDb('tokens') -users = MemDb('users') diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/__init__.py +++ /dev/null diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json deleted file mode 100644 index 187802215..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "name": "nosdn-nofeature-ha", - "installers": - [ - { - "installer": "apex", - "versions": - [ - { - "owner": "Luke", - "version": "master", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": - [ - { - "date": "2017-01-08 22:46:44", - "score": "12/14" - } - - ], - "trust_indicators": [] - }, - { - "project": "yardstick", - "customs": [], - "scores": [], - "trust_indicators": [] - } - ] - } - ] - } - ] -} diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json deleted file mode 100644 index 980051c4f..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "name": "odl_2-nofeature-ha", - "installers": - [ - { - "installer": "fuel", - "versions": - [ - { - "owner": "Lucky", - "version": "danube", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": [], - "trust_indicators": [ - { - "date": "2017-01-18 22:46:44", - "status": "silver" - } - - ] - }, - { - "project": "yardstick", - "customs": ["suite-a"], - "scores": [ - { - "date": "2017-01-08 22:46:44", - "score": "0/1" - } - ], - "trust_indicators": [ - { - "date": "2017-01-18 22:46:44", - "status": "gold" - } - ] - } - ] - }, - { - "owner": "Luke", - "version": "colorado", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": - [ - { - "date": "2017-01-09 22:46:44", - "score": "11/14" - } - - ], - "trust_indicators": [] - }, - { - "project": "yardstick", - "customs": [], - "scores": [], - "trust_indicators": [] - } - ] - } - ] - } - ] -} diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_base.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_base.py deleted file mode 100644 index b7fabb994..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_base.py +++ /dev/null @@ -1,204 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -from datetime import datetime -import json -from os import path - -from bson.objectid import ObjectId -import mock -from tornado import testing - -from opnfv_testapi.models import base_models -from opnfv_testapi.models import pod_models -from opnfv_testapi.tests.unit import fake_pymongo - - -class TestBase(testing.AsyncHTTPTestCase): - headers = {'Content-Type': 'application/json; charset=UTF-8'} - - def setUp(self): - self._patch_server() - self.basePath = '' - self.create_res = base_models.CreateResponse - self.get_res = None - self.list_res = None - self.update_res = None - self.pod_d = pod_models.Pod(name='zte-pod1', - mode='virtual', - details='zte pod 1', - role='community-ci', - _id=str(ObjectId()), - owner='ValidUser', - create_date=str(datetime.now())) - self.pod_e = pod_models.Pod(name='zte-pod2', - mode='metal', - details='zte pod 2', - role='production-ci', - _id=str(ObjectId()), - owner='ValidUser', - create_date=str(datetime.now())) - self.req_d = None - self.req_e = None - self.addCleanup(self._clear) - super(TestBase, self).setUp() - fake_pymongo.users.insert({"user": "ValidUser", - 'email': 'validuser@lf.com', - 'fullname': 'Valid User', - 'groups': [ - 'opnfv-testapi-users', - 'opnfv-gerrit-functest-submitters', - 'opnfv-gerrit-qtip-contributors'] - }) - - def tearDown(self): - self.db_patcher.stop() - self.config_patcher.stop() - - def _patch_server(self): - import argparse - config = path.join(path.dirname(__file__), - '../../../../etc/config.ini') - self.config_patcher = mock.patch( - 'argparse.ArgumentParser.parse_known_args', - return_value=(argparse.Namespace(config_file=config), None)) - self.db_patcher = mock.patch('opnfv_testapi.db.api.DB', - fake_pymongo) - self.config_patcher.start() - self.db_patcher.start() - - def get_app(self): - from opnfv_testapi.cmd import server - return server.make_app() - - def create_d(self, *args): - return self.create(self.req_d, *args) - - def create_e(self, *args): - return self.create(self.req_e, *args) - - def create(self, req=None, *args): - return self.create_help(self.basePath, req, *args) - - def create_help(self, uri, req, *args): - return self.post_direct_url(self._update_uri(uri, *args), req) - - def post_direct_url(self, url, req): - if req and not isinstance(req, str) and hasattr(req, 'format'): - req = req.format() - res = self.fetch(url, - method='POST', - body=json.dumps(req), - headers=self.headers) - - return self._get_return(res, self.create_res) - - def get(self, *args): - res = self.fetch(self._get_uri(*args), - method='GET', - headers=self.headers) - - def inner(): - new_args, num = self._get_valid_args(*args) - return self.get_res \ - if num != self._need_arg_num(self.basePath) else self.list_res - return self._get_return(res, inner()) - - def query(self, query): - res = self.fetch(self._get_query_uri(query), - method='GET', - headers=self.headers) - return self._get_return(res, self.list_res) - - def update_direct_url(self, url, new=None): - if new and hasattr(new, 'format'): - new = new.format() - res = self.fetch(url, - method='PUT', - body=json.dumps(new), - headers=self.headers) - return self._get_return(res, self.update_res) - - def update(self, new=None, *args): - return self.update_direct_url(self._get_uri(*args), new) - - def delete_direct_url(self, url, body): - if body: - res = self.fetch(url, - method='DELETE', - body=json.dumps(body), - headers=self.headers, - allow_nonstandard_methods=True) - else: - res = self.fetch(url, - method='DELETE', - headers=self.headers) - - return res.code, res.body - - def delete(self, *args): - return self.delete_direct_url(self._get_uri(*args), None) - - @staticmethod - def _get_valid_args(*args): - new_args = tuple(['%s' % arg for arg in args if arg is not None]) - return new_args, len(new_args) - - def _need_arg_num(self, uri): - return uri.count('%s') - - def _get_query_uri(self, query): - return self.basePath + '?' + query if query else self.basePath - - def _get_uri(self, *args): - return self._update_uri(self.basePath, *args) - - def _update_uri(self, uri, *args): - r_uri = uri - new_args, num = self._get_valid_args(*args) - if num != self._need_arg_num(uri): - r_uri += '/%s' - - return r_uri % tuple(['%s' % arg for arg in new_args]) - - def _get_return(self, res, cls): - code = res.code - body = res.body - if body: - return code, self._get_return_body(code, body, cls) - else: - return code, None - - @staticmethod - def _get_return_body(code, body, cls): - return cls.from_dict(json.loads(body)) if code < 300 and cls else body - - def assert_href(self, body): - self.assertIn(self.basePath, body.href) - - def assert_create_body(self, body, req=None, *args): - import inspect - if not req: - req = self.req_d - resource_name = '' - if inspect.isclass(req): - resource_name = req.name - elif isinstance(req, dict): - resource_name = req['name'] - elif isinstance(req, str): - resource_name = json.loads(req)['name'] - new_args = args + tuple([resource_name]) - self.assertIn(self._get_uri(*new_args), body.href) - - @staticmethod - def _clear(): - fake_pymongo.pods.clear() - fake_pymongo.projects.clear() - fake_pymongo.testcases.clear() - fake_pymongo.results.clear() - fake_pymongo.scenarios.clear() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py deleted file mode 100644 index 95ed8bac1..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py +++ /dev/null @@ -1,118 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.models import pod_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import fake_pymongo -from opnfv_testapi.tests.unit.handlers import test_base as base - - -class TestPodBase(base.TestBase): - def setUp(self): - super(TestPodBase, self).setUp() - self.get_res = pod_models.Pod - self.list_res = pod_models.Pods - self.basePath = '/api/v1/pods' - self.req_d = pod_models.PodCreateRequest(name=self.pod_d.name, - mode=self.pod_d.mode, - details=self.pod_d.details, - role=self.pod_d.role) - self.req_e = pod_models.PodCreateRequest(name=self.pod_e.name, - mode=self.pod_e.mode, - details=self.pod_e.details, - role=self.pod_e.role) - - def assert_get_body(self, pod, req=None): - if not req: - req = self.req_d - self.assertEqual(pod.owner, 'ValidUser') - self.assertEqual(pod.name, req.name) - self.assertEqual(pod.mode, req.mode) - self.assertEqual(pod.details, req.details) - self.assertEqual(pod.role, req.role) - self.assertIsNotNone(pod.creation_date) - self.assertIsNotNone(pod._id) - - -class TestPodCreate(TestPodBase): - @executor.create(httplib.BAD_REQUEST, message.not_login()) - def test_notlogin(self): - return self.req_d - - @executor.mock_invalid_lfid() - @executor.create(httplib.BAD_REQUEST, message.not_lfid()) - def test_invalidLfid(self): - return self.req_d - - @executor.mock_valid_lfid() - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None - - @executor.mock_valid_lfid() - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - return pod_models.PodCreateRequest('') - - @executor.mock_valid_lfid() - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - return pod_models.PodCreateRequest(None) - - @executor.mock_valid_lfid() - @executor.create(httplib.OK, 'assert_create_body') - def test_success(self): - return self.req_d - - @executor.mock_valid_lfid() - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - fake_pymongo.pods.insert(self.pod_d.format()) - return self.req_d - - @executor.mock_valid_lfid() - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExistCaseInsensitive(self): - fake_pymongo.pods.insert(self.pod_d.format()) - self.req_d.name = self.req_d.name.upper() - return self.req_d - - -class TestPodGet(TestPodBase): - def setUp(self): - super(TestPodGet, self).setUp() - fake_pymongo.pods.insert(self.pod_d.format()) - fake_pymongo.pods.insert(self.pod_e.format()) - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_get_body') - def test_getOne(self): - return self.pod_d.name - - @executor.get(httplib.OK, '_assert_list') - def test_list(self): - return None - - def _assert_list(self, body): - self.assertEqual(len(body.pods), 2) - for pod in body.pods: - if self.pod_d.name == pod.name: - self.assert_get_body(pod) - else: - self.assert_get_body(pod, self.pod_e) - - -if __name__ == '__main__': - unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_project.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_project.py deleted file mode 100644 index 939cc0d07..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_project.py +++ /dev/null @@ -1,137 +0,0 @@ -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.models import project_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.handlers import test_base as base - - -class TestProjectBase(base.TestBase): - def setUp(self): - super(TestProjectBase, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping', - 'vping-ssh test') - self.req_e = project_models.ProjectCreateRequest('doctor', - 'doctor test') - self.get_res = project_models.Project - self.list_res = project_models.Projects - self.update_res = project_models.Project - self.basePath = '/api/v1/projects' - - def assert_body(self, project, req=None): - if not req: - req = self.req_d - self.assertEqual(project.name, req.name) - self.assertEqual(project.description, req.description) - self.assertIsNotNone(project._id) - self.assertIsNotNone(project.creation_date) - - -class TestProjectCreate(TestProjectBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - return project_models.ProjectCreateRequest('') - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - return project_models.ProjectCreateRequest(None) - - @executor.create(httplib.OK, 'assert_create_body') - def test_success(self): - return self.req_d - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestProjectGet(TestProjectBase): - def setUp(self): - super(TestProjectGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_assert_list') - def test_list(self): - return None - - def _assert_list(self, body): - for project in body.projects: - if self.req_d.name == project.name: - self.assert_body(project) - else: - self.assert_body(project, self.req_e) - - -class TestProjectUpdate(TestProjectBase): - def setUp(self): - super(TestProjectUpdate, self).setUp() - _, d_body = self.create_d() - _, get_res = self.get(self.req_d.name) - self.index_d = get_res._id - self.create_e() - - @executor.update(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None, 'noBody' - - @executor.update(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return self.req_e, 'notFound' - - @executor.update(httplib.FORBIDDEN, message.exist_base) - def test_newNameExist(self): - return self.req_e, self.req_d.name - - @executor.update(httplib.FORBIDDEN, message.no_update()) - def test_noUpdate(self): - return self.req_d, self.req_d.name - - @executor.update(httplib.OK, '_assert_update') - def test_success(self): - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - def _assert_update(self, req, body): - self.assertEqual(self.index_d, body._id) - self.assert_body(body, req) - _, new_body = self.get(req.name) - self.assertEqual(self.index_d, new_body._id) - self.assert_body(new_body, req) - - -class TestProjectDelete(TestProjectBase): - def setUp(self): - super(TestProjectDelete, self).setUp() - self.create_d() - - @executor.delete(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return 'notFound' - - @executor.delete(httplib.OK, '_assert_delete') - def test_success(self): - return self.req_d.name - - def _assert_delete(self, body): - self.assertEqual(body, '') - code, body = self.get(self.req_d.name) - self.assertEqual(code, httplib.NOT_FOUND) - - -if __name__ == '__main__': - unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_result.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_result.py deleted file mode 100644 index b9f9ede26..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_result.py +++ /dev/null @@ -1,413 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import copy -from datetime import datetime -from datetime import timedelta -import httplib -import json -import urllib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.models import project_models -from opnfv_testapi.models import result_models -from opnfv_testapi.models import testcase_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import fake_pymongo -from opnfv_testapi.tests.unit.handlers import test_base as base - - -class Details(object): - def __init__(self, timestart=None, duration=None, status=None): - self.timestart = timestart - self.duration = duration - self.status = status - self.items = [{'item1': 1}, {'item2': 2}] - - def format(self): - return { - "timestart": self.timestart, - "duration": self.duration, - "status": self.status, - 'items': [{'item1': 1}, {'item2': 2}] - } - - @staticmethod - def from_dict(a_dict): - - if a_dict is None: - return None - - t = Details() - t.timestart = a_dict.get('timestart') - t.duration = a_dict.get('duration') - t.status = a_dict.get('status') - t.items = a_dict.get('items') - return t - - -class TestResultBase(base.TestBase): - def setUp(self): - super(TestResultBase, self).setUp() - self.pod = self.pod_d.name - self.project = 'functest' - self.case = 'vPing' - self.installer = 'fuel' - self.version = 'C' - self.build_tag = 'v3.0' - self.scenario = 'odl-l2' - self.criteria = 'PASS' - self.trust_indicator = result_models.TI(0.7) - self.start_date = str(datetime.now()) - self.stop_date = str(datetime.now() + timedelta(minutes=1)) - self.update_date = str(datetime.now() + timedelta(days=1)) - self.update_step = -0.05 - self.details = Details(timestart='0', duration='9s', status='OK') - self.req_d = result_models.ResultCreateRequest( - pod_name=self.pod, - project_name=self.project, - case_name=self.case, - installer=self.installer, - version=self.version, - start_date=self.start_date, - stop_date=self.stop_date, - details=self.details.format(), - build_tag=self.build_tag, - scenario=self.scenario, - criteria=self.criteria, - trust_indicator=self.trust_indicator) - self.get_res = result_models.TestResult - self.list_res = result_models.TestResults - self.update_res = result_models.TestResult - self.basePath = '/api/v1/results' - self.req_project = project_models.ProjectCreateRequest( - self.project, - 'vping test') - self.req_testcase = testcase_models.TestcaseCreateRequest( - self.case, - '/cases/vping', - 'vping-ssh test') - fake_pymongo.pods.insert(self.pod_d.format()) - self.create_help('/api/v1/projects', self.req_project) - self.create_help('/api/v1/projects/%s/cases', - self.req_testcase, - self.project) - - def assert_res(self, result, req=None): - if req is None: - req = self.req_d - self.assertEqual(result.pod_name, req.pod_name) - self.assertEqual(result.project_name, req.project_name) - self.assertEqual(result.case_name, req.case_name) - self.assertEqual(result.installer, req.installer) - self.assertEqual(result.version, req.version) - details_req = Details.from_dict(req.details) - details_res = Details.from_dict(result.details) - self.assertEqual(details_res.duration, details_req.duration) - self.assertEqual(details_res.timestart, details_req.timestart) - self.assertEqual(details_res.status, details_req.status) - self.assertEqual(details_res.items, details_req.items) - self.assertEqual(result.build_tag, req.build_tag) - self.assertEqual(result.scenario, req.scenario) - self.assertEqual(result.criteria, req.criteria) - self.assertEqual(result.start_date, req.start_date) - self.assertEqual(result.stop_date, req.stop_date) - self.assertIsNotNone(result._id) - ti = result.trust_indicator - self.assertEqual(ti.current, req.trust_indicator.current) - if ti.histories: - history = ti.histories[0] - self.assertEqual(history.date, self.update_date) - self.assertEqual(history.step, self.update_step) - - def _create_d(self): - _, res = self.create_d() - return res.href.split('/')[-1] - - def upload(self, req): - if req and not isinstance(req, str) and hasattr(req, 'format'): - req = req.format() - res = self.fetch(self.basePath + '/upload', - method='POST', - body=json.dumps(req), - headers=self.headers) - - return self._get_return(res, self.create_res) - - -class TestResultUpload(TestResultBase): - @executor.upload(httplib.BAD_REQUEST, message.key_error('file')) - def test_filenotfind(self): - return None - - -class TestResultCreate(TestResultBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_nobody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('pod_name')) - def test_podNotProvided(self): - req = self.req_d - req.pod_name = None - return req - - @executor.create(httplib.BAD_REQUEST, message.missing('project_name')) - def test_projectNotProvided(self): - req = self.req_d - req.project_name = None - return req - - @executor.create(httplib.BAD_REQUEST, message.missing('case_name')) - def test_testcaseNotProvided(self): - req = self.req_d - req.case_name = None - return req - - @executor.create(httplib.BAD_REQUEST, - message.invalid_value('criteria', ['PASS', 'FAIL'])) - def test_invalid_criteria(self): - req = self.req_d - req.criteria = 'invalid' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noPod(self): - req = self.req_d - req.pod_name = 'notExistPod' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noProject(self): - req = self.req_d - req.project_name = 'notExistProject' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noTestcase(self): - req = self.req_d - req.case_name = 'notExistTestcase' - return req - - @executor.create(httplib.OK, 'assert_href') - def test_success(self): - return self.req_d - - @executor.create(httplib.OK, 'assert_href') - def test_key_with_doc(self): - req = copy.deepcopy(self.req_d) - req.details = {'1.name': 'dot_name'} - return req - - @executor.create(httplib.OK, '_assert_no_ti') - def test_no_ti(self): - req = result_models.ResultCreateRequest(pod_name=self.pod, - project_name=self.project, - case_name=self.case, - installer=self.installer, - version=self.version, - start_date=self.start_date, - stop_date=self.stop_date, - details=self.details.format(), - build_tag=self.build_tag, - scenario=self.scenario, - criteria=self.criteria) - self.actual_req = req - return req - - def _assert_no_ti(self, body): - _id = body.href.split('/')[-1] - code, body = self.get(_id) - self.assert_res(body, self.actual_req) - - -class TestResultGet(TestResultBase): - def setUp(self): - super(TestResultGet, self).setUp() - self.req_10d_before = self._create_changed_date(days=-10) - self.req_d_id = self._create_d() - self.req_10d_later = self._create_changed_date(days=10) - - @executor.get(httplib.OK, 'assert_res') - def test_getOne(self): - return self.req_d_id - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryPod(self): - return self._set_query('pod') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryProject(self): - return self._set_query('project') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryTestcase(self): - return self._set_query('case') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryVersion(self): - return self._set_query('version') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryInstaller(self): - return self._set_query('installer') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryBuildTag(self): - return self._set_query('build_tag') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryScenario(self): - return self._set_query('scenario') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryTrustIndicator(self): - return self._set_query('trust_indicator') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryCriteria(self): - return self._set_query('criteria') - - @executor.query(httplib.BAD_REQUEST, message.must_int('period')) - def test_queryPeriodNotInt(self): - return self._set_query(period='a') - - @executor.query(httplib.OK, '_query_period_one', 1) - def test_queryPeriodSuccess(self): - return self._set_query(period=5) - - @executor.query(httplib.BAD_REQUEST, message.must_int('last')) - def test_queryLastNotInt(self): - return self._set_query(last='a') - - @executor.query(httplib.OK, '_query_last_one', 1) - def test_queryLast(self): - return self._set_query(last=1) - - @executor.query(httplib.OK, '_query_success', 4) - def test_queryPublic(self): - self._create_public_data() - return self._set_query() - - @executor.query(httplib.OK, '_query_success', 1) - def test_queryPrivate(self): - self._create_private_data() - return self._set_query(public='false') - - @executor.query(httplib.OK, '_query_period_one', 1) - def test_combination(self): - return self._set_query('pod', - 'project', - 'case', - 'version', - 'installer', - 'build_tag', - 'scenario', - 'trust_indicator', - 'criteria', - period=5) - - @executor.query(httplib.OK, '_query_success', 0) - def test_notFound(self): - return self._set_query('project', - 'case', - 'version', - 'installer', - 'build_tag', - 'scenario', - 'trust_indicator', - 'criteria', - pod='notExistPod', - period=1) - - @executor.query(httplib.OK, '_query_success', 1) - def test_filterErrorStartdate(self): - self._create_error_start_date(None) - self._create_error_start_date('None') - self._create_error_start_date('null') - self._create_error_start_date('') - return self._set_query(period=5) - - def _query_success(self, body, number): - self.assertEqual(number, len(body.results)) - - def _query_last_one(self, body, number): - self.assertEqual(number, len(body.results)) - self.assert_res(body.results[0], self.req_10d_later) - - def _query_period_one(self, body, number): - self.assertEqual(number, len(body.results)) - self.assert_res(body.results[0], self.req_d) - - def _create_error_start_date(self, start_date): - req = copy.deepcopy(self.req_d) - req.start_date = start_date - self.create(req) - return req - - def _create_changed_date(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.start_date = datetime.now() + timedelta(**kwargs) - req.stop_date = str(req.start_date + timedelta(minutes=10)) - req.start_date = str(req.start_date) - self.create(req) - return req - - def _create_public_data(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.public = 'true' - self.create(req) - return req - - def _create_private_data(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.public = 'false' - self.create(req) - return req - - def _set_query(self, *args, **kwargs): - def get_value(arg): - return self.__getattribute__(arg) \ - if arg != 'trust_indicator' else self.trust_indicator.current - query = [] - for arg in args: - query.append((arg, get_value(arg))) - for k, v in kwargs.iteritems(): - query.append((k, v)) - return urllib.urlencode(query) - - -class TestResultUpdate(TestResultBase): - def setUp(self): - super(TestResultUpdate, self).setUp() - self.req_d_id = self._create_d() - - @executor.update(httplib.OK, '_assert_update_ti') - def test_success(self): - new_ti = copy.deepcopy(self.trust_indicator) - new_ti.current += self.update_step - new_ti.histories.append( - result_models.TIHistory(self.update_date, self.update_step)) - new_data = copy.deepcopy(self.req_d) - new_data.trust_indicator = new_ti - update = result_models.ResultUpdateRequest(trust_indicator=new_ti) - self.update_req = new_data - return update, self.req_d_id - - def _assert_update_ti(self, request, body): - ti = body.trust_indicator - self.assertEqual(ti.current, request.trust_indicator.current) - if ti.histories: - history = ti.histories[0] - self.assertEqual(history.date, self.update_date) - self.assertEqual(history.step, self.update_step) - - -if __name__ == '__main__': - unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py deleted file mode 100644 index de7777a83..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py +++ /dev/null @@ -1,449 +0,0 @@ -import functools -import httplib -import json -import os - -from datetime import datetime - -from opnfv_testapi.common import message -import opnfv_testapi.models.scenario_models as models -from opnfv_testapi.tests.unit.handlers import test_base as base - - -def _none_default(check, default): - return check if check else default - - -class TestScenarioBase(base.TestBase): - def setUp(self): - super(TestScenarioBase, self).setUp() - self.get_res = models.Scenario - self.list_res = models.Scenarios - self.basePath = '/api/v1/scenarios' - self.req_d = self._load_request('scenario-c1.json') - self.req_2 = self._load_request('scenario-c2.json') - - def tearDown(self): - pass - - def assert_body(self, project, req=None): - pass - - @staticmethod - def _load_request(f_req): - abs_file = os.path.join(os.path.dirname(__file__), f_req) - with open(abs_file, 'r') as f: - loader = json.load(f) - f.close() - return loader - - def create_return_name(self, req): - _, res = self.create(req) - return res.href.split('/')[-1] - - def assert_res(self, code, scenario, req=None): - self.assertEqual(code, httplib.OK) - if req is None: - req = self.req_d - self.assertIsNotNone(scenario._id) - self.assertIsNotNone(scenario.creation_date) - self.assertEqual(scenario, models.Scenario.from_dict(req)) - - @staticmethod - def set_query(*args): - uri = '' - for arg in args: - uri += arg + '&' - return uri[0: -1] - - def get_and_assert(self, name): - code, body = self.get(name) - self.assert_res(code, body, self.req_d) - - -class TestScenarioCreate(TestScenarioBase): - def test_withoutBody(self): - (code, body) = self.create() - self.assertEqual(code, httplib.BAD_REQUEST) - - def test_emptyName(self): - req_empty = models.ScenarioCreateRequest('') - (code, body) = self.create(req_empty) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_noneName(self): - req_none = models.ScenarioCreateRequest(None) - (code, body) = self.create(req_none) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_success(self): - (code, body) = self.create_d() - self.assertEqual(code, httplib.OK) - self.assert_create_body(body) - - def test_alreadyExist(self): - self.create_d() - (code, body) = self.create_d() - self.assertEqual(code, httplib.FORBIDDEN) - self.assertIn(message.exist_base, body) - - -class TestScenarioGet(TestScenarioBase): - def setUp(self): - super(TestScenarioGet, self).setUp() - self.scenario_1 = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def test_getByName(self): - self.get_and_assert(self.scenario_1) - - def test_getAll(self): - self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) - - def test_queryName(self): - query = self.set_query('name=nosdn-nofeature-ha') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryInstaller(self): - query = self.set_query('installer=apex') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryVersion(self): - query = self.set_query('version=master') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryProject(self): - query = self.set_query('project=functest') - self._query_and_assert(query, reqs=[self.req_d, self.req_2]) - - # close due to random fail, open again after solve it in another patch - # def test_queryCombination(self): - # query = self._set_query('name=nosdn-nofeature-ha', - # 'installer=apex', - # 'version=master', - # 'project=functest') - # - # self._query_and_assert(query, reqs=[self.req_d]) - - def _query_and_assert(self, query, found=True, reqs=None): - code, body = self.query(query) - if not found: - self.assertEqual(code, httplib.OK) - self.assertEqual(0, len(body.scenarios)) - else: - self.assertEqual(len(reqs), len(body.scenarios)) - for req in reqs: - for scenario in body.scenarios: - if req['name'] == scenario.name: - self.assert_res(code, scenario, req) - - -class TestScenarioDelete(TestScenarioBase): - def test_notFound(self): - code, body = self.delete('notFound') - self.assertEqual(code, httplib.NOT_FOUND) - - def test_success(self): - scenario = self.create_return_name(self.req_d) - code, _ = self.delete(scenario) - self.assertEqual(code, httplib.OK) - code, _ = self.get(scenario) - self.assertEqual(code, httplib.NOT_FOUND) - - -class TestScenarioUpdate(TestScenarioBase): - def setUp(self): - super(TestScenarioUpdate, self).setUp() - self.scenario = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - self.update_url = '' - self.scenario_url = '/api/v1/scenarios/{}'.format(self.scenario) - self.installer = self.req_d['installers'][0]['installer'] - self.version = self.req_d['installers'][0]['versions'][0]['version'] - self.locate_project = 'installer={}&version={}&project={}'.format( - self.installer, - self.version, - 'functest') - - def update_url_fixture(item): - def _update_url_fixture(xstep): - def wrapper(self, *args, **kwargs): - self.update_url = '{}/{}'.format(self.scenario_url, item) - locator = None - if item in ['projects', 'owner']: - locator = 'installer={}&version={}'.format( - self.installer, - self.version) - elif item in ['versions']: - locator = 'installer={}'.format( - self.installer) - elif item in ['rename']: - self.update_url = self.scenario_url - - if locator: - self.update_url = '{}?{}'.format(self.update_url, locator) - - xstep(self, *args, **kwargs) - return wrapper - return _update_url_fixture - - def update_partial(operate, expected): - def _update_partial(set_update): - @functools.wraps(set_update) - def wrapper(self): - update = set_update(self) - code, body = getattr(self, operate)(update) - getattr(self, expected)(code) - return wrapper - return _update_partial - - @update_partial('_add', '_success') - def test_addScore(self): - add = models.ScenarioScore(date=str(datetime.now()), score='11/12') - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['scores'].append(add.format()) - self.update_url = '{}/scores?{}'.format(self.scenario_url, - self.locate_project) - - return add - - @update_partial('_add', '_success') - def test_addTrustIndicator(self): - add = models.ScenarioTI(date=str(datetime.now()), status='gold') - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['trust_indicators'].append(add.format()) - self.update_url = '{}/trust_indicators?{}'.format(self.scenario_url, - self.locate_project) - - return add - - @update_partial('_add', '_success') - def test_addCustoms(self): - adds = ['odl', 'parser', 'vping_ssh'] - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = list(set(functest['customs'] + adds)) - self.update_url = '{}/customs?{}'.format(self.scenario_url, - self.locate_project) - return adds - - @update_partial('_update', '_success') - def test_updateCustoms(self): - updates = ['odl', 'parser', 'vping_ssh'] - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = updates - self.update_url = '{}/customs?{}'.format(self.scenario_url, - self.locate_project) - - return updates - - @update_partial('_delete', '_success') - def test_deleteCustoms(self): - deletes = ['vping_ssh'] - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck'] - self.update_url = '{}/customs?{}'.format(self.scenario_url, - self.locate_project) - - return deletes - - @update_url_fixture('projects') - @update_partial('_add', '_success') - def test_addProjects_succ(self): - add = models.ScenarioProject(project='qtip').format() - self.req_d['installers'][0]['versions'][0]['projects'].append(add) - return [add] - - @update_url_fixture('projects') - @update_partial('_add', '_conflict') - def test_addProjects_already_exist(self): - add = models.ScenarioProject(project='functest').format() - return [add] - - @update_url_fixture('projects') - @update_partial('_add', '_bad_request') - def test_addProjects_bad_schema(self): - add = models.ScenarioProject(project='functest').format() - add['score'] = None - return [add] - - @update_url_fixture('projects') - @update_partial('_update', '_success') - def test_updateProjects_succ(self): - update = models.ScenarioProject(project='qtip').format() - self.req_d['installers'][0]['versions'][0]['projects'] = [update] - return [update] - - @update_url_fixture('projects') - @update_partial('_update', '_conflict') - def test_updateProjects_duplicated(self): - update = models.ScenarioProject(project='qtip').format() - return [update, update] - - @update_url_fixture('projects') - @update_partial('_update', '_bad_request') - def test_updateProjects_bad_schema(self): - update = models.ScenarioProject(project='functest').format() - update['score'] = None - return [update] - - @update_url_fixture('projects') - @update_partial('_delete', '_success') - def test_deleteProjects(self): - deletes = ['functest'] - projects = self.req_d['installers'][0]['versions'][0]['projects'] - self.req_d['installers'][0]['versions'][0]['projects'] = filter( - lambda f: f['project'] != 'functest', - projects) - return deletes - - @update_url_fixture('owner') - @update_partial('_update', '_success') - def test_changeOwner(self): - new_owner = 'new_owner' - update = models.ScenarioChangeOwnerRequest(new_owner).format() - self.req_d['installers'][0]['versions'][0]['owner'] = new_owner - return update - - @update_url_fixture('versions') - @update_partial('_add', '_success') - def test_addVersions_succ(self): - add = models.ScenarioVersion(version='Euphrates').format() - self.req_d['installers'][0]['versions'].append(add) - return [add] - - @update_url_fixture('versions') - @update_partial('_add', '_conflict') - def test_addVersions_already_exist(self): - add = models.ScenarioVersion(version='master').format() - return [add] - - @update_url_fixture('versions') - @update_partial('_add', '_bad_request') - def test_addVersions_bad_schema(self): - add = models.ScenarioVersion(version='euphrates').format() - add['notexist'] = None - return [add] - - @update_url_fixture('versions') - @update_partial('_update', '_success') - def test_updateVersions_succ(self): - update = models.ScenarioVersion(version='euphrates').format() - self.req_d['installers'][0]['versions'] = [update] - return [update] - - @update_url_fixture('versions') - @update_partial('_update', '_conflict') - def test_updateVersions_duplicated(self): - update = models.ScenarioVersion(version='euphrates').format() - return [update, update] - - @update_url_fixture('versions') - @update_partial('_update', '_bad_request') - def test_updateVersions_bad_schema(self): - update = models.ScenarioVersion(version='euphrates').format() - update['not_owner'] = 'Iam' - return [update] - - @update_url_fixture('versions') - @update_partial('_delete', '_success') - def test_deleteVersions(self): - deletes = ['master'] - versions = self.req_d['installers'][0]['versions'] - self.req_d['installers'][0]['versions'] = filter( - lambda f: f['version'] != 'master', - versions) - return deletes - - @update_url_fixture('installers') - @update_partial('_add', '_success') - def test_addInstallers_succ(self): - add = models.ScenarioInstaller(installer='daisy').format() - self.req_d['installers'].append(add) - return [add] - - @update_url_fixture('installers') - @update_partial('_add', '_conflict') - def test_addInstallers_already_exist(self): - add = models.ScenarioInstaller(installer='apex').format() - return [add] - - @update_url_fixture('installers') - @update_partial('_add', '_bad_request') - def test_addInstallers_bad_schema(self): - add = models.ScenarioInstaller(installer='daisy').format() - add['not_exist'] = 'not_exist' - return [add] - - @update_url_fixture('installers') - @update_partial('_update', '_success') - def test_updateInstallers_succ(self): - update = models.ScenarioInstaller(installer='daisy').format() - self.req_d['installers'] = [update] - return [update] - - @update_url_fixture('installers') - @update_partial('_update', '_conflict') - def test_updateInstallers_duplicated(self): - update = models.ScenarioInstaller(installer='daisy').format() - return [update, update] - - @update_url_fixture('installers') - @update_partial('_update', '_bad_request') - def test_updateInstallers_bad_schema(self): - update = models.ScenarioInstaller(installer='daisy').format() - update['not_exist'] = 'not_exist' - return [update] - - @update_url_fixture('installers') - @update_partial('_delete', '_success') - def test_deleteInstallers(self): - deletes = ['apex'] - installers = self.req_d['installers'] - self.req_d['installers'] = filter( - lambda f: f['installer'] != 'apex', - installers) - return deletes - - @update_url_fixture('rename') - @update_partial('_update', '_success') - def test_renameScenario(self): - new_name = 'new_scenario_name' - update = models.ScenarioUpdateRequest(name=new_name) - self.req_d['name'] = new_name - return update - - @update_url_fixture('rename') - @update_partial('_update', '_forbidden') - def test_renameScenario_exist(self): - new_name = self.req_d['name'] - update = models.ScenarioUpdateRequest(name=new_name) - return update - - def _add(self, update_req): - return self.post_direct_url(self.update_url, update_req) - - def _update(self, update_req): - return self.update_direct_url(self.update_url, update_req) - - def _delete(self, update_req): - return self.delete_direct_url(self.update_url, update_req) - - def _success(self, status): - self.assertEqual(status, httplib.OK) - self.get_and_assert(self.req_d['name']) - - def _forbidden(self, status): - self.assertEqual(status, httplib.FORBIDDEN) - - def _bad_request(self, status): - self.assertEqual(status, httplib.BAD_REQUEST) - - def _conflict(self, status): - self.assertEqual(status, httplib.CONFLICT) diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py deleted file mode 100644 index e4c668e7a..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py +++ /dev/null @@ -1,201 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import copy -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.models import project_models -from opnfv_testapi.models import testcase_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.handlers import test_base as base - - -class TestCaseBase(base.TestBase): - def setUp(self): - super(TestCaseBase, self).setUp() - self.req_d = testcase_models.TestcaseCreateRequest('vping_1', - '/cases/vping_1', - 'vping-ssh test') - self.req_e = testcase_models.TestcaseCreateRequest('doctor_1', - '/cases/doctor_1', - 'create doctor') - self.update_d = testcase_models.TestcaseUpdateRequest('vping_1', - 'vping-ssh test', - 'functest') - self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1', - 'create doctor', - 'functest') - self.get_res = testcase_models.Testcase - self.list_res = testcase_models.Testcases - self.update_res = testcase_models.Testcase - self.basePath = '/api/v1/projects/%s/cases' - self.create_project() - - def assert_body(self, case, req=None): - if not req: - req = self.req_d - self.assertEqual(case.name, req.name) - self.assertEqual(case.description, req.description) - self.assertEqual(case.url, req.url) - self.assertIsNotNone(case._id) - self.assertIsNotNone(case.creation_date) - - def assert_update_body(self, old, new, req=None): - if not req: - req = self.req_d - self.assertEqual(new.name, req.name) - self.assertEqual(new.description, req.description) - self.assertEqual(new.url, old.url) - self.assertIsNotNone(new._id) - self.assertIsNotNone(new.creation_date) - - def create_project(self): - req_p = project_models.ProjectCreateRequest('functest', - 'vping-ssh test') - self.create_help('/api/v1/projects', req_p) - self.project = req_p.name - - def create_d(self): - return super(TestCaseBase, self).create_d(self.project) - - def create_e(self): - return super(TestCaseBase, self).create_e(self.project) - - def get(self, case=None): - return super(TestCaseBase, self).get(self.project, case) - - def create(self, req=None, *args): - return super(TestCaseBase, self).create(req, self.project) - - def update(self, new=None, case=None): - return super(TestCaseBase, self).update(new, self.project, case) - - def delete(self, case): - return super(TestCaseBase, self).delete(self.project, case) - - -class TestCaseCreate(TestCaseBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_noBody(self): - return None - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noProject(self): - self.project = 'noProject' - return self.req_d - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - req_empty = testcase_models.TestcaseCreateRequest('') - return req_empty - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - req_none = testcase_models.TestcaseCreateRequest(None) - return req_none - - @executor.create(httplib.OK, '_assert_success') - def test_success(self): - return self.req_d - - def _assert_success(self, body): - self.assert_create_body(body, self.req_d, self.project) - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestCaseGet(TestCaseBase): - def setUp(self): - super(TestCaseGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_list') - def test_list(self): - return None - - def _list(self, body): - for case in body.testcases: - if self.req_d.name == case.name: - self.assert_body(case) - else: - self.assert_body(case, self.req_e) - - -class TestCaseUpdate(TestCaseBase): - def setUp(self): - super(TestCaseUpdate, self).setUp() - self.create_d() - - @executor.update(httplib.BAD_REQUEST, message.no_body()) - def test_noBody(self): - return None, 'noBody' - - @executor.update(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return self.update_e, 'notFound' - - @executor.update(httplib.FORBIDDEN, message.exist_base) - def test_newNameExist(self): - self.create_e() - return self.update_e, self.req_d.name - - @executor.update(httplib.FORBIDDEN, message.no_update()) - def test_noUpdate(self): - return self.update_d, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_success(self): - return self.update_e, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_with_dollar(self): - update = copy.deepcopy(self.update_d) - update.description = {'2. change': 'dollar change'} - return update, self.req_d.name - - def _update_success(self, request, body): - self.assert_update_body(self.req_d, body, request) - _, new_body = self.get(request.name) - self.assert_update_body(self.req_d, new_body, request) - - -class TestCaseDelete(TestCaseBase): - def setUp(self): - super(TestCaseDelete, self).setUp() - self.create_d() - - @executor.delete(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return 'notFound' - - @executor.delete(httplib.OK, '_delete_success') - def test_success(self): - return self.req_d.name - - def _delete_success(self, body): - self.assertEqual(body, '') - code, body = self.get(self.req_d.name) - self.assertEqual(code, httplib.NOT_FOUND) - - -if __name__ == '__main__': - unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_token.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_token.py deleted file mode 100644 index e8b746dfc..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_token.py +++ /dev/null @@ -1,52 +0,0 @@ -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -import httplib -import unittest - -from tornado import web - -from opnfv_testapi.common import message -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import fake_pymongo -from opnfv_testapi.tests.unit.handlers import test_result - - -class TestTokenCreateResult(test_result.TestResultBase): - def get_app(self): - from opnfv_testapi.router import url_mappings - return web.Application( - url_mappings.mappings, - db=fake_pymongo, - debug=True, - auth=True - ) - - def setUp(self): - super(TestTokenCreateResult, self).setUp() - fake_pymongo.tokens.insert({"access_token": "12345"}) - - @executor.create(httplib.FORBIDDEN, message.invalid_token()) - def test_resultCreateTokenInvalid(self): - self.headers['X-Auth-Token'] = '1234' - return self.req_d - - @executor.create(httplib.UNAUTHORIZED, message.unauthorized()) - def test_resultCreateTokenUnauthorized(self): - if 'X-Auth-Token' in self.headers: - self.headers.pop('X-Auth-Token') - return self.req_d - - @executor.create(httplib.OK, '_create_success') - def test_resultCreateTokenSuccess(self): - self.headers['X-Auth-Token'] = '12345' - return self.req_d - - def _create_success(self, body): - self.assertIn('CreateResponse', str(type(body))) - - -if __name__ == '__main__': - unittest.main() diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_version.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_version.py deleted file mode 100644 index 6ef810f0e..000000000 --- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_version.py +++ /dev/null @@ -1,36 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import httplib -import unittest - -from opnfv_testapi.models import base_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.handlers import test_base as base - - -class TestVersionBase(base.TestBase): - def setUp(self): - super(TestVersionBase, self).setUp() - self.list_res = base_models.Versions - self.basePath = '/versions' - - -class TestVersion(TestVersionBase): - @executor.get(httplib.OK, '_get_success') - def test_success(self): - return None - - def _get_success(self, body): - self.assertEqual(len(body.versions), 1) - self.assertEqual(body.versions[0].version, 'v1.0') - self.assertEqual(body.versions[0].description, 'basics') - - -if __name__ == '__main__': - unittest.main() |