diff options
Diffstat (limited to 'cvp/opnfv_testapi/tests')
24 files changed, 0 insertions, 2235 deletions
diff --git a/cvp/opnfv_testapi/tests/__init__.py b/cvp/opnfv_testapi/tests/__init__.py deleted file mode 100644 index 9f28b0bf..00000000 --- a/cvp/opnfv_testapi/tests/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__author__ = 'serena' diff --git a/cvp/opnfv_testapi/tests/unit/__init__.py b/cvp/opnfv_testapi/tests/unit/__init__.py deleted file mode 100644 index 3fc79f1d..00000000 --- a/cvp/opnfv_testapi/tests/unit/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -__author__ = 'serena' diff --git a/cvp/opnfv_testapi/tests/unit/common/__init__.py b/cvp/opnfv_testapi/tests/unit/common/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/cvp/opnfv_testapi/tests/unit/common/__init__.py +++ /dev/null diff --git a/cvp/opnfv_testapi/tests/unit/common/noparam.ini b/cvp/opnfv_testapi/tests/unit/common/noparam.ini deleted file mode 100644 index fda2a09e..00000000 --- a/cvp/opnfv_testapi/tests/unit/common/noparam.ini +++ /dev/null @@ -1,16 +0,0 @@ -# to add a new parameter in the config file, -# the CONF object in config.ini must be updated -[mongo] -# URL of the mongo DB -# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1 -url = mongodb://127.0.0.1:27017/ - -[api] -# Listening port -port = 8000 -# With debug_on set to true, error traces will be shown in HTTP responses -debug = True -authenticate = False - -[swagger] -base_url = http://localhost:8000 diff --git a/cvp/opnfv_testapi/tests/unit/common/normal.ini b/cvp/opnfv_testapi/tests/unit/common/normal.ini deleted file mode 100644 index 77cc6c6e..00000000 --- a/cvp/opnfv_testapi/tests/unit/common/normal.ini +++ /dev/null @@ -1,17 +0,0 @@ -# to add a new parameter in the config file, -# the CONF object in config.ini must be updated -[mongo] -# URL of the mongo DB -# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1 -url = mongodb://127.0.0.1:27017/ -dbname = test_results_collection - -[api] -# Listening port -port = 8000 -# With debug_on set to true, error traces will be shown in HTTP responses -debug = True -authenticate = False - -[swagger] -base_url = http://localhost:8000 diff --git a/cvp/opnfv_testapi/tests/unit/common/nosection.ini b/cvp/opnfv_testapi/tests/unit/common/nosection.ini deleted file mode 100644 index 9988fc0a..00000000 --- a/cvp/opnfv_testapi/tests/unit/common/nosection.ini +++ /dev/null @@ -1,11 +0,0 @@ -# to add a new parameter in the config file, -# the CONF object in config.ini must be updated -[api] -# Listening port -port = 8000 -# With debug_on set to true, error traces will be shown in HTTP responses -debug = True -authenticate = False - -[swagger] -base_url = http://localhost:8000 diff --git a/cvp/opnfv_testapi/tests/unit/common/notboolean.ini b/cvp/opnfv_testapi/tests/unit/common/notboolean.ini deleted file mode 100644 index b3f32767..00000000 --- a/cvp/opnfv_testapi/tests/unit/common/notboolean.ini +++ /dev/null @@ -1,17 +0,0 @@ -# to add a new parameter in the config file, -# the CONF object in config.ini must be updated -[mongo] -# URL of the mongo DB -# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1 -url = mongodb://127.0.0.1:27017/ -dbname = test_results_collection - -[api] -# Listening port -port = 8000 -# With debug_on set to true, error traces will be shown in HTTP responses -debug = True -authenticate = notboolean - -[swagger] -base_url = http://localhost:8000 diff --git a/cvp/opnfv_testapi/tests/unit/common/notint.ini b/cvp/opnfv_testapi/tests/unit/common/notint.ini deleted file mode 100644 index d1b752a3..00000000 --- a/cvp/opnfv_testapi/tests/unit/common/notint.ini +++ /dev/null @@ -1,17 +0,0 @@ -# to add a new parameter in the config file, -# the CONF object in config.ini must be updated -[mongo] -# URL of the mongo DB -# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1 -url = mongodb://127.0.0.1:27017/ -dbname = test_results_collection - -[api] -# Listening port -port = notint -# With debug_on set to true, error traces will be shown in HTTP responses -debug = True -authenticate = False - -[swagger] -base_url = http://localhost:8000 diff --git a/cvp/opnfv_testapi/tests/unit/common/test_config.py b/cvp/opnfv_testapi/tests/unit/common/test_config.py deleted file mode 100644 index cc8743ca..00000000 --- a/cvp/opnfv_testapi/tests/unit/common/test_config.py +++ /dev/null @@ -1,15 +0,0 @@ -import argparse - - -def test_config_normal(mocker, config_normal): - mocker.patch( - 'argparse.ArgumentParser.parse_known_args', - return_value=(argparse.Namespace(config_file=config_normal), None)) - from opnfv_testapi.common import config - CONF = config.Config() - assert CONF.mongo_url == 'mongodb://127.0.0.1:27017/' - assert CONF.mongo_dbname == 'test_results_collection' - assert CONF.api_port == 8000 - assert CONF.api_debug is True - assert CONF.api_authenticate is False - assert CONF.swagger_base_url == 'http://localhost:8000' diff --git a/cvp/opnfv_testapi/tests/unit/conftest.py b/cvp/opnfv_testapi/tests/unit/conftest.py deleted file mode 100644 index feff1daa..00000000 --- a/cvp/opnfv_testapi/tests/unit/conftest.py +++ /dev/null @@ -1,8 +0,0 @@ -from os import path - -import pytest - - -@pytest.fixture -def config_normal(): - return path.join(path.dirname(__file__), 'common/normal.ini') diff --git a/cvp/opnfv_testapi/tests/unit/executor.py b/cvp/opnfv_testapi/tests/unit/executor.py deleted file mode 100644 index b8f696ca..00000000 --- a/cvp/opnfv_testapi/tests/unit/executor.py +++ /dev/null @@ -1,97 +0,0 @@ -############################################################################## -# Copyright (c) 2017 ZTE Corp -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import functools -import httplib - - -def upload(excepted_status, excepted_response): - def _upload(create_request): - @functools.wraps(create_request) - def wrap(self): - request = create_request(self) - status, body = self.upload(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body) - else: - self.assertIn(excepted_response, body) - return wrap - return _upload - - -def create(excepted_status, excepted_response): - def _create(create_request): - @functools.wraps(create_request) - def wrap(self): - request = create_request(self) - status, body = self.create(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body) - else: - self.assertIn(excepted_response, body) - return wrap - return _create - - -def get(excepted_status, excepted_response): - def _get(get_request): - @functools.wraps(get_request) - def wrap(self): - request = get_request(self) - status, body = self.get(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body) - else: - self.assertIn(excepted_response, body) - return wrap - return _get - - -def update(excepted_status, excepted_response): - def _update(update_request): - @functools.wraps(update_request) - def wrap(self): - request, resource = update_request(self) - status, body = self.update(request, resource) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(request, body) - else: - self.assertIn(excepted_response, body) - return wrap - return _update - - -def delete(excepted_status, excepted_response): - def _delete(delete_request): - @functools.wraps(delete_request) - def wrap(self): - request = delete_request(self) - if isinstance(request, tuple): - status, body = self.delete(request[0], *(request[1])) - else: - status, body = self.delete(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body) - else: - self.assertIn(excepted_response, body) - return wrap - return _delete - - -def query(excepted_status, excepted_response, number=0): - def _query(get_request): - @functools.wraps(get_request) - def wrap(self): - request = get_request(self) - status, body = self.query(request) - if excepted_status == httplib.OK: - getattr(self, excepted_response)(body, number) - else: - self.assertIn(excepted_response, body) - return wrap - return _query diff --git a/cvp/opnfv_testapi/tests/unit/fake_pymongo.py b/cvp/opnfv_testapi/tests/unit/fake_pymongo.py deleted file mode 100644 index 0ca83df6..00000000 --- a/cvp/opnfv_testapi/tests/unit/fake_pymongo.py +++ /dev/null @@ -1,284 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -from operator import itemgetter - -from bson.objectid import ObjectId -from concurrent.futures import ThreadPoolExecutor - - -def thread_execute(method, *args, **kwargs): - with ThreadPoolExecutor(max_workers=2) as executor: - result = executor.submit(method, *args, **kwargs) - return result - - -class MemCursor(object): - def __init__(self, collection): - self.collection = collection - self.length = len(self.collection) - self.sorted = [] - - def _is_next_exist(self): - return self.length != 0 - - @property - def fetch_next(self): - return thread_execute(self._is_next_exist) - - def next_object(self): - self.length -= 1 - return self.collection.pop() - - def sort(self, key_or_list): - for k, v in key_or_list.iteritems(): - if v == -1: - reverse = True - else: - reverse = False - - self.collection = sorted(self.collection, - key=itemgetter(k), reverse=reverse) - return self - - def limit(self, limit): - if limit != 0 and limit < len(self.collection): - self.collection = self.collection[0: limit] - self.length = limit - return self - - def skip(self, skip): - if skip < self.length and (skip > 0): - self.collection = self.collection[self.length - skip: -1] - self.length -= skip - elif skip >= self.length: - self.collection = [] - self.length = 0 - return self - - def _count(self): - return self.length - - def count(self): - return thread_execute(self._count) - - -class MemDb(object): - - def __init__(self, name): - self.name = name - self.contents = [] - pass - - def _find_one(self, spec_or_id=None, *args): - if spec_or_id is not None and not isinstance(spec_or_id, dict): - spec_or_id = {"_id": spec_or_id} - if '_id' in spec_or_id: - spec_or_id['_id'] = str(spec_or_id['_id']) - cursor = self._find(spec_or_id, *args) - for result in cursor: - return result - return None - - def find_one(self, spec_or_id=None, *args): - return thread_execute(self._find_one, spec_or_id, *args) - - def _insert(self, doc_or_docs, check_keys=True): - - docs = doc_or_docs - return_one = False - if isinstance(docs, dict): - return_one = True - docs = [docs] - - if check_keys: - for doc in docs: - self._check_keys(doc) - - ids = [] - for doc in docs: - if '_id' not in doc: - doc['_id'] = str(ObjectId()) - if not self._find_one(doc['_id']): - ids.append(doc['_id']) - self.contents.append(doc_or_docs) - - if len(ids) == 0: - return None - if return_one: - return ids[0] - else: - return ids - - def insert(self, doc_or_docs, check_keys=True): - return thread_execute(self._insert, doc_or_docs, check_keys) - - @staticmethod - def _compare_date(spec, value): - gte = True - lt = False - for k, v in spec.iteritems(): - if k == '$gte' and value < v: - gte = False - elif k == '$lt' and value < v: - lt = True - return gte and lt - - def _in(self, content, *args): - if self.name == 'scenarios': - return self._in_scenarios(content, *args) - else: - return self._in_others(content, *args) - - def _in_scenarios_installer(self, installer, content): - hit = False - for s_installer in content['installers']: - if installer == s_installer['installer']: - hit = True - - return hit - - def _in_scenarios_version(self, version, content): - hit = False - for s_installer in content['installers']: - for s_version in s_installer['versions']: - if version == s_version['version']: - hit = True - return hit - - def _in_scenarios_project(self, project, content): - hit = False - for s_installer in content['installers']: - for s_version in s_installer['versions']: - for s_project in s_version['projects']: - if project == s_project['project']: - hit = True - - return hit - - def _in_scenarios(self, content, *args): - for arg in args: - for k, v in arg.iteritems(): - if k == 'installers': - for inner in v.values(): - for i_k, i_v in inner.iteritems(): - if i_k == 'installer': - return self._in_scenarios_installer(i_v, - content) - elif i_k == 'versions.version': - return self._in_scenarios_version(i_v, - content) - elif i_k == 'versions.projects.project': - return self._in_scenarios_project(i_v, - content) - elif content.get(k, None) != v: - return False - - return True - - def _in_others(self, content, *args): - for arg in args: - for k, v in arg.iteritems(): - if k == 'start_date': - if not MemDb._compare_date(v, content.get(k)): - return False - elif k == 'trust_indicator.current': - if content.get('trust_indicator').get('current') != v: - return False - elif not isinstance(v, dict) and content.get(k, None) != v: - return False - return True - - def _find(self, *args): - res = [] - for content in self.contents: - if self._in(content, *args): - res.append(content) - - return res - - def find(self, *args): - return MemCursor(self._find(*args)) - - def _aggregate(self, *args, **kwargs): - res = self.contents - print args - for arg in args[0]: - for k, v in arg.iteritems(): - if k == '$match': - res = self._find(v) - cursor = MemCursor(res) - for arg in args[0]: - for k, v in arg.iteritems(): - if k == '$sort': - cursor = cursor.sort(v) - elif k == '$skip': - cursor = cursor.skip(v) - elif k == '$limit': - cursor = cursor.limit(v) - return cursor - - def aggregate(self, *args, **kwargs): - return self._aggregate(*args, **kwargs) - - def _update(self, spec, document, check_keys=True): - updated = False - - if check_keys: - self._check_keys(document) - - for index in range(len(self.contents)): - content = self.contents[index] - if self._in(content, spec): - for k, v in document.iteritems(): - updated = True - content[k] = v - self.contents[index] = content - return updated - - def update(self, spec, document, check_keys=True): - return thread_execute(self._update, spec, document, check_keys) - - def _remove(self, spec_or_id=None): - if spec_or_id is None: - self.contents = [] - if not isinstance(spec_or_id, dict): - spec_or_id = {'_id': spec_or_id} - for index in range(len(self.contents)): - content = self.contents[index] - if self._in(content, spec_or_id): - del self.contents[index] - return True - return False - - def remove(self, spec_or_id=None): - return thread_execute(self._remove, spec_or_id) - - def clear(self): - self._remove() - - def _check_keys(self, doc): - for key in doc.keys(): - if '.' in key: - raise NameError('key {} must not contain .'.format(key)) - if key.startswith('$'): - raise NameError('key {} must not start with $'.format(key)) - if isinstance(doc.get(key), dict): - self._check_keys(doc.get(key)) - - -def __getattr__(name): - return globals()[name] - - -pods = MemDb('pods') -projects = MemDb('projects') -testcases = MemDb('testcases') -results = MemDb('results') -scenarios = MemDb('scenarios') -tokens = MemDb('tokens') diff --git a/cvp/opnfv_testapi/tests/unit/resources/__init__.py b/cvp/opnfv_testapi/tests/unit/resources/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/__init__.py +++ /dev/null diff --git a/cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json b/cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json deleted file mode 100644 index 18780221..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "name": "nosdn-nofeature-ha", - "installers": - [ - { - "installer": "apex", - "versions": - [ - { - "owner": "Luke", - "version": "master", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": - [ - { - "date": "2017-01-08 22:46:44", - "score": "12/14" - } - - ], - "trust_indicators": [] - }, - { - "project": "yardstick", - "customs": [], - "scores": [], - "trust_indicators": [] - } - ] - } - ] - } - ] -} diff --git a/cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json b/cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json deleted file mode 100644 index b6a3b83a..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "name": "odl_2-nofeature-ha", - "installers": - [ - { - "installer": "fuel", - "versions": - [ - { - "owner": "Lucky", - "version": "colorado", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": [], - "trust_indicators": [ - { - "date": "2017-01-18 22:46:44", - "status": "silver" - } - - ] - }, - { - "project": "yardstick", - "customs": ["suite-a"], - "scores": [ - { - "date": "2017-01-08 22:46:44", - "score": "0" - } - ], - "trust_indicators": [ - { - "date": "2017-01-18 22:46:44", - "status": "gold" - } - ] - } - ] - }, - { - "owner": "Luke", - "version": "colorado", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": - [ - { - "date": "2017-01-09 22:46:44", - "score": "11/14" - } - - ], - "trust_indicators": [] - }, - { - "project": "yardstick", - "customs": [], - "scores": [], - "trust_indicators": [] - } - ] - } - ] - } - ] -} diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_base.py b/cvp/opnfv_testapi/tests/unit/resources/test_base.py deleted file mode 100644 index dcec4e95..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/test_base.py +++ /dev/null @@ -1,161 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import json -from os import path - -import mock -from tornado import testing - -from opnfv_testapi.resources import models -from opnfv_testapi.tests.unit import fake_pymongo - - -class TestBase(testing.AsyncHTTPTestCase): - headers = {'Content-Type': 'application/json; charset=UTF-8'} - - def setUp(self): - self._patch_server() - self.basePath = '' - self.create_res = models.CreateResponse - self.get_res = None - self.list_res = None - self.update_res = None - self.req_d = None - self.req_e = None - self.addCleanup(self._clear) - super(TestBase, self).setUp() - - def tearDown(self): - self.db_patcher.stop() - self.config_patcher.stop() - - def _patch_server(self): - import argparse - config = path.join(path.dirname(__file__), '../common/normal.ini') - self.config_patcher = mock.patch( - 'argparse.ArgumentParser.parse_known_args', - return_value=(argparse.Namespace(config_file=config), None)) - self.db_patcher = mock.patch('opnfv_testapi.db.api.DB', - fake_pymongo) - self.config_patcher.start() - self.db_patcher.start() - - def set_config_file(self): - self.config_file = 'normal.ini' - - def get_app(self): - from opnfv_testapi.cmd import server - return server.make_app() - - def create_d(self, *args): - return self.create(self.req_d, *args) - - def create_e(self, *args): - return self.create(self.req_e, *args) - - def create(self, req=None, *args): - return self.create_help(self.basePath, req, *args) - - def create_help(self, uri, req, *args): - if req and not isinstance(req, str) and hasattr(req, 'format'): - req = req.format() - res = self.fetch(self._update_uri(uri, *args), - method='POST', - body=json.dumps(req), - headers=self.headers) - - return self._get_return(res, self.create_res) - - def get(self, *args): - res = self.fetch(self._get_uri(*args), - method='GET', - headers=self.headers) - - def inner(): - new_args, num = self._get_valid_args(*args) - return self.get_res \ - if num != self._need_arg_num(self.basePath) else self.list_res - return self._get_return(res, inner()) - - def query(self, query): - res = self.fetch(self._get_query_uri(query), - method='GET', - headers=self.headers) - return self._get_return(res, self.list_res) - - def update(self, new=None, *args): - if new: - new = new.format() - res = self.fetch(self._get_uri(*args), - method='PUT', - body=json.dumps(new), - headers=self.headers) - return self._get_return(res, self.update_res) - - def delete(self, *args): - res = self.fetch(self._get_uri(*args), - method='DELETE', - headers=self.headers) - return res.code, res.body - - @staticmethod - def _get_valid_args(*args): - new_args = tuple(['%s' % arg for arg in args if arg is not None]) - return new_args, len(new_args) - - def _need_arg_num(self, uri): - return uri.count('%s') - - def _get_query_uri(self, query): - return self.basePath + '?' + query if query else self.basePath - - def _get_uri(self, *args): - return self._update_uri(self.basePath, *args) - - def _update_uri(self, uri, *args): - r_uri = uri - new_args, num = self._get_valid_args(*args) - if num != self._need_arg_num(uri): - r_uri += '/%s' - - return r_uri % tuple(['%s' % arg for arg in new_args]) - - def _get_return(self, res, cls): - code = res.code - body = res.body - return code, self._get_return_body(code, body, cls) - - @staticmethod - def _get_return_body(code, body, cls): - return cls.from_dict(json.loads(body)) if code < 300 and cls else body - - def assert_href(self, body): - self.assertIn(self.basePath, body.href) - - def assert_create_body(self, body, req=None, *args): - import inspect - if not req: - req = self.req_d - resource_name = '' - if inspect.isclass(req): - resource_name = req.name - elif isinstance(req, dict): - resource_name = req['name'] - elif isinstance(req, str): - resource_name = json.loads(req)['name'] - new_args = args + tuple([resource_name]) - self.assertIn(self._get_uri(*new_args), body.href) - - @staticmethod - def _clear(): - fake_pymongo.pods.clear() - fake_pymongo.projects.clear() - fake_pymongo.testcases.clear() - fake_pymongo.results.clear() - fake_pymongo.scenarios.clear() diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py b/cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py deleted file mode 100644 index 1ebc96f3..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py +++ /dev/null @@ -1,123 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import unittest - -from tornado import gen -from tornado import testing -from tornado import web - -from opnfv_testapi.tests.unit import fake_pymongo - - -class MyTest(testing.AsyncHTTPTestCase): - def setUp(self): - super(MyTest, self).setUp() - self.db = fake_pymongo - self.addCleanup(self._clear) - self.io_loop.run_sync(self.fixture_setup) - - def get_app(self): - return web.Application() - - @gen.coroutine - def fixture_setup(self): - self.test1 = {'_id': '1', 'name': 'test1'} - self.test2 = {'name': 'test2'} - yield self.db.pods.insert({'_id': '1', 'name': 'test1'}) - yield self.db.pods.insert({'name': 'test2'}) - - @testing.gen_test - def test_find_one(self): - user = yield self.db.pods.find_one({'name': 'test1'}) - self.assertEqual(user, self.test1) - self.db.pods.remove() - - @testing.gen_test - def test_find(self): - cursor = self.db.pods.find() - names = [] - while (yield cursor.fetch_next): - ob = cursor.next_object() - names.append(ob.get('name')) - self.assertItemsEqual(names, ['test1', 'test2']) - - @testing.gen_test - def test_update(self): - yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'}) - user = yield self.db.pods.find_one({'_id': '1'}) - self.assertEqual(user.get('name', None), 'new_test1') - - def test_update_dot_error(self): - self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, - 'key 1. name must not contain .') - - def test_update_dot_no_error(self): - self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, - None, - check_keys=False) - - def test_update_dollar_error(self): - self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, - 'key $name must not start with $') - - def test_update_dollar_no_error(self): - self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, - None, - check_keys=False) - - @testing.gen_test - def test_remove(self): - yield self.db.pods.remove({'_id': '1'}) - user = yield self.db.pods.find_one({'_id': '1'}) - self.assertIsNone(user) - - def test_insert_dot_error(self): - self._insert_assert({'_id': '1', '2. name': 'test1'}, - 'key 2. name must not contain .') - - def test_insert_dot_no_error(self): - self._insert_assert({'_id': '1', '2. name': 'test1'}, - None, - check_keys=False) - - def test_insert_dollar_error(self): - self._insert_assert({'_id': '1', '$name': 'test1'}, - 'key $name must not start with $') - - def test_insert_dollar_no_error(self): - self._insert_assert({'_id': '1', '$name': 'test1'}, - None, - check_keys=False) - - def _clear(self): - self.db.pods.clear() - - def _update_assert(self, docs, error=None, **kwargs): - self._db_assert('update', error, {'_id': '1'}, docs, **kwargs) - - def _insert_assert(self, docs, error=None, **kwargs): - self._db_assert('insert', error, docs, **kwargs) - - @testing.gen_test - def _db_assert(self, method, error, *args, **kwargs): - name_error = None - try: - yield self._eval_pods_db(method, *args, **kwargs) - except NameError as err: - name_error = err.args[0] - finally: - self.assertEqual(name_error, error) - - def _eval_pods_db(self, method, *args, **kwargs): - table_obj = vars(self.db)['pods'] - return table_obj.__getattribute__(method)(*args, **kwargs) - - -if __name__ == '__main__': - unittest.main() diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_pod.py b/cvp/opnfv_testapi/tests/unit/resources/test_pod.py deleted file mode 100644 index cb4f1d92..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/test_pod.py +++ /dev/null @@ -1,90 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import pod_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestPodBase(base.TestBase): - def setUp(self): - super(TestPodBase, self).setUp() - self.req_d = pod_models.PodCreateRequest('zte-1', 'virtual', - 'zte pod 1', 'ci-pod') - self.req_e = pod_models.PodCreateRequest('zte-2', 'metal', 'zte pod 2') - self.get_res = pod_models.Pod - self.list_res = pod_models.Pods - self.basePath = '/api/v1/pods' - - def assert_get_body(self, pod, req=None): - if not req: - req = self.req_d - self.assertEqual(pod.name, req.name) - self.assertEqual(pod.mode, req.mode) - self.assertEqual(pod.details, req.details) - self.assertEqual(pod.role, req.role) - self.assertIsNotNone(pod.creation_date) - self.assertIsNotNone(pod._id) - - -class TestPodCreate(TestPodBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - return pod_models.PodCreateRequest('') - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - return pod_models.PodCreateRequest(None) - - @executor.create(httplib.OK, 'assert_create_body') - def test_success(self): - return self.req_d - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestPodGet(TestPodBase): - def setUp(self): - super(TestPodGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_get_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_assert_list') - def test_list(self): - return None - - def _assert_list(self, body): - self.assertEqual(len(body.pods), 2) - for pod in body.pods: - if self.req_d.name == pod.name: - self.assert_get_body(pod) - else: - self.assert_get_body(pod, self.req_e) - - -if __name__ == '__main__': - unittest.main() diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_project.py b/cvp/opnfv_testapi/tests/unit/resources/test_project.py deleted file mode 100644 index 0622ba8d..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/test_project.py +++ /dev/null @@ -1,137 +0,0 @@ -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import project_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestProjectBase(base.TestBase): - def setUp(self): - super(TestProjectBase, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping', - 'vping-ssh test') - self.req_e = project_models.ProjectCreateRequest('doctor', - 'doctor test') - self.get_res = project_models.Project - self.list_res = project_models.Projects - self.update_res = project_models.Project - self.basePath = '/api/v1/projects' - - def assert_body(self, project, req=None): - if not req: - req = self.req_d - self.assertEqual(project.name, req.name) - self.assertEqual(project.description, req.description) - self.assertIsNotNone(project._id) - self.assertIsNotNone(project.creation_date) - - -class TestProjectCreate(TestProjectBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - return project_models.ProjectCreateRequest('') - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - return project_models.ProjectCreateRequest(None) - - @executor.create(httplib.OK, 'assert_create_body') - def test_success(self): - return self.req_d - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestProjectGet(TestProjectBase): - def setUp(self): - super(TestProjectGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_assert_list') - def test_list(self): - return None - - def _assert_list(self, body): - for project in body.projects: - if self.req_d.name == project.name: - self.assert_body(project) - else: - self.assert_body(project, self.req_e) - - -class TestProjectUpdate(TestProjectBase): - def setUp(self): - super(TestProjectUpdate, self).setUp() - _, d_body = self.create_d() - _, get_res = self.get(self.req_d.name) - self.index_d = get_res._id - self.create_e() - - @executor.update(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None, 'noBody' - - @executor.update(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return self.req_e, 'notFound' - - @executor.update(httplib.FORBIDDEN, message.exist_base) - def test_newNameExist(self): - return self.req_e, self.req_d.name - - @executor.update(httplib.FORBIDDEN, message.no_update()) - def test_noUpdate(self): - return self.req_d, self.req_d.name - - @executor.update(httplib.OK, '_assert_update') - def test_success(self): - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - def _assert_update(self, req, body): - self.assertEqual(self.index_d, body._id) - self.assert_body(body, req) - _, new_body = self.get(req.name) - self.assertEqual(self.index_d, new_body._id) - self.assert_body(new_body, req) - - -class TestProjectDelete(TestProjectBase): - def setUp(self): - super(TestProjectDelete, self).setUp() - self.create_d() - - @executor.delete(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return 'notFound' - - @executor.delete(httplib.OK, '_assert_delete') - def test_success(self): - return self.req_d.name - - def _assert_delete(self, body): - self.assertEqual(body, '') - code, body = self.get(self.req_d.name) - self.assertEqual(code, httplib.NOT_FOUND) - - -if __name__ == '__main__': - unittest.main() diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_result.py b/cvp/opnfv_testapi/tests/unit/resources/test_result.py deleted file mode 100644 index 1e83ed30..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/test_result.py +++ /dev/null @@ -1,410 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import copy -import httplib -import unittest -from datetime import datetime, timedelta -import json - -from opnfv_testapi.common import message -from opnfv_testapi.resources import pod_models -from opnfv_testapi.resources import project_models -from opnfv_testapi.resources import result_models -from opnfv_testapi.resources import testcase_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.resources import test_base as base - - -class Details(object): - def __init__(self, timestart=None, duration=None, status=None): - self.timestart = timestart - self.duration = duration - self.status = status - self.items = [{'item1': 1}, {'item2': 2}] - - def format(self): - return { - "timestart": self.timestart, - "duration": self.duration, - "status": self.status, - 'items': [{'item1': 1}, {'item2': 2}] - } - - @staticmethod - def from_dict(a_dict): - - if a_dict is None: - return None - - t = Details() - t.timestart = a_dict.get('timestart') - t.duration = a_dict.get('duration') - t.status = a_dict.get('status') - t.items = a_dict.get('items') - return t - - -class TestResultBase(base.TestBase): - def setUp(self): - self.pod = 'zte-pod1' - self.project = 'functest' - self.case = 'vPing' - self.installer = 'fuel' - self.version = 'C' - self.build_tag = 'v3.0' - self.scenario = 'odl-l2' - self.criteria = 'passed' - self.trust_indicator = result_models.TI(0.7) - self.start_date = str(datetime.now()) - self.stop_date = str(datetime.now() + timedelta(minutes=1)) - self.update_date = str(datetime.now() + timedelta(days=1)) - self.update_step = -0.05 - super(TestResultBase, self).setUp() - self.details = Details(timestart='0', duration='9s', status='OK') - self.req_d = result_models.ResultCreateRequest( - pod_name=self.pod, - project_name=self.project, - case_name=self.case, - installer=self.installer, - version=self.version, - start_date=self.start_date, - stop_date=self.stop_date, - details=self.details.format(), - build_tag=self.build_tag, - scenario=self.scenario, - criteria=self.criteria, - trust_indicator=self.trust_indicator) - self.get_res = result_models.TestResult - self.list_res = result_models.TestResults - self.update_res = result_models.TestResult - self.basePath = '/api/v1/results' - self.req_pod = pod_models.PodCreateRequest( - self.pod, - 'metal', - 'zte pod 1') - self.req_project = project_models.ProjectCreateRequest( - self.project, - 'vping test') - self.req_testcase = testcase_models.TestcaseCreateRequest( - self.case, - '/cases/vping', - 'vping-ssh test') - self.create_help('/api/v1/pods', self.req_pod) - self.create_help('/api/v1/projects', self.req_project) - self.create_help('/api/v1/projects/%s/cases', - self.req_testcase, - self.project) - - def assert_res(self, result, req=None): - if req is None: - req = self.req_d - self.assertEqual(result.pod_name, req.pod_name) - self.assertEqual(result.project_name, req.project_name) - self.assertEqual(result.case_name, req.case_name) - self.assertEqual(result.installer, req.installer) - self.assertEqual(result.version, req.version) - details_req = Details.from_dict(req.details) - details_res = Details.from_dict(result.details) - self.assertEqual(details_res.duration, details_req.duration) - self.assertEqual(details_res.timestart, details_req.timestart) - self.assertEqual(details_res.status, details_req.status) - self.assertEqual(details_res.items, details_req.items) - self.assertEqual(result.build_tag, req.build_tag) - self.assertEqual(result.scenario, req.scenario) - self.assertEqual(result.criteria, req.criteria) - self.assertEqual(result.start_date, req.start_date) - self.assertEqual(result.stop_date, req.stop_date) - self.assertIsNotNone(result._id) - ti = result.trust_indicator - self.assertEqual(ti.current, req.trust_indicator.current) - if ti.histories: - history = ti.histories[0] - self.assertEqual(history.date, self.update_date) - self.assertEqual(history.step, self.update_step) - - def _create_d(self): - _, res = self.create_d() - return res.href.split('/')[-1] - - def upload(self, req): - if req and not isinstance(req, str) and hasattr(req, 'format'): - req = req.format() - res = self.fetch(self.basePath + '/upload', - method='POST', - body=json.dumps(req), - headers=self.headers) - - return self._get_return(res, self.create_res) - - -class TestResultUpload(TestResultBase): - @executor.upload(httplib.BAD_REQUEST, message.key_error('file')) - def test_filenotfind(self): - return None - - -class TestResultCreate(TestResultBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_nobody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('pod_name')) - def test_podNotProvided(self): - req = self.req_d - req.pod_name = None - return req - - @executor.create(httplib.BAD_REQUEST, message.missing('project_name')) - def test_projectNotProvided(self): - req = self.req_d - req.project_name = None - return req - - @executor.create(httplib.BAD_REQUEST, message.missing('case_name')) - def test_testcaseNotProvided(self): - req = self.req_d - req.case_name = None - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noPod(self): - req = self.req_d - req.pod_name = 'notExistPod' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noProject(self): - req = self.req_d - req.project_name = 'notExistProject' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noTestcase(self): - req = self.req_d - req.case_name = 'notExistTestcase' - return req - - @executor.create(httplib.OK, 'assert_href') - def test_success(self): - return self.req_d - - @executor.create(httplib.OK, 'assert_href') - def test_key_with_doc(self): - req = copy.deepcopy(self.req_d) - req.details = {'1.name': 'dot_name'} - return req - - @executor.create(httplib.OK, '_assert_no_ti') - def test_no_ti(self): - req = result_models.ResultCreateRequest(pod_name=self.pod, - project_name=self.project, - case_name=self.case, - installer=self.installer, - version=self.version, - start_date=self.start_date, - stop_date=self.stop_date, - details=self.details.format(), - build_tag=self.build_tag, - scenario=self.scenario, - criteria=self.criteria) - self.actual_req = req - return req - - def _assert_no_ti(self, body): - _id = body.href.split('/')[-1] - code, body = self.get(_id) - self.assert_res(body, self.actual_req) - - -class TestResultGet(TestResultBase): - def setUp(self): - super(TestResultGet, self).setUp() - self.req_10d_before = self._create_changed_date(days=-10) - self.req_d_id = self._create_d() - self.req_10d_later = self._create_changed_date(days=10) - - @executor.get(httplib.OK, 'assert_res') - def test_getOne(self): - return self.req_d_id - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryPod(self): - return self._set_query('pod') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryProject(self): - return self._set_query('project') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryTestcase(self): - return self._set_query('case') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryVersion(self): - return self._set_query('version') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryInstaller(self): - return self._set_query('installer') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryBuildTag(self): - return self._set_query('build_tag') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryScenario(self): - return self._set_query('scenario') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryTrustIndicator(self): - return self._set_query('trust_indicator') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryCriteria(self): - return self._set_query('criteria') - - @executor.query(httplib.BAD_REQUEST, message.must_int('period')) - def test_queryPeriodNotInt(self): - return self._set_query('period=a') - - @executor.query(httplib.OK, '_query_period_one', 1) - def test_queryPeriodSuccess(self): - return self._set_query('period=5') - - @executor.query(httplib.BAD_REQUEST, message.must_int('last')) - def test_queryLastNotInt(self): - return self._set_query('last=a') - - @executor.query(httplib.OK, '_query_last_one', 1) - def test_queryLast(self): - return self._set_query('last=1') - - @executor.query(httplib.OK, '_query_success', 4) - def test_queryPublic(self): - self._create_public_data() - return self._set_query('') - - @executor.query(httplib.OK, '_query_success', 1) - def test_queryPrivate(self): - self._create_private_data() - return self._set_query('public=false') - - @executor.query(httplib.OK, '_query_period_one', 1) - def test_combination(self): - return self._set_query('pod', - 'project', - 'case', - 'version', - 'installer', - 'build_tag', - 'scenario', - 'trust_indicator', - 'criteria', - 'period=5') - - @executor.query(httplib.OK, '_query_success', 0) - def test_notFound(self): - return self._set_query('pod=notExistPod', - 'project', - 'case', - 'version', - 'installer', - 'build_tag', - 'scenario', - 'trust_indicator', - 'criteria', - 'period=1') - - @executor.query(httplib.OK, '_query_success', 1) - def test_filterErrorStartdate(self): - self._create_error_start_date(None) - self._create_error_start_date('None') - self._create_error_start_date('null') - self._create_error_start_date('') - return self._set_query('period=5') - - def _query_success(self, body, number): - self.assertEqual(number, len(body.results)) - - def _query_last_one(self, body, number): - self.assertEqual(number, len(body.results)) - self.assert_res(body.results[0], self.req_10d_later) - - def _query_period_one(self, body, number): - self.assertEqual(number, len(body.results)) - self.assert_res(body.results[0], self.req_d) - - def _create_error_start_date(self, start_date): - req = copy.deepcopy(self.req_d) - req.start_date = start_date - self.create(req) - return req - - def _create_changed_date(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.start_date = datetime.now() + timedelta(**kwargs) - req.stop_date = str(req.start_date + timedelta(minutes=10)) - req.start_date = str(req.start_date) - self.create(req) - return req - - def _create_public_data(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.public = 'true' - self.create(req) - return req - - def _create_private_data(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.public = 'false' - self.create(req) - return req - - def _set_query(self, *args): - def get_value(arg): - return self.__getattribute__(arg) \ - if arg != 'trust_indicator' else self.trust_indicator.current - uri = '' - for arg in args: - if arg: - if '=' in arg: - uri += arg + '&' - else: - uri += '{}={}&'.format(arg, get_value(arg)) - return uri[0: -1] - - -class TestResultUpdate(TestResultBase): - def setUp(self): - super(TestResultUpdate, self).setUp() - self.req_d_id = self._create_d() - - @executor.update(httplib.OK, '_assert_update_ti') - def test_success(self): - new_ti = copy.deepcopy(self.trust_indicator) - new_ti.current += self.update_step - new_ti.histories.append( - result_models.TIHistory(self.update_date, self.update_step)) - new_data = copy.deepcopy(self.req_d) - new_data.trust_indicator = new_ti - update = result_models.ResultUpdateRequest(trust_indicator=new_ti) - self.update_req = new_data - return update, self.req_d_id - - def _assert_update_ti(self, request, body): - ti = body.trust_indicator - self.assertEqual(ti.current, request.trust_indicator.current) - if ti.histories: - history = ti.histories[0] - self.assertEqual(history.date, self.update_date) - self.assertEqual(history.step, self.update_step) - - -if __name__ == '__main__': - unittest.main() diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_scenario.py b/cvp/opnfv_testapi/tests/unit/resources/test_scenario.py deleted file mode 100644 index bd720671..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/test_scenario.py +++ /dev/null @@ -1,360 +0,0 @@ -import functools -import httplib -import json -import os -from copy import deepcopy -from datetime import datetime - -import opnfv_testapi.resources.scenario_models as models -from opnfv_testapi.common import message -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestScenarioBase(base.TestBase): - def setUp(self): - super(TestScenarioBase, self).setUp() - self.get_res = models.Scenario - self.list_res = models.Scenarios - self.basePath = '/api/v1/scenarios' - self.req_d = self._load_request('scenario-c1.json') - self.req_2 = self._load_request('scenario-c2.json') - - def tearDown(self): - pass - - def assert_body(self, project, req=None): - pass - - @staticmethod - def _load_request(f_req): - abs_file = os.path.join(os.path.dirname(__file__), f_req) - with open(abs_file, 'r') as f: - loader = json.load(f) - f.close() - return loader - - def create_return_name(self, req): - _, res = self.create(req) - return res.href.split('/')[-1] - - def assert_res(self, code, scenario, req=None): - self.assertEqual(code, httplib.OK) - if req is None: - req = self.req_d - self.assertIsNotNone(scenario._id) - self.assertIsNotNone(scenario.creation_date) - - scenario == models.Scenario.from_dict(req) - - @staticmethod - def _set_query(*args): - uri = '' - for arg in args: - uri += arg + '&' - return uri[0: -1] - - def _get_and_assert(self, name, req=None): - code, body = self.get(name) - self.assert_res(code, body, req) - - -class TestScenarioCreate(TestScenarioBase): - def test_withoutBody(self): - (code, body) = self.create() - self.assertEqual(code, httplib.BAD_REQUEST) - - def test_emptyName(self): - req_empty = models.ScenarioCreateRequest('') - (code, body) = self.create(req_empty) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_noneName(self): - req_none = models.ScenarioCreateRequest(None) - (code, body) = self.create(req_none) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_success(self): - (code, body) = self.create_d() - self.assertEqual(code, httplib.OK) - self.assert_create_body(body) - - def test_alreadyExist(self): - self.create_d() - (code, body) = self.create_d() - self.assertEqual(code, httplib.FORBIDDEN) - self.assertIn(message.exist_base, body) - - -class TestScenarioGet(TestScenarioBase): - def setUp(self): - super(TestScenarioGet, self).setUp() - self.scenario_1 = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def test_getByName(self): - self._get_and_assert(self.scenario_1, self.req_d) - - def test_getAll(self): - self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) - - def test_queryName(self): - query = self._set_query('name=nosdn-nofeature-ha') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryInstaller(self): - query = self._set_query('installer=apex') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryVersion(self): - query = self._set_query('version=master') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryProject(self): - query = self._set_query('project=functest') - self._query_and_assert(query, reqs=[self.req_d, self.req_2]) - - def test_queryCombination(self): - query = self._set_query('name=nosdn-nofeature-ha', - 'installer=apex', - 'version=master', - 'project=functest') - - self._query_and_assert(query, reqs=[self.req_d]) - - def _query_and_assert(self, query, found=True, reqs=None): - code, body = self.query(query) - if not found: - self.assertEqual(code, httplib.OK) - self.assertEqual(0, len(body.scenarios)) - else: - self.assertEqual(len(reqs), len(body.scenarios)) - for req in reqs: - for scenario in body.scenarios: - if req['name'] == scenario.name: - self.assert_res(code, scenario, req) - - -class TestScenarioUpdate(TestScenarioBase): - def setUp(self): - super(TestScenarioUpdate, self).setUp() - self.scenario = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def _execute(set_update): - @functools.wraps(set_update) - def magic(self): - update, scenario = set_update(self, deepcopy(self.req_d)) - self._update_and_assert(update, scenario) - return magic - - def _update(expected): - def _update(set_update): - @functools.wraps(set_update) - def wrap(self): - update, scenario = set_update(self, deepcopy(self.req_d)) - code, body = self.update(update, self.scenario) - getattr(self, expected)(code, scenario) - return wrap - return _update - - @_update('_success') - def test_renameScenario(self, scenario): - new_name = 'nosdn-nofeature-noha' - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={'name': new_name}) - return update_req, scenario - - @_update('_forbidden') - def test_renameScenario_exist(self, scenario): - new_name = self.scenario_2 - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={'name': new_name}) - return update_req, scenario - - @_update('_bad_request') - def test_renameScenario_noName(self, scenario): - new_name = self.scenario_2 - scenario['name'] = new_name - update_req = models.ScenarioUpdateRequest(field='name', - op='update', - locate={}, - term={}) - return update_req, scenario - - @_execute - def test_addInstaller(self, scenario): - add = models.ScenarioInstaller(installer='daisy', versions=list()) - scenario['installers'].append(add.format()) - update = models.ScenarioUpdateRequest(field='installer', - op='add', - locate={}, - term=add.format()) - return update, scenario - - @_execute - def test_deleteInstaller(self, scenario): - scenario['installers'] = filter(lambda f: f['installer'] != 'apex', - scenario['installers']) - - update = models.ScenarioUpdateRequest(field='installer', - op='delete', - locate={'installer': 'apex'}) - return update, scenario - - @_execute - def test_addVersion(self, scenario): - add = models.ScenarioVersion(version='danube', projects=list()) - scenario['installers'][0]['versions'].append(add.format()) - update = models.ScenarioUpdateRequest(field='version', - op='add', - locate={'installer': 'apex'}, - term=add.format()) - return update, scenario - - @_execute - def test_deleteVersion(self, scenario): - scenario['installers'][0]['versions'] = filter( - lambda f: f['version'] != 'master', - scenario['installers'][0]['versions']) - - update = models.ScenarioUpdateRequest(field='version', - op='delete', - locate={'installer': 'apex', - 'version': 'master'}) - return update, scenario - - @_execute - def test_changeOwner(self, scenario): - scenario['installers'][0]['versions'][0]['owner'] = 'lucy' - - update = models.ScenarioUpdateRequest(field='owner', - op='update', - locate={'installer': 'apex', - 'version': 'master'}, - term={'owner': 'lucy'}) - return update, scenario - - @_execute - def test_addProject(self, scenario): - add = models.ScenarioProject(project='qtip').format() - scenario['installers'][0]['versions'][0]['projects'].append(add) - update = models.ScenarioUpdateRequest(field='project', - op='add', - locate={'installer': 'apex', - 'version': 'master'}, - term=add) - return update, scenario - - @_execute - def test_deleteProject(self, scenario): - scenario['installers'][0]['versions'][0]['projects'] = filter( - lambda f: f['project'] != 'functest', - scenario['installers'][0]['versions'][0]['projects']) - - update = models.ScenarioUpdateRequest(field='project', - op='delete', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}) - return update, scenario - - @_execute - def test_addCustoms(self, scenario): - add = ['odl', 'parser', 'vping_ssh'] - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh'] - update = models.ScenarioUpdateRequest(field='customs', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add) - return update, scenario - - @_execute - def test_deleteCustoms(self, scenario): - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck'] - update = models.ScenarioUpdateRequest(field='customs', - op='delete', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=['vping_ssh']) - return update, scenario - - @_execute - def test_addScore(self, scenario): - add = models.ScenarioScore(date=str(datetime.now()), score='11/12') - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['scores'].append(add.format()) - update = models.ScenarioUpdateRequest(field='score', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add.format()) - return update, scenario - - @_execute - def test_addTi(self, scenario): - add = models.ScenarioTI(date=str(datetime.now()), status='gold') - projects = scenario['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['trust_indicators'].append(add.format()) - update = models.ScenarioUpdateRequest(field='trust_indicator', - op='add', - locate={ - 'installer': 'apex', - 'version': 'master', - 'project': 'functest'}, - term=add.format()) - return update, scenario - - def _update_and_assert(self, update_req, new_scenario, name=None): - code, _ = self.update(update_req, self.scenario) - self.assertEqual(code, httplib.OK) - self._get_and_assert(_none_default(name, self.scenario), - new_scenario) - - def _success(self, status, new_scenario): - self.assertEqual(status, httplib.OK) - self._get_and_assert(new_scenario.get('name'), new_scenario) - - def _forbidden(self, status, new_scenario): - self.assertEqual(status, httplib.FORBIDDEN) - - def _bad_request(self, status, new_scenario): - self.assertEqual(status, httplib.BAD_REQUEST) - - -class TestScenarioDelete(TestScenarioBase): - def test_notFound(self): - code, body = self.delete('notFound') - self.assertEqual(code, httplib.NOT_FOUND) - - def test_success(self): - scenario = self.create_return_name(self.req_d) - code, _ = self.delete(scenario) - self.assertEqual(code, httplib.OK) - code, _ = self.get(scenario) - self.assertEqual(code, httplib.NOT_FOUND) - - -def _none_default(check, default): - return check if check else default diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_testcase.py b/cvp/opnfv_testapi/tests/unit/resources/test_testcase.py deleted file mode 100644 index 4f2bc2ad..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/test_testcase.py +++ /dev/null @@ -1,201 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import copy -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import project_models -from opnfv_testapi.resources import testcase_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestCaseBase(base.TestBase): - def setUp(self): - super(TestCaseBase, self).setUp() - self.req_d = testcase_models.TestcaseCreateRequest('vping_1', - '/cases/vping_1', - 'vping-ssh test') - self.req_e = testcase_models.TestcaseCreateRequest('doctor_1', - '/cases/doctor_1', - 'create doctor') - self.update_d = testcase_models.TestcaseUpdateRequest('vping_1', - 'vping-ssh test', - 'functest') - self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1', - 'create doctor', - 'functest') - self.get_res = testcase_models.Testcase - self.list_res = testcase_models.Testcases - self.update_res = testcase_models.Testcase - self.basePath = '/api/v1/projects/%s/cases' - self.create_project() - - def assert_body(self, case, req=None): - if not req: - req = self.req_d - self.assertEqual(case.name, req.name) - self.assertEqual(case.description, req.description) - self.assertEqual(case.url, req.url) - self.assertIsNotNone(case._id) - self.assertIsNotNone(case.creation_date) - - def assert_update_body(self, old, new, req=None): - if not req: - req = self.req_d - self.assertEqual(new.name, req.name) - self.assertEqual(new.description, req.description) - self.assertEqual(new.url, old.url) - self.assertIsNotNone(new._id) - self.assertIsNotNone(new.creation_date) - - def create_project(self): - req_p = project_models.ProjectCreateRequest('functest', - 'vping-ssh test') - self.create_help('/api/v1/projects', req_p) - self.project = req_p.name - - def create_d(self): - return super(TestCaseBase, self).create_d(self.project) - - def create_e(self): - return super(TestCaseBase, self).create_e(self.project) - - def get(self, case=None): - return super(TestCaseBase, self).get(self.project, case) - - def create(self, req=None, *args): - return super(TestCaseBase, self).create(req, self.project) - - def update(self, new=None, case=None): - return super(TestCaseBase, self).update(new, self.project, case) - - def delete(self, case): - return super(TestCaseBase, self).delete(self.project, case) - - -class TestCaseCreate(TestCaseBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_noBody(self): - return None - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noProject(self): - self.project = 'noProject' - return self.req_d - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - req_empty = testcase_models.TestcaseCreateRequest('') - return req_empty - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - req_none = testcase_models.TestcaseCreateRequest(None) - return req_none - - @executor.create(httplib.OK, '_assert_success') - def test_success(self): - return self.req_d - - def _assert_success(self, body): - self.assert_create_body(body, self.req_d, self.project) - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestCaseGet(TestCaseBase): - def setUp(self): - super(TestCaseGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_list') - def test_list(self): - return None - - def _list(self, body): - for case in body.testcases: - if self.req_d.name == case.name: - self.assert_body(case) - else: - self.assert_body(case, self.req_e) - - -class TestCaseUpdate(TestCaseBase): - def setUp(self): - super(TestCaseUpdate, self).setUp() - self.create_d() - - @executor.update(httplib.BAD_REQUEST, message.no_body()) - def test_noBody(self): - return None, 'noBody' - - @executor.update(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return self.update_e, 'notFound' - - @executor.update(httplib.FORBIDDEN, message.exist_base) - def test_newNameExist(self): - self.create_e() - return self.update_e, self.req_d.name - - @executor.update(httplib.FORBIDDEN, message.no_update()) - def test_noUpdate(self): - return self.update_d, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_success(self): - return self.update_e, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_with_dollar(self): - update = copy.deepcopy(self.update_d) - update.description = {'2. change': 'dollar change'} - return update, self.req_d.name - - def _update_success(self, request, body): - self.assert_update_body(self.req_d, body, request) - _, new_body = self.get(request.name) - self.assert_update_body(self.req_d, new_body, request) - - -class TestCaseDelete(TestCaseBase): - def setUp(self): - super(TestCaseDelete, self).setUp() - self.create_d() - - @executor.delete(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return 'notFound' - - @executor.delete(httplib.OK, '_delete_success') - def test_success(self): - return self.req_d.name - - def _delete_success(self, body): - self.assertEqual(body, '') - code, body = self.get(self.req_d.name) - self.assertEqual(code, httplib.NOT_FOUND) - - -if __name__ == '__main__': - unittest.main() diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_token.py b/cvp/opnfv_testapi/tests/unit/resources/test_token.py deleted file mode 100644 index 940e256c..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/test_token.py +++ /dev/null @@ -1,114 +0,0 @@ -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -import httplib -import unittest - -from tornado import web - -from opnfv_testapi.common import message -from opnfv_testapi.resources import project_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import fake_pymongo -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestToken(base.TestBase): - def get_app(self): - from opnfv_testapi.router import url_mappings - return web.Application( - url_mappings.mappings, - db=fake_pymongo, - debug=True, - auth=True - ) - - -class TestTokenCreateProject(TestToken): - def setUp(self): - super(TestTokenCreateProject, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping') - fake_pymongo.tokens.insert({"access_token": "12345"}) - self.basePath = '/api/v1/projects' - - @executor.create(httplib.FORBIDDEN, message.invalid_token()) - def test_projectCreateTokenInvalid(self): - self.headers['X-Auth-Token'] = '1234' - return self.req_d - - @executor.create(httplib.UNAUTHORIZED, message.unauthorized()) - def test_projectCreateTokenUnauthorized(self): - if 'X-Auth-Token' in self.headers: - self.headers.pop('X-Auth-Token') - return self.req_d - - @executor.create(httplib.OK, '_create_success') - def test_projectCreateTokenSuccess(self): - self.headers['X-Auth-Token'] = '12345' - return self.req_d - - def _create_success(self, body): - self.assertIn('CreateResponse', str(type(body))) - - -class TestTokenDeleteProject(TestToken): - def setUp(self): - super(TestTokenDeleteProject, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping') - fake_pymongo.tokens.insert({"access_token": "12345"}) - self.basePath = '/api/v1/projects' - self.headers['X-Auth-Token'] = '12345' - self.create_d() - - @executor.delete(httplib.FORBIDDEN, message.invalid_token()) - def test_projectDeleteTokenIvalid(self): - self.headers['X-Auth-Token'] = '1234' - return self.req_d.name - - @executor.delete(httplib.UNAUTHORIZED, message.unauthorized()) - def test_projectDeleteTokenUnauthorized(self): - self.headers.pop('X-Auth-Token') - return self.req_d.name - - @executor.delete(httplib.OK, '_delete_success') - def test_projectDeleteTokenSuccess(self): - return self.req_d.name - - def _delete_success(self, body): - self.assertEqual('', body) - - -class TestTokenUpdateProject(TestToken): - def setUp(self): - super(TestTokenUpdateProject, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping') - fake_pymongo.tokens.insert({"access_token": "12345"}) - self.basePath = '/api/v1/projects' - self.headers['X-Auth-Token'] = '12345' - self.create_d() - - @executor.update(httplib.FORBIDDEN, message.invalid_token()) - def test_projectUpdateTokenIvalid(self): - self.headers['X-Auth-Token'] = '1234' - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - @executor.update(httplib.UNAUTHORIZED, message.unauthorized()) - def test_projectUpdateTokenUnauthorized(self): - self.headers.pop('X-Auth-Token') - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_projectUpdateTokenSuccess(self): - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - def _update_success(self, request, body): - self.assertIn(request.name, body) - - -if __name__ == '__main__': - unittest.main() diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_version.py b/cvp/opnfv_testapi/tests/unit/resources/test_version.py deleted file mode 100644 index 51fed11e..00000000 --- a/cvp/opnfv_testapi/tests/unit/resources/test_version.py +++ /dev/null @@ -1,36 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import httplib -import unittest - -from opnfv_testapi.resources import models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestVersionBase(base.TestBase): - def setUp(self): - super(TestVersionBase, self).setUp() - self.list_res = models.Versions - self.basePath = '/versions' - - -class TestVersion(TestVersionBase): - @executor.get(httplib.OK, '_get_success') - def test_success(self): - return None - - def _get_success(self, body): - self.assertEqual(len(body.versions), 1) - self.assertEqual(body.versions[0].version, 'v1.0') - self.assertEqual(body.versions[0].description, 'basics') - - -if __name__ == '__main__': - unittest.main() |