summaryrefslogtreecommitdiffstats
path: root/utils/test/testapi/opnfv_testapi/tests/unit
diff options
context:
space:
mode:
Diffstat (limited to 'utils/test/testapi/opnfv_testapi/tests/unit')
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/__init__.py9
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/common/__init__.py0
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py25
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/conftest.py8
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/executor.py130
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py291
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/__init__.py0
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json38
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json73
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_base.py204
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py118
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_project.py137
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_result.py413
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py449
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py201
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_token.py52
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_version.py36
17 files changed, 0 insertions, 2184 deletions
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py
deleted file mode 100644
index 3fc79f1d5..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-__author__ = 'serena'
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/common/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/common/__init__.py
+++ /dev/null
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py b/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py
deleted file mode 100644
index 6d160ce1d..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/common/test_config.py
+++ /dev/null
@@ -1,25 +0,0 @@
-import argparse
-import pytest
-
-
-def test_config_normal(mocker, config_normal):
- mocker.patch(
- 'argparse.ArgumentParser.parse_known_args',
- return_value=(argparse.Namespace(config_file=config_normal), None))
- from opnfv_testapi.common import config
- CONF = config.Config()
- assert CONF.mongo_url == 'mongodb://127.0.0.1:27017/'
- assert CONF.mongo_dbname == 'test_results_collection'
- assert CONF.api_port == 8000
- assert CONF.api_debug is True
- assert CONF.api_token_check is False
- assert CONF.api_authenticate is True
- assert CONF.ui_url == 'http://localhost:8000'
-
-
-def test_config_file_not_exist(mocker):
- mocker.patch('os.path.exists', return_value=False)
- with pytest.raises(Exception) as m_exc:
- from opnfv_testapi.common import config
- config.Config()
- assert 'not found' in str(m_exc.value)
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/conftest.py b/utils/test/testapi/opnfv_testapi/tests/unit/conftest.py
deleted file mode 100644
index 75e621d0e..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/conftest.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from os import path
-
-import pytest
-
-
-@pytest.fixture
-def config_normal():
- return path.join(path.dirname(__file__), '../../../etc/config.ini')
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/executor.py b/utils/test/testapi/opnfv_testapi/tests/unit/executor.py
deleted file mode 100644
index 743c07615..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/executor.py
+++ /dev/null
@@ -1,130 +0,0 @@
-##############################################################################
-# Copyright (c) 2017 ZTE Corp
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import functools
-import httplib
-
-from concurrent.futures import ThreadPoolExecutor
-import mock
-
-
-O_get_secure_cookie = (
- 'opnfv_testapi.handlers.base_handlers.GenericApiHandler.get_secure_cookie')
-
-
-def thread_execute(method, *args, **kwargs):
- with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(method, *args, **kwargs)
- return result
-
-
-def mock_invalid_lfid():
- def _mock_invalid_lfid(xstep):
- def wrap(self, *args, **kwargs):
- with mock.patch(O_get_secure_cookie) as m_cookie:
- m_cookie.return_value = 'InvalidUser'
- return xstep(self, *args, **kwargs)
- return wrap
- return _mock_invalid_lfid
-
-
-def mock_valid_lfid():
- def _mock_valid_lfid(xstep):
- def wrap(self, *args, **kwargs):
- with mock.patch(O_get_secure_cookie) as m_cookie:
- m_cookie.return_value = 'ValidUser'
- return xstep(self, *args, **kwargs)
- return wrap
- return _mock_valid_lfid
-
-
-def upload(excepted_status, excepted_response):
- def _upload(create_request):
- @functools.wraps(create_request)
- def wrap(self):
- request = create_request(self)
- status, body = self.upload(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _upload
-
-
-def create(excepted_status, excepted_response):
- def _create(create_request):
- @functools.wraps(create_request)
- def wrap(self):
- request = create_request(self)
- status, body = self.create(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _create
-
-
-def get(excepted_status, excepted_response):
- def _get(get_request):
- @functools.wraps(get_request)
- def wrap(self):
- request = get_request(self)
- status, body = self.get(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _get
-
-
-def update(excepted_status, excepted_response):
- def _update(update_request):
- @functools.wraps(update_request)
- def wrap(self):
- request, resource = update_request(self)
- status, body = self.update(request, resource)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(request, body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _update
-
-
-def delete(excepted_status, excepted_response):
- def _delete(delete_request):
- @functools.wraps(delete_request)
- def wrap(self):
- request = delete_request(self)
- if isinstance(request, tuple):
- status, body = self.delete(request[0], *(request[1]))
- else:
- status, body = self.delete(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _delete
-
-
-def query(excepted_status, excepted_response, number=0):
- def _query(get_request):
- @functools.wraps(get_request)
- def wrap(self):
- request = get_request(self)
- status, body = self.query(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body, number)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _query
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py
deleted file mode 100644
index c44a92c11..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py
+++ /dev/null
@@ -1,291 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import re
-
-from operator import itemgetter
-
-from bson.objectid import ObjectId
-from concurrent.futures import ThreadPoolExecutor
-
-
-def thread_execute(method, *args, **kwargs):
- with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(method, *args, **kwargs)
- return result
-
-
-class MemCursor(object):
- def __init__(self, collection):
- self.collection = collection
- self.length = len(self.collection)
- self.sorted = []
-
- def _is_next_exist(self):
- return self.length != 0
-
- @property
- def fetch_next(self):
- return thread_execute(self._is_next_exist)
-
- def next_object(self):
- self.length -= 1
- return self.collection.pop()
-
- def sort(self, key_or_list):
- for k, v in key_or_list.iteritems():
- if v == -1:
- reverse = True
- else:
- reverse = False
-
- self.collection = sorted(self.collection,
- key=itemgetter(k), reverse=reverse)
- return self
-
- def limit(self, limit):
- if limit != 0 and limit < len(self.collection):
- self.collection = self.collection[0: limit]
- self.length = limit
- return self
-
- def skip(self, skip):
- if skip < self.length and (skip > 0):
- self.collection = self.collection[self.length - skip: -1]
- self.length -= skip
- elif skip >= self.length:
- self.collection = []
- self.length = 0
- return self
-
- def _count(self):
- return self.length
-
- def count(self):
- return thread_execute(self._count)
-
-
-class MemDb(object):
-
- def __init__(self, name):
- self.name = name
- self.contents = []
- pass
-
- def _find_one(self, spec_or_id=None, *args):
- if spec_or_id is not None and not isinstance(spec_or_id, dict):
- spec_or_id = {"_id": spec_or_id}
- if '_id' in spec_or_id:
- spec_or_id['_id'] = str(spec_or_id['_id'])
- cursor = self._find(spec_or_id, *args)
- for result in cursor:
- return result
- return None
-
- def find_one(self, spec_or_id=None, *args):
- return thread_execute(self._find_one, spec_or_id, *args)
-
- def _insert(self, doc_or_docs, check_keys=True):
-
- docs = doc_or_docs
- return_one = False
- if isinstance(docs, dict):
- return_one = True
- docs = [docs]
-
- if check_keys:
- for doc in docs:
- self._check_keys(doc)
-
- ids = []
- for doc in docs:
- if '_id' not in doc:
- doc['_id'] = str(ObjectId())
- if not self._find_one(doc['_id']):
- ids.append(doc['_id'])
- self.contents.append(doc_or_docs)
-
- if len(ids) == 0:
- return None
- if return_one:
- return ids[0]
- else:
- return ids
-
- def insert(self, doc_or_docs, check_keys=True):
- return thread_execute(self._insert, doc_or_docs, check_keys)
-
- @staticmethod
- def _compare_date(spec, value):
- gte = True
- lt = False
- for k, v in spec.iteritems():
- if k == '$gte' and value < v:
- gte = False
- elif k == '$lt' and value < v:
- lt = True
- return gte and lt
-
- def _in(self, content, *args):
- if self.name == 'scenarios':
- return self._in_scenarios(content, *args)
- else:
- return self._in_others(content, *args)
-
- def _in_scenarios_installer(self, installer, content):
- hit = False
- for s_installer in content['installers']:
- if installer == s_installer['installer']:
- hit = True
-
- return hit
-
- def _in_scenarios_version(self, version, content):
- hit = False
- for s_installer in content['installers']:
- for s_version in s_installer['versions']:
- if version == s_version['version']:
- hit = True
- return hit
-
- def _in_scenarios_project(self, project, content):
- hit = False
- for s_installer in content['installers']:
- for s_version in s_installer['versions']:
- for s_project in s_version['projects']:
- if project == s_project['project']:
- hit = True
-
- return hit
-
- def _in_scenarios(self, content, *args):
- for arg in args:
- for k, v in arg.iteritems():
- if k == 'installers':
- for inner in v.values():
- for i_k, i_v in inner.iteritems():
- if i_k == 'installer':
- return self._in_scenarios_installer(i_v,
- content)
- elif i_k == 'versions.version':
- return self._in_scenarios_version(i_v,
- content)
- elif i_k == 'versions.projects.project':
- return self._in_scenarios_project(i_v,
- content)
- elif content.get(k, None) != v:
- return False
-
- return True
-
- def _in_others(self, content, *args):
- for arg in args:
- for k, v in arg.iteritems():
- if k == 'start_date':
- if not MemDb._compare_date(v, content.get(k)):
- return False
- elif k == 'trust_indicator.current':
- if content.get('trust_indicator').get('current') != v:
- return False
- elif not isinstance(v, dict):
- if isinstance(v, re._pattern_type):
- if v.match(content.get(k, None)) is None:
- return False
- else:
- if content.get(k, None) != v:
- return False
- return True
-
- def _find(self, *args):
- res = []
- for content in self.contents:
- if self._in(content, *args):
- res.append(content)
- return res
-
- def find(self, *args):
- return MemCursor(self._find(*args))
-
- def _aggregate(self, *args, **kwargs):
- res = self.contents
- print args
- for arg in args[0]:
- for k, v in arg.iteritems():
- if k == '$match':
- res = self._find(v)
- cursor = MemCursor(res)
- for arg in args[0]:
- for k, v in arg.iteritems():
- if k == '$sort':
- cursor = cursor.sort(v)
- elif k == '$skip':
- cursor = cursor.skip(v)
- elif k == '$limit':
- cursor = cursor.limit(v)
- return cursor
-
- def aggregate(self, *args, **kwargs):
- return self._aggregate(*args, **kwargs)
-
- def _update(self, spec, document, check_keys=True):
- updated = False
-
- if check_keys:
- self._check_keys(document)
-
- for index in range(len(self.contents)):
- content = self.contents[index]
- if self._in(content, spec):
- for k, v in document.iteritems():
- updated = True
- content[k] = v
- self.contents[index] = content
- return updated
-
- def update(self, spec, document, check_keys=True):
- return thread_execute(self._update, spec, document, check_keys)
-
- def _remove(self, spec_or_id=None):
- if spec_or_id is None:
- self.contents = []
- if not isinstance(spec_or_id, dict):
- spec_or_id = {'_id': spec_or_id}
- for index in range(len(self.contents)):
- content = self.contents[index]
- if self._in(content, spec_or_id):
- del self.contents[index]
- return True
- return False
-
- def remove(self, spec_or_id=None):
- return thread_execute(self._remove, spec_or_id)
-
- def clear(self):
- self._remove()
-
- def _check_keys(self, doc):
- for key in doc.keys():
- if '.' in key:
- raise NameError('key {} must not contain .'.format(key))
- if key.startswith('$'):
- raise NameError('key {} must not start with $'.format(key))
- if isinstance(doc.get(key), dict):
- self._check_keys(doc.get(key))
-
-
-def __getattr__(name):
- return globals()[name]
-
-
-pods = MemDb('pods')
-projects = MemDb('projects')
-testcases = MemDb('testcases')
-results = MemDb('results')
-scenarios = MemDb('scenarios')
-tokens = MemDb('tokens')
-users = MemDb('users')
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/__init__.py
+++ /dev/null
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json
deleted file mode 100644
index 187802215..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json
+++ /dev/null
@@ -1,38 +0,0 @@
-{
- "name": "nosdn-nofeature-ha",
- "installers":
- [
- {
- "installer": "apex",
- "versions":
- [
- {
- "owner": "Luke",
- "version": "master",
- "projects":
- [
- {
- "project": "functest",
- "customs": [ "healthcheck", "vping_ssh"],
- "scores":
- [
- {
- "date": "2017-01-08 22:46:44",
- "score": "12/14"
- }
-
- ],
- "trust_indicators": []
- },
- {
- "project": "yardstick",
- "customs": [],
- "scores": [],
- "trust_indicators": []
- }
- ]
- }
- ]
- }
- ]
-}
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json
deleted file mode 100644
index 980051c4f..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json
+++ /dev/null
@@ -1,73 +0,0 @@
-{
- "name": "odl_2-nofeature-ha",
- "installers":
- [
- {
- "installer": "fuel",
- "versions":
- [
- {
- "owner": "Lucky",
- "version": "danube",
- "projects":
- [
- {
- "project": "functest",
- "customs": [ "healthcheck", "vping_ssh"],
- "scores": [],
- "trust_indicators": [
- {
- "date": "2017-01-18 22:46:44",
- "status": "silver"
- }
-
- ]
- },
- {
- "project": "yardstick",
- "customs": ["suite-a"],
- "scores": [
- {
- "date": "2017-01-08 22:46:44",
- "score": "0/1"
- }
- ],
- "trust_indicators": [
- {
- "date": "2017-01-18 22:46:44",
- "status": "gold"
- }
- ]
- }
- ]
- },
- {
- "owner": "Luke",
- "version": "colorado",
- "projects":
- [
- {
- "project": "functest",
- "customs": [ "healthcheck", "vping_ssh"],
- "scores":
- [
- {
- "date": "2017-01-09 22:46:44",
- "score": "11/14"
- }
-
- ],
- "trust_indicators": []
- },
- {
- "project": "yardstick",
- "customs": [],
- "scores": [],
- "trust_indicators": []
- }
- ]
- }
- ]
- }
- ]
-}
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_base.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_base.py
deleted file mode 100644
index b7fabb994..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_base.py
+++ /dev/null
@@ -1,204 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from datetime import datetime
-import json
-from os import path
-
-from bson.objectid import ObjectId
-import mock
-from tornado import testing
-
-from opnfv_testapi.models import base_models
-from opnfv_testapi.models import pod_models
-from opnfv_testapi.tests.unit import fake_pymongo
-
-
-class TestBase(testing.AsyncHTTPTestCase):
- headers = {'Content-Type': 'application/json; charset=UTF-8'}
-
- def setUp(self):
- self._patch_server()
- self.basePath = ''
- self.create_res = base_models.CreateResponse
- self.get_res = None
- self.list_res = None
- self.update_res = None
- self.pod_d = pod_models.Pod(name='zte-pod1',
- mode='virtual',
- details='zte pod 1',
- role='community-ci',
- _id=str(ObjectId()),
- owner='ValidUser',
- create_date=str(datetime.now()))
- self.pod_e = pod_models.Pod(name='zte-pod2',
- mode='metal',
- details='zte pod 2',
- role='production-ci',
- _id=str(ObjectId()),
- owner='ValidUser',
- create_date=str(datetime.now()))
- self.req_d = None
- self.req_e = None
- self.addCleanup(self._clear)
- super(TestBase, self).setUp()
- fake_pymongo.users.insert({"user": "ValidUser",
- 'email': 'validuser@lf.com',
- 'fullname': 'Valid User',
- 'groups': [
- 'opnfv-testapi-users',
- 'opnfv-gerrit-functest-submitters',
- 'opnfv-gerrit-qtip-contributors']
- })
-
- def tearDown(self):
- self.db_patcher.stop()
- self.config_patcher.stop()
-
- def _patch_server(self):
- import argparse
- config = path.join(path.dirname(__file__),
- '../../../../etc/config.ini')
- self.config_patcher = mock.patch(
- 'argparse.ArgumentParser.parse_known_args',
- return_value=(argparse.Namespace(config_file=config), None))
- self.db_patcher = mock.patch('opnfv_testapi.db.api.DB',
- fake_pymongo)
- self.config_patcher.start()
- self.db_patcher.start()
-
- def get_app(self):
- from opnfv_testapi.cmd import server
- return server.make_app()
-
- def create_d(self, *args):
- return self.create(self.req_d, *args)
-
- def create_e(self, *args):
- return self.create(self.req_e, *args)
-
- def create(self, req=None, *args):
- return self.create_help(self.basePath, req, *args)
-
- def create_help(self, uri, req, *args):
- return self.post_direct_url(self._update_uri(uri, *args), req)
-
- def post_direct_url(self, url, req):
- if req and not isinstance(req, str) and hasattr(req, 'format'):
- req = req.format()
- res = self.fetch(url,
- method='POST',
- body=json.dumps(req),
- headers=self.headers)
-
- return self._get_return(res, self.create_res)
-
- def get(self, *args):
- res = self.fetch(self._get_uri(*args),
- method='GET',
- headers=self.headers)
-
- def inner():
- new_args, num = self._get_valid_args(*args)
- return self.get_res \
- if num != self._need_arg_num(self.basePath) else self.list_res
- return self._get_return(res, inner())
-
- def query(self, query):
- res = self.fetch(self._get_query_uri(query),
- method='GET',
- headers=self.headers)
- return self._get_return(res, self.list_res)
-
- def update_direct_url(self, url, new=None):
- if new and hasattr(new, 'format'):
- new = new.format()
- res = self.fetch(url,
- method='PUT',
- body=json.dumps(new),
- headers=self.headers)
- return self._get_return(res, self.update_res)
-
- def update(self, new=None, *args):
- return self.update_direct_url(self._get_uri(*args), new)
-
- def delete_direct_url(self, url, body):
- if body:
- res = self.fetch(url,
- method='DELETE',
- body=json.dumps(body),
- headers=self.headers,
- allow_nonstandard_methods=True)
- else:
- res = self.fetch(url,
- method='DELETE',
- headers=self.headers)
-
- return res.code, res.body
-
- def delete(self, *args):
- return self.delete_direct_url(self._get_uri(*args), None)
-
- @staticmethod
- def _get_valid_args(*args):
- new_args = tuple(['%s' % arg for arg in args if arg is not None])
- return new_args, len(new_args)
-
- def _need_arg_num(self, uri):
- return uri.count('%s')
-
- def _get_query_uri(self, query):
- return self.basePath + '?' + query if query else self.basePath
-
- def _get_uri(self, *args):
- return self._update_uri(self.basePath, *args)
-
- def _update_uri(self, uri, *args):
- r_uri = uri
- new_args, num = self._get_valid_args(*args)
- if num != self._need_arg_num(uri):
- r_uri += '/%s'
-
- return r_uri % tuple(['%s' % arg for arg in new_args])
-
- def _get_return(self, res, cls):
- code = res.code
- body = res.body
- if body:
- return code, self._get_return_body(code, body, cls)
- else:
- return code, None
-
- @staticmethod
- def _get_return_body(code, body, cls):
- return cls.from_dict(json.loads(body)) if code < 300 and cls else body
-
- def assert_href(self, body):
- self.assertIn(self.basePath, body.href)
-
- def assert_create_body(self, body, req=None, *args):
- import inspect
- if not req:
- req = self.req_d
- resource_name = ''
- if inspect.isclass(req):
- resource_name = req.name
- elif isinstance(req, dict):
- resource_name = req['name']
- elif isinstance(req, str):
- resource_name = json.loads(req)['name']
- new_args = args + tuple([resource_name])
- self.assertIn(self._get_uri(*new_args), body.href)
-
- @staticmethod
- def _clear():
- fake_pymongo.pods.clear()
- fake_pymongo.projects.clear()
- fake_pymongo.testcases.clear()
- fake_pymongo.results.clear()
- fake_pymongo.scenarios.clear()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py
deleted file mode 100644
index 95ed8bac1..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py
+++ /dev/null
@@ -1,118 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import httplib
-import unittest
-
-from opnfv_testapi.common import message
-from opnfv_testapi.models import pod_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit import fake_pymongo
-from opnfv_testapi.tests.unit.handlers import test_base as base
-
-
-class TestPodBase(base.TestBase):
- def setUp(self):
- super(TestPodBase, self).setUp()
- self.get_res = pod_models.Pod
- self.list_res = pod_models.Pods
- self.basePath = '/api/v1/pods'
- self.req_d = pod_models.PodCreateRequest(name=self.pod_d.name,
- mode=self.pod_d.mode,
- details=self.pod_d.details,
- role=self.pod_d.role)
- self.req_e = pod_models.PodCreateRequest(name=self.pod_e.name,
- mode=self.pod_e.mode,
- details=self.pod_e.details,
- role=self.pod_e.role)
-
- def assert_get_body(self, pod, req=None):
- if not req:
- req = self.req_d
- self.assertEqual(pod.owner, 'ValidUser')
- self.assertEqual(pod.name, req.name)
- self.assertEqual(pod.mode, req.mode)
- self.assertEqual(pod.details, req.details)
- self.assertEqual(pod.role, req.role)
- self.assertIsNotNone(pod.creation_date)
- self.assertIsNotNone(pod._id)
-
-
-class TestPodCreate(TestPodBase):
- @executor.create(httplib.BAD_REQUEST, message.not_login())
- def test_notlogin(self):
- return self.req_d
-
- @executor.mock_invalid_lfid()
- @executor.create(httplib.BAD_REQUEST, message.not_lfid())
- def test_invalidLfid(self):
- return self.req_d
-
- @executor.mock_valid_lfid()
- @executor.create(httplib.BAD_REQUEST, message.no_body())
- def test_withoutBody(self):
- return None
-
- @executor.mock_valid_lfid()
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_emptyName(self):
- return pod_models.PodCreateRequest('')
-
- @executor.mock_valid_lfid()
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_noneName(self):
- return pod_models.PodCreateRequest(None)
-
- @executor.mock_valid_lfid()
- @executor.create(httplib.OK, 'assert_create_body')
- def test_success(self):
- return self.req_d
-
- @executor.mock_valid_lfid()
- @executor.create(httplib.FORBIDDEN, message.exist_base)
- def test_alreadyExist(self):
- fake_pymongo.pods.insert(self.pod_d.format())
- return self.req_d
-
- @executor.mock_valid_lfid()
- @executor.create(httplib.FORBIDDEN, message.exist_base)
- def test_alreadyExistCaseInsensitive(self):
- fake_pymongo.pods.insert(self.pod_d.format())
- self.req_d.name = self.req_d.name.upper()
- return self.req_d
-
-
-class TestPodGet(TestPodBase):
- def setUp(self):
- super(TestPodGet, self).setUp()
- fake_pymongo.pods.insert(self.pod_d.format())
- fake_pymongo.pods.insert(self.pod_e.format())
-
- @executor.get(httplib.NOT_FOUND, message.not_found_base)
- def test_notExist(self):
- return 'notExist'
-
- @executor.get(httplib.OK, 'assert_get_body')
- def test_getOne(self):
- return self.pod_d.name
-
- @executor.get(httplib.OK, '_assert_list')
- def test_list(self):
- return None
-
- def _assert_list(self, body):
- self.assertEqual(len(body.pods), 2)
- for pod in body.pods:
- if self.pod_d.name == pod.name:
- self.assert_get_body(pod)
- else:
- self.assert_get_body(pod, self.pod_e)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_project.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_project.py
deleted file mode 100644
index 939cc0d07..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_project.py
+++ /dev/null
@@ -1,137 +0,0 @@
-import httplib
-import unittest
-
-from opnfv_testapi.common import message
-from opnfv_testapi.models import project_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit.handlers import test_base as base
-
-
-class TestProjectBase(base.TestBase):
- def setUp(self):
- super(TestProjectBase, self).setUp()
- self.req_d = project_models.ProjectCreateRequest('vping',
- 'vping-ssh test')
- self.req_e = project_models.ProjectCreateRequest('doctor',
- 'doctor test')
- self.get_res = project_models.Project
- self.list_res = project_models.Projects
- self.update_res = project_models.Project
- self.basePath = '/api/v1/projects'
-
- def assert_body(self, project, req=None):
- if not req:
- req = self.req_d
- self.assertEqual(project.name, req.name)
- self.assertEqual(project.description, req.description)
- self.assertIsNotNone(project._id)
- self.assertIsNotNone(project.creation_date)
-
-
-class TestProjectCreate(TestProjectBase):
- @executor.create(httplib.BAD_REQUEST, message.no_body())
- def test_withoutBody(self):
- return None
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_emptyName(self):
- return project_models.ProjectCreateRequest('')
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_noneName(self):
- return project_models.ProjectCreateRequest(None)
-
- @executor.create(httplib.OK, 'assert_create_body')
- def test_success(self):
- return self.req_d
-
- @executor.create(httplib.FORBIDDEN, message.exist_base)
- def test_alreadyExist(self):
- self.create_d()
- return self.req_d
-
-
-class TestProjectGet(TestProjectBase):
- def setUp(self):
- super(TestProjectGet, self).setUp()
- self.create_d()
- self.create_e()
-
- @executor.get(httplib.NOT_FOUND, message.not_found_base)
- def test_notExist(self):
- return 'notExist'
-
- @executor.get(httplib.OK, 'assert_body')
- def test_getOne(self):
- return self.req_d.name
-
- @executor.get(httplib.OK, '_assert_list')
- def test_list(self):
- return None
-
- def _assert_list(self, body):
- for project in body.projects:
- if self.req_d.name == project.name:
- self.assert_body(project)
- else:
- self.assert_body(project, self.req_e)
-
-
-class TestProjectUpdate(TestProjectBase):
- def setUp(self):
- super(TestProjectUpdate, self).setUp()
- _, d_body = self.create_d()
- _, get_res = self.get(self.req_d.name)
- self.index_d = get_res._id
- self.create_e()
-
- @executor.update(httplib.BAD_REQUEST, message.no_body())
- def test_withoutBody(self):
- return None, 'noBody'
-
- @executor.update(httplib.NOT_FOUND, message.not_found_base)
- def test_notFound(self):
- return self.req_e, 'notFound'
-
- @executor.update(httplib.FORBIDDEN, message.exist_base)
- def test_newNameExist(self):
- return self.req_e, self.req_d.name
-
- @executor.update(httplib.FORBIDDEN, message.no_update())
- def test_noUpdate(self):
- return self.req_d, self.req_d.name
-
- @executor.update(httplib.OK, '_assert_update')
- def test_success(self):
- req = project_models.ProjectUpdateRequest('newName', 'new description')
- return req, self.req_d.name
-
- def _assert_update(self, req, body):
- self.assertEqual(self.index_d, body._id)
- self.assert_body(body, req)
- _, new_body = self.get(req.name)
- self.assertEqual(self.index_d, new_body._id)
- self.assert_body(new_body, req)
-
-
-class TestProjectDelete(TestProjectBase):
- def setUp(self):
- super(TestProjectDelete, self).setUp()
- self.create_d()
-
- @executor.delete(httplib.NOT_FOUND, message.not_found_base)
- def test_notFound(self):
- return 'notFound'
-
- @executor.delete(httplib.OK, '_assert_delete')
- def test_success(self):
- return self.req_d.name
-
- def _assert_delete(self, body):
- self.assertEqual(body, '')
- code, body = self.get(self.req_d.name)
- self.assertEqual(code, httplib.NOT_FOUND)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_result.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_result.py
deleted file mode 100644
index b9f9ede26..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_result.py
+++ /dev/null
@@ -1,413 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import copy
-from datetime import datetime
-from datetime import timedelta
-import httplib
-import json
-import urllib
-import unittest
-
-from opnfv_testapi.common import message
-from opnfv_testapi.models import project_models
-from opnfv_testapi.models import result_models
-from opnfv_testapi.models import testcase_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit import fake_pymongo
-from opnfv_testapi.tests.unit.handlers import test_base as base
-
-
-class Details(object):
- def __init__(self, timestart=None, duration=None, status=None):
- self.timestart = timestart
- self.duration = duration
- self.status = status
- self.items = [{'item1': 1}, {'item2': 2}]
-
- def format(self):
- return {
- "timestart": self.timestart,
- "duration": self.duration,
- "status": self.status,
- 'items': [{'item1': 1}, {'item2': 2}]
- }
-
- @staticmethod
- def from_dict(a_dict):
-
- if a_dict is None:
- return None
-
- t = Details()
- t.timestart = a_dict.get('timestart')
- t.duration = a_dict.get('duration')
- t.status = a_dict.get('status')
- t.items = a_dict.get('items')
- return t
-
-
-class TestResultBase(base.TestBase):
- def setUp(self):
- super(TestResultBase, self).setUp()
- self.pod = self.pod_d.name
- self.project = 'functest'
- self.case = 'vPing'
- self.installer = 'fuel'
- self.version = 'C'
- self.build_tag = 'v3.0'
- self.scenario = 'odl-l2'
- self.criteria = 'PASS'
- self.trust_indicator = result_models.TI(0.7)
- self.start_date = str(datetime.now())
- self.stop_date = str(datetime.now() + timedelta(minutes=1))
- self.update_date = str(datetime.now() + timedelta(days=1))
- self.update_step = -0.05
- self.details = Details(timestart='0', duration='9s', status='OK')
- self.req_d = result_models.ResultCreateRequest(
- pod_name=self.pod,
- project_name=self.project,
- case_name=self.case,
- installer=self.installer,
- version=self.version,
- start_date=self.start_date,
- stop_date=self.stop_date,
- details=self.details.format(),
- build_tag=self.build_tag,
- scenario=self.scenario,
- criteria=self.criteria,
- trust_indicator=self.trust_indicator)
- self.get_res = result_models.TestResult
- self.list_res = result_models.TestResults
- self.update_res = result_models.TestResult
- self.basePath = '/api/v1/results'
- self.req_project = project_models.ProjectCreateRequest(
- self.project,
- 'vping test')
- self.req_testcase = testcase_models.TestcaseCreateRequest(
- self.case,
- '/cases/vping',
- 'vping-ssh test')
- fake_pymongo.pods.insert(self.pod_d.format())
- self.create_help('/api/v1/projects', self.req_project)
- self.create_help('/api/v1/projects/%s/cases',
- self.req_testcase,
- self.project)
-
- def assert_res(self, result, req=None):
- if req is None:
- req = self.req_d
- self.assertEqual(result.pod_name, req.pod_name)
- self.assertEqual(result.project_name, req.project_name)
- self.assertEqual(result.case_name, req.case_name)
- self.assertEqual(result.installer, req.installer)
- self.assertEqual(result.version, req.version)
- details_req = Details.from_dict(req.details)
- details_res = Details.from_dict(result.details)
- self.assertEqual(details_res.duration, details_req.duration)
- self.assertEqual(details_res.timestart, details_req.timestart)
- self.assertEqual(details_res.status, details_req.status)
- self.assertEqual(details_res.items, details_req.items)
- self.assertEqual(result.build_tag, req.build_tag)
- self.assertEqual(result.scenario, req.scenario)
- self.assertEqual(result.criteria, req.criteria)
- self.assertEqual(result.start_date, req.start_date)
- self.assertEqual(result.stop_date, req.stop_date)
- self.assertIsNotNone(result._id)
- ti = result.trust_indicator
- self.assertEqual(ti.current, req.trust_indicator.current)
- if ti.histories:
- history = ti.histories[0]
- self.assertEqual(history.date, self.update_date)
- self.assertEqual(history.step, self.update_step)
-
- def _create_d(self):
- _, res = self.create_d()
- return res.href.split('/')[-1]
-
- def upload(self, req):
- if req and not isinstance(req, str) and hasattr(req, 'format'):
- req = req.format()
- res = self.fetch(self.basePath + '/upload',
- method='POST',
- body=json.dumps(req),
- headers=self.headers)
-
- return self._get_return(res, self.create_res)
-
-
-class TestResultUpload(TestResultBase):
- @executor.upload(httplib.BAD_REQUEST, message.key_error('file'))
- def test_filenotfind(self):
- return None
-
-
-class TestResultCreate(TestResultBase):
- @executor.create(httplib.BAD_REQUEST, message.no_body())
- def test_nobody(self):
- return None
-
- @executor.create(httplib.BAD_REQUEST, message.missing('pod_name'))
- def test_podNotProvided(self):
- req = self.req_d
- req.pod_name = None
- return req
-
- @executor.create(httplib.BAD_REQUEST, message.missing('project_name'))
- def test_projectNotProvided(self):
- req = self.req_d
- req.project_name = None
- return req
-
- @executor.create(httplib.BAD_REQUEST, message.missing('case_name'))
- def test_testcaseNotProvided(self):
- req = self.req_d
- req.case_name = None
- return req
-
- @executor.create(httplib.BAD_REQUEST,
- message.invalid_value('criteria', ['PASS', 'FAIL']))
- def test_invalid_criteria(self):
- req = self.req_d
- req.criteria = 'invalid'
- return req
-
- @executor.create(httplib.FORBIDDEN, message.not_found_base)
- def test_noPod(self):
- req = self.req_d
- req.pod_name = 'notExistPod'
- return req
-
- @executor.create(httplib.FORBIDDEN, message.not_found_base)
- def test_noProject(self):
- req = self.req_d
- req.project_name = 'notExistProject'
- return req
-
- @executor.create(httplib.FORBIDDEN, message.not_found_base)
- def test_noTestcase(self):
- req = self.req_d
- req.case_name = 'notExistTestcase'
- return req
-
- @executor.create(httplib.OK, 'assert_href')
- def test_success(self):
- return self.req_d
-
- @executor.create(httplib.OK, 'assert_href')
- def test_key_with_doc(self):
- req = copy.deepcopy(self.req_d)
- req.details = {'1.name': 'dot_name'}
- return req
-
- @executor.create(httplib.OK, '_assert_no_ti')
- def test_no_ti(self):
- req = result_models.ResultCreateRequest(pod_name=self.pod,
- project_name=self.project,
- case_name=self.case,
- installer=self.installer,
- version=self.version,
- start_date=self.start_date,
- stop_date=self.stop_date,
- details=self.details.format(),
- build_tag=self.build_tag,
- scenario=self.scenario,
- criteria=self.criteria)
- self.actual_req = req
- return req
-
- def _assert_no_ti(self, body):
- _id = body.href.split('/')[-1]
- code, body = self.get(_id)
- self.assert_res(body, self.actual_req)
-
-
-class TestResultGet(TestResultBase):
- def setUp(self):
- super(TestResultGet, self).setUp()
- self.req_10d_before = self._create_changed_date(days=-10)
- self.req_d_id = self._create_d()
- self.req_10d_later = self._create_changed_date(days=10)
-
- @executor.get(httplib.OK, 'assert_res')
- def test_getOne(self):
- return self.req_d_id
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryPod(self):
- return self._set_query('pod')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryProject(self):
- return self._set_query('project')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryTestcase(self):
- return self._set_query('case')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryVersion(self):
- return self._set_query('version')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryInstaller(self):
- return self._set_query('installer')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryBuildTag(self):
- return self._set_query('build_tag')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryScenario(self):
- return self._set_query('scenario')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryTrustIndicator(self):
- return self._set_query('trust_indicator')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryCriteria(self):
- return self._set_query('criteria')
-
- @executor.query(httplib.BAD_REQUEST, message.must_int('period'))
- def test_queryPeriodNotInt(self):
- return self._set_query(period='a')
-
- @executor.query(httplib.OK, '_query_period_one', 1)
- def test_queryPeriodSuccess(self):
- return self._set_query(period=5)
-
- @executor.query(httplib.BAD_REQUEST, message.must_int('last'))
- def test_queryLastNotInt(self):
- return self._set_query(last='a')
-
- @executor.query(httplib.OK, '_query_last_one', 1)
- def test_queryLast(self):
- return self._set_query(last=1)
-
- @executor.query(httplib.OK, '_query_success', 4)
- def test_queryPublic(self):
- self._create_public_data()
- return self._set_query()
-
- @executor.query(httplib.OK, '_query_success', 1)
- def test_queryPrivate(self):
- self._create_private_data()
- return self._set_query(public='false')
-
- @executor.query(httplib.OK, '_query_period_one', 1)
- def test_combination(self):
- return self._set_query('pod',
- 'project',
- 'case',
- 'version',
- 'installer',
- 'build_tag',
- 'scenario',
- 'trust_indicator',
- 'criteria',
- period=5)
-
- @executor.query(httplib.OK, '_query_success', 0)
- def test_notFound(self):
- return self._set_query('project',
- 'case',
- 'version',
- 'installer',
- 'build_tag',
- 'scenario',
- 'trust_indicator',
- 'criteria',
- pod='notExistPod',
- period=1)
-
- @executor.query(httplib.OK, '_query_success', 1)
- def test_filterErrorStartdate(self):
- self._create_error_start_date(None)
- self._create_error_start_date('None')
- self._create_error_start_date('null')
- self._create_error_start_date('')
- return self._set_query(period=5)
-
- def _query_success(self, body, number):
- self.assertEqual(number, len(body.results))
-
- def _query_last_one(self, body, number):
- self.assertEqual(number, len(body.results))
- self.assert_res(body.results[0], self.req_10d_later)
-
- def _query_period_one(self, body, number):
- self.assertEqual(number, len(body.results))
- self.assert_res(body.results[0], self.req_d)
-
- def _create_error_start_date(self, start_date):
- req = copy.deepcopy(self.req_d)
- req.start_date = start_date
- self.create(req)
- return req
-
- def _create_changed_date(self, **kwargs):
- req = copy.deepcopy(self.req_d)
- req.start_date = datetime.now() + timedelta(**kwargs)
- req.stop_date = str(req.start_date + timedelta(minutes=10))
- req.start_date = str(req.start_date)
- self.create(req)
- return req
-
- def _create_public_data(self, **kwargs):
- req = copy.deepcopy(self.req_d)
- req.public = 'true'
- self.create(req)
- return req
-
- def _create_private_data(self, **kwargs):
- req = copy.deepcopy(self.req_d)
- req.public = 'false'
- self.create(req)
- return req
-
- def _set_query(self, *args, **kwargs):
- def get_value(arg):
- return self.__getattribute__(arg) \
- if arg != 'trust_indicator' else self.trust_indicator.current
- query = []
- for arg in args:
- query.append((arg, get_value(arg)))
- for k, v in kwargs.iteritems():
- query.append((k, v))
- return urllib.urlencode(query)
-
-
-class TestResultUpdate(TestResultBase):
- def setUp(self):
- super(TestResultUpdate, self).setUp()
- self.req_d_id = self._create_d()
-
- @executor.update(httplib.OK, '_assert_update_ti')
- def test_success(self):
- new_ti = copy.deepcopy(self.trust_indicator)
- new_ti.current += self.update_step
- new_ti.histories.append(
- result_models.TIHistory(self.update_date, self.update_step))
- new_data = copy.deepcopy(self.req_d)
- new_data.trust_indicator = new_ti
- update = result_models.ResultUpdateRequest(trust_indicator=new_ti)
- self.update_req = new_data
- return update, self.req_d_id
-
- def _assert_update_ti(self, request, body):
- ti = body.trust_indicator
- self.assertEqual(ti.current, request.trust_indicator.current)
- if ti.histories:
- history = ti.histories[0]
- self.assertEqual(history.date, self.update_date)
- self.assertEqual(history.step, self.update_step)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py
deleted file mode 100644
index de7777a83..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py
+++ /dev/null
@@ -1,449 +0,0 @@
-import functools
-import httplib
-import json
-import os
-
-from datetime import datetime
-
-from opnfv_testapi.common import message
-import opnfv_testapi.models.scenario_models as models
-from opnfv_testapi.tests.unit.handlers import test_base as base
-
-
-def _none_default(check, default):
- return check if check else default
-
-
-class TestScenarioBase(base.TestBase):
- def setUp(self):
- super(TestScenarioBase, self).setUp()
- self.get_res = models.Scenario
- self.list_res = models.Scenarios
- self.basePath = '/api/v1/scenarios'
- self.req_d = self._load_request('scenario-c1.json')
- self.req_2 = self._load_request('scenario-c2.json')
-
- def tearDown(self):
- pass
-
- def assert_body(self, project, req=None):
- pass
-
- @staticmethod
- def _load_request(f_req):
- abs_file = os.path.join(os.path.dirname(__file__), f_req)
- with open(abs_file, 'r') as f:
- loader = json.load(f)
- f.close()
- return loader
-
- def create_return_name(self, req):
- _, res = self.create(req)
- return res.href.split('/')[-1]
-
- def assert_res(self, code, scenario, req=None):
- self.assertEqual(code, httplib.OK)
- if req is None:
- req = self.req_d
- self.assertIsNotNone(scenario._id)
- self.assertIsNotNone(scenario.creation_date)
- self.assertEqual(scenario, models.Scenario.from_dict(req))
-
- @staticmethod
- def set_query(*args):
- uri = ''
- for arg in args:
- uri += arg + '&'
- return uri[0: -1]
-
- def get_and_assert(self, name):
- code, body = self.get(name)
- self.assert_res(code, body, self.req_d)
-
-
-class TestScenarioCreate(TestScenarioBase):
- def test_withoutBody(self):
- (code, body) = self.create()
- self.assertEqual(code, httplib.BAD_REQUEST)
-
- def test_emptyName(self):
- req_empty = models.ScenarioCreateRequest('')
- (code, body) = self.create(req_empty)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
-
- def test_noneName(self):
- req_none = models.ScenarioCreateRequest(None)
- (code, body) = self.create(req_none)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
-
- def test_success(self):
- (code, body) = self.create_d()
- self.assertEqual(code, httplib.OK)
- self.assert_create_body(body)
-
- def test_alreadyExist(self):
- self.create_d()
- (code, body) = self.create_d()
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.exist_base, body)
-
-
-class TestScenarioGet(TestScenarioBase):
- def setUp(self):
- super(TestScenarioGet, self).setUp()
- self.scenario_1 = self.create_return_name(self.req_d)
- self.scenario_2 = self.create_return_name(self.req_2)
-
- def test_getByName(self):
- self.get_and_assert(self.scenario_1)
-
- def test_getAll(self):
- self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
-
- def test_queryName(self):
- query = self.set_query('name=nosdn-nofeature-ha')
- self._query_and_assert(query, reqs=[self.req_d])
-
- def test_queryInstaller(self):
- query = self.set_query('installer=apex')
- self._query_and_assert(query, reqs=[self.req_d])
-
- def test_queryVersion(self):
- query = self.set_query('version=master')
- self._query_and_assert(query, reqs=[self.req_d])
-
- def test_queryProject(self):
- query = self.set_query('project=functest')
- self._query_and_assert(query, reqs=[self.req_d, self.req_2])
-
- # close due to random fail, open again after solve it in another patch
- # def test_queryCombination(self):
- # query = self._set_query('name=nosdn-nofeature-ha',
- # 'installer=apex',
- # 'version=master',
- # 'project=functest')
- #
- # self._query_and_assert(query, reqs=[self.req_d])
-
- def _query_and_assert(self, query, found=True, reqs=None):
- code, body = self.query(query)
- if not found:
- self.assertEqual(code, httplib.OK)
- self.assertEqual(0, len(body.scenarios))
- else:
- self.assertEqual(len(reqs), len(body.scenarios))
- for req in reqs:
- for scenario in body.scenarios:
- if req['name'] == scenario.name:
- self.assert_res(code, scenario, req)
-
-
-class TestScenarioDelete(TestScenarioBase):
- def test_notFound(self):
- code, body = self.delete('notFound')
- self.assertEqual(code, httplib.NOT_FOUND)
-
- def test_success(self):
- scenario = self.create_return_name(self.req_d)
- code, _ = self.delete(scenario)
- self.assertEqual(code, httplib.OK)
- code, _ = self.get(scenario)
- self.assertEqual(code, httplib.NOT_FOUND)
-
-
-class TestScenarioUpdate(TestScenarioBase):
- def setUp(self):
- super(TestScenarioUpdate, self).setUp()
- self.scenario = self.create_return_name(self.req_d)
- self.scenario_2 = self.create_return_name(self.req_2)
- self.update_url = ''
- self.scenario_url = '/api/v1/scenarios/{}'.format(self.scenario)
- self.installer = self.req_d['installers'][0]['installer']
- self.version = self.req_d['installers'][0]['versions'][0]['version']
- self.locate_project = 'installer={}&version={}&project={}'.format(
- self.installer,
- self.version,
- 'functest')
-
- def update_url_fixture(item):
- def _update_url_fixture(xstep):
- def wrapper(self, *args, **kwargs):
- self.update_url = '{}/{}'.format(self.scenario_url, item)
- locator = None
- if item in ['projects', 'owner']:
- locator = 'installer={}&version={}'.format(
- self.installer,
- self.version)
- elif item in ['versions']:
- locator = 'installer={}'.format(
- self.installer)
- elif item in ['rename']:
- self.update_url = self.scenario_url
-
- if locator:
- self.update_url = '{}?{}'.format(self.update_url, locator)
-
- xstep(self, *args, **kwargs)
- return wrapper
- return _update_url_fixture
-
- def update_partial(operate, expected):
- def _update_partial(set_update):
- @functools.wraps(set_update)
- def wrapper(self):
- update = set_update(self)
- code, body = getattr(self, operate)(update)
- getattr(self, expected)(code)
- return wrapper
- return _update_partial
-
- @update_partial('_add', '_success')
- def test_addScore(self):
- add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
- projects = self.req_d['installers'][0]['versions'][0]['projects']
- functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['scores'].append(add.format())
- self.update_url = '{}/scores?{}'.format(self.scenario_url,
- self.locate_project)
-
- return add
-
- @update_partial('_add', '_success')
- def test_addTrustIndicator(self):
- add = models.ScenarioTI(date=str(datetime.now()), status='gold')
- projects = self.req_d['installers'][0]['versions'][0]['projects']
- functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['trust_indicators'].append(add.format())
- self.update_url = '{}/trust_indicators?{}'.format(self.scenario_url,
- self.locate_project)
-
- return add
-
- @update_partial('_add', '_success')
- def test_addCustoms(self):
- adds = ['odl', 'parser', 'vping_ssh']
- projects = self.req_d['installers'][0]['versions'][0]['projects']
- functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['customs'] = list(set(functest['customs'] + adds))
- self.update_url = '{}/customs?{}'.format(self.scenario_url,
- self.locate_project)
- return adds
-
- @update_partial('_update', '_success')
- def test_updateCustoms(self):
- updates = ['odl', 'parser', 'vping_ssh']
- projects = self.req_d['installers'][0]['versions'][0]['projects']
- functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['customs'] = updates
- self.update_url = '{}/customs?{}'.format(self.scenario_url,
- self.locate_project)
-
- return updates
-
- @update_partial('_delete', '_success')
- def test_deleteCustoms(self):
- deletes = ['vping_ssh']
- projects = self.req_d['installers'][0]['versions'][0]['projects']
- functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['customs'] = ['healthcheck']
- self.update_url = '{}/customs?{}'.format(self.scenario_url,
- self.locate_project)
-
- return deletes
-
- @update_url_fixture('projects')
- @update_partial('_add', '_success')
- def test_addProjects_succ(self):
- add = models.ScenarioProject(project='qtip').format()
- self.req_d['installers'][0]['versions'][0]['projects'].append(add)
- return [add]
-
- @update_url_fixture('projects')
- @update_partial('_add', '_conflict')
- def test_addProjects_already_exist(self):
- add = models.ScenarioProject(project='functest').format()
- return [add]
-
- @update_url_fixture('projects')
- @update_partial('_add', '_bad_request')
- def test_addProjects_bad_schema(self):
- add = models.ScenarioProject(project='functest').format()
- add['score'] = None
- return [add]
-
- @update_url_fixture('projects')
- @update_partial('_update', '_success')
- def test_updateProjects_succ(self):
- update = models.ScenarioProject(project='qtip').format()
- self.req_d['installers'][0]['versions'][0]['projects'] = [update]
- return [update]
-
- @update_url_fixture('projects')
- @update_partial('_update', '_conflict')
- def test_updateProjects_duplicated(self):
- update = models.ScenarioProject(project='qtip').format()
- return [update, update]
-
- @update_url_fixture('projects')
- @update_partial('_update', '_bad_request')
- def test_updateProjects_bad_schema(self):
- update = models.ScenarioProject(project='functest').format()
- update['score'] = None
- return [update]
-
- @update_url_fixture('projects')
- @update_partial('_delete', '_success')
- def test_deleteProjects(self):
- deletes = ['functest']
- projects = self.req_d['installers'][0]['versions'][0]['projects']
- self.req_d['installers'][0]['versions'][0]['projects'] = filter(
- lambda f: f['project'] != 'functest',
- projects)
- return deletes
-
- @update_url_fixture('owner')
- @update_partial('_update', '_success')
- def test_changeOwner(self):
- new_owner = 'new_owner'
- update = models.ScenarioChangeOwnerRequest(new_owner).format()
- self.req_d['installers'][0]['versions'][0]['owner'] = new_owner
- return update
-
- @update_url_fixture('versions')
- @update_partial('_add', '_success')
- def test_addVersions_succ(self):
- add = models.ScenarioVersion(version='Euphrates').format()
- self.req_d['installers'][0]['versions'].append(add)
- return [add]
-
- @update_url_fixture('versions')
- @update_partial('_add', '_conflict')
- def test_addVersions_already_exist(self):
- add = models.ScenarioVersion(version='master').format()
- return [add]
-
- @update_url_fixture('versions')
- @update_partial('_add', '_bad_request')
- def test_addVersions_bad_schema(self):
- add = models.ScenarioVersion(version='euphrates').format()
- add['notexist'] = None
- return [add]
-
- @update_url_fixture('versions')
- @update_partial('_update', '_success')
- def test_updateVersions_succ(self):
- update = models.ScenarioVersion(version='euphrates').format()
- self.req_d['installers'][0]['versions'] = [update]
- return [update]
-
- @update_url_fixture('versions')
- @update_partial('_update', '_conflict')
- def test_updateVersions_duplicated(self):
- update = models.ScenarioVersion(version='euphrates').format()
- return [update, update]
-
- @update_url_fixture('versions')
- @update_partial('_update', '_bad_request')
- def test_updateVersions_bad_schema(self):
- update = models.ScenarioVersion(version='euphrates').format()
- update['not_owner'] = 'Iam'
- return [update]
-
- @update_url_fixture('versions')
- @update_partial('_delete', '_success')
- def test_deleteVersions(self):
- deletes = ['master']
- versions = self.req_d['installers'][0]['versions']
- self.req_d['installers'][0]['versions'] = filter(
- lambda f: f['version'] != 'master',
- versions)
- return deletes
-
- @update_url_fixture('installers')
- @update_partial('_add', '_success')
- def test_addInstallers_succ(self):
- add = models.ScenarioInstaller(installer='daisy').format()
- self.req_d['installers'].append(add)
- return [add]
-
- @update_url_fixture('installers')
- @update_partial('_add', '_conflict')
- def test_addInstallers_already_exist(self):
- add = models.ScenarioInstaller(installer='apex').format()
- return [add]
-
- @update_url_fixture('installers')
- @update_partial('_add', '_bad_request')
- def test_addInstallers_bad_schema(self):
- add = models.ScenarioInstaller(installer='daisy').format()
- add['not_exist'] = 'not_exist'
- return [add]
-
- @update_url_fixture('installers')
- @update_partial('_update', '_success')
- def test_updateInstallers_succ(self):
- update = models.ScenarioInstaller(installer='daisy').format()
- self.req_d['installers'] = [update]
- return [update]
-
- @update_url_fixture('installers')
- @update_partial('_update', '_conflict')
- def test_updateInstallers_duplicated(self):
- update = models.ScenarioInstaller(installer='daisy').format()
- return [update, update]
-
- @update_url_fixture('installers')
- @update_partial('_update', '_bad_request')
- def test_updateInstallers_bad_schema(self):
- update = models.ScenarioInstaller(installer='daisy').format()
- update['not_exist'] = 'not_exist'
- return [update]
-
- @update_url_fixture('installers')
- @update_partial('_delete', '_success')
- def test_deleteInstallers(self):
- deletes = ['apex']
- installers = self.req_d['installers']
- self.req_d['installers'] = filter(
- lambda f: f['installer'] != 'apex',
- installers)
- return deletes
-
- @update_url_fixture('rename')
- @update_partial('_update', '_success')
- def test_renameScenario(self):
- new_name = 'new_scenario_name'
- update = models.ScenarioUpdateRequest(name=new_name)
- self.req_d['name'] = new_name
- return update
-
- @update_url_fixture('rename')
- @update_partial('_update', '_forbidden')
- def test_renameScenario_exist(self):
- new_name = self.req_d['name']
- update = models.ScenarioUpdateRequest(name=new_name)
- return update
-
- def _add(self, update_req):
- return self.post_direct_url(self.update_url, update_req)
-
- def _update(self, update_req):
- return self.update_direct_url(self.update_url, update_req)
-
- def _delete(self, update_req):
- return self.delete_direct_url(self.update_url, update_req)
-
- def _success(self, status):
- self.assertEqual(status, httplib.OK)
- self.get_and_assert(self.req_d['name'])
-
- def _forbidden(self, status):
- self.assertEqual(status, httplib.FORBIDDEN)
-
- def _bad_request(self, status):
- self.assertEqual(status, httplib.BAD_REQUEST)
-
- def _conflict(self, status):
- self.assertEqual(status, httplib.CONFLICT)
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py
deleted file mode 100644
index e4c668e7a..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py
+++ /dev/null
@@ -1,201 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import copy
-import httplib
-import unittest
-
-from opnfv_testapi.common import message
-from opnfv_testapi.models import project_models
-from opnfv_testapi.models import testcase_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit.handlers import test_base as base
-
-
-class TestCaseBase(base.TestBase):
- def setUp(self):
- super(TestCaseBase, self).setUp()
- self.req_d = testcase_models.TestcaseCreateRequest('vping_1',
- '/cases/vping_1',
- 'vping-ssh test')
- self.req_e = testcase_models.TestcaseCreateRequest('doctor_1',
- '/cases/doctor_1',
- 'create doctor')
- self.update_d = testcase_models.TestcaseUpdateRequest('vping_1',
- 'vping-ssh test',
- 'functest')
- self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1',
- 'create doctor',
- 'functest')
- self.get_res = testcase_models.Testcase
- self.list_res = testcase_models.Testcases
- self.update_res = testcase_models.Testcase
- self.basePath = '/api/v1/projects/%s/cases'
- self.create_project()
-
- def assert_body(self, case, req=None):
- if not req:
- req = self.req_d
- self.assertEqual(case.name, req.name)
- self.assertEqual(case.description, req.description)
- self.assertEqual(case.url, req.url)
- self.assertIsNotNone(case._id)
- self.assertIsNotNone(case.creation_date)
-
- def assert_update_body(self, old, new, req=None):
- if not req:
- req = self.req_d
- self.assertEqual(new.name, req.name)
- self.assertEqual(new.description, req.description)
- self.assertEqual(new.url, old.url)
- self.assertIsNotNone(new._id)
- self.assertIsNotNone(new.creation_date)
-
- def create_project(self):
- req_p = project_models.ProjectCreateRequest('functest',
- 'vping-ssh test')
- self.create_help('/api/v1/projects', req_p)
- self.project = req_p.name
-
- def create_d(self):
- return super(TestCaseBase, self).create_d(self.project)
-
- def create_e(self):
- return super(TestCaseBase, self).create_e(self.project)
-
- def get(self, case=None):
- return super(TestCaseBase, self).get(self.project, case)
-
- def create(self, req=None, *args):
- return super(TestCaseBase, self).create(req, self.project)
-
- def update(self, new=None, case=None):
- return super(TestCaseBase, self).update(new, self.project, case)
-
- def delete(self, case):
- return super(TestCaseBase, self).delete(self.project, case)
-
-
-class TestCaseCreate(TestCaseBase):
- @executor.create(httplib.BAD_REQUEST, message.no_body())
- def test_noBody(self):
- return None
-
- @executor.create(httplib.FORBIDDEN, message.not_found_base)
- def test_noProject(self):
- self.project = 'noProject'
- return self.req_d
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_emptyName(self):
- req_empty = testcase_models.TestcaseCreateRequest('')
- return req_empty
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_noneName(self):
- req_none = testcase_models.TestcaseCreateRequest(None)
- return req_none
-
- @executor.create(httplib.OK, '_assert_success')
- def test_success(self):
- return self.req_d
-
- def _assert_success(self, body):
- self.assert_create_body(body, self.req_d, self.project)
-
- @executor.create(httplib.FORBIDDEN, message.exist_base)
- def test_alreadyExist(self):
- self.create_d()
- return self.req_d
-
-
-class TestCaseGet(TestCaseBase):
- def setUp(self):
- super(TestCaseGet, self).setUp()
- self.create_d()
- self.create_e()
-
- @executor.get(httplib.NOT_FOUND, message.not_found_base)
- def test_notExist(self):
- return 'notExist'
-
- @executor.get(httplib.OK, 'assert_body')
- def test_getOne(self):
- return self.req_d.name
-
- @executor.get(httplib.OK, '_list')
- def test_list(self):
- return None
-
- def _list(self, body):
- for case in body.testcases:
- if self.req_d.name == case.name:
- self.assert_body(case)
- else:
- self.assert_body(case, self.req_e)
-
-
-class TestCaseUpdate(TestCaseBase):
- def setUp(self):
- super(TestCaseUpdate, self).setUp()
- self.create_d()
-
- @executor.update(httplib.BAD_REQUEST, message.no_body())
- def test_noBody(self):
- return None, 'noBody'
-
- @executor.update(httplib.NOT_FOUND, message.not_found_base)
- def test_notFound(self):
- return self.update_e, 'notFound'
-
- @executor.update(httplib.FORBIDDEN, message.exist_base)
- def test_newNameExist(self):
- self.create_e()
- return self.update_e, self.req_d.name
-
- @executor.update(httplib.FORBIDDEN, message.no_update())
- def test_noUpdate(self):
- return self.update_d, self.req_d.name
-
- @executor.update(httplib.OK, '_update_success')
- def test_success(self):
- return self.update_e, self.req_d.name
-
- @executor.update(httplib.OK, '_update_success')
- def test_with_dollar(self):
- update = copy.deepcopy(self.update_d)
- update.description = {'2. change': 'dollar change'}
- return update, self.req_d.name
-
- def _update_success(self, request, body):
- self.assert_update_body(self.req_d, body, request)
- _, new_body = self.get(request.name)
- self.assert_update_body(self.req_d, new_body, request)
-
-
-class TestCaseDelete(TestCaseBase):
- def setUp(self):
- super(TestCaseDelete, self).setUp()
- self.create_d()
-
- @executor.delete(httplib.NOT_FOUND, message.not_found_base)
- def test_notFound(self):
- return 'notFound'
-
- @executor.delete(httplib.OK, '_delete_success')
- def test_success(self):
- return self.req_d.name
-
- def _delete_success(self, body):
- self.assertEqual(body, '')
- code, body = self.get(self.req_d.name)
- self.assertEqual(code, httplib.NOT_FOUND)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_token.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_token.py
deleted file mode 100644
index e8b746dfc..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_token.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-import httplib
-import unittest
-
-from tornado import web
-
-from opnfv_testapi.common import message
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit import fake_pymongo
-from opnfv_testapi.tests.unit.handlers import test_result
-
-
-class TestTokenCreateResult(test_result.TestResultBase):
- def get_app(self):
- from opnfv_testapi.router import url_mappings
- return web.Application(
- url_mappings.mappings,
- db=fake_pymongo,
- debug=True,
- auth=True
- )
-
- def setUp(self):
- super(TestTokenCreateResult, self).setUp()
- fake_pymongo.tokens.insert({"access_token": "12345"})
-
- @executor.create(httplib.FORBIDDEN, message.invalid_token())
- def test_resultCreateTokenInvalid(self):
- self.headers['X-Auth-Token'] = '1234'
- return self.req_d
-
- @executor.create(httplib.UNAUTHORIZED, message.unauthorized())
- def test_resultCreateTokenUnauthorized(self):
- if 'X-Auth-Token' in self.headers:
- self.headers.pop('X-Auth-Token')
- return self.req_d
-
- @executor.create(httplib.OK, '_create_success')
- def test_resultCreateTokenSuccess(self):
- self.headers['X-Auth-Token'] = '12345'
- return self.req_d
-
- def _create_success(self, body):
- self.assertIn('CreateResponse', str(type(body)))
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_version.py b/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_version.py
deleted file mode 100644
index 6ef810f0e..000000000
--- a/utils/test/testapi/opnfv_testapi/tests/unit/handlers/test_version.py
+++ /dev/null
@@ -1,36 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import httplib
-import unittest
-
-from opnfv_testapi.models import base_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit.handlers import test_base as base
-
-
-class TestVersionBase(base.TestBase):
- def setUp(self):
- super(TestVersionBase, self).setUp()
- self.list_res = base_models.Versions
- self.basePath = '/versions'
-
-
-class TestVersion(TestVersionBase):
- @executor.get(httplib.OK, '_get_success')
- def test_success(self):
- return None
-
- def _get_success(self, body):
- self.assertEqual(len(body.versions), 1)
- self.assertEqual(body.versions[0].version, 'v1.0')
- self.assertEqual(body.versions[0].description, 'basics')
-
-
-if __name__ == '__main__':
- unittest.main()