summaryrefslogtreecommitdiffstats
path: root/opnfv_testapi/tests/unit
diff options
context:
space:
mode:
authorxudan <xudan16@huawei.com>2018-07-06 05:16:40 -0400
committerxudan <xudan16@huawei.com>2018-07-06 05:21:42 -0400
commitb3e40f026d655501bfa581452c447784604ecb05 (patch)
tree406f8bfc1abc1b33f98153d03abd34ef7b0e2fe9 /opnfv_testapi/tests/unit
parentb1b0ea32d1a296c7d055c5391261dcad6be48c63 (diff)
Move all web portal code to the new repo dovetail-webportal
This is only the first step to simply copy the file here. There still need some more work to make sure all work well. All the changes will be submitted with other patches to make it easily to review. JIRA: DOVETAIL-671 Change-Id: I64d32a9df562184166b6199e2719f298687d1a0a Signed-off-by: xudan <xudan16@huawei.com>
Diffstat (limited to 'opnfv_testapi/tests/unit')
-rw-r--r--opnfv_testapi/tests/unit/__init__.py9
-rw-r--r--opnfv_testapi/tests/unit/common/__init__.py0
-rw-r--r--opnfv_testapi/tests/unit/common/noparam.ini16
-rw-r--r--opnfv_testapi/tests/unit/common/normal.ini17
-rw-r--r--opnfv_testapi/tests/unit/common/nosection.ini11
-rw-r--r--opnfv_testapi/tests/unit/common/notboolean.ini17
-rw-r--r--opnfv_testapi/tests/unit/common/notint.ini17
-rw-r--r--opnfv_testapi/tests/unit/common/test_config.py15
-rw-r--r--opnfv_testapi/tests/unit/conftest.py8
-rw-r--r--opnfv_testapi/tests/unit/executor.py97
-rw-r--r--opnfv_testapi/tests/unit/fake_pymongo.py284
-rw-r--r--opnfv_testapi/tests/unit/resources/__init__.py0
-rw-r--r--opnfv_testapi/tests/unit/resources/scenario-c1.json38
-rw-r--r--opnfv_testapi/tests/unit/resources/scenario-c2.json73
-rw-r--r--opnfv_testapi/tests/unit/resources/test_base.py161
-rw-r--r--opnfv_testapi/tests/unit/resources/test_fake_pymongo.py123
-rw-r--r--opnfv_testapi/tests/unit/resources/test_pod.py90
-rw-r--r--opnfv_testapi/tests/unit/resources/test_project.py137
-rw-r--r--opnfv_testapi/tests/unit/resources/test_result.py410
-rw-r--r--opnfv_testapi/tests/unit/resources/test_scenario.py360
-rw-r--r--opnfv_testapi/tests/unit/resources/test_testcase.py201
-rw-r--r--opnfv_testapi/tests/unit/resources/test_token.py114
-rw-r--r--opnfv_testapi/tests/unit/resources/test_version.py36
23 files changed, 2234 insertions, 0 deletions
diff --git a/opnfv_testapi/tests/unit/__init__.py b/opnfv_testapi/tests/unit/__init__.py
new file mode 100644
index 0000000..3fc79f1
--- /dev/null
+++ b/opnfv_testapi/tests/unit/__init__.py
@@ -0,0 +1,9 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+__author__ = 'serena'
diff --git a/opnfv_testapi/tests/unit/common/__init__.py b/opnfv_testapi/tests/unit/common/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/opnfv_testapi/tests/unit/common/__init__.py
diff --git a/opnfv_testapi/tests/unit/common/noparam.ini b/opnfv_testapi/tests/unit/common/noparam.ini
new file mode 100644
index 0000000..fda2a09
--- /dev/null
+++ b/opnfv_testapi/tests/unit/common/noparam.ini
@@ -0,0 +1,16 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[mongo]
+# URL of the mongo DB
+# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
+url = mongodb://127.0.0.1:27017/
+
+[api]
+# Listening port
+port = 8000
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = False
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/opnfv_testapi/tests/unit/common/normal.ini b/opnfv_testapi/tests/unit/common/normal.ini
new file mode 100644
index 0000000..77cc6c6
--- /dev/null
+++ b/opnfv_testapi/tests/unit/common/normal.ini
@@ -0,0 +1,17 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[mongo]
+# URL of the mongo DB
+# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
+url = mongodb://127.0.0.1:27017/
+dbname = test_results_collection
+
+[api]
+# Listening port
+port = 8000
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = False
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/opnfv_testapi/tests/unit/common/nosection.ini b/opnfv_testapi/tests/unit/common/nosection.ini
new file mode 100644
index 0000000..9988fc0
--- /dev/null
+++ b/opnfv_testapi/tests/unit/common/nosection.ini
@@ -0,0 +1,11 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[api]
+# Listening port
+port = 8000
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = False
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/opnfv_testapi/tests/unit/common/notboolean.ini b/opnfv_testapi/tests/unit/common/notboolean.ini
new file mode 100644
index 0000000..b3f3276
--- /dev/null
+++ b/opnfv_testapi/tests/unit/common/notboolean.ini
@@ -0,0 +1,17 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[mongo]
+# URL of the mongo DB
+# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
+url = mongodb://127.0.0.1:27017/
+dbname = test_results_collection
+
+[api]
+# Listening port
+port = 8000
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = notboolean
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/opnfv_testapi/tests/unit/common/notint.ini b/opnfv_testapi/tests/unit/common/notint.ini
new file mode 100644
index 0000000..d1b752a
--- /dev/null
+++ b/opnfv_testapi/tests/unit/common/notint.ini
@@ -0,0 +1,17 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[mongo]
+# URL of the mongo DB
+# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
+url = mongodb://127.0.0.1:27017/
+dbname = test_results_collection
+
+[api]
+# Listening port
+port = notint
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = False
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/opnfv_testapi/tests/unit/common/test_config.py b/opnfv_testapi/tests/unit/common/test_config.py
new file mode 100644
index 0000000..cc8743c
--- /dev/null
+++ b/opnfv_testapi/tests/unit/common/test_config.py
@@ -0,0 +1,15 @@
+import argparse
+
+
+def test_config_normal(mocker, config_normal):
+ mocker.patch(
+ 'argparse.ArgumentParser.parse_known_args',
+ return_value=(argparse.Namespace(config_file=config_normal), None))
+ from opnfv_testapi.common import config
+ CONF = config.Config()
+ assert CONF.mongo_url == 'mongodb://127.0.0.1:27017/'
+ assert CONF.mongo_dbname == 'test_results_collection'
+ assert CONF.api_port == 8000
+ assert CONF.api_debug is True
+ assert CONF.api_authenticate is False
+ assert CONF.swagger_base_url == 'http://localhost:8000'
diff --git a/opnfv_testapi/tests/unit/conftest.py b/opnfv_testapi/tests/unit/conftest.py
new file mode 100644
index 0000000..feff1da
--- /dev/null
+++ b/opnfv_testapi/tests/unit/conftest.py
@@ -0,0 +1,8 @@
+from os import path
+
+import pytest
+
+
+@pytest.fixture
+def config_normal():
+ return path.join(path.dirname(__file__), 'common/normal.ini')
diff --git a/opnfv_testapi/tests/unit/executor.py b/opnfv_testapi/tests/unit/executor.py
new file mode 100644
index 0000000..b8f696c
--- /dev/null
+++ b/opnfv_testapi/tests/unit/executor.py
@@ -0,0 +1,97 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corp
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import functools
+import httplib
+
+
+def upload(excepted_status, excepted_response):
+ def _upload(create_request):
+ @functools.wraps(create_request)
+ def wrap(self):
+ request = create_request(self)
+ status, body = self.upload(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _upload
+
+
+def create(excepted_status, excepted_response):
+ def _create(create_request):
+ @functools.wraps(create_request)
+ def wrap(self):
+ request = create_request(self)
+ status, body = self.create(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _create
+
+
+def get(excepted_status, excepted_response):
+ def _get(get_request):
+ @functools.wraps(get_request)
+ def wrap(self):
+ request = get_request(self)
+ status, body = self.get(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _get
+
+
+def update(excepted_status, excepted_response):
+ def _update(update_request):
+ @functools.wraps(update_request)
+ def wrap(self):
+ request, resource = update_request(self)
+ status, body = self.update(request, resource)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(request, body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _update
+
+
+def delete(excepted_status, excepted_response):
+ def _delete(delete_request):
+ @functools.wraps(delete_request)
+ def wrap(self):
+ request = delete_request(self)
+ if isinstance(request, tuple):
+ status, body = self.delete(request[0], *(request[1]))
+ else:
+ status, body = self.delete(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _delete
+
+
+def query(excepted_status, excepted_response, number=0):
+ def _query(get_request):
+ @functools.wraps(get_request)
+ def wrap(self):
+ request = get_request(self)
+ status, body = self.query(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body, number)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _query
diff --git a/opnfv_testapi/tests/unit/fake_pymongo.py b/opnfv_testapi/tests/unit/fake_pymongo.py
new file mode 100644
index 0000000..0ca83df
--- /dev/null
+++ b/opnfv_testapi/tests/unit/fake_pymongo.py
@@ -0,0 +1,284 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from operator import itemgetter
+
+from bson.objectid import ObjectId
+from concurrent.futures import ThreadPoolExecutor
+
+
+def thread_execute(method, *args, **kwargs):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(method, *args, **kwargs)
+ return result
+
+
+class MemCursor(object):
+ def __init__(self, collection):
+ self.collection = collection
+ self.length = len(self.collection)
+ self.sorted = []
+
+ def _is_next_exist(self):
+ return self.length != 0
+
+ @property
+ def fetch_next(self):
+ return thread_execute(self._is_next_exist)
+
+ def next_object(self):
+ self.length -= 1
+ return self.collection.pop()
+
+ def sort(self, key_or_list):
+ for k, v in key_or_list.iteritems():
+ if v == -1:
+ reverse = True
+ else:
+ reverse = False
+
+ self.collection = sorted(self.collection,
+ key=itemgetter(k), reverse=reverse)
+ return self
+
+ def limit(self, limit):
+ if limit != 0 and limit < len(self.collection):
+ self.collection = self.collection[0: limit]
+ self.length = limit
+ return self
+
+ def skip(self, skip):
+ if skip < self.length and (skip > 0):
+ self.collection = self.collection[self.length - skip: -1]
+ self.length -= skip
+ elif skip >= self.length:
+ self.collection = []
+ self.length = 0
+ return self
+
+ def _count(self):
+ return self.length
+
+ def count(self):
+ return thread_execute(self._count)
+
+
+class MemDb(object):
+
+ def __init__(self, name):
+ self.name = name
+ self.contents = []
+ pass
+
+ def _find_one(self, spec_or_id=None, *args):
+ if spec_or_id is not None and not isinstance(spec_or_id, dict):
+ spec_or_id = {"_id": spec_or_id}
+ if '_id' in spec_or_id:
+ spec_or_id['_id'] = str(spec_or_id['_id'])
+ cursor = self._find(spec_or_id, *args)
+ for result in cursor:
+ return result
+ return None
+
+ def find_one(self, spec_or_id=None, *args):
+ return thread_execute(self._find_one, spec_or_id, *args)
+
+ def _insert(self, doc_or_docs, check_keys=True):
+
+ docs = doc_or_docs
+ return_one = False
+ if isinstance(docs, dict):
+ return_one = True
+ docs = [docs]
+
+ if check_keys:
+ for doc in docs:
+ self._check_keys(doc)
+
+ ids = []
+ for doc in docs:
+ if '_id' not in doc:
+ doc['_id'] = str(ObjectId())
+ if not self._find_one(doc['_id']):
+ ids.append(doc['_id'])
+ self.contents.append(doc_or_docs)
+
+ if len(ids) == 0:
+ return None
+ if return_one:
+ return ids[0]
+ else:
+ return ids
+
+ def insert(self, doc_or_docs, check_keys=True):
+ return thread_execute(self._insert, doc_or_docs, check_keys)
+
+ @staticmethod
+ def _compare_date(spec, value):
+ gte = True
+ lt = False
+ for k, v in spec.iteritems():
+ if k == '$gte' and value < v:
+ gte = False
+ elif k == '$lt' and value < v:
+ lt = True
+ return gte and lt
+
+ def _in(self, content, *args):
+ if self.name == 'scenarios':
+ return self._in_scenarios(content, *args)
+ else:
+ return self._in_others(content, *args)
+
+ def _in_scenarios_installer(self, installer, content):
+ hit = False
+ for s_installer in content['installers']:
+ if installer == s_installer['installer']:
+ hit = True
+
+ return hit
+
+ def _in_scenarios_version(self, version, content):
+ hit = False
+ for s_installer in content['installers']:
+ for s_version in s_installer['versions']:
+ if version == s_version['version']:
+ hit = True
+ return hit
+
+ def _in_scenarios_project(self, project, content):
+ hit = False
+ for s_installer in content['installers']:
+ for s_version in s_installer['versions']:
+ for s_project in s_version['projects']:
+ if project == s_project['project']:
+ hit = True
+
+ return hit
+
+ def _in_scenarios(self, content, *args):
+ for arg in args:
+ for k, v in arg.iteritems():
+ if k == 'installers':
+ for inner in v.values():
+ for i_k, i_v in inner.iteritems():
+ if i_k == 'installer':
+ return self._in_scenarios_installer(i_v,
+ content)
+ elif i_k == 'versions.version':
+ return self._in_scenarios_version(i_v,
+ content)
+ elif i_k == 'versions.projects.project':
+ return self._in_scenarios_project(i_v,
+ content)
+ elif content.get(k, None) != v:
+ return False
+
+ return True
+
+ def _in_others(self, content, *args):
+ for arg in args:
+ for k, v in arg.iteritems():
+ if k == 'start_date':
+ if not MemDb._compare_date(v, content.get(k)):
+ return False
+ elif k == 'trust_indicator.current':
+ if content.get('trust_indicator').get('current') != v:
+ return False
+ elif not isinstance(v, dict) and content.get(k, None) != v:
+ return False
+ return True
+
+ def _find(self, *args):
+ res = []
+ for content in self.contents:
+ if self._in(content, *args):
+ res.append(content)
+
+ return res
+
+ def find(self, *args):
+ return MemCursor(self._find(*args))
+
+ def _aggregate(self, *args, **kwargs):
+ res = self.contents
+ print args
+ for arg in args[0]:
+ for k, v in arg.iteritems():
+ if k == '$match':
+ res = self._find(v)
+ cursor = MemCursor(res)
+ for arg in args[0]:
+ for k, v in arg.iteritems():
+ if k == '$sort':
+ cursor = cursor.sort(v)
+ elif k == '$skip':
+ cursor = cursor.skip(v)
+ elif k == '$limit':
+ cursor = cursor.limit(v)
+ return cursor
+
+ def aggregate(self, *args, **kwargs):
+ return self._aggregate(*args, **kwargs)
+
+ def _update(self, spec, document, check_keys=True):
+ updated = False
+
+ if check_keys:
+ self._check_keys(document)
+
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec):
+ for k, v in document.iteritems():
+ updated = True
+ content[k] = v
+ self.contents[index] = content
+ return updated
+
+ def update(self, spec, document, check_keys=True):
+ return thread_execute(self._update, spec, document, check_keys)
+
+ def _remove(self, spec_or_id=None):
+ if spec_or_id is None:
+ self.contents = []
+ if not isinstance(spec_or_id, dict):
+ spec_or_id = {'_id': spec_or_id}
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec_or_id):
+ del self.contents[index]
+ return True
+ return False
+
+ def remove(self, spec_or_id=None):
+ return thread_execute(self._remove, spec_or_id)
+
+ def clear(self):
+ self._remove()
+
+ def _check_keys(self, doc):
+ for key in doc.keys():
+ if '.' in key:
+ raise NameError('key {} must not contain .'.format(key))
+ if key.startswith('$'):
+ raise NameError('key {} must not start with $'.format(key))
+ if isinstance(doc.get(key), dict):
+ self._check_keys(doc.get(key))
+
+
+def __getattr__(name):
+ return globals()[name]
+
+
+pods = MemDb('pods')
+projects = MemDb('projects')
+testcases = MemDb('testcases')
+results = MemDb('results')
+scenarios = MemDb('scenarios')
+tokens = MemDb('tokens')
diff --git a/opnfv_testapi/tests/unit/resources/__init__.py b/opnfv_testapi/tests/unit/resources/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/__init__.py
diff --git a/opnfv_testapi/tests/unit/resources/scenario-c1.json b/opnfv_testapi/tests/unit/resources/scenario-c1.json
new file mode 100644
index 0000000..1878022
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/scenario-c1.json
@@ -0,0 +1,38 @@
+{
+ "name": "nosdn-nofeature-ha",
+ "installers":
+ [
+ {
+ "installer": "apex",
+ "versions":
+ [
+ {
+ "owner": "Luke",
+ "version": "master",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores":
+ [
+ {
+ "date": "2017-01-08 22:46:44",
+ "score": "12/14"
+ }
+
+ ],
+ "trust_indicators": []
+ },
+ {
+ "project": "yardstick",
+ "customs": [],
+ "scores": [],
+ "trust_indicators": []
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/opnfv_testapi/tests/unit/resources/scenario-c2.json b/opnfv_testapi/tests/unit/resources/scenario-c2.json
new file mode 100644
index 0000000..b6a3b83
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/scenario-c2.json
@@ -0,0 +1,73 @@
+{
+ "name": "odl_2-nofeature-ha",
+ "installers":
+ [
+ {
+ "installer": "fuel",
+ "versions":
+ [
+ {
+ "owner": "Lucky",
+ "version": "colorado",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores": [],
+ "trust_indicators": [
+ {
+ "date": "2017-01-18 22:46:44",
+ "status": "silver"
+ }
+
+ ]
+ },
+ {
+ "project": "yardstick",
+ "customs": ["suite-a"],
+ "scores": [
+ {
+ "date": "2017-01-08 22:46:44",
+ "score": "0"
+ }
+ ],
+ "trust_indicators": [
+ {
+ "date": "2017-01-18 22:46:44",
+ "status": "gold"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "owner": "Luke",
+ "version": "colorado",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores":
+ [
+ {
+ "date": "2017-01-09 22:46:44",
+ "score": "11/14"
+ }
+
+ ],
+ "trust_indicators": []
+ },
+ {
+ "project": "yardstick",
+ "customs": [],
+ "scores": [],
+ "trust_indicators": []
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/opnfv_testapi/tests/unit/resources/test_base.py b/opnfv_testapi/tests/unit/resources/test_base.py
new file mode 100644
index 0000000..dcec4e9
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/test_base.py
@@ -0,0 +1,161 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import json
+from os import path
+
+import mock
+from tornado import testing
+
+from opnfv_testapi.resources import models
+from opnfv_testapi.tests.unit import fake_pymongo
+
+
+class TestBase(testing.AsyncHTTPTestCase):
+ headers = {'Content-Type': 'application/json; charset=UTF-8'}
+
+ def setUp(self):
+ self._patch_server()
+ self.basePath = ''
+ self.create_res = models.CreateResponse
+ self.get_res = None
+ self.list_res = None
+ self.update_res = None
+ self.req_d = None
+ self.req_e = None
+ self.addCleanup(self._clear)
+ super(TestBase, self).setUp()
+
+ def tearDown(self):
+ self.db_patcher.stop()
+ self.config_patcher.stop()
+
+ def _patch_server(self):
+ import argparse
+ config = path.join(path.dirname(__file__), '../common/normal.ini')
+ self.config_patcher = mock.patch(
+ 'argparse.ArgumentParser.parse_known_args',
+ return_value=(argparse.Namespace(config_file=config), None))
+ self.db_patcher = mock.patch('opnfv_testapi.db.api.DB',
+ fake_pymongo)
+ self.config_patcher.start()
+ self.db_patcher.start()
+
+ def set_config_file(self):
+ self.config_file = 'normal.ini'
+
+ def get_app(self):
+ from opnfv_testapi.cmd import server
+ return server.make_app()
+
+ def create_d(self, *args):
+ return self.create(self.req_d, *args)
+
+ def create_e(self, *args):
+ return self.create(self.req_e, *args)
+
+ def create(self, req=None, *args):
+ return self.create_help(self.basePath, req, *args)
+
+ def create_help(self, uri, req, *args):
+ if req and not isinstance(req, str) and hasattr(req, 'format'):
+ req = req.format()
+ res = self.fetch(self._update_uri(uri, *args),
+ method='POST',
+ body=json.dumps(req),
+ headers=self.headers)
+
+ return self._get_return(res, self.create_res)
+
+ def get(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='GET',
+ headers=self.headers)
+
+ def inner():
+ new_args, num = self._get_valid_args(*args)
+ return self.get_res \
+ if num != self._need_arg_num(self.basePath) else self.list_res
+ return self._get_return(res, inner())
+
+ def query(self, query):
+ res = self.fetch(self._get_query_uri(query),
+ method='GET',
+ headers=self.headers)
+ return self._get_return(res, self.list_res)
+
+ def update(self, new=None, *args):
+ if new:
+ new = new.format()
+ res = self.fetch(self._get_uri(*args),
+ method='PUT',
+ body=json.dumps(new),
+ headers=self.headers)
+ return self._get_return(res, self.update_res)
+
+ def delete(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='DELETE',
+ headers=self.headers)
+ return res.code, res.body
+
+ @staticmethod
+ def _get_valid_args(*args):
+ new_args = tuple(['%s' % arg for arg in args if arg is not None])
+ return new_args, len(new_args)
+
+ def _need_arg_num(self, uri):
+ return uri.count('%s')
+
+ def _get_query_uri(self, query):
+ return self.basePath + '?' + query if query else self.basePath
+
+ def _get_uri(self, *args):
+ return self._update_uri(self.basePath, *args)
+
+ def _update_uri(self, uri, *args):
+ r_uri = uri
+ new_args, num = self._get_valid_args(*args)
+ if num != self._need_arg_num(uri):
+ r_uri += '/%s'
+
+ return r_uri % tuple(['%s' % arg for arg in new_args])
+
+ def _get_return(self, res, cls):
+ code = res.code
+ body = res.body
+ return code, self._get_return_body(code, body, cls)
+
+ @staticmethod
+ def _get_return_body(code, body, cls):
+ return cls.from_dict(json.loads(body)) if code < 300 and cls else body
+
+ def assert_href(self, body):
+ self.assertIn(self.basePath, body.href)
+
+ def assert_create_body(self, body, req=None, *args):
+ import inspect
+ if not req:
+ req = self.req_d
+ resource_name = ''
+ if inspect.isclass(req):
+ resource_name = req.name
+ elif isinstance(req, dict):
+ resource_name = req['name']
+ elif isinstance(req, str):
+ resource_name = json.loads(req)['name']
+ new_args = args + tuple([resource_name])
+ self.assertIn(self._get_uri(*new_args), body.href)
+
+ @staticmethod
+ def _clear():
+ fake_pymongo.pods.clear()
+ fake_pymongo.projects.clear()
+ fake_pymongo.testcases.clear()
+ fake_pymongo.results.clear()
+ fake_pymongo.scenarios.clear()
diff --git a/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py b/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py
new file mode 100644
index 0000000..1ebc96f
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py
@@ -0,0 +1,123 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from tornado import gen
+from tornado import testing
+from tornado import web
+
+from opnfv_testapi.tests.unit import fake_pymongo
+
+
+class MyTest(testing.AsyncHTTPTestCase):
+ def setUp(self):
+ super(MyTest, self).setUp()
+ self.db = fake_pymongo
+ self.addCleanup(self._clear)
+ self.io_loop.run_sync(self.fixture_setup)
+
+ def get_app(self):
+ return web.Application()
+
+ @gen.coroutine
+ def fixture_setup(self):
+ self.test1 = {'_id': '1', 'name': 'test1'}
+ self.test2 = {'name': 'test2'}
+ yield self.db.pods.insert({'_id': '1', 'name': 'test1'})
+ yield self.db.pods.insert({'name': 'test2'})
+
+ @testing.gen_test
+ def test_find_one(self):
+ user = yield self.db.pods.find_one({'name': 'test1'})
+ self.assertEqual(user, self.test1)
+ self.db.pods.remove()
+
+ @testing.gen_test
+ def test_find(self):
+ cursor = self.db.pods.find()
+ names = []
+ while (yield cursor.fetch_next):
+ ob = cursor.next_object()
+ names.append(ob.get('name'))
+ self.assertItemsEqual(names, ['test1', 'test2'])
+
+ @testing.gen_test
+ def test_update(self):
+ yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertEqual(user.get('name', None), 'new_test1')
+
+ def test_update_dot_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ 'key 1. name must not contain .')
+
+ def test_update_dot_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ None,
+ check_keys=False)
+
+ def test_update_dollar_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ 'key $name must not start with $')
+
+ def test_update_dollar_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ None,
+ check_keys=False)
+
+ @testing.gen_test
+ def test_remove(self):
+ yield self.db.pods.remove({'_id': '1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertIsNone(user)
+
+ def test_insert_dot_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ 'key 2. name must not contain .')
+
+ def test_insert_dot_no_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ None,
+ check_keys=False)
+
+ def test_insert_dollar_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ 'key $name must not start with $')
+
+ def test_insert_dollar_no_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ None,
+ check_keys=False)
+
+ def _clear(self):
+ self.db.pods.clear()
+
+ def _update_assert(self, docs, error=None, **kwargs):
+ self._db_assert('update', error, {'_id': '1'}, docs, **kwargs)
+
+ def _insert_assert(self, docs, error=None, **kwargs):
+ self._db_assert('insert', error, docs, **kwargs)
+
+ @testing.gen_test
+ def _db_assert(self, method, error, *args, **kwargs):
+ name_error = None
+ try:
+ yield self._eval_pods_db(method, *args, **kwargs)
+ except NameError as err:
+ name_error = err.args[0]
+ finally:
+ self.assertEqual(name_error, error)
+
+ def _eval_pods_db(self, method, *args, **kwargs):
+ table_obj = vars(self.db)['pods']
+ return table_obj.__getattribute__(method)(*args, **kwargs)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/opnfv_testapi/tests/unit/resources/test_pod.py b/opnfv_testapi/tests/unit/resources/test_pod.py
new file mode 100644
index 0000000..cb4f1d9
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/test_pod.py
@@ -0,0 +1,90 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import pod_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestPodBase(base.TestBase):
+ def setUp(self):
+ super(TestPodBase, self).setUp()
+ self.req_d = pod_models.PodCreateRequest('zte-1', 'virtual',
+ 'zte pod 1', 'ci-pod')
+ self.req_e = pod_models.PodCreateRequest('zte-2', 'metal', 'zte pod 2')
+ self.get_res = pod_models.Pod
+ self.list_res = pod_models.Pods
+ self.basePath = '/api/v1/pods'
+
+ def assert_get_body(self, pod, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(pod.name, req.name)
+ self.assertEqual(pod.mode, req.mode)
+ self.assertEqual(pod.details, req.details)
+ self.assertEqual(pod.role, req.role)
+ self.assertIsNotNone(pod.creation_date)
+ self.assertIsNotNone(pod._id)
+
+
+class TestPodCreate(TestPodBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ return pod_models.PodCreateRequest('')
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ return pod_models.PodCreateRequest(None)
+
+ @executor.create(httplib.OK, 'assert_create_body')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestPodGet(TestPodBase):
+ def setUp(self):
+ super(TestPodGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_get_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_assert_list')
+ def test_list(self):
+ return None
+
+ def _assert_list(self, body):
+ self.assertEqual(len(body.pods), 2)
+ for pod in body.pods:
+ if self.req_d.name == pod.name:
+ self.assert_get_body(pod)
+ else:
+ self.assert_get_body(pod, self.req_e)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/opnfv_testapi/tests/unit/resources/test_project.py b/opnfv_testapi/tests/unit/resources/test_project.py
new file mode 100644
index 0000000..0622ba8
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/test_project.py
@@ -0,0 +1,137 @@
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestProjectBase(base.TestBase):
+ def setUp(self):
+ super(TestProjectBase, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping',
+ 'vping-ssh test')
+ self.req_e = project_models.ProjectCreateRequest('doctor',
+ 'doctor test')
+ self.get_res = project_models.Project
+ self.list_res = project_models.Projects
+ self.update_res = project_models.Project
+ self.basePath = '/api/v1/projects'
+
+ def assert_body(self, project, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(project.name, req.name)
+ self.assertEqual(project.description, req.description)
+ self.assertIsNotNone(project._id)
+ self.assertIsNotNone(project.creation_date)
+
+
+class TestProjectCreate(TestProjectBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ return project_models.ProjectCreateRequest('')
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ return project_models.ProjectCreateRequest(None)
+
+ @executor.create(httplib.OK, 'assert_create_body')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestProjectGet(TestProjectBase):
+ def setUp(self):
+ super(TestProjectGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_assert_list')
+ def test_list(self):
+ return None
+
+ def _assert_list(self, body):
+ for project in body.projects:
+ if self.req_d.name == project.name:
+ self.assert_body(project)
+ else:
+ self.assert_body(project, self.req_e)
+
+
+class TestProjectUpdate(TestProjectBase):
+ def setUp(self):
+ super(TestProjectUpdate, self).setUp()
+ _, d_body = self.create_d()
+ _, get_res = self.get(self.req_d.name)
+ self.index_d = get_res._id
+ self.create_e()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None, 'noBody'
+
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return self.req_e, 'notFound'
+
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
+ def test_newNameExist(self):
+ return self.req_e, self.req_d.name
+
+ @executor.update(httplib.FORBIDDEN, message.no_update())
+ def test_noUpdate(self):
+ return self.req_d, self.req_d.name
+
+ @executor.update(httplib.OK, '_assert_update')
+ def test_success(self):
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ def _assert_update(self, req, body):
+ self.assertEqual(self.index_d, body._id)
+ self.assert_body(body, req)
+ _, new_body = self.get(req.name)
+ self.assertEqual(self.index_d, new_body._id)
+ self.assert_body(new_body, req)
+
+
+class TestProjectDelete(TestProjectBase):
+ def setUp(self):
+ super(TestProjectDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return 'notFound'
+
+ @executor.delete(httplib.OK, '_assert_delete')
+ def test_success(self):
+ return self.req_d.name
+
+ def _assert_delete(self, body):
+ self.assertEqual(body, '')
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/opnfv_testapi/tests/unit/resources/test_result.py b/opnfv_testapi/tests/unit/resources/test_result.py
new file mode 100644
index 0000000..1e83ed3
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/test_result.py
@@ -0,0 +1,410 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+import httplib
+import unittest
+from datetime import datetime, timedelta
+import json
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import pod_models
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.resources import result_models
+from opnfv_testapi.resources import testcase_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class Details(object):
+ def __init__(self, timestart=None, duration=None, status=None):
+ self.timestart = timestart
+ self.duration = duration
+ self.status = status
+ self.items = [{'item1': 1}, {'item2': 2}]
+
+ def format(self):
+ return {
+ "timestart": self.timestart,
+ "duration": self.duration,
+ "status": self.status,
+ 'items': [{'item1': 1}, {'item2': 2}]
+ }
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = Details()
+ t.timestart = a_dict.get('timestart')
+ t.duration = a_dict.get('duration')
+ t.status = a_dict.get('status')
+ t.items = a_dict.get('items')
+ return t
+
+
+class TestResultBase(base.TestBase):
+ def setUp(self):
+ self.pod = 'zte-pod1'
+ self.project = 'functest'
+ self.case = 'vPing'
+ self.installer = 'fuel'
+ self.version = 'C'
+ self.build_tag = 'v3.0'
+ self.scenario = 'odl-l2'
+ self.criteria = 'passed'
+ self.trust_indicator = result_models.TI(0.7)
+ self.start_date = str(datetime.now())
+ self.stop_date = str(datetime.now() + timedelta(minutes=1))
+ self.update_date = str(datetime.now() + timedelta(days=1))
+ self.update_step = -0.05
+ super(TestResultBase, self).setUp()
+ self.details = Details(timestart='0', duration='9s', status='OK')
+ self.req_d = result_models.ResultCreateRequest(
+ pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria,
+ trust_indicator=self.trust_indicator)
+ self.get_res = result_models.TestResult
+ self.list_res = result_models.TestResults
+ self.update_res = result_models.TestResult
+ self.basePath = '/api/v1/results'
+ self.req_pod = pod_models.PodCreateRequest(
+ self.pod,
+ 'metal',
+ 'zte pod 1')
+ self.req_project = project_models.ProjectCreateRequest(
+ self.project,
+ 'vping test')
+ self.req_testcase = testcase_models.TestcaseCreateRequest(
+ self.case,
+ '/cases/vping',
+ 'vping-ssh test')
+ self.create_help('/api/v1/pods', self.req_pod)
+ self.create_help('/api/v1/projects', self.req_project)
+ self.create_help('/api/v1/projects/%s/cases',
+ self.req_testcase,
+ self.project)
+
+ def assert_res(self, result, req=None):
+ if req is None:
+ req = self.req_d
+ self.assertEqual(result.pod_name, req.pod_name)
+ self.assertEqual(result.project_name, req.project_name)
+ self.assertEqual(result.case_name, req.case_name)
+ self.assertEqual(result.installer, req.installer)
+ self.assertEqual(result.version, req.version)
+ details_req = Details.from_dict(req.details)
+ details_res = Details.from_dict(result.details)
+ self.assertEqual(details_res.duration, details_req.duration)
+ self.assertEqual(details_res.timestart, details_req.timestart)
+ self.assertEqual(details_res.status, details_req.status)
+ self.assertEqual(details_res.items, details_req.items)
+ self.assertEqual(result.build_tag, req.build_tag)
+ self.assertEqual(result.scenario, req.scenario)
+ self.assertEqual(result.criteria, req.criteria)
+ self.assertEqual(result.start_date, req.start_date)
+ self.assertEqual(result.stop_date, req.stop_date)
+ self.assertIsNotNone(result._id)
+ ti = result.trust_indicator
+ self.assertEqual(ti.current, req.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+ def _create_d(self):
+ _, res = self.create_d()
+ return res.href.split('/')[-1]
+
+ def upload(self, req):
+ if req and not isinstance(req, str) and hasattr(req, 'format'):
+ req = req.format()
+ res = self.fetch(self.basePath + '/upload',
+ method='POST',
+ body=json.dumps(req),
+ headers=self.headers)
+
+ return self._get_return(res, self.create_res)
+
+
+class TestResultUpload(TestResultBase):
+ @executor.upload(httplib.BAD_REQUEST, message.key_error('file'))
+ def test_filenotfind(self):
+ return None
+
+
+class TestResultCreate(TestResultBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_nobody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('pod_name'))
+ def test_podNotProvided(self):
+ req = self.req_d
+ req.pod_name = None
+ return req
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('project_name'))
+ def test_projectNotProvided(self):
+ req = self.req_d
+ req.project_name = None
+ return req
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('case_name'))
+ def test_testcaseNotProvided(self):
+ req = self.req_d
+ req.case_name = None
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noPod(self):
+ req = self.req_d
+ req.pod_name = 'notExistPod'
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noProject(self):
+ req = self.req_d
+ req.project_name = 'notExistProject'
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noTestcase(self):
+ req = self.req_d
+ req.case_name = 'notExistTestcase'
+ return req
+
+ @executor.create(httplib.OK, 'assert_href')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.OK, 'assert_href')
+ def test_key_with_doc(self):
+ req = copy.deepcopy(self.req_d)
+ req.details = {'1.name': 'dot_name'}
+ return req
+
+ @executor.create(httplib.OK, '_assert_no_ti')
+ def test_no_ti(self):
+ req = result_models.ResultCreateRequest(pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria)
+ self.actual_req = req
+ return req
+
+ def _assert_no_ti(self, body):
+ _id = body.href.split('/')[-1]
+ code, body = self.get(_id)
+ self.assert_res(body, self.actual_req)
+
+
+class TestResultGet(TestResultBase):
+ def setUp(self):
+ super(TestResultGet, self).setUp()
+ self.req_10d_before = self._create_changed_date(days=-10)
+ self.req_d_id = self._create_d()
+ self.req_10d_later = self._create_changed_date(days=10)
+
+ @executor.get(httplib.OK, 'assert_res')
+ def test_getOne(self):
+ return self.req_d_id
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryPod(self):
+ return self._set_query('pod')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryProject(self):
+ return self._set_query('project')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryTestcase(self):
+ return self._set_query('case')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryVersion(self):
+ return self._set_query('version')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryInstaller(self):
+ return self._set_query('installer')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryBuildTag(self):
+ return self._set_query('build_tag')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryScenario(self):
+ return self._set_query('scenario')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryTrustIndicator(self):
+ return self._set_query('trust_indicator')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryCriteria(self):
+ return self._set_query('criteria')
+
+ @executor.query(httplib.BAD_REQUEST, message.must_int('period'))
+ def test_queryPeriodNotInt(self):
+ return self._set_query('period=a')
+
+ @executor.query(httplib.OK, '_query_period_one', 1)
+ def test_queryPeriodSuccess(self):
+ return self._set_query('period=5')
+
+ @executor.query(httplib.BAD_REQUEST, message.must_int('last'))
+ def test_queryLastNotInt(self):
+ return self._set_query('last=a')
+
+ @executor.query(httplib.OK, '_query_last_one', 1)
+ def test_queryLast(self):
+ return self._set_query('last=1')
+
+ @executor.query(httplib.OK, '_query_success', 4)
+ def test_queryPublic(self):
+ self._create_public_data()
+ return self._set_query('')
+
+ @executor.query(httplib.OK, '_query_success', 1)
+ def test_queryPrivate(self):
+ self._create_private_data()
+ return self._set_query('public=false')
+
+ @executor.query(httplib.OK, '_query_period_one', 1)
+ def test_combination(self):
+ return self._set_query('pod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=5')
+
+ @executor.query(httplib.OK, '_query_success', 0)
+ def test_notFound(self):
+ return self._set_query('pod=notExistPod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1')
+
+ @executor.query(httplib.OK, '_query_success', 1)
+ def test_filterErrorStartdate(self):
+ self._create_error_start_date(None)
+ self._create_error_start_date('None')
+ self._create_error_start_date('null')
+ self._create_error_start_date('')
+ return self._set_query('period=5')
+
+ def _query_success(self, body, number):
+ self.assertEqual(number, len(body.results))
+
+ def _query_last_one(self, body, number):
+ self.assertEqual(number, len(body.results))
+ self.assert_res(body.results[0], self.req_10d_later)
+
+ def _query_period_one(self, body, number):
+ self.assertEqual(number, len(body.results))
+ self.assert_res(body.results[0], self.req_d)
+
+ def _create_error_start_date(self, start_date):
+ req = copy.deepcopy(self.req_d)
+ req.start_date = start_date
+ self.create(req)
+ return req
+
+ def _create_changed_date(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.start_date = datetime.now() + timedelta(**kwargs)
+ req.stop_date = str(req.start_date + timedelta(minutes=10))
+ req.start_date = str(req.start_date)
+ self.create(req)
+ return req
+
+ def _create_public_data(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.public = 'true'
+ self.create(req)
+ return req
+
+ def _create_private_data(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.public = 'false'
+ self.create(req)
+ return req
+
+ def _set_query(self, *args):
+ def get_value(arg):
+ return self.__getattribute__(arg) \
+ if arg != 'trust_indicator' else self.trust_indicator.current
+ uri = ''
+ for arg in args:
+ if arg:
+ if '=' in arg:
+ uri += arg + '&'
+ else:
+ uri += '{}={}&'.format(arg, get_value(arg))
+ return uri[0: -1]
+
+
+class TestResultUpdate(TestResultBase):
+ def setUp(self):
+ super(TestResultUpdate, self).setUp()
+ self.req_d_id = self._create_d()
+
+ @executor.update(httplib.OK, '_assert_update_ti')
+ def test_success(self):
+ new_ti = copy.deepcopy(self.trust_indicator)
+ new_ti.current += self.update_step
+ new_ti.histories.append(
+ result_models.TIHistory(self.update_date, self.update_step))
+ new_data = copy.deepcopy(self.req_d)
+ new_data.trust_indicator = new_ti
+ update = result_models.ResultUpdateRequest(trust_indicator=new_ti)
+ self.update_req = new_data
+ return update, self.req_d_id
+
+ def _assert_update_ti(self, request, body):
+ ti = body.trust_indicator
+ self.assertEqual(ti.current, request.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/opnfv_testapi/tests/unit/resources/test_scenario.py b/opnfv_testapi/tests/unit/resources/test_scenario.py
new file mode 100644
index 0000000..bd72067
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/test_scenario.py
@@ -0,0 +1,360 @@
+import functools
+import httplib
+import json
+import os
+from copy import deepcopy
+from datetime import datetime
+
+import opnfv_testapi.resources.scenario_models as models
+from opnfv_testapi.common import message
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestScenarioBase(base.TestBase):
+ def setUp(self):
+ super(TestScenarioBase, self).setUp()
+ self.get_res = models.Scenario
+ self.list_res = models.Scenarios
+ self.basePath = '/api/v1/scenarios'
+ self.req_d = self._load_request('scenario-c1.json')
+ self.req_2 = self._load_request('scenario-c2.json')
+
+ def tearDown(self):
+ pass
+
+ def assert_body(self, project, req=None):
+ pass
+
+ @staticmethod
+ def _load_request(f_req):
+ abs_file = os.path.join(os.path.dirname(__file__), f_req)
+ with open(abs_file, 'r') as f:
+ loader = json.load(f)
+ f.close()
+ return loader
+
+ def create_return_name(self, req):
+ _, res = self.create(req)
+ return res.href.split('/')[-1]
+
+ def assert_res(self, code, scenario, req=None):
+ self.assertEqual(code, httplib.OK)
+ if req is None:
+ req = self.req_d
+ self.assertIsNotNone(scenario._id)
+ self.assertIsNotNone(scenario.creation_date)
+
+ scenario == models.Scenario.from_dict(req)
+
+ @staticmethod
+ def _set_query(*args):
+ uri = ''
+ for arg in args:
+ uri += arg + '&'
+ return uri[0: -1]
+
+ def _get_and_assert(self, name, req=None):
+ code, body = self.get(name)
+ self.assert_res(code, body, req)
+
+
+class TestScenarioCreate(TestScenarioBase):
+ def test_withoutBody(self):
+ (code, body) = self.create()
+ self.assertEqual(code, httplib.BAD_REQUEST)
+
+ def test_emptyName(self):
+ req_empty = models.ScenarioCreateRequest('')
+ (code, body) = self.create(req_empty)
+ self.assertEqual(code, httplib.BAD_REQUEST)
+ self.assertIn(message.missing('name'), body)
+
+ def test_noneName(self):
+ req_none = models.ScenarioCreateRequest(None)
+ (code, body) = self.create(req_none)
+ self.assertEqual(code, httplib.BAD_REQUEST)
+ self.assertIn(message.missing('name'), body)
+
+ def test_success(self):
+ (code, body) = self.create_d()
+ self.assertEqual(code, httplib.OK)
+ self.assert_create_body(body)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ (code, body) = self.create_d()
+ self.assertEqual(code, httplib.FORBIDDEN)
+ self.assertIn(message.exist_base, body)
+
+
+class TestScenarioGet(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioGet, self).setUp()
+ self.scenario_1 = self.create_return_name(self.req_d)
+ self.scenario_2 = self.create_return_name(self.req_2)
+
+ def test_getByName(self):
+ self._get_and_assert(self.scenario_1, self.req_d)
+
+ def test_getAll(self):
+ self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
+
+ def test_queryName(self):
+ query = self._set_query('name=nosdn-nofeature-ha')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryInstaller(self):
+ query = self._set_query('installer=apex')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryVersion(self):
+ query = self._set_query('version=master')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryProject(self):
+ query = self._set_query('project=functest')
+ self._query_and_assert(query, reqs=[self.req_d, self.req_2])
+
+ def test_queryCombination(self):
+ query = self._set_query('name=nosdn-nofeature-ha',
+ 'installer=apex',
+ 'version=master',
+ 'project=functest')
+
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def _query_and_assert(self, query, found=True, reqs=None):
+ code, body = self.query(query)
+ if not found:
+ self.assertEqual(code, httplib.OK)
+ self.assertEqual(0, len(body.scenarios))
+ else:
+ self.assertEqual(len(reqs), len(body.scenarios))
+ for req in reqs:
+ for scenario in body.scenarios:
+ if req['name'] == scenario.name:
+ self.assert_res(code, scenario, req)
+
+
+class TestScenarioUpdate(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioUpdate, self).setUp()
+ self.scenario = self.create_return_name(self.req_d)
+ self.scenario_2 = self.create_return_name(self.req_2)
+
+ def _execute(set_update):
+ @functools.wraps(set_update)
+ def magic(self):
+ update, scenario = set_update(self, deepcopy(self.req_d))
+ self._update_and_assert(update, scenario)
+ return magic
+
+ def _update(expected):
+ def _update(set_update):
+ @functools.wraps(set_update)
+ def wrap(self):
+ update, scenario = set_update(self, deepcopy(self.req_d))
+ code, body = self.update(update, self.scenario)
+ getattr(self, expected)(code, scenario)
+ return wrap
+ return _update
+
+ @_update('_success')
+ def test_renameScenario(self, scenario):
+ new_name = 'nosdn-nofeature-noha'
+ scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={'name': new_name})
+ return update_req, scenario
+
+ @_update('_forbidden')
+ def test_renameScenario_exist(self, scenario):
+ new_name = self.scenario_2
+ scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={'name': new_name})
+ return update_req, scenario
+
+ @_update('_bad_request')
+ def test_renameScenario_noName(self, scenario):
+ new_name = self.scenario_2
+ scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={})
+ return update_req, scenario
+
+ @_execute
+ def test_addInstaller(self, scenario):
+ add = models.ScenarioInstaller(installer='daisy', versions=list())
+ scenario['installers'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='installer',
+ op='add',
+ locate={},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_deleteInstaller(self, scenario):
+ scenario['installers'] = filter(lambda f: f['installer'] != 'apex',
+ scenario['installers'])
+
+ update = models.ScenarioUpdateRequest(field='installer',
+ op='delete',
+ locate={'installer': 'apex'})
+ return update, scenario
+
+ @_execute
+ def test_addVersion(self, scenario):
+ add = models.ScenarioVersion(version='danube', projects=list())
+ scenario['installers'][0]['versions'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='version',
+ op='add',
+ locate={'installer': 'apex'},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_deleteVersion(self, scenario):
+ scenario['installers'][0]['versions'] = filter(
+ lambda f: f['version'] != 'master',
+ scenario['installers'][0]['versions'])
+
+ update = models.ScenarioUpdateRequest(field='version',
+ op='delete',
+ locate={'installer': 'apex',
+ 'version': 'master'})
+ return update, scenario
+
+ @_execute
+ def test_changeOwner(self, scenario):
+ scenario['installers'][0]['versions'][0]['owner'] = 'lucy'
+
+ update = models.ScenarioUpdateRequest(field='owner',
+ op='update',
+ locate={'installer': 'apex',
+ 'version': 'master'},
+ term={'owner': 'lucy'})
+ return update, scenario
+
+ @_execute
+ def test_addProject(self, scenario):
+ add = models.ScenarioProject(project='qtip').format()
+ scenario['installers'][0]['versions'][0]['projects'].append(add)
+ update = models.ScenarioUpdateRequest(field='project',
+ op='add',
+ locate={'installer': 'apex',
+ 'version': 'master'},
+ term=add)
+ return update, scenario
+
+ @_execute
+ def test_deleteProject(self, scenario):
+ scenario['installers'][0]['versions'][0]['projects'] = filter(
+ lambda f: f['project'] != 'functest',
+ scenario['installers'][0]['versions'][0]['projects'])
+
+ update = models.ScenarioUpdateRequest(field='project',
+ op='delete',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'})
+ return update, scenario
+
+ @_execute
+ def test_addCustoms(self, scenario):
+ add = ['odl', 'parser', 'vping_ssh']
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh']
+ update = models.ScenarioUpdateRequest(field='customs',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add)
+ return update, scenario
+
+ @_execute
+ def test_deleteCustoms(self, scenario):
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = ['healthcheck']
+ update = models.ScenarioUpdateRequest(field='customs',
+ op='delete',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=['vping_ssh'])
+ return update, scenario
+
+ @_execute
+ def test_addScore(self, scenario):
+ add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['scores'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='score',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_addTi(self, scenario):
+ add = models.ScenarioTI(date=str(datetime.now()), status='gold')
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['trust_indicators'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='trust_indicator',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add.format())
+ return update, scenario
+
+ def _update_and_assert(self, update_req, new_scenario, name=None):
+ code, _ = self.update(update_req, self.scenario)
+ self.assertEqual(code, httplib.OK)
+ self._get_and_assert(_none_default(name, self.scenario),
+ new_scenario)
+
+ def _success(self, status, new_scenario):
+ self.assertEqual(status, httplib.OK)
+ self._get_and_assert(new_scenario.get('name'), new_scenario)
+
+ def _forbidden(self, status, new_scenario):
+ self.assertEqual(status, httplib.FORBIDDEN)
+
+ def _bad_request(self, status, new_scenario):
+ self.assertEqual(status, httplib.BAD_REQUEST)
+
+
+class TestScenarioDelete(TestScenarioBase):
+ def test_notFound(self):
+ code, body = self.delete('notFound')
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+ def test_success(self):
+ scenario = self.create_return_name(self.req_d)
+ code, _ = self.delete(scenario)
+ self.assertEqual(code, httplib.OK)
+ code, _ = self.get(scenario)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+def _none_default(check, default):
+ return check if check else default
diff --git a/opnfv_testapi/tests/unit/resources/test_testcase.py b/opnfv_testapi/tests/unit/resources/test_testcase.py
new file mode 100644
index 0000000..4f2bc2a
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/test_testcase.py
@@ -0,0 +1,201 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.resources import testcase_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestCaseBase(base.TestBase):
+ def setUp(self):
+ super(TestCaseBase, self).setUp()
+ self.req_d = testcase_models.TestcaseCreateRequest('vping_1',
+ '/cases/vping_1',
+ 'vping-ssh test')
+ self.req_e = testcase_models.TestcaseCreateRequest('doctor_1',
+ '/cases/doctor_1',
+ 'create doctor')
+ self.update_d = testcase_models.TestcaseUpdateRequest('vping_1',
+ 'vping-ssh test',
+ 'functest')
+ self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1',
+ 'create doctor',
+ 'functest')
+ self.get_res = testcase_models.Testcase
+ self.list_res = testcase_models.Testcases
+ self.update_res = testcase_models.Testcase
+ self.basePath = '/api/v1/projects/%s/cases'
+ self.create_project()
+
+ def assert_body(self, case, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(case.name, req.name)
+ self.assertEqual(case.description, req.description)
+ self.assertEqual(case.url, req.url)
+ self.assertIsNotNone(case._id)
+ self.assertIsNotNone(case.creation_date)
+
+ def assert_update_body(self, old, new, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(new.name, req.name)
+ self.assertEqual(new.description, req.description)
+ self.assertEqual(new.url, old.url)
+ self.assertIsNotNone(new._id)
+ self.assertIsNotNone(new.creation_date)
+
+ def create_project(self):
+ req_p = project_models.ProjectCreateRequest('functest',
+ 'vping-ssh test')
+ self.create_help('/api/v1/projects', req_p)
+ self.project = req_p.name
+
+ def create_d(self):
+ return super(TestCaseBase, self).create_d(self.project)
+
+ def create_e(self):
+ return super(TestCaseBase, self).create_e(self.project)
+
+ def get(self, case=None):
+ return super(TestCaseBase, self).get(self.project, case)
+
+ def create(self, req=None, *args):
+ return super(TestCaseBase, self).create(req, self.project)
+
+ def update(self, new=None, case=None):
+ return super(TestCaseBase, self).update(new, self.project, case)
+
+ def delete(self, case):
+ return super(TestCaseBase, self).delete(self.project, case)
+
+
+class TestCaseCreate(TestCaseBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_noBody(self):
+ return None
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noProject(self):
+ self.project = 'noProject'
+ return self.req_d
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ req_empty = testcase_models.TestcaseCreateRequest('')
+ return req_empty
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ req_none = testcase_models.TestcaseCreateRequest(None)
+ return req_none
+
+ @executor.create(httplib.OK, '_assert_success')
+ def test_success(self):
+ return self.req_d
+
+ def _assert_success(self, body):
+ self.assert_create_body(body, self.req_d, self.project)
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestCaseGet(TestCaseBase):
+ def setUp(self):
+ super(TestCaseGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_list')
+ def test_list(self):
+ return None
+
+ def _list(self, body):
+ for case in body.testcases:
+ if self.req_d.name == case.name:
+ self.assert_body(case)
+ else:
+ self.assert_body(case, self.req_e)
+
+
+class TestCaseUpdate(TestCaseBase):
+ def setUp(self):
+ super(TestCaseUpdate, self).setUp()
+ self.create_d()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
+ def test_noBody(self):
+ return None, 'noBody'
+
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return self.update_e, 'notFound'
+
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
+ def test_newNameExist(self):
+ self.create_e()
+ return self.update_e, self.req_d.name
+
+ @executor.update(httplib.FORBIDDEN, message.no_update())
+ def test_noUpdate(self):
+ return self.update_d, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_success(self):
+ return self.update_e, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_with_dollar(self):
+ update = copy.deepcopy(self.update_d)
+ update.description = {'2. change': 'dollar change'}
+ return update, self.req_d.name
+
+ def _update_success(self, request, body):
+ self.assert_update_body(self.req_d, body, request)
+ _, new_body = self.get(request.name)
+ self.assert_update_body(self.req_d, new_body, request)
+
+
+class TestCaseDelete(TestCaseBase):
+ def setUp(self):
+ super(TestCaseDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return 'notFound'
+
+ @executor.delete(httplib.OK, '_delete_success')
+ def test_success(self):
+ return self.req_d.name
+
+ def _delete_success(self, body):
+ self.assertEqual(body, '')
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/opnfv_testapi/tests/unit/resources/test_token.py b/opnfv_testapi/tests/unit/resources/test_token.py
new file mode 100644
index 0000000..940e256
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/test_token.py
@@ -0,0 +1,114 @@
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import httplib
+import unittest
+
+from tornado import web
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import fake_pymongo
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestToken(base.TestBase):
+ def get_app(self):
+ from opnfv_testapi.router import url_mappings
+ return web.Application(
+ url_mappings.mappings,
+ db=fake_pymongo,
+ debug=True,
+ auth=True
+ )
+
+
+class TestTokenCreateProject(TestToken):
+ def setUp(self):
+ super(TestTokenCreateProject, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping')
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+ self.basePath = '/api/v1/projects'
+
+ @executor.create(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectCreateTokenInvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ return self.req_d
+
+ @executor.create(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_projectCreateTokenUnauthorized(self):
+ if 'X-Auth-Token' in self.headers:
+ self.headers.pop('X-Auth-Token')
+ return self.req_d
+
+ @executor.create(httplib.OK, '_create_success')
+ def test_projectCreateTokenSuccess(self):
+ self.headers['X-Auth-Token'] = '12345'
+ return self.req_d
+
+ def _create_success(self, body):
+ self.assertIn('CreateResponse', str(type(body)))
+
+
+class TestTokenDeleteProject(TestToken):
+ def setUp(self):
+ super(TestTokenDeleteProject, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping')
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+ self.basePath = '/api/v1/projects'
+ self.headers['X-Auth-Token'] = '12345'
+ self.create_d()
+
+ @executor.delete(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectDeleteTokenIvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ return self.req_d.name
+
+ @executor.delete(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_projectDeleteTokenUnauthorized(self):
+ self.headers.pop('X-Auth-Token')
+ return self.req_d.name
+
+ @executor.delete(httplib.OK, '_delete_success')
+ def test_projectDeleteTokenSuccess(self):
+ return self.req_d.name
+
+ def _delete_success(self, body):
+ self.assertEqual('', body)
+
+
+class TestTokenUpdateProject(TestToken):
+ def setUp(self):
+ super(TestTokenUpdateProject, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping')
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+ self.basePath = '/api/v1/projects'
+ self.headers['X-Auth-Token'] = '12345'
+ self.create_d()
+
+ @executor.update(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectUpdateTokenIvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ @executor.update(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_projectUpdateTokenUnauthorized(self):
+ self.headers.pop('X-Auth-Token')
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_projectUpdateTokenSuccess(self):
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ def _update_success(self, request, body):
+ self.assertIn(request.name, body)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/opnfv_testapi/tests/unit/resources/test_version.py b/opnfv_testapi/tests/unit/resources/test_version.py
new file mode 100644
index 0000000..51fed11
--- /dev/null
+++ b/opnfv_testapi/tests/unit/resources/test_version.py
@@ -0,0 +1,36 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import httplib
+import unittest
+
+from opnfv_testapi.resources import models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestVersionBase(base.TestBase):
+ def setUp(self):
+ super(TestVersionBase, self).setUp()
+ self.list_res = models.Versions
+ self.basePath = '/versions'
+
+
+class TestVersion(TestVersionBase):
+ @executor.get(httplib.OK, '_get_success')
+ def test_success(self):
+ return None
+
+ def _get_success(self, body):
+ self.assertEqual(len(body.versions), 1)
+ self.assertEqual(body.versions[0].version, 'v1.0')
+ self.assertEqual(body.versions[0].description, 'basics')
+
+
+if __name__ == '__main__':
+ unittest.main()