summaryrefslogtreecommitdiffstats
path: root/cvp/opnfv_testapi/tests
diff options
context:
space:
mode:
authorhongbo tian <hongbo.tianhongbo@huawei.com>2017-09-28 09:28:41 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-09-28 09:28:41 +0000
commit61820ee85c967d90021e4089c6a7907046685639 (patch)
tree0c3f23e2146f0855fb5be0ff55343d59e3a9c9ab /cvp/opnfv_testapi/tests
parent0cc2fdefa9e6e959e7c69beede736738f339f636 (diff)
parent0cf6b232ac9cf128ee9183a27c08f4f74ab2e2e6 (diff)
Merge "add api&web services for cvp"
Diffstat (limited to 'cvp/opnfv_testapi/tests')
-rw-r--r--cvp/opnfv_testapi/tests/__init__.py1
-rw-r--r--cvp/opnfv_testapi/tests/unit/__init__.py9
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/__init__.py0
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/noparam.ini16
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/normal.ini17
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/nosection.ini11
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/notboolean.ini17
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/notint.ini17
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/test_config.py15
-rw-r--r--cvp/opnfv_testapi/tests/unit/conftest.py8
-rw-r--r--cvp/opnfv_testapi/tests/unit/executor.py97
-rw-r--r--cvp/opnfv_testapi/tests/unit/fake_pymongo.py284
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/__init__.py0
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json38
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json73
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_base.py161
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py123
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_pod.py90
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_project.py137
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_result.py410
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_scenario.py360
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_testcase.py201
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_token.py114
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_version.py36
24 files changed, 2235 insertions, 0 deletions
diff --git a/cvp/opnfv_testapi/tests/__init__.py b/cvp/opnfv_testapi/tests/__init__.py
new file mode 100644
index 00000000..9f28b0bf
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/__init__.py
@@ -0,0 +1 @@
+__author__ = 'serena'
diff --git a/cvp/opnfv_testapi/tests/unit/__init__.py b/cvp/opnfv_testapi/tests/unit/__init__.py
new file mode 100644
index 00000000..3fc79f1d
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/__init__.py
@@ -0,0 +1,9 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+__author__ = 'serena'
diff --git a/cvp/opnfv_testapi/tests/unit/common/__init__.py b/cvp/opnfv_testapi/tests/unit/common/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/common/__init__.py
diff --git a/cvp/opnfv_testapi/tests/unit/common/noparam.ini b/cvp/opnfv_testapi/tests/unit/common/noparam.ini
new file mode 100644
index 00000000..fda2a09e
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/common/noparam.ini
@@ -0,0 +1,16 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[mongo]
+# URL of the mongo DB
+# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
+url = mongodb://127.0.0.1:27017/
+
+[api]
+# Listening port
+port = 8000
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = False
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/normal.ini b/cvp/opnfv_testapi/tests/unit/common/normal.ini
new file mode 100644
index 00000000..77cc6c6e
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/common/normal.ini
@@ -0,0 +1,17 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[mongo]
+# URL of the mongo DB
+# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
+url = mongodb://127.0.0.1:27017/
+dbname = test_results_collection
+
+[api]
+# Listening port
+port = 8000
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = False
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/nosection.ini b/cvp/opnfv_testapi/tests/unit/common/nosection.ini
new file mode 100644
index 00000000..9988fc0a
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/common/nosection.ini
@@ -0,0 +1,11 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[api]
+# Listening port
+port = 8000
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = False
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/notboolean.ini b/cvp/opnfv_testapi/tests/unit/common/notboolean.ini
new file mode 100644
index 00000000..b3f32767
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/common/notboolean.ini
@@ -0,0 +1,17 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[mongo]
+# URL of the mongo DB
+# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
+url = mongodb://127.0.0.1:27017/
+dbname = test_results_collection
+
+[api]
+# Listening port
+port = 8000
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = notboolean
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/notint.ini b/cvp/opnfv_testapi/tests/unit/common/notint.ini
new file mode 100644
index 00000000..d1b752a3
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/common/notint.ini
@@ -0,0 +1,17 @@
+# to add a new parameter in the config file,
+# the CONF object in config.ini must be updated
+[mongo]
+# URL of the mongo DB
+# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
+url = mongodb://127.0.0.1:27017/
+dbname = test_results_collection
+
+[api]
+# Listening port
+port = notint
+# With debug_on set to true, error traces will be shown in HTTP responses
+debug = True
+authenticate = False
+
+[swagger]
+base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/test_config.py b/cvp/opnfv_testapi/tests/unit/common/test_config.py
new file mode 100644
index 00000000..cc8743ca
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/common/test_config.py
@@ -0,0 +1,15 @@
+import argparse
+
+
+def test_config_normal(mocker, config_normal):
+ mocker.patch(
+ 'argparse.ArgumentParser.parse_known_args',
+ return_value=(argparse.Namespace(config_file=config_normal), None))
+ from opnfv_testapi.common import config
+ CONF = config.Config()
+ assert CONF.mongo_url == 'mongodb://127.0.0.1:27017/'
+ assert CONF.mongo_dbname == 'test_results_collection'
+ assert CONF.api_port == 8000
+ assert CONF.api_debug is True
+ assert CONF.api_authenticate is False
+ assert CONF.swagger_base_url == 'http://localhost:8000'
diff --git a/cvp/opnfv_testapi/tests/unit/conftest.py b/cvp/opnfv_testapi/tests/unit/conftest.py
new file mode 100644
index 00000000..feff1daa
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/conftest.py
@@ -0,0 +1,8 @@
+from os import path
+
+import pytest
+
+
+@pytest.fixture
+def config_normal():
+ return path.join(path.dirname(__file__), 'common/normal.ini')
diff --git a/cvp/opnfv_testapi/tests/unit/executor.py b/cvp/opnfv_testapi/tests/unit/executor.py
new file mode 100644
index 00000000..b8f696ca
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/executor.py
@@ -0,0 +1,97 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corp
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import functools
+import httplib
+
+
+def upload(excepted_status, excepted_response):
+ def _upload(create_request):
+ @functools.wraps(create_request)
+ def wrap(self):
+ request = create_request(self)
+ status, body = self.upload(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _upload
+
+
+def create(excepted_status, excepted_response):
+ def _create(create_request):
+ @functools.wraps(create_request)
+ def wrap(self):
+ request = create_request(self)
+ status, body = self.create(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _create
+
+
+def get(excepted_status, excepted_response):
+ def _get(get_request):
+ @functools.wraps(get_request)
+ def wrap(self):
+ request = get_request(self)
+ status, body = self.get(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _get
+
+
+def update(excepted_status, excepted_response):
+ def _update(update_request):
+ @functools.wraps(update_request)
+ def wrap(self):
+ request, resource = update_request(self)
+ status, body = self.update(request, resource)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(request, body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _update
+
+
+def delete(excepted_status, excepted_response):
+ def _delete(delete_request):
+ @functools.wraps(delete_request)
+ def wrap(self):
+ request = delete_request(self)
+ if isinstance(request, tuple):
+ status, body = self.delete(request[0], *(request[1]))
+ else:
+ status, body = self.delete(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _delete
+
+
+def query(excepted_status, excepted_response, number=0):
+ def _query(get_request):
+ @functools.wraps(get_request)
+ def wrap(self):
+ request = get_request(self)
+ status, body = self.query(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body, number)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _query
diff --git a/cvp/opnfv_testapi/tests/unit/fake_pymongo.py b/cvp/opnfv_testapi/tests/unit/fake_pymongo.py
new file mode 100644
index 00000000..0ca83df6
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/fake_pymongo.py
@@ -0,0 +1,284 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from operator import itemgetter
+
+from bson.objectid import ObjectId
+from concurrent.futures import ThreadPoolExecutor
+
+
+def thread_execute(method, *args, **kwargs):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(method, *args, **kwargs)
+ return result
+
+
+class MemCursor(object):
+ def __init__(self, collection):
+ self.collection = collection
+ self.length = len(self.collection)
+ self.sorted = []
+
+ def _is_next_exist(self):
+ return self.length != 0
+
+ @property
+ def fetch_next(self):
+ return thread_execute(self._is_next_exist)
+
+ def next_object(self):
+ self.length -= 1
+ return self.collection.pop()
+
+ def sort(self, key_or_list):
+ for k, v in key_or_list.iteritems():
+ if v == -1:
+ reverse = True
+ else:
+ reverse = False
+
+ self.collection = sorted(self.collection,
+ key=itemgetter(k), reverse=reverse)
+ return self
+
+ def limit(self, limit):
+ if limit != 0 and limit < len(self.collection):
+ self.collection = self.collection[0: limit]
+ self.length = limit
+ return self
+
+ def skip(self, skip):
+ if skip < self.length and (skip > 0):
+ self.collection = self.collection[self.length - skip: -1]
+ self.length -= skip
+ elif skip >= self.length:
+ self.collection = []
+ self.length = 0
+ return self
+
+ def _count(self):
+ return self.length
+
+ def count(self):
+ return thread_execute(self._count)
+
+
+class MemDb(object):
+
+ def __init__(self, name):
+ self.name = name
+ self.contents = []
+ pass
+
+ def _find_one(self, spec_or_id=None, *args):
+ if spec_or_id is not None and not isinstance(spec_or_id, dict):
+ spec_or_id = {"_id": spec_or_id}
+ if '_id' in spec_or_id:
+ spec_or_id['_id'] = str(spec_or_id['_id'])
+ cursor = self._find(spec_or_id, *args)
+ for result in cursor:
+ return result
+ return None
+
+ def find_one(self, spec_or_id=None, *args):
+ return thread_execute(self._find_one, spec_or_id, *args)
+
+ def _insert(self, doc_or_docs, check_keys=True):
+
+ docs = doc_or_docs
+ return_one = False
+ if isinstance(docs, dict):
+ return_one = True
+ docs = [docs]
+
+ if check_keys:
+ for doc in docs:
+ self._check_keys(doc)
+
+ ids = []
+ for doc in docs:
+ if '_id' not in doc:
+ doc['_id'] = str(ObjectId())
+ if not self._find_one(doc['_id']):
+ ids.append(doc['_id'])
+ self.contents.append(doc_or_docs)
+
+ if len(ids) == 0:
+ return None
+ if return_one:
+ return ids[0]
+ else:
+ return ids
+
+ def insert(self, doc_or_docs, check_keys=True):
+ return thread_execute(self._insert, doc_or_docs, check_keys)
+
+ @staticmethod
+ def _compare_date(spec, value):
+ gte = True
+ lt = False
+ for k, v in spec.iteritems():
+ if k == '$gte' and value < v:
+ gte = False
+ elif k == '$lt' and value < v:
+ lt = True
+ return gte and lt
+
+ def _in(self, content, *args):
+ if self.name == 'scenarios':
+ return self._in_scenarios(content, *args)
+ else:
+ return self._in_others(content, *args)
+
+ def _in_scenarios_installer(self, installer, content):
+ hit = False
+ for s_installer in content['installers']:
+ if installer == s_installer['installer']:
+ hit = True
+
+ return hit
+
+ def _in_scenarios_version(self, version, content):
+ hit = False
+ for s_installer in content['installers']:
+ for s_version in s_installer['versions']:
+ if version == s_version['version']:
+ hit = True
+ return hit
+
+ def _in_scenarios_project(self, project, content):
+ hit = False
+ for s_installer in content['installers']:
+ for s_version in s_installer['versions']:
+ for s_project in s_version['projects']:
+ if project == s_project['project']:
+ hit = True
+
+ return hit
+
+ def _in_scenarios(self, content, *args):
+ for arg in args:
+ for k, v in arg.iteritems():
+ if k == 'installers':
+ for inner in v.values():
+ for i_k, i_v in inner.iteritems():
+ if i_k == 'installer':
+ return self._in_scenarios_installer(i_v,
+ content)
+ elif i_k == 'versions.version':
+ return self._in_scenarios_version(i_v,
+ content)
+ elif i_k == 'versions.projects.project':
+ return self._in_scenarios_project(i_v,
+ content)
+ elif content.get(k, None) != v:
+ return False
+
+ return True
+
+ def _in_others(self, content, *args):
+ for arg in args:
+ for k, v in arg.iteritems():
+ if k == 'start_date':
+ if not MemDb._compare_date(v, content.get(k)):
+ return False
+ elif k == 'trust_indicator.current':
+ if content.get('trust_indicator').get('current') != v:
+ return False
+ elif not isinstance(v, dict) and content.get(k, None) != v:
+ return False
+ return True
+
+ def _find(self, *args):
+ res = []
+ for content in self.contents:
+ if self._in(content, *args):
+ res.append(content)
+
+ return res
+
+ def find(self, *args):
+ return MemCursor(self._find(*args))
+
+ def _aggregate(self, *args, **kwargs):
+ res = self.contents
+ print args
+ for arg in args[0]:
+ for k, v in arg.iteritems():
+ if k == '$match':
+ res = self._find(v)
+ cursor = MemCursor(res)
+ for arg in args[0]:
+ for k, v in arg.iteritems():
+ if k == '$sort':
+ cursor = cursor.sort(v)
+ elif k == '$skip':
+ cursor = cursor.skip(v)
+ elif k == '$limit':
+ cursor = cursor.limit(v)
+ return cursor
+
+ def aggregate(self, *args, **kwargs):
+ return self._aggregate(*args, **kwargs)
+
+ def _update(self, spec, document, check_keys=True):
+ updated = False
+
+ if check_keys:
+ self._check_keys(document)
+
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec):
+ for k, v in document.iteritems():
+ updated = True
+ content[k] = v
+ self.contents[index] = content
+ return updated
+
+ def update(self, spec, document, check_keys=True):
+ return thread_execute(self._update, spec, document, check_keys)
+
+ def _remove(self, spec_or_id=None):
+ if spec_or_id is None:
+ self.contents = []
+ if not isinstance(spec_or_id, dict):
+ spec_or_id = {'_id': spec_or_id}
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec_or_id):
+ del self.contents[index]
+ return True
+ return False
+
+ def remove(self, spec_or_id=None):
+ return thread_execute(self._remove, spec_or_id)
+
+ def clear(self):
+ self._remove()
+
+ def _check_keys(self, doc):
+ for key in doc.keys():
+ if '.' in key:
+ raise NameError('key {} must not contain .'.format(key))
+ if key.startswith('$'):
+ raise NameError('key {} must not start with $'.format(key))
+ if isinstance(doc.get(key), dict):
+ self._check_keys(doc.get(key))
+
+
+def __getattr__(name):
+ return globals()[name]
+
+
+pods = MemDb('pods')
+projects = MemDb('projects')
+testcases = MemDb('testcases')
+results = MemDb('results')
+scenarios = MemDb('scenarios')
+tokens = MemDb('tokens')
diff --git a/cvp/opnfv_testapi/tests/unit/resources/__init__.py b/cvp/opnfv_testapi/tests/unit/resources/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/__init__.py
diff --git a/cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json b/cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json
new file mode 100644
index 00000000..18780221
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json
@@ -0,0 +1,38 @@
+{
+ "name": "nosdn-nofeature-ha",
+ "installers":
+ [
+ {
+ "installer": "apex",
+ "versions":
+ [
+ {
+ "owner": "Luke",
+ "version": "master",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores":
+ [
+ {
+ "date": "2017-01-08 22:46:44",
+ "score": "12/14"
+ }
+
+ ],
+ "trust_indicators": []
+ },
+ {
+ "project": "yardstick",
+ "customs": [],
+ "scores": [],
+ "trust_indicators": []
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json b/cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json
new file mode 100644
index 00000000..b6a3b83a
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json
@@ -0,0 +1,73 @@
+{
+ "name": "odl_2-nofeature-ha",
+ "installers":
+ [
+ {
+ "installer": "fuel",
+ "versions":
+ [
+ {
+ "owner": "Lucky",
+ "version": "colorado",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores": [],
+ "trust_indicators": [
+ {
+ "date": "2017-01-18 22:46:44",
+ "status": "silver"
+ }
+
+ ]
+ },
+ {
+ "project": "yardstick",
+ "customs": ["suite-a"],
+ "scores": [
+ {
+ "date": "2017-01-08 22:46:44",
+ "score": "0"
+ }
+ ],
+ "trust_indicators": [
+ {
+ "date": "2017-01-18 22:46:44",
+ "status": "gold"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "owner": "Luke",
+ "version": "colorado",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores":
+ [
+ {
+ "date": "2017-01-09 22:46:44",
+ "score": "11/14"
+ }
+
+ ],
+ "trust_indicators": []
+ },
+ {
+ "project": "yardstick",
+ "customs": [],
+ "scores": [],
+ "trust_indicators": []
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_base.py b/cvp/opnfv_testapi/tests/unit/resources/test_base.py
new file mode 100644
index 00000000..dcec4e95
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/test_base.py
@@ -0,0 +1,161 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import json
+from os import path
+
+import mock
+from tornado import testing
+
+from opnfv_testapi.resources import models
+from opnfv_testapi.tests.unit import fake_pymongo
+
+
+class TestBase(testing.AsyncHTTPTestCase):
+ headers = {'Content-Type': 'application/json; charset=UTF-8'}
+
+ def setUp(self):
+ self._patch_server()
+ self.basePath = ''
+ self.create_res = models.CreateResponse
+ self.get_res = None
+ self.list_res = None
+ self.update_res = None
+ self.req_d = None
+ self.req_e = None
+ self.addCleanup(self._clear)
+ super(TestBase, self).setUp()
+
+ def tearDown(self):
+ self.db_patcher.stop()
+ self.config_patcher.stop()
+
+ def _patch_server(self):
+ import argparse
+ config = path.join(path.dirname(__file__), '../common/normal.ini')
+ self.config_patcher = mock.patch(
+ 'argparse.ArgumentParser.parse_known_args',
+ return_value=(argparse.Namespace(config_file=config), None))
+ self.db_patcher = mock.patch('opnfv_testapi.db.api.DB',
+ fake_pymongo)
+ self.config_patcher.start()
+ self.db_patcher.start()
+
+ def set_config_file(self):
+ self.config_file = 'normal.ini'
+
+ def get_app(self):
+ from opnfv_testapi.cmd import server
+ return server.make_app()
+
+ def create_d(self, *args):
+ return self.create(self.req_d, *args)
+
+ def create_e(self, *args):
+ return self.create(self.req_e, *args)
+
+ def create(self, req=None, *args):
+ return self.create_help(self.basePath, req, *args)
+
+ def create_help(self, uri, req, *args):
+ if req and not isinstance(req, str) and hasattr(req, 'format'):
+ req = req.format()
+ res = self.fetch(self._update_uri(uri, *args),
+ method='POST',
+ body=json.dumps(req),
+ headers=self.headers)
+
+ return self._get_return(res, self.create_res)
+
+ def get(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='GET',
+ headers=self.headers)
+
+ def inner():
+ new_args, num = self._get_valid_args(*args)
+ return self.get_res \
+ if num != self._need_arg_num(self.basePath) else self.list_res
+ return self._get_return(res, inner())
+
+ def query(self, query):
+ res = self.fetch(self._get_query_uri(query),
+ method='GET',
+ headers=self.headers)
+ return self._get_return(res, self.list_res)
+
+ def update(self, new=None, *args):
+ if new:
+ new = new.format()
+ res = self.fetch(self._get_uri(*args),
+ method='PUT',
+ body=json.dumps(new),
+ headers=self.headers)
+ return self._get_return(res, self.update_res)
+
+ def delete(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='DELETE',
+ headers=self.headers)
+ return res.code, res.body
+
+ @staticmethod
+ def _get_valid_args(*args):
+ new_args = tuple(['%s' % arg for arg in args if arg is not None])
+ return new_args, len(new_args)
+
+ def _need_arg_num(self, uri):
+ return uri.count('%s')
+
+ def _get_query_uri(self, query):
+ return self.basePath + '?' + query if query else self.basePath
+
+ def _get_uri(self, *args):
+ return self._update_uri(self.basePath, *args)
+
+ def _update_uri(self, uri, *args):
+ r_uri = uri
+ new_args, num = self._get_valid_args(*args)
+ if num != self._need_arg_num(uri):
+ r_uri += '/%s'
+
+ return r_uri % tuple(['%s' % arg for arg in new_args])
+
+ def _get_return(self, res, cls):
+ code = res.code
+ body = res.body
+ return code, self._get_return_body(code, body, cls)
+
+ @staticmethod
+ def _get_return_body(code, body, cls):
+ return cls.from_dict(json.loads(body)) if code < 300 and cls else body
+
+ def assert_href(self, body):
+ self.assertIn(self.basePath, body.href)
+
+ def assert_create_body(self, body, req=None, *args):
+ import inspect
+ if not req:
+ req = self.req_d
+ resource_name = ''
+ if inspect.isclass(req):
+ resource_name = req.name
+ elif isinstance(req, dict):
+ resource_name = req['name']
+ elif isinstance(req, str):
+ resource_name = json.loads(req)['name']
+ new_args = args + tuple([resource_name])
+ self.assertIn(self._get_uri(*new_args), body.href)
+
+ @staticmethod
+ def _clear():
+ fake_pymongo.pods.clear()
+ fake_pymongo.projects.clear()
+ fake_pymongo.testcases.clear()
+ fake_pymongo.results.clear()
+ fake_pymongo.scenarios.clear()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py b/cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py
new file mode 100644
index 00000000..1ebc96f3
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py
@@ -0,0 +1,123 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from tornado import gen
+from tornado import testing
+from tornado import web
+
+from opnfv_testapi.tests.unit import fake_pymongo
+
+
+class MyTest(testing.AsyncHTTPTestCase):
+ def setUp(self):
+ super(MyTest, self).setUp()
+ self.db = fake_pymongo
+ self.addCleanup(self._clear)
+ self.io_loop.run_sync(self.fixture_setup)
+
+ def get_app(self):
+ return web.Application()
+
+ @gen.coroutine
+ def fixture_setup(self):
+ self.test1 = {'_id': '1', 'name': 'test1'}
+ self.test2 = {'name': 'test2'}
+ yield self.db.pods.insert({'_id': '1', 'name': 'test1'})
+ yield self.db.pods.insert({'name': 'test2'})
+
+ @testing.gen_test
+ def test_find_one(self):
+ user = yield self.db.pods.find_one({'name': 'test1'})
+ self.assertEqual(user, self.test1)
+ self.db.pods.remove()
+
+ @testing.gen_test
+ def test_find(self):
+ cursor = self.db.pods.find()
+ names = []
+ while (yield cursor.fetch_next):
+ ob = cursor.next_object()
+ names.append(ob.get('name'))
+ self.assertItemsEqual(names, ['test1', 'test2'])
+
+ @testing.gen_test
+ def test_update(self):
+ yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertEqual(user.get('name', None), 'new_test1')
+
+ def test_update_dot_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ 'key 1. name must not contain .')
+
+ def test_update_dot_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ None,
+ check_keys=False)
+
+ def test_update_dollar_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ 'key $name must not start with $')
+
+ def test_update_dollar_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ None,
+ check_keys=False)
+
+ @testing.gen_test
+ def test_remove(self):
+ yield self.db.pods.remove({'_id': '1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertIsNone(user)
+
+ def test_insert_dot_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ 'key 2. name must not contain .')
+
+ def test_insert_dot_no_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ None,
+ check_keys=False)
+
+ def test_insert_dollar_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ 'key $name must not start with $')
+
+ def test_insert_dollar_no_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ None,
+ check_keys=False)
+
+ def _clear(self):
+ self.db.pods.clear()
+
+ def _update_assert(self, docs, error=None, **kwargs):
+ self._db_assert('update', error, {'_id': '1'}, docs, **kwargs)
+
+ def _insert_assert(self, docs, error=None, **kwargs):
+ self._db_assert('insert', error, docs, **kwargs)
+
+ @testing.gen_test
+ def _db_assert(self, method, error, *args, **kwargs):
+ name_error = None
+ try:
+ yield self._eval_pods_db(method, *args, **kwargs)
+ except NameError as err:
+ name_error = err.args[0]
+ finally:
+ self.assertEqual(name_error, error)
+
+ def _eval_pods_db(self, method, *args, **kwargs):
+ table_obj = vars(self.db)['pods']
+ return table_obj.__getattribute__(method)(*args, **kwargs)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_pod.py b/cvp/opnfv_testapi/tests/unit/resources/test_pod.py
new file mode 100644
index 00000000..cb4f1d92
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/test_pod.py
@@ -0,0 +1,90 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import pod_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestPodBase(base.TestBase):
+ def setUp(self):
+ super(TestPodBase, self).setUp()
+ self.req_d = pod_models.PodCreateRequest('zte-1', 'virtual',
+ 'zte pod 1', 'ci-pod')
+ self.req_e = pod_models.PodCreateRequest('zte-2', 'metal', 'zte pod 2')
+ self.get_res = pod_models.Pod
+ self.list_res = pod_models.Pods
+ self.basePath = '/api/v1/pods'
+
+ def assert_get_body(self, pod, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(pod.name, req.name)
+ self.assertEqual(pod.mode, req.mode)
+ self.assertEqual(pod.details, req.details)
+ self.assertEqual(pod.role, req.role)
+ self.assertIsNotNone(pod.creation_date)
+ self.assertIsNotNone(pod._id)
+
+
+class TestPodCreate(TestPodBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ return pod_models.PodCreateRequest('')
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ return pod_models.PodCreateRequest(None)
+
+ @executor.create(httplib.OK, 'assert_create_body')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestPodGet(TestPodBase):
+ def setUp(self):
+ super(TestPodGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_get_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_assert_list')
+ def test_list(self):
+ return None
+
+ def _assert_list(self, body):
+ self.assertEqual(len(body.pods), 2)
+ for pod in body.pods:
+ if self.req_d.name == pod.name:
+ self.assert_get_body(pod)
+ else:
+ self.assert_get_body(pod, self.req_e)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_project.py b/cvp/opnfv_testapi/tests/unit/resources/test_project.py
new file mode 100644
index 00000000..0622ba8d
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/test_project.py
@@ -0,0 +1,137 @@
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestProjectBase(base.TestBase):
+ def setUp(self):
+ super(TestProjectBase, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping',
+ 'vping-ssh test')
+ self.req_e = project_models.ProjectCreateRequest('doctor',
+ 'doctor test')
+ self.get_res = project_models.Project
+ self.list_res = project_models.Projects
+ self.update_res = project_models.Project
+ self.basePath = '/api/v1/projects'
+
+ def assert_body(self, project, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(project.name, req.name)
+ self.assertEqual(project.description, req.description)
+ self.assertIsNotNone(project._id)
+ self.assertIsNotNone(project.creation_date)
+
+
+class TestProjectCreate(TestProjectBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ return project_models.ProjectCreateRequest('')
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ return project_models.ProjectCreateRequest(None)
+
+ @executor.create(httplib.OK, 'assert_create_body')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestProjectGet(TestProjectBase):
+ def setUp(self):
+ super(TestProjectGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_assert_list')
+ def test_list(self):
+ return None
+
+ def _assert_list(self, body):
+ for project in body.projects:
+ if self.req_d.name == project.name:
+ self.assert_body(project)
+ else:
+ self.assert_body(project, self.req_e)
+
+
+class TestProjectUpdate(TestProjectBase):
+ def setUp(self):
+ super(TestProjectUpdate, self).setUp()
+ _, d_body = self.create_d()
+ _, get_res = self.get(self.req_d.name)
+ self.index_d = get_res._id
+ self.create_e()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None, 'noBody'
+
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return self.req_e, 'notFound'
+
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
+ def test_newNameExist(self):
+ return self.req_e, self.req_d.name
+
+ @executor.update(httplib.FORBIDDEN, message.no_update())
+ def test_noUpdate(self):
+ return self.req_d, self.req_d.name
+
+ @executor.update(httplib.OK, '_assert_update')
+ def test_success(self):
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ def _assert_update(self, req, body):
+ self.assertEqual(self.index_d, body._id)
+ self.assert_body(body, req)
+ _, new_body = self.get(req.name)
+ self.assertEqual(self.index_d, new_body._id)
+ self.assert_body(new_body, req)
+
+
+class TestProjectDelete(TestProjectBase):
+ def setUp(self):
+ super(TestProjectDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return 'notFound'
+
+ @executor.delete(httplib.OK, '_assert_delete')
+ def test_success(self):
+ return self.req_d.name
+
+ def _assert_delete(self, body):
+ self.assertEqual(body, '')
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_result.py b/cvp/opnfv_testapi/tests/unit/resources/test_result.py
new file mode 100644
index 00000000..1e83ed30
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/test_result.py
@@ -0,0 +1,410 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+import httplib
+import unittest
+from datetime import datetime, timedelta
+import json
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import pod_models
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.resources import result_models
+from opnfv_testapi.resources import testcase_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class Details(object):
+ def __init__(self, timestart=None, duration=None, status=None):
+ self.timestart = timestart
+ self.duration = duration
+ self.status = status
+ self.items = [{'item1': 1}, {'item2': 2}]
+
+ def format(self):
+ return {
+ "timestart": self.timestart,
+ "duration": self.duration,
+ "status": self.status,
+ 'items': [{'item1': 1}, {'item2': 2}]
+ }
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = Details()
+ t.timestart = a_dict.get('timestart')
+ t.duration = a_dict.get('duration')
+ t.status = a_dict.get('status')
+ t.items = a_dict.get('items')
+ return t
+
+
+class TestResultBase(base.TestBase):
+ def setUp(self):
+ self.pod = 'zte-pod1'
+ self.project = 'functest'
+ self.case = 'vPing'
+ self.installer = 'fuel'
+ self.version = 'C'
+ self.build_tag = 'v3.0'
+ self.scenario = 'odl-l2'
+ self.criteria = 'passed'
+ self.trust_indicator = result_models.TI(0.7)
+ self.start_date = str(datetime.now())
+ self.stop_date = str(datetime.now() + timedelta(minutes=1))
+ self.update_date = str(datetime.now() + timedelta(days=1))
+ self.update_step = -0.05
+ super(TestResultBase, self).setUp()
+ self.details = Details(timestart='0', duration='9s', status='OK')
+ self.req_d = result_models.ResultCreateRequest(
+ pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria,
+ trust_indicator=self.trust_indicator)
+ self.get_res = result_models.TestResult
+ self.list_res = result_models.TestResults
+ self.update_res = result_models.TestResult
+ self.basePath = '/api/v1/results'
+ self.req_pod = pod_models.PodCreateRequest(
+ self.pod,
+ 'metal',
+ 'zte pod 1')
+ self.req_project = project_models.ProjectCreateRequest(
+ self.project,
+ 'vping test')
+ self.req_testcase = testcase_models.TestcaseCreateRequest(
+ self.case,
+ '/cases/vping',
+ 'vping-ssh test')
+ self.create_help('/api/v1/pods', self.req_pod)
+ self.create_help('/api/v1/projects', self.req_project)
+ self.create_help('/api/v1/projects/%s/cases',
+ self.req_testcase,
+ self.project)
+
+ def assert_res(self, result, req=None):
+ if req is None:
+ req = self.req_d
+ self.assertEqual(result.pod_name, req.pod_name)
+ self.assertEqual(result.project_name, req.project_name)
+ self.assertEqual(result.case_name, req.case_name)
+ self.assertEqual(result.installer, req.installer)
+ self.assertEqual(result.version, req.version)
+ details_req = Details.from_dict(req.details)
+ details_res = Details.from_dict(result.details)
+ self.assertEqual(details_res.duration, details_req.duration)
+ self.assertEqual(details_res.timestart, details_req.timestart)
+ self.assertEqual(details_res.status, details_req.status)
+ self.assertEqual(details_res.items, details_req.items)
+ self.assertEqual(result.build_tag, req.build_tag)
+ self.assertEqual(result.scenario, req.scenario)
+ self.assertEqual(result.criteria, req.criteria)
+ self.assertEqual(result.start_date, req.start_date)
+ self.assertEqual(result.stop_date, req.stop_date)
+ self.assertIsNotNone(result._id)
+ ti = result.trust_indicator
+ self.assertEqual(ti.current, req.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+ def _create_d(self):
+ _, res = self.create_d()
+ return res.href.split('/')[-1]
+
+ def upload(self, req):
+ if req and not isinstance(req, str) and hasattr(req, 'format'):
+ req = req.format()
+ res = self.fetch(self.basePath + '/upload',
+ method='POST',
+ body=json.dumps(req),
+ headers=self.headers)
+
+ return self._get_return(res, self.create_res)
+
+
+class TestResultUpload(TestResultBase):
+ @executor.upload(httplib.BAD_REQUEST, message.key_error('file'))
+ def test_filenotfind(self):
+ return None
+
+
+class TestResultCreate(TestResultBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_nobody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('pod_name'))
+ def test_podNotProvided(self):
+ req = self.req_d
+ req.pod_name = None
+ return req
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('project_name'))
+ def test_projectNotProvided(self):
+ req = self.req_d
+ req.project_name = None
+ return req
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('case_name'))
+ def test_testcaseNotProvided(self):
+ req = self.req_d
+ req.case_name = None
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noPod(self):
+ req = self.req_d
+ req.pod_name = 'notExistPod'
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noProject(self):
+ req = self.req_d
+ req.project_name = 'notExistProject'
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noTestcase(self):
+ req = self.req_d
+ req.case_name = 'notExistTestcase'
+ return req
+
+ @executor.create(httplib.OK, 'assert_href')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.OK, 'assert_href')
+ def test_key_with_doc(self):
+ req = copy.deepcopy(self.req_d)
+ req.details = {'1.name': 'dot_name'}
+ return req
+
+ @executor.create(httplib.OK, '_assert_no_ti')
+ def test_no_ti(self):
+ req = result_models.ResultCreateRequest(pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria)
+ self.actual_req = req
+ return req
+
+ def _assert_no_ti(self, body):
+ _id = body.href.split('/')[-1]
+ code, body = self.get(_id)
+ self.assert_res(body, self.actual_req)
+
+
+class TestResultGet(TestResultBase):
+ def setUp(self):
+ super(TestResultGet, self).setUp()
+ self.req_10d_before = self._create_changed_date(days=-10)
+ self.req_d_id = self._create_d()
+ self.req_10d_later = self._create_changed_date(days=10)
+
+ @executor.get(httplib.OK, 'assert_res')
+ def test_getOne(self):
+ return self.req_d_id
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryPod(self):
+ return self._set_query('pod')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryProject(self):
+ return self._set_query('project')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryTestcase(self):
+ return self._set_query('case')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryVersion(self):
+ return self._set_query('version')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryInstaller(self):
+ return self._set_query('installer')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryBuildTag(self):
+ return self._set_query('build_tag')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryScenario(self):
+ return self._set_query('scenario')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryTrustIndicator(self):
+ return self._set_query('trust_indicator')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryCriteria(self):
+ return self._set_query('criteria')
+
+ @executor.query(httplib.BAD_REQUEST, message.must_int('period'))
+ def test_queryPeriodNotInt(self):
+ return self._set_query('period=a')
+
+ @executor.query(httplib.OK, '_query_period_one', 1)
+ def test_queryPeriodSuccess(self):
+ return self._set_query('period=5')
+
+ @executor.query(httplib.BAD_REQUEST, message.must_int('last'))
+ def test_queryLastNotInt(self):
+ return self._set_query('last=a')
+
+ @executor.query(httplib.OK, '_query_last_one', 1)
+ def test_queryLast(self):
+ return self._set_query('last=1')
+
+ @executor.query(httplib.OK, '_query_success', 4)
+ def test_queryPublic(self):
+ self._create_public_data()
+ return self._set_query('')
+
+ @executor.query(httplib.OK, '_query_success', 1)
+ def test_queryPrivate(self):
+ self._create_private_data()
+ return self._set_query('public=false')
+
+ @executor.query(httplib.OK, '_query_period_one', 1)
+ def test_combination(self):
+ return self._set_query('pod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=5')
+
+ @executor.query(httplib.OK, '_query_success', 0)
+ def test_notFound(self):
+ return self._set_query('pod=notExistPod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1')
+
+ @executor.query(httplib.OK, '_query_success', 1)
+ def test_filterErrorStartdate(self):
+ self._create_error_start_date(None)
+ self._create_error_start_date('None')
+ self._create_error_start_date('null')
+ self._create_error_start_date('')
+ return self._set_query('period=5')
+
+ def _query_success(self, body, number):
+ self.assertEqual(number, len(body.results))
+
+ def _query_last_one(self, body, number):
+ self.assertEqual(number, len(body.results))
+ self.assert_res(body.results[0], self.req_10d_later)
+
+ def _query_period_one(self, body, number):
+ self.assertEqual(number, len(body.results))
+ self.assert_res(body.results[0], self.req_d)
+
+ def _create_error_start_date(self, start_date):
+ req = copy.deepcopy(self.req_d)
+ req.start_date = start_date
+ self.create(req)
+ return req
+
+ def _create_changed_date(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.start_date = datetime.now() + timedelta(**kwargs)
+ req.stop_date = str(req.start_date + timedelta(minutes=10))
+ req.start_date = str(req.start_date)
+ self.create(req)
+ return req
+
+ def _create_public_data(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.public = 'true'
+ self.create(req)
+ return req
+
+ def _create_private_data(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.public = 'false'
+ self.create(req)
+ return req
+
+ def _set_query(self, *args):
+ def get_value(arg):
+ return self.__getattribute__(arg) \
+ if arg != 'trust_indicator' else self.trust_indicator.current
+ uri = ''
+ for arg in args:
+ if arg:
+ if '=' in arg:
+ uri += arg + '&'
+ else:
+ uri += '{}={}&'.format(arg, get_value(arg))
+ return uri[0: -1]
+
+
+class TestResultUpdate(TestResultBase):
+ def setUp(self):
+ super(TestResultUpdate, self).setUp()
+ self.req_d_id = self._create_d()
+
+ @executor.update(httplib.OK, '_assert_update_ti')
+ def test_success(self):
+ new_ti = copy.deepcopy(self.trust_indicator)
+ new_ti.current += self.update_step
+ new_ti.histories.append(
+ result_models.TIHistory(self.update_date, self.update_step))
+ new_data = copy.deepcopy(self.req_d)
+ new_data.trust_indicator = new_ti
+ update = result_models.ResultUpdateRequest(trust_indicator=new_ti)
+ self.update_req = new_data
+ return update, self.req_d_id
+
+ def _assert_update_ti(self, request, body):
+ ti = body.trust_indicator
+ self.assertEqual(ti.current, request.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_scenario.py b/cvp/opnfv_testapi/tests/unit/resources/test_scenario.py
new file mode 100644
index 00000000..bd720671
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/test_scenario.py
@@ -0,0 +1,360 @@
+import functools
+import httplib
+import json
+import os
+from copy import deepcopy
+from datetime import datetime
+
+import opnfv_testapi.resources.scenario_models as models
+from opnfv_testapi.common import message
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestScenarioBase(base.TestBase):
+ def setUp(self):
+ super(TestScenarioBase, self).setUp()
+ self.get_res = models.Scenario
+ self.list_res = models.Scenarios
+ self.basePath = '/api/v1/scenarios'
+ self.req_d = self._load_request('scenario-c1.json')
+ self.req_2 = self._load_request('scenario-c2.json')
+
+ def tearDown(self):
+ pass
+
+ def assert_body(self, project, req=None):
+ pass
+
+ @staticmethod
+ def _load_request(f_req):
+ abs_file = os.path.join(os.path.dirname(__file__), f_req)
+ with open(abs_file, 'r') as f:
+ loader = json.load(f)
+ f.close()
+ return loader
+
+ def create_return_name(self, req):
+ _, res = self.create(req)
+ return res.href.split('/')[-1]
+
+ def assert_res(self, code, scenario, req=None):
+ self.assertEqual(code, httplib.OK)
+ if req is None:
+ req = self.req_d
+ self.assertIsNotNone(scenario._id)
+ self.assertIsNotNone(scenario.creation_date)
+
+ scenario == models.Scenario.from_dict(req)
+
+ @staticmethod
+ def _set_query(*args):
+ uri = ''
+ for arg in args:
+ uri += arg + '&'
+ return uri[0: -1]
+
+ def _get_and_assert(self, name, req=None):
+ code, body = self.get(name)
+ self.assert_res(code, body, req)
+
+
+class TestScenarioCreate(TestScenarioBase):
+ def test_withoutBody(self):
+ (code, body) = self.create()
+ self.assertEqual(code, httplib.BAD_REQUEST)
+
+ def test_emptyName(self):
+ req_empty = models.ScenarioCreateRequest('')
+ (code, body) = self.create(req_empty)
+ self.assertEqual(code, httplib.BAD_REQUEST)
+ self.assertIn(message.missing('name'), body)
+
+ def test_noneName(self):
+ req_none = models.ScenarioCreateRequest(None)
+ (code, body) = self.create(req_none)
+ self.assertEqual(code, httplib.BAD_REQUEST)
+ self.assertIn(message.missing('name'), body)
+
+ def test_success(self):
+ (code, body) = self.create_d()
+ self.assertEqual(code, httplib.OK)
+ self.assert_create_body(body)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ (code, body) = self.create_d()
+ self.assertEqual(code, httplib.FORBIDDEN)
+ self.assertIn(message.exist_base, body)
+
+
+class TestScenarioGet(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioGet, self).setUp()
+ self.scenario_1 = self.create_return_name(self.req_d)
+ self.scenario_2 = self.create_return_name(self.req_2)
+
+ def test_getByName(self):
+ self._get_and_assert(self.scenario_1, self.req_d)
+
+ def test_getAll(self):
+ self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
+
+ def test_queryName(self):
+ query = self._set_query('name=nosdn-nofeature-ha')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryInstaller(self):
+ query = self._set_query('installer=apex')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryVersion(self):
+ query = self._set_query('version=master')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryProject(self):
+ query = self._set_query('project=functest')
+ self._query_and_assert(query, reqs=[self.req_d, self.req_2])
+
+ def test_queryCombination(self):
+ query = self._set_query('name=nosdn-nofeature-ha',
+ 'installer=apex',
+ 'version=master',
+ 'project=functest')
+
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def _query_and_assert(self, query, found=True, reqs=None):
+ code, body = self.query(query)
+ if not found:
+ self.assertEqual(code, httplib.OK)
+ self.assertEqual(0, len(body.scenarios))
+ else:
+ self.assertEqual(len(reqs), len(body.scenarios))
+ for req in reqs:
+ for scenario in body.scenarios:
+ if req['name'] == scenario.name:
+ self.assert_res(code, scenario, req)
+
+
+class TestScenarioUpdate(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioUpdate, self).setUp()
+ self.scenario = self.create_return_name(self.req_d)
+ self.scenario_2 = self.create_return_name(self.req_2)
+
+ def _execute(set_update):
+ @functools.wraps(set_update)
+ def magic(self):
+ update, scenario = set_update(self, deepcopy(self.req_d))
+ self._update_and_assert(update, scenario)
+ return magic
+
+ def _update(expected):
+ def _update(set_update):
+ @functools.wraps(set_update)
+ def wrap(self):
+ update, scenario = set_update(self, deepcopy(self.req_d))
+ code, body = self.update(update, self.scenario)
+ getattr(self, expected)(code, scenario)
+ return wrap
+ return _update
+
+ @_update('_success')
+ def test_renameScenario(self, scenario):
+ new_name = 'nosdn-nofeature-noha'
+ scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={'name': new_name})
+ return update_req, scenario
+
+ @_update('_forbidden')
+ def test_renameScenario_exist(self, scenario):
+ new_name = self.scenario_2
+ scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={'name': new_name})
+ return update_req, scenario
+
+ @_update('_bad_request')
+ def test_renameScenario_noName(self, scenario):
+ new_name = self.scenario_2
+ scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={})
+ return update_req, scenario
+
+ @_execute
+ def test_addInstaller(self, scenario):
+ add = models.ScenarioInstaller(installer='daisy', versions=list())
+ scenario['installers'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='installer',
+ op='add',
+ locate={},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_deleteInstaller(self, scenario):
+ scenario['installers'] = filter(lambda f: f['installer'] != 'apex',
+ scenario['installers'])
+
+ update = models.ScenarioUpdateRequest(field='installer',
+ op='delete',
+ locate={'installer': 'apex'})
+ return update, scenario
+
+ @_execute
+ def test_addVersion(self, scenario):
+ add = models.ScenarioVersion(version='danube', projects=list())
+ scenario['installers'][0]['versions'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='version',
+ op='add',
+ locate={'installer': 'apex'},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_deleteVersion(self, scenario):
+ scenario['installers'][0]['versions'] = filter(
+ lambda f: f['version'] != 'master',
+ scenario['installers'][0]['versions'])
+
+ update = models.ScenarioUpdateRequest(field='version',
+ op='delete',
+ locate={'installer': 'apex',
+ 'version': 'master'})
+ return update, scenario
+
+ @_execute
+ def test_changeOwner(self, scenario):
+ scenario['installers'][0]['versions'][0]['owner'] = 'lucy'
+
+ update = models.ScenarioUpdateRequest(field='owner',
+ op='update',
+ locate={'installer': 'apex',
+ 'version': 'master'},
+ term={'owner': 'lucy'})
+ return update, scenario
+
+ @_execute
+ def test_addProject(self, scenario):
+ add = models.ScenarioProject(project='qtip').format()
+ scenario['installers'][0]['versions'][0]['projects'].append(add)
+ update = models.ScenarioUpdateRequest(field='project',
+ op='add',
+ locate={'installer': 'apex',
+ 'version': 'master'},
+ term=add)
+ return update, scenario
+
+ @_execute
+ def test_deleteProject(self, scenario):
+ scenario['installers'][0]['versions'][0]['projects'] = filter(
+ lambda f: f['project'] != 'functest',
+ scenario['installers'][0]['versions'][0]['projects'])
+
+ update = models.ScenarioUpdateRequest(field='project',
+ op='delete',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'})
+ return update, scenario
+
+ @_execute
+ def test_addCustoms(self, scenario):
+ add = ['odl', 'parser', 'vping_ssh']
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh']
+ update = models.ScenarioUpdateRequest(field='customs',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add)
+ return update, scenario
+
+ @_execute
+ def test_deleteCustoms(self, scenario):
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = ['healthcheck']
+ update = models.ScenarioUpdateRequest(field='customs',
+ op='delete',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=['vping_ssh'])
+ return update, scenario
+
+ @_execute
+ def test_addScore(self, scenario):
+ add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['scores'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='score',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_addTi(self, scenario):
+ add = models.ScenarioTI(date=str(datetime.now()), status='gold')
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['trust_indicators'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='trust_indicator',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add.format())
+ return update, scenario
+
+ def _update_and_assert(self, update_req, new_scenario, name=None):
+ code, _ = self.update(update_req, self.scenario)
+ self.assertEqual(code, httplib.OK)
+ self._get_and_assert(_none_default(name, self.scenario),
+ new_scenario)
+
+ def _success(self, status, new_scenario):
+ self.assertEqual(status, httplib.OK)
+ self._get_and_assert(new_scenario.get('name'), new_scenario)
+
+ def _forbidden(self, status, new_scenario):
+ self.assertEqual(status, httplib.FORBIDDEN)
+
+ def _bad_request(self, status, new_scenario):
+ self.assertEqual(status, httplib.BAD_REQUEST)
+
+
+class TestScenarioDelete(TestScenarioBase):
+ def test_notFound(self):
+ code, body = self.delete('notFound')
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+ def test_success(self):
+ scenario = self.create_return_name(self.req_d)
+ code, _ = self.delete(scenario)
+ self.assertEqual(code, httplib.OK)
+ code, _ = self.get(scenario)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+def _none_default(check, default):
+ return check if check else default
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_testcase.py b/cvp/opnfv_testapi/tests/unit/resources/test_testcase.py
new file mode 100644
index 00000000..4f2bc2ad
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/test_testcase.py
@@ -0,0 +1,201 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.resources import testcase_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestCaseBase(base.TestBase):
+ def setUp(self):
+ super(TestCaseBase, self).setUp()
+ self.req_d = testcase_models.TestcaseCreateRequest('vping_1',
+ '/cases/vping_1',
+ 'vping-ssh test')
+ self.req_e = testcase_models.TestcaseCreateRequest('doctor_1',
+ '/cases/doctor_1',
+ 'create doctor')
+ self.update_d = testcase_models.TestcaseUpdateRequest('vping_1',
+ 'vping-ssh test',
+ 'functest')
+ self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1',
+ 'create doctor',
+ 'functest')
+ self.get_res = testcase_models.Testcase
+ self.list_res = testcase_models.Testcases
+ self.update_res = testcase_models.Testcase
+ self.basePath = '/api/v1/projects/%s/cases'
+ self.create_project()
+
+ def assert_body(self, case, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(case.name, req.name)
+ self.assertEqual(case.description, req.description)
+ self.assertEqual(case.url, req.url)
+ self.assertIsNotNone(case._id)
+ self.assertIsNotNone(case.creation_date)
+
+ def assert_update_body(self, old, new, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(new.name, req.name)
+ self.assertEqual(new.description, req.description)
+ self.assertEqual(new.url, old.url)
+ self.assertIsNotNone(new._id)
+ self.assertIsNotNone(new.creation_date)
+
+ def create_project(self):
+ req_p = project_models.ProjectCreateRequest('functest',
+ 'vping-ssh test')
+ self.create_help('/api/v1/projects', req_p)
+ self.project = req_p.name
+
+ def create_d(self):
+ return super(TestCaseBase, self).create_d(self.project)
+
+ def create_e(self):
+ return super(TestCaseBase, self).create_e(self.project)
+
+ def get(self, case=None):
+ return super(TestCaseBase, self).get(self.project, case)
+
+ def create(self, req=None, *args):
+ return super(TestCaseBase, self).create(req, self.project)
+
+ def update(self, new=None, case=None):
+ return super(TestCaseBase, self).update(new, self.project, case)
+
+ def delete(self, case):
+ return super(TestCaseBase, self).delete(self.project, case)
+
+
+class TestCaseCreate(TestCaseBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_noBody(self):
+ return None
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noProject(self):
+ self.project = 'noProject'
+ return self.req_d
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ req_empty = testcase_models.TestcaseCreateRequest('')
+ return req_empty
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ req_none = testcase_models.TestcaseCreateRequest(None)
+ return req_none
+
+ @executor.create(httplib.OK, '_assert_success')
+ def test_success(self):
+ return self.req_d
+
+ def _assert_success(self, body):
+ self.assert_create_body(body, self.req_d, self.project)
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestCaseGet(TestCaseBase):
+ def setUp(self):
+ super(TestCaseGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_list')
+ def test_list(self):
+ return None
+
+ def _list(self, body):
+ for case in body.testcases:
+ if self.req_d.name == case.name:
+ self.assert_body(case)
+ else:
+ self.assert_body(case, self.req_e)
+
+
+class TestCaseUpdate(TestCaseBase):
+ def setUp(self):
+ super(TestCaseUpdate, self).setUp()
+ self.create_d()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
+ def test_noBody(self):
+ return None, 'noBody'
+
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return self.update_e, 'notFound'
+
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
+ def test_newNameExist(self):
+ self.create_e()
+ return self.update_e, self.req_d.name
+
+ @executor.update(httplib.FORBIDDEN, message.no_update())
+ def test_noUpdate(self):
+ return self.update_d, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_success(self):
+ return self.update_e, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_with_dollar(self):
+ update = copy.deepcopy(self.update_d)
+ update.description = {'2. change': 'dollar change'}
+ return update, self.req_d.name
+
+ def _update_success(self, request, body):
+ self.assert_update_body(self.req_d, body, request)
+ _, new_body = self.get(request.name)
+ self.assert_update_body(self.req_d, new_body, request)
+
+
+class TestCaseDelete(TestCaseBase):
+ def setUp(self):
+ super(TestCaseDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return 'notFound'
+
+ @executor.delete(httplib.OK, '_delete_success')
+ def test_success(self):
+ return self.req_d.name
+
+ def _delete_success(self, body):
+ self.assertEqual(body, '')
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_token.py b/cvp/opnfv_testapi/tests/unit/resources/test_token.py
new file mode 100644
index 00000000..940e256c
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/test_token.py
@@ -0,0 +1,114 @@
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import httplib
+import unittest
+
+from tornado import web
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import fake_pymongo
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestToken(base.TestBase):
+ def get_app(self):
+ from opnfv_testapi.router import url_mappings
+ return web.Application(
+ url_mappings.mappings,
+ db=fake_pymongo,
+ debug=True,
+ auth=True
+ )
+
+
+class TestTokenCreateProject(TestToken):
+ def setUp(self):
+ super(TestTokenCreateProject, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping')
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+ self.basePath = '/api/v1/projects'
+
+ @executor.create(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectCreateTokenInvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ return self.req_d
+
+ @executor.create(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_projectCreateTokenUnauthorized(self):
+ if 'X-Auth-Token' in self.headers:
+ self.headers.pop('X-Auth-Token')
+ return self.req_d
+
+ @executor.create(httplib.OK, '_create_success')
+ def test_projectCreateTokenSuccess(self):
+ self.headers['X-Auth-Token'] = '12345'
+ return self.req_d
+
+ def _create_success(self, body):
+ self.assertIn('CreateResponse', str(type(body)))
+
+
+class TestTokenDeleteProject(TestToken):
+ def setUp(self):
+ super(TestTokenDeleteProject, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping')
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+ self.basePath = '/api/v1/projects'
+ self.headers['X-Auth-Token'] = '12345'
+ self.create_d()
+
+ @executor.delete(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectDeleteTokenIvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ return self.req_d.name
+
+ @executor.delete(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_projectDeleteTokenUnauthorized(self):
+ self.headers.pop('X-Auth-Token')
+ return self.req_d.name
+
+ @executor.delete(httplib.OK, '_delete_success')
+ def test_projectDeleteTokenSuccess(self):
+ return self.req_d.name
+
+ def _delete_success(self, body):
+ self.assertEqual('', body)
+
+
+class TestTokenUpdateProject(TestToken):
+ def setUp(self):
+ super(TestTokenUpdateProject, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping')
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+ self.basePath = '/api/v1/projects'
+ self.headers['X-Auth-Token'] = '12345'
+ self.create_d()
+
+ @executor.update(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectUpdateTokenIvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ @executor.update(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_projectUpdateTokenUnauthorized(self):
+ self.headers.pop('X-Auth-Token')
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_projectUpdateTokenSuccess(self):
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ def _update_success(self, request, body):
+ self.assertIn(request.name, body)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_version.py b/cvp/opnfv_testapi/tests/unit/resources/test_version.py
new file mode 100644
index 00000000..51fed11e
--- /dev/null
+++ b/cvp/opnfv_testapi/tests/unit/resources/test_version.py
@@ -0,0 +1,36 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import httplib
+import unittest
+
+from opnfv_testapi.resources import models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestVersionBase(base.TestBase):
+ def setUp(self):
+ super(TestVersionBase, self).setUp()
+ self.list_res = models.Versions
+ self.basePath = '/versions'
+
+
+class TestVersion(TestVersionBase):
+ @executor.get(httplib.OK, '_get_success')
+ def test_success(self):
+ return None
+
+ def _get_success(self, body):
+ self.assertEqual(len(body.versions), 1)
+ self.assertEqual(body.versions[0].version, 'v1.0')
+ self.assertEqual(body.versions[0].description, 'basics')
+
+
+if __name__ == '__main__':
+ unittest.main()