summaryrefslogtreecommitdiffstats
path: root/utils/test/testapi/opnfv_testapi/tests/unit/resources
diff options
context:
space:
mode:
authorMorgan Richomme <morgan.richomme@orange.com>2017-07-12 09:36:50 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-07-12 09:36:50 +0000
commit1bfef523c2abd5024e2ec95a2fec3262178d1949 (patch)
treec7597c81a53cb05c96e2fad509a6641a2ae514a0 /utils/test/testapi/opnfv_testapi/tests/unit/resources
parent3bd556d74c6e75da6c54781d410303dbb75fb9a4 (diff)
parentcc8766086b4580ed37e7b6fc246e055e683afadc (diff)
Merge "move resources unit tests to tests/unit/resources"
Diffstat (limited to 'utils/test/testapi/opnfv_testapi/tests/unit/resources')
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/__init__.py0
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json38
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json73
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/test_base.py164
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py123
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/test_pod.py89
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/test_project.py136
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/test_result.py352
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py360
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py201
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/test_token.py113
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/resources/test_version.py36
12 files changed, 1685 insertions, 0 deletions
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/__init__.py
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json b/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json
new file mode 100644
index 000000000..187802215
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json
@@ -0,0 +1,38 @@
+{
+ "name": "nosdn-nofeature-ha",
+ "installers":
+ [
+ {
+ "installer": "apex",
+ "versions":
+ [
+ {
+ "owner": "Luke",
+ "version": "master",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores":
+ [
+ {
+ "date": "2017-01-08 22:46:44",
+ "score": "12/14"
+ }
+
+ ],
+ "trust_indicators": []
+ },
+ {
+ "project": "yardstick",
+ "customs": [],
+ "scores": [],
+ "trust_indicators": []
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json b/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json
new file mode 100644
index 000000000..b6a3b83ab
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json
@@ -0,0 +1,73 @@
+{
+ "name": "odl_2-nofeature-ha",
+ "installers":
+ [
+ {
+ "installer": "fuel",
+ "versions":
+ [
+ {
+ "owner": "Lucky",
+ "version": "colorado",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores": [],
+ "trust_indicators": [
+ {
+ "date": "2017-01-18 22:46:44",
+ "status": "silver"
+ }
+
+ ]
+ },
+ {
+ "project": "yardstick",
+ "customs": ["suite-a"],
+ "scores": [
+ {
+ "date": "2017-01-08 22:46:44",
+ "score": "0"
+ }
+ ],
+ "trust_indicators": [
+ {
+ "date": "2017-01-18 22:46:44",
+ "status": "gold"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "owner": "Luke",
+ "version": "colorado",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores":
+ [
+ {
+ "date": "2017-01-09 22:46:44",
+ "score": "11/14"
+ }
+
+ ],
+ "trust_indicators": []
+ },
+ {
+ "project": "yardstick",
+ "customs": [],
+ "scores": [],
+ "trust_indicators": []
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_base.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_base.py
new file mode 100644
index 000000000..6e4d454de
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_base.py
@@ -0,0 +1,164 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import json
+from os import path
+
+import mock
+from tornado import testing
+
+from opnfv_testapi.common import config
+from opnfv_testapi.resources import models
+from opnfv_testapi.tests.unit import fake_pymongo
+
+config.Config.CONFIG = path.join(path.dirname(__file__),
+ '../../../../etc/config.ini')
+
+
+class TestBase(testing.AsyncHTTPTestCase):
+ headers = {'Content-Type': 'application/json; charset=UTF-8'}
+
+ def setUp(self):
+ self._patch_server()
+ self.basePath = ''
+ self.create_res = models.CreateResponse
+ self.get_res = None
+ self.list_res = None
+ self.update_res = None
+ self.req_d = None
+ self.req_e = None
+ self.addCleanup(self._clear)
+ super(TestBase, self).setUp()
+
+ def tearDown(self):
+ self.db_patcher.stop()
+
+ def _patch_server(self):
+ from opnfv_testapi.cmd import server
+ server.parse_config([
+ '--config-file',
+ path.join(path.dirname(__file__), path.pardir, 'common/normal.ini')
+ ])
+ self.db_patcher = mock.patch('opnfv_testapi.cmd.server.get_db',
+ self._fake_pymongo)
+ self.db_patcher.start()
+
+ @staticmethod
+ def _fake_pymongo():
+ return fake_pymongo
+
+ def get_app(self):
+ from opnfv_testapi.cmd import server
+ return server.make_app()
+
+ def create_d(self, *args):
+ return self.create(self.req_d, *args)
+
+ def create_e(self, *args):
+ return self.create(self.req_e, *args)
+
+ def create(self, req=None, *args):
+ return self.create_help(self.basePath, req, *args)
+
+ def create_help(self, uri, req, *args):
+ if req and not isinstance(req, str) and hasattr(req, 'format'):
+ req = req.format()
+ res = self.fetch(self._update_uri(uri, *args),
+ method='POST',
+ body=json.dumps(req),
+ headers=self.headers)
+
+ return self._get_return(res, self.create_res)
+
+ def get(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='GET',
+ headers=self.headers)
+
+ def inner():
+ new_args, num = self._get_valid_args(*args)
+ return self.get_res \
+ if num != self._need_arg_num(self.basePath) else self.list_res
+ return self._get_return(res, inner())
+
+ def query(self, query):
+ res = self.fetch(self._get_query_uri(query),
+ method='GET',
+ headers=self.headers)
+ return self._get_return(res, self.list_res)
+
+ def update(self, new=None, *args):
+ if new:
+ new = new.format()
+ res = self.fetch(self._get_uri(*args),
+ method='PUT',
+ body=json.dumps(new),
+ headers=self.headers)
+ return self._get_return(res, self.update_res)
+
+ def delete(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='DELETE',
+ headers=self.headers)
+ return res.code, res.body
+
+ @staticmethod
+ def _get_valid_args(*args):
+ new_args = tuple(['%s' % arg for arg in args if arg is not None])
+ return new_args, len(new_args)
+
+ def _need_arg_num(self, uri):
+ return uri.count('%s')
+
+ def _get_query_uri(self, query):
+ return self.basePath + '?' + query if query else self.basePath
+
+ def _get_uri(self, *args):
+ return self._update_uri(self.basePath, *args)
+
+ def _update_uri(self, uri, *args):
+ r_uri = uri
+ new_args, num = self._get_valid_args(*args)
+ if num != self._need_arg_num(uri):
+ r_uri += '/%s'
+
+ return r_uri % tuple(['%s' % arg for arg in new_args])
+
+ def _get_return(self, res, cls):
+ code = res.code
+ body = res.body
+ return code, self._get_return_body(code, body, cls)
+
+ @staticmethod
+ def _get_return_body(code, body, cls):
+ return cls.from_dict(json.loads(body)) if code < 300 and cls else body
+
+ def assert_href(self, body):
+ self.assertIn(self.basePath, body.href)
+
+ def assert_create_body(self, body, req=None, *args):
+ import inspect
+ if not req:
+ req = self.req_d
+ resource_name = ''
+ if inspect.isclass(req):
+ resource_name = req.name
+ elif isinstance(req, dict):
+ resource_name = req['name']
+ elif isinstance(req, str):
+ resource_name = json.loads(req)['name']
+ new_args = args + tuple([resource_name])
+ self.assertIn(self._get_uri(*new_args), body.href)
+
+ @staticmethod
+ def _clear():
+ fake_pymongo.pods.clear()
+ fake_pymongo.projects.clear()
+ fake_pymongo.testcases.clear()
+ fake_pymongo.results.clear()
+ fake_pymongo.scenarios.clear()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py
new file mode 100644
index 000000000..1ebc96f3b
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py
@@ -0,0 +1,123 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from tornado import gen
+from tornado import testing
+from tornado import web
+
+from opnfv_testapi.tests.unit import fake_pymongo
+
+
+class MyTest(testing.AsyncHTTPTestCase):
+ def setUp(self):
+ super(MyTest, self).setUp()
+ self.db = fake_pymongo
+ self.addCleanup(self._clear)
+ self.io_loop.run_sync(self.fixture_setup)
+
+ def get_app(self):
+ return web.Application()
+
+ @gen.coroutine
+ def fixture_setup(self):
+ self.test1 = {'_id': '1', 'name': 'test1'}
+ self.test2 = {'name': 'test2'}
+ yield self.db.pods.insert({'_id': '1', 'name': 'test1'})
+ yield self.db.pods.insert({'name': 'test2'})
+
+ @testing.gen_test
+ def test_find_one(self):
+ user = yield self.db.pods.find_one({'name': 'test1'})
+ self.assertEqual(user, self.test1)
+ self.db.pods.remove()
+
+ @testing.gen_test
+ def test_find(self):
+ cursor = self.db.pods.find()
+ names = []
+ while (yield cursor.fetch_next):
+ ob = cursor.next_object()
+ names.append(ob.get('name'))
+ self.assertItemsEqual(names, ['test1', 'test2'])
+
+ @testing.gen_test
+ def test_update(self):
+ yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertEqual(user.get('name', None), 'new_test1')
+
+ def test_update_dot_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ 'key 1. name must not contain .')
+
+ def test_update_dot_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ None,
+ check_keys=False)
+
+ def test_update_dollar_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ 'key $name must not start with $')
+
+ def test_update_dollar_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ None,
+ check_keys=False)
+
+ @testing.gen_test
+ def test_remove(self):
+ yield self.db.pods.remove({'_id': '1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertIsNone(user)
+
+ def test_insert_dot_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ 'key 2. name must not contain .')
+
+ def test_insert_dot_no_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ None,
+ check_keys=False)
+
+ def test_insert_dollar_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ 'key $name must not start with $')
+
+ def test_insert_dollar_no_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ None,
+ check_keys=False)
+
+ def _clear(self):
+ self.db.pods.clear()
+
+ def _update_assert(self, docs, error=None, **kwargs):
+ self._db_assert('update', error, {'_id': '1'}, docs, **kwargs)
+
+ def _insert_assert(self, docs, error=None, **kwargs):
+ self._db_assert('insert', error, docs, **kwargs)
+
+ @testing.gen_test
+ def _db_assert(self, method, error, *args, **kwargs):
+ name_error = None
+ try:
+ yield self._eval_pods_db(method, *args, **kwargs)
+ except NameError as err:
+ name_error = err.args[0]
+ finally:
+ self.assertEqual(name_error, error)
+
+ def _eval_pods_db(self, method, *args, **kwargs):
+ table_obj = vars(self.db)['pods']
+ return table_obj.__getattribute__(method)(*args, **kwargs)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_pod.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_pod.py
new file mode 100644
index 000000000..8e0ae407d
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_pod.py
@@ -0,0 +1,89 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import pod_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestPodBase(base.TestBase):
+ def setUp(self):
+ super(TestPodBase, self).setUp()
+ self.req_d = pod_models.PodCreateRequest('zte-1', 'virtual',
+ 'zte pod 1', 'ci-pod')
+ self.req_e = pod_models.PodCreateRequest('zte-2', 'metal', 'zte pod 2')
+ self.get_res = pod_models.Pod
+ self.list_res = pod_models.Pods
+ self.basePath = '/api/v1/pods'
+
+ def assert_get_body(self, pod, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(pod.name, req.name)
+ self.assertEqual(pod.mode, req.mode)
+ self.assertEqual(pod.details, req.details)
+ self.assertEqual(pod.role, req.role)
+ self.assertIsNotNone(pod.creation_date)
+ self.assertIsNotNone(pod._id)
+
+
+class TestPodCreate(TestPodBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ return pod_models.PodCreateRequest('')
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ return pod_models.PodCreateRequest(None)
+
+ @executor.create(httplib.OK, 'assert_create_body')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestPodGet(TestPodBase):
+ def setUp(self):
+ super(TestPodGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_get_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_assert_list')
+ def test_list(self):
+ return None
+
+ def _assert_list(self, body):
+ self.assertEqual(len(body.pods), 2)
+ for pod in body.pods:
+ if self.req_d.name == pod.name:
+ self.assert_get_body(pod)
+ else:
+ self.assert_get_body(pod, self.req_e)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_project.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_project.py
new file mode 100644
index 000000000..5a2ce7522
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_project.py
@@ -0,0 +1,136 @@
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestProjectBase(base.TestBase):
+ def setUp(self):
+ super(TestProjectBase, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping',
+ 'vping-ssh test')
+ self.req_e = project_models.ProjectCreateRequest('doctor',
+ 'doctor test')
+ self.get_res = project_models.Project
+ self.list_res = project_models.Projects
+ self.update_res = project_models.Project
+ self.basePath = '/api/v1/projects'
+
+ def assert_body(self, project, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(project.name, req.name)
+ self.assertEqual(project.description, req.description)
+ self.assertIsNotNone(project._id)
+ self.assertIsNotNone(project.creation_date)
+
+
+class TestProjectCreate(TestProjectBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ return project_models.ProjectCreateRequest('')
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ return project_models.ProjectCreateRequest(None)
+
+ @executor.create(httplib.OK, 'assert_create_body')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestProjectGet(TestProjectBase):
+ def setUp(self):
+ super(TestProjectGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_assert_list')
+ def test_list(self):
+ return None
+
+ def _assert_list(self, body):
+ for project in body.projects:
+ if self.req_d.name == project.name:
+ self.assert_body(project)
+ else:
+ self.assert_body(project, self.req_e)
+
+
+class TestProjectUpdate(TestProjectBase):
+ def setUp(self):
+ super(TestProjectUpdate, self).setUp()
+ _, d_body = self.create_d()
+ _, get_res = self.get(self.req_d.name)
+ self.index_d = get_res._id
+ self.create_e()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None, 'noBody'
+
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return self.req_e, 'notFound'
+
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
+ def test_newNameExist(self):
+ return self.req_e, self.req_d.name
+
+ @executor.update(httplib.FORBIDDEN, message.no_update())
+ def test_noUpdate(self):
+ return self.req_d, self.req_d.name
+
+ @executor.update(httplib.OK, '_assert_update')
+ def test_success(self):
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ def _assert_update(self, req, body):
+ self.assertEqual(self.index_d, body._id)
+ self.assert_body(body, req)
+ _, new_body = self.get(req.name)
+ self.assertEqual(self.index_d, new_body._id)
+ self.assert_body(new_body, req)
+
+
+class TestProjectDelete(TestProjectBase):
+ def setUp(self):
+ super(TestProjectDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return 'notFound'
+
+ @executor.delete(httplib.OK, '_assert_delete')
+ def test_success(self):
+ return self.req_d.name
+
+ def _assert_delete(self, body):
+ self.assertEqual(body, '')
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_result.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_result.py
new file mode 100644
index 000000000..c8463cb02
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_result.py
@@ -0,0 +1,352 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+import httplib
+import unittest
+from datetime import datetime, timedelta
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import pod_models
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.resources import result_models
+from opnfv_testapi.resources import testcase_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class Details(object):
+ def __init__(self, timestart=None, duration=None, status=None):
+ self.timestart = timestart
+ self.duration = duration
+ self.status = status
+ self.items = [{'item1': 1}, {'item2': 2}]
+
+ def format(self):
+ return {
+ "timestart": self.timestart,
+ "duration": self.duration,
+ "status": self.status,
+ 'items': [{'item1': 1}, {'item2': 2}]
+ }
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = Details()
+ t.timestart = a_dict.get('timestart')
+ t.duration = a_dict.get('duration')
+ t.status = a_dict.get('status')
+ t.items = a_dict.get('items')
+ return t
+
+
+class TestResultBase(base.TestBase):
+ def setUp(self):
+ self.pod = 'zte-pod1'
+ self.project = 'functest'
+ self.case = 'vPing'
+ self.installer = 'fuel'
+ self.version = 'C'
+ self.build_tag = 'v3.0'
+ self.scenario = 'odl-l2'
+ self.criteria = 'passed'
+ self.trust_indicator = result_models.TI(0.7)
+ self.start_date = "2016-05-23 07:16:09.477097"
+ self.stop_date = "2016-05-23 07:16:19.477097"
+ self.update_date = "2016-05-24 07:16:19.477097"
+ self.update_step = -0.05
+ super(TestResultBase, self).setUp()
+ self.details = Details(timestart='0', duration='9s', status='OK')
+ self.req_d = result_models.ResultCreateRequest(
+ pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria,
+ trust_indicator=self.trust_indicator)
+ self.get_res = result_models.TestResult
+ self.list_res = result_models.TestResults
+ self.update_res = result_models.TestResult
+ self.basePath = '/api/v1/results'
+ self.req_pod = pod_models.PodCreateRequest(
+ self.pod,
+ 'metal',
+ 'zte pod 1')
+ self.req_project = project_models.ProjectCreateRequest(
+ self.project,
+ 'vping test')
+ self.req_testcase = testcase_models.TestcaseCreateRequest(
+ self.case,
+ '/cases/vping',
+ 'vping-ssh test')
+ self.create_help('/api/v1/pods', self.req_pod)
+ self.create_help('/api/v1/projects', self.req_project)
+ self.create_help('/api/v1/projects/%s/cases',
+ self.req_testcase,
+ self.project)
+
+ def assert_res(self, result, req=None):
+ if req is None:
+ req = self.req_d
+ self.assertEqual(result.pod_name, req.pod_name)
+ self.assertEqual(result.project_name, req.project_name)
+ self.assertEqual(result.case_name, req.case_name)
+ self.assertEqual(result.installer, req.installer)
+ self.assertEqual(result.version, req.version)
+ details_req = Details.from_dict(req.details)
+ details_res = Details.from_dict(result.details)
+ self.assertEqual(details_res.duration, details_req.duration)
+ self.assertEqual(details_res.timestart, details_req.timestart)
+ self.assertEqual(details_res.status, details_req.status)
+ self.assertEqual(details_res.items, details_req.items)
+ self.assertEqual(result.build_tag, req.build_tag)
+ self.assertEqual(result.scenario, req.scenario)
+ self.assertEqual(result.criteria, req.criteria)
+ self.assertEqual(result.start_date, req.start_date)
+ self.assertEqual(result.stop_date, req.stop_date)
+ self.assertIsNotNone(result._id)
+ ti = result.trust_indicator
+ self.assertEqual(ti.current, req.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+ def _create_d(self):
+ _, res = self.create_d()
+ return res.href.split('/')[-1]
+
+
+class TestResultCreate(TestResultBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_nobody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('pod_name'))
+ def test_podNotProvided(self):
+ req = self.req_d
+ req.pod_name = None
+ return req
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('project_name'))
+ def test_projectNotProvided(self):
+ req = self.req_d
+ req.project_name = None
+ return req
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('case_name'))
+ def test_testcaseNotProvided(self):
+ req = self.req_d
+ req.case_name = None
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noPod(self):
+ req = self.req_d
+ req.pod_name = 'notExistPod'
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noProject(self):
+ req = self.req_d
+ req.project_name = 'notExistProject'
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noTestcase(self):
+ req = self.req_d
+ req.case_name = 'notExistTestcase'
+ return req
+
+ @executor.create(httplib.OK, 'assert_href')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.OK, 'assert_href')
+ def test_key_with_doc(self):
+ req = copy.deepcopy(self.req_d)
+ req.details = {'1.name': 'dot_name'}
+ return req
+
+ @executor.create(httplib.OK, '_assert_no_ti')
+ def test_no_ti(self):
+ req = result_models.ResultCreateRequest(pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria)
+ self.actual_req = req
+ return req
+
+ def _assert_no_ti(self, body):
+ _id = body.href.split('/')[-1]
+ code, body = self.get(_id)
+ self.assert_res(body, self.actual_req)
+
+
+class TestResultGet(TestResultBase):
+ def setUp(self):
+ super(TestResultGet, self).setUp()
+ self.req_d_id = self._create_d()
+ self.req_10d_later = self._create_changed_date(days=10)
+ self.req_10d_before = self._create_changed_date(days=-10)
+
+ @executor.get(httplib.OK, 'assert_res')
+ def test_getOne(self):
+ return self.req_d_id
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryPod(self):
+ return self._set_query('pod')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryProject(self):
+ return self._set_query('project')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryTestcase(self):
+ return self._set_query('case')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryVersion(self):
+ return self._set_query('version')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryInstaller(self):
+ return self._set_query('installer')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryBuildTag(self):
+ return self._set_query('build_tag')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryScenario(self):
+ return self._set_query('scenario')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryTrustIndicator(self):
+ return self._set_query('trust_indicator')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryCriteria(self):
+ return self._set_query('criteria')
+
+ @executor.query(httplib.BAD_REQUEST, message.must_int('period'))
+ def test_queryPeriodNotInt(self):
+ return self._set_query('period=a')
+
+ @executor.query(httplib.OK, '_query_last_one', 1)
+ def test_queryPeriodSuccess(self):
+ return self._set_query('period=1')
+
+ @executor.query(httplib.BAD_REQUEST, message.must_int('last'))
+ def test_queryLastNotInt(self):
+ return self._set_query('last=a')
+
+ @executor.query(httplib.OK, '_query_last_one', 1)
+ def test_queryLast(self):
+ return self._set_query('last=1')
+
+ @executor.query(httplib.OK, '_query_last_one', 1)
+ def test_combination(self):
+ return self._set_query('pod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1')
+
+ @executor.query(httplib.OK, '_query_success', 0)
+ def test_notFound(self):
+ return self._set_query('pod=notExistPod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1')
+
+ def _query_success(self, body, number):
+ self.assertEqual(number, len(body.results))
+
+ def _query_last_one(self, body, number):
+ self.assertEqual(number, len(body.results))
+ self.assert_res(body.results[0], self.req_10d_later)
+
+ def _create_changed_date(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.start_date = datetime.now() + timedelta(**kwargs)
+ req.stop_date = str(req.start_date + timedelta(minutes=10))
+ req.start_date = str(req.start_date)
+ self.create(req)
+ return req
+
+ def _set_query(self, *args):
+ def get_value(arg):
+ return self.__getattribute__(arg) \
+ if arg != 'trust_indicator' else self.trust_indicator.current
+ uri = ''
+ for arg in args:
+ if '=' in arg:
+ uri += arg + '&'
+ else:
+ uri += '{}={}&'.format(arg, get_value(arg))
+ return uri[0: -1]
+
+
+class TestResultUpdate(TestResultBase):
+ def setUp(self):
+ super(TestResultUpdate, self).setUp()
+ self.req_d_id = self._create_d()
+
+ @executor.update(httplib.OK, '_assert_update_ti')
+ def test_success(self):
+ new_ti = copy.deepcopy(self.trust_indicator)
+ new_ti.current += self.update_step
+ new_ti.histories.append(
+ result_models.TIHistory(self.update_date, self.update_step))
+ new_data = copy.deepcopy(self.req_d)
+ new_data.trust_indicator = new_ti
+ update = result_models.ResultUpdateRequest(trust_indicator=new_ti)
+ self.update_req = new_data
+ return update, self.req_d_id
+
+ def _assert_update_ti(self, request, body):
+ ti = body.trust_indicator
+ self.assertEqual(ti.current, request.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py
new file mode 100644
index 000000000..bd720671b
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py
@@ -0,0 +1,360 @@
+import functools
+import httplib
+import json
+import os
+from copy import deepcopy
+from datetime import datetime
+
+import opnfv_testapi.resources.scenario_models as models
+from opnfv_testapi.common import message
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestScenarioBase(base.TestBase):
+ def setUp(self):
+ super(TestScenarioBase, self).setUp()
+ self.get_res = models.Scenario
+ self.list_res = models.Scenarios
+ self.basePath = '/api/v1/scenarios'
+ self.req_d = self._load_request('scenario-c1.json')
+ self.req_2 = self._load_request('scenario-c2.json')
+
+ def tearDown(self):
+ pass
+
+ def assert_body(self, project, req=None):
+ pass
+
+ @staticmethod
+ def _load_request(f_req):
+ abs_file = os.path.join(os.path.dirname(__file__), f_req)
+ with open(abs_file, 'r') as f:
+ loader = json.load(f)
+ f.close()
+ return loader
+
+ def create_return_name(self, req):
+ _, res = self.create(req)
+ return res.href.split('/')[-1]
+
+ def assert_res(self, code, scenario, req=None):
+ self.assertEqual(code, httplib.OK)
+ if req is None:
+ req = self.req_d
+ self.assertIsNotNone(scenario._id)
+ self.assertIsNotNone(scenario.creation_date)
+
+ scenario == models.Scenario.from_dict(req)
+
+ @staticmethod
+ def _set_query(*args):
+ uri = ''
+ for arg in args:
+ uri += arg + '&'
+ return uri[0: -1]
+
+ def _get_and_assert(self, name, req=None):
+ code, body = self.get(name)
+ self.assert_res(code, body, req)
+
+
+class TestScenarioCreate(TestScenarioBase):
+ def test_withoutBody(self):
+ (code, body) = self.create()
+ self.assertEqual(code, httplib.BAD_REQUEST)
+
+ def test_emptyName(self):
+ req_empty = models.ScenarioCreateRequest('')
+ (code, body) = self.create(req_empty)
+ self.assertEqual(code, httplib.BAD_REQUEST)
+ self.assertIn(message.missing('name'), body)
+
+ def test_noneName(self):
+ req_none = models.ScenarioCreateRequest(None)
+ (code, body) = self.create(req_none)
+ self.assertEqual(code, httplib.BAD_REQUEST)
+ self.assertIn(message.missing('name'), body)
+
+ def test_success(self):
+ (code, body) = self.create_d()
+ self.assertEqual(code, httplib.OK)
+ self.assert_create_body(body)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ (code, body) = self.create_d()
+ self.assertEqual(code, httplib.FORBIDDEN)
+ self.assertIn(message.exist_base, body)
+
+
+class TestScenarioGet(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioGet, self).setUp()
+ self.scenario_1 = self.create_return_name(self.req_d)
+ self.scenario_2 = self.create_return_name(self.req_2)
+
+ def test_getByName(self):
+ self._get_and_assert(self.scenario_1, self.req_d)
+
+ def test_getAll(self):
+ self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
+
+ def test_queryName(self):
+ query = self._set_query('name=nosdn-nofeature-ha')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryInstaller(self):
+ query = self._set_query('installer=apex')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryVersion(self):
+ query = self._set_query('version=master')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryProject(self):
+ query = self._set_query('project=functest')
+ self._query_and_assert(query, reqs=[self.req_d, self.req_2])
+
+ def test_queryCombination(self):
+ query = self._set_query('name=nosdn-nofeature-ha',
+ 'installer=apex',
+ 'version=master',
+ 'project=functest')
+
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def _query_and_assert(self, query, found=True, reqs=None):
+ code, body = self.query(query)
+ if not found:
+ self.assertEqual(code, httplib.OK)
+ self.assertEqual(0, len(body.scenarios))
+ else:
+ self.assertEqual(len(reqs), len(body.scenarios))
+ for req in reqs:
+ for scenario in body.scenarios:
+ if req['name'] == scenario.name:
+ self.assert_res(code, scenario, req)
+
+
+class TestScenarioUpdate(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioUpdate, self).setUp()
+ self.scenario = self.create_return_name(self.req_d)
+ self.scenario_2 = self.create_return_name(self.req_2)
+
+ def _execute(set_update):
+ @functools.wraps(set_update)
+ def magic(self):
+ update, scenario = set_update(self, deepcopy(self.req_d))
+ self._update_and_assert(update, scenario)
+ return magic
+
+ def _update(expected):
+ def _update(set_update):
+ @functools.wraps(set_update)
+ def wrap(self):
+ update, scenario = set_update(self, deepcopy(self.req_d))
+ code, body = self.update(update, self.scenario)
+ getattr(self, expected)(code, scenario)
+ return wrap
+ return _update
+
+ @_update('_success')
+ def test_renameScenario(self, scenario):
+ new_name = 'nosdn-nofeature-noha'
+ scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={'name': new_name})
+ return update_req, scenario
+
+ @_update('_forbidden')
+ def test_renameScenario_exist(self, scenario):
+ new_name = self.scenario_2
+ scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={'name': new_name})
+ return update_req, scenario
+
+ @_update('_bad_request')
+ def test_renameScenario_noName(self, scenario):
+ new_name = self.scenario_2
+ scenario['name'] = new_name
+ update_req = models.ScenarioUpdateRequest(field='name',
+ op='update',
+ locate={},
+ term={})
+ return update_req, scenario
+
+ @_execute
+ def test_addInstaller(self, scenario):
+ add = models.ScenarioInstaller(installer='daisy', versions=list())
+ scenario['installers'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='installer',
+ op='add',
+ locate={},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_deleteInstaller(self, scenario):
+ scenario['installers'] = filter(lambda f: f['installer'] != 'apex',
+ scenario['installers'])
+
+ update = models.ScenarioUpdateRequest(field='installer',
+ op='delete',
+ locate={'installer': 'apex'})
+ return update, scenario
+
+ @_execute
+ def test_addVersion(self, scenario):
+ add = models.ScenarioVersion(version='danube', projects=list())
+ scenario['installers'][0]['versions'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='version',
+ op='add',
+ locate={'installer': 'apex'},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_deleteVersion(self, scenario):
+ scenario['installers'][0]['versions'] = filter(
+ lambda f: f['version'] != 'master',
+ scenario['installers'][0]['versions'])
+
+ update = models.ScenarioUpdateRequest(field='version',
+ op='delete',
+ locate={'installer': 'apex',
+ 'version': 'master'})
+ return update, scenario
+
+ @_execute
+ def test_changeOwner(self, scenario):
+ scenario['installers'][0]['versions'][0]['owner'] = 'lucy'
+
+ update = models.ScenarioUpdateRequest(field='owner',
+ op='update',
+ locate={'installer': 'apex',
+ 'version': 'master'},
+ term={'owner': 'lucy'})
+ return update, scenario
+
+ @_execute
+ def test_addProject(self, scenario):
+ add = models.ScenarioProject(project='qtip').format()
+ scenario['installers'][0]['versions'][0]['projects'].append(add)
+ update = models.ScenarioUpdateRequest(field='project',
+ op='add',
+ locate={'installer': 'apex',
+ 'version': 'master'},
+ term=add)
+ return update, scenario
+
+ @_execute
+ def test_deleteProject(self, scenario):
+ scenario['installers'][0]['versions'][0]['projects'] = filter(
+ lambda f: f['project'] != 'functest',
+ scenario['installers'][0]['versions'][0]['projects'])
+
+ update = models.ScenarioUpdateRequest(field='project',
+ op='delete',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'})
+ return update, scenario
+
+ @_execute
+ def test_addCustoms(self, scenario):
+ add = ['odl', 'parser', 'vping_ssh']
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh']
+ update = models.ScenarioUpdateRequest(field='customs',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add)
+ return update, scenario
+
+ @_execute
+ def test_deleteCustoms(self, scenario):
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = ['healthcheck']
+ update = models.ScenarioUpdateRequest(field='customs',
+ op='delete',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=['vping_ssh'])
+ return update, scenario
+
+ @_execute
+ def test_addScore(self, scenario):
+ add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['scores'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='score',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add.format())
+ return update, scenario
+
+ @_execute
+ def test_addTi(self, scenario):
+ add = models.ScenarioTI(date=str(datetime.now()), status='gold')
+ projects = scenario['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['trust_indicators'].append(add.format())
+ update = models.ScenarioUpdateRequest(field='trust_indicator',
+ op='add',
+ locate={
+ 'installer': 'apex',
+ 'version': 'master',
+ 'project': 'functest'},
+ term=add.format())
+ return update, scenario
+
+ def _update_and_assert(self, update_req, new_scenario, name=None):
+ code, _ = self.update(update_req, self.scenario)
+ self.assertEqual(code, httplib.OK)
+ self._get_and_assert(_none_default(name, self.scenario),
+ new_scenario)
+
+ def _success(self, status, new_scenario):
+ self.assertEqual(status, httplib.OK)
+ self._get_and_assert(new_scenario.get('name'), new_scenario)
+
+ def _forbidden(self, status, new_scenario):
+ self.assertEqual(status, httplib.FORBIDDEN)
+
+ def _bad_request(self, status, new_scenario):
+ self.assertEqual(status, httplib.BAD_REQUEST)
+
+
+class TestScenarioDelete(TestScenarioBase):
+ def test_notFound(self):
+ code, body = self.delete('notFound')
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+ def test_success(self):
+ scenario = self.create_return_name(self.req_d)
+ code, _ = self.delete(scenario)
+ self.assertEqual(code, httplib.OK)
+ code, _ = self.get(scenario)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+def _none_default(check, default):
+ return check if check else default
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py
new file mode 100644
index 000000000..4f2bc2ad1
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py
@@ -0,0 +1,201 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.resources import testcase_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestCaseBase(base.TestBase):
+ def setUp(self):
+ super(TestCaseBase, self).setUp()
+ self.req_d = testcase_models.TestcaseCreateRequest('vping_1',
+ '/cases/vping_1',
+ 'vping-ssh test')
+ self.req_e = testcase_models.TestcaseCreateRequest('doctor_1',
+ '/cases/doctor_1',
+ 'create doctor')
+ self.update_d = testcase_models.TestcaseUpdateRequest('vping_1',
+ 'vping-ssh test',
+ 'functest')
+ self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1',
+ 'create doctor',
+ 'functest')
+ self.get_res = testcase_models.Testcase
+ self.list_res = testcase_models.Testcases
+ self.update_res = testcase_models.Testcase
+ self.basePath = '/api/v1/projects/%s/cases'
+ self.create_project()
+
+ def assert_body(self, case, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(case.name, req.name)
+ self.assertEqual(case.description, req.description)
+ self.assertEqual(case.url, req.url)
+ self.assertIsNotNone(case._id)
+ self.assertIsNotNone(case.creation_date)
+
+ def assert_update_body(self, old, new, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(new.name, req.name)
+ self.assertEqual(new.description, req.description)
+ self.assertEqual(new.url, old.url)
+ self.assertIsNotNone(new._id)
+ self.assertIsNotNone(new.creation_date)
+
+ def create_project(self):
+ req_p = project_models.ProjectCreateRequest('functest',
+ 'vping-ssh test')
+ self.create_help('/api/v1/projects', req_p)
+ self.project = req_p.name
+
+ def create_d(self):
+ return super(TestCaseBase, self).create_d(self.project)
+
+ def create_e(self):
+ return super(TestCaseBase, self).create_e(self.project)
+
+ def get(self, case=None):
+ return super(TestCaseBase, self).get(self.project, case)
+
+ def create(self, req=None, *args):
+ return super(TestCaseBase, self).create(req, self.project)
+
+ def update(self, new=None, case=None):
+ return super(TestCaseBase, self).update(new, self.project, case)
+
+ def delete(self, case):
+ return super(TestCaseBase, self).delete(self.project, case)
+
+
+class TestCaseCreate(TestCaseBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_noBody(self):
+ return None
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noProject(self):
+ self.project = 'noProject'
+ return self.req_d
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ req_empty = testcase_models.TestcaseCreateRequest('')
+ return req_empty
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ req_none = testcase_models.TestcaseCreateRequest(None)
+ return req_none
+
+ @executor.create(httplib.OK, '_assert_success')
+ def test_success(self):
+ return self.req_d
+
+ def _assert_success(self, body):
+ self.assert_create_body(body, self.req_d, self.project)
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestCaseGet(TestCaseBase):
+ def setUp(self):
+ super(TestCaseGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_list')
+ def test_list(self):
+ return None
+
+ def _list(self, body):
+ for case in body.testcases:
+ if self.req_d.name == case.name:
+ self.assert_body(case)
+ else:
+ self.assert_body(case, self.req_e)
+
+
+class TestCaseUpdate(TestCaseBase):
+ def setUp(self):
+ super(TestCaseUpdate, self).setUp()
+ self.create_d()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
+ def test_noBody(self):
+ return None, 'noBody'
+
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return self.update_e, 'notFound'
+
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
+ def test_newNameExist(self):
+ self.create_e()
+ return self.update_e, self.req_d.name
+
+ @executor.update(httplib.FORBIDDEN, message.no_update())
+ def test_noUpdate(self):
+ return self.update_d, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_success(self):
+ return self.update_e, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_with_dollar(self):
+ update = copy.deepcopy(self.update_d)
+ update.description = {'2. change': 'dollar change'}
+ return update, self.req_d.name
+
+ def _update_success(self, request, body):
+ self.assert_update_body(self.req_d, body, request)
+ _, new_body = self.get(request.name)
+ self.assert_update_body(self.req_d, new_body, request)
+
+
+class TestCaseDelete(TestCaseBase):
+ def setUp(self):
+ super(TestCaseDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return 'notFound'
+
+ @executor.delete(httplib.OK, '_delete_success')
+ def test_success(self):
+ return self.req_d.name
+
+ def _delete_success(self, body):
+ self.assertEqual(body, '')
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_token.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_token.py
new file mode 100644
index 000000000..c9d4b72c4
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_token.py
@@ -0,0 +1,113 @@
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import httplib
+import unittest
+
+from tornado import web
+
+from opnfv_testapi.common import message
+from opnfv_testapi.resources import project_models
+from opnfv_testapi.router import url_mappings
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import fake_pymongo
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestToken(base.TestBase):
+ def get_app(self):
+ return web.Application(
+ url_mappings.mappings,
+ db=fake_pymongo,
+ debug=True,
+ auth=True
+ )
+
+
+class TestTokenCreateProject(TestToken):
+ def setUp(self):
+ super(TestTokenCreateProject, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping')
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+ self.basePath = '/api/v1/projects'
+
+ @executor.create(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectCreateTokenInvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ return self.req_d
+
+ @executor.create(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_projectCreateTokenUnauthorized(self):
+ if 'X-Auth-Token' in self.headers:
+ self.headers.pop('X-Auth-Token')
+ return self.req_d
+
+ @executor.create(httplib.OK, '_create_success')
+ def test_projectCreateTokenSuccess(self):
+ self.headers['X-Auth-Token'] = '12345'
+ return self.req_d
+
+ def _create_success(self, body):
+ self.assertIn('CreateResponse', str(type(body)))
+
+
+class TestTokenDeleteProject(TestToken):
+ def setUp(self):
+ super(TestTokenDeleteProject, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping')
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+ self.basePath = '/api/v1/projects'
+ self.headers['X-Auth-Token'] = '12345'
+ self.create_d()
+
+ @executor.delete(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectDeleteTokenIvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ return self.req_d.name
+
+ @executor.delete(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_projectDeleteTokenUnauthorized(self):
+ self.headers.pop('X-Auth-Token')
+ return self.req_d.name
+
+ @executor.delete(httplib.OK, '_delete_success')
+ def test_projectDeleteTokenSuccess(self):
+ return self.req_d.name
+
+ def _delete_success(self, body):
+ self.assertEqual('', body)
+
+
+class TestTokenUpdateProject(TestToken):
+ def setUp(self):
+ super(TestTokenUpdateProject, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping')
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+ self.basePath = '/api/v1/projects'
+ self.headers['X-Auth-Token'] = '12345'
+ self.create_d()
+
+ @executor.update(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectUpdateTokenIvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ @executor.update(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_projectUpdateTokenUnauthorized(self):
+ self.headers.pop('X-Auth-Token')
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_projectUpdateTokenSuccess(self):
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ def _update_success(self, request, body):
+ self.assertIn(request.name, body)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_version.py b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_version.py
new file mode 100644
index 000000000..51fed11ea
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/resources/test_version.py
@@ -0,0 +1,36 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import httplib
+import unittest
+
+from opnfv_testapi.resources import models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.resources import test_base as base
+
+
+class TestVersionBase(base.TestBase):
+ def setUp(self):
+ super(TestVersionBase, self).setUp()
+ self.list_res = models.Versions
+ self.basePath = '/versions'
+
+
+class TestVersion(TestVersionBase):
+ @executor.get(httplib.OK, '_get_success')
+ def test_success(self):
+ return None
+
+ def _get_success(self, body):
+ self.assertEqual(len(body.versions), 1)
+ self.assertEqual(body.versions[0].version, 'v1.0')
+ self.assertEqual(body.versions[0].description, 'basics')
+
+
+if __name__ == '__main__':
+ unittest.main()