summaryrefslogtreecommitdiffstats
path: root/testapi/opnfv_testapi/tests/unit/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'testapi/opnfv_testapi/tests/unit/handlers')
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/__init__.py0
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json38
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json73
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/test_base.py204
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/test_pod.py118
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/test_project.py137
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/test_result.py413
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py449
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py201
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/test_token.py52
-rw-r--r--testapi/opnfv_testapi/tests/unit/handlers/test_version.py36
11 files changed, 1721 insertions, 0 deletions
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/__init__.py b/testapi/opnfv_testapi/tests/unit/handlers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/__init__.py
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json
new file mode 100644
index 0000000..1878022
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json
@@ -0,0 +1,38 @@
+{
+ "name": "nosdn-nofeature-ha",
+ "installers":
+ [
+ {
+ "installer": "apex",
+ "versions":
+ [
+ {
+ "owner": "Luke",
+ "version": "master",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores":
+ [
+ {
+ "date": "2017-01-08 22:46:44",
+ "score": "12/14"
+ }
+
+ ],
+ "trust_indicators": []
+ },
+ {
+ "project": "yardstick",
+ "customs": [],
+ "scores": [],
+ "trust_indicators": []
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json
new file mode 100644
index 0000000..980051c
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json
@@ -0,0 +1,73 @@
+{
+ "name": "odl_2-nofeature-ha",
+ "installers":
+ [
+ {
+ "installer": "fuel",
+ "versions":
+ [
+ {
+ "owner": "Lucky",
+ "version": "danube",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores": [],
+ "trust_indicators": [
+ {
+ "date": "2017-01-18 22:46:44",
+ "status": "silver"
+ }
+
+ ]
+ },
+ {
+ "project": "yardstick",
+ "customs": ["suite-a"],
+ "scores": [
+ {
+ "date": "2017-01-08 22:46:44",
+ "score": "0/1"
+ }
+ ],
+ "trust_indicators": [
+ {
+ "date": "2017-01-18 22:46:44",
+ "status": "gold"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "owner": "Luke",
+ "version": "colorado",
+ "projects":
+ [
+ {
+ "project": "functest",
+ "customs": [ "healthcheck", "vping_ssh"],
+ "scores":
+ [
+ {
+ "date": "2017-01-09 22:46:44",
+ "score": "11/14"
+ }
+
+ ],
+ "trust_indicators": []
+ },
+ {
+ "project": "yardstick",
+ "customs": [],
+ "scores": [],
+ "trust_indicators": []
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_base.py b/testapi/opnfv_testapi/tests/unit/handlers/test_base.py
new file mode 100644
index 0000000..b7fabb9
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/test_base.py
@@ -0,0 +1,204 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from datetime import datetime
+import json
+from os import path
+
+from bson.objectid import ObjectId
+import mock
+from tornado import testing
+
+from opnfv_testapi.models import base_models
+from opnfv_testapi.models import pod_models
+from opnfv_testapi.tests.unit import fake_pymongo
+
+
+class TestBase(testing.AsyncHTTPTestCase):
+ headers = {'Content-Type': 'application/json; charset=UTF-8'}
+
+ def setUp(self):
+ self._patch_server()
+ self.basePath = ''
+ self.create_res = base_models.CreateResponse
+ self.get_res = None
+ self.list_res = None
+ self.update_res = None
+ self.pod_d = pod_models.Pod(name='zte-pod1',
+ mode='virtual',
+ details='zte pod 1',
+ role='community-ci',
+ _id=str(ObjectId()),
+ owner='ValidUser',
+ create_date=str(datetime.now()))
+ self.pod_e = pod_models.Pod(name='zte-pod2',
+ mode='metal',
+ details='zte pod 2',
+ role='production-ci',
+ _id=str(ObjectId()),
+ owner='ValidUser',
+ create_date=str(datetime.now()))
+ self.req_d = None
+ self.req_e = None
+ self.addCleanup(self._clear)
+ super(TestBase, self).setUp()
+ fake_pymongo.users.insert({"user": "ValidUser",
+ 'email': 'validuser@lf.com',
+ 'fullname': 'Valid User',
+ 'groups': [
+ 'opnfv-testapi-users',
+ 'opnfv-gerrit-functest-submitters',
+ 'opnfv-gerrit-qtip-contributors']
+ })
+
+ def tearDown(self):
+ self.db_patcher.stop()
+ self.config_patcher.stop()
+
+ def _patch_server(self):
+ import argparse
+ config = path.join(path.dirname(__file__),
+ '../../../../etc/config.ini')
+ self.config_patcher = mock.patch(
+ 'argparse.ArgumentParser.parse_known_args',
+ return_value=(argparse.Namespace(config_file=config), None))
+ self.db_patcher = mock.patch('opnfv_testapi.db.api.DB',
+ fake_pymongo)
+ self.config_patcher.start()
+ self.db_patcher.start()
+
+ def get_app(self):
+ from opnfv_testapi.cmd import server
+ return server.make_app()
+
+ def create_d(self, *args):
+ return self.create(self.req_d, *args)
+
+ def create_e(self, *args):
+ return self.create(self.req_e, *args)
+
+ def create(self, req=None, *args):
+ return self.create_help(self.basePath, req, *args)
+
+ def create_help(self, uri, req, *args):
+ return self.post_direct_url(self._update_uri(uri, *args), req)
+
+ def post_direct_url(self, url, req):
+ if req and not isinstance(req, str) and hasattr(req, 'format'):
+ req = req.format()
+ res = self.fetch(url,
+ method='POST',
+ body=json.dumps(req),
+ headers=self.headers)
+
+ return self._get_return(res, self.create_res)
+
+ def get(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='GET',
+ headers=self.headers)
+
+ def inner():
+ new_args, num = self._get_valid_args(*args)
+ return self.get_res \
+ if num != self._need_arg_num(self.basePath) else self.list_res
+ return self._get_return(res, inner())
+
+ def query(self, query):
+ res = self.fetch(self._get_query_uri(query),
+ method='GET',
+ headers=self.headers)
+ return self._get_return(res, self.list_res)
+
+ def update_direct_url(self, url, new=None):
+ if new and hasattr(new, 'format'):
+ new = new.format()
+ res = self.fetch(url,
+ method='PUT',
+ body=json.dumps(new),
+ headers=self.headers)
+ return self._get_return(res, self.update_res)
+
+ def update(self, new=None, *args):
+ return self.update_direct_url(self._get_uri(*args), new)
+
+ def delete_direct_url(self, url, body):
+ if body:
+ res = self.fetch(url,
+ method='DELETE',
+ body=json.dumps(body),
+ headers=self.headers,
+ allow_nonstandard_methods=True)
+ else:
+ res = self.fetch(url,
+ method='DELETE',
+ headers=self.headers)
+
+ return res.code, res.body
+
+ def delete(self, *args):
+ return self.delete_direct_url(self._get_uri(*args), None)
+
+ @staticmethod
+ def _get_valid_args(*args):
+ new_args = tuple(['%s' % arg for arg in args if arg is not None])
+ return new_args, len(new_args)
+
+ def _need_arg_num(self, uri):
+ return uri.count('%s')
+
+ def _get_query_uri(self, query):
+ return self.basePath + '?' + query if query else self.basePath
+
+ def _get_uri(self, *args):
+ return self._update_uri(self.basePath, *args)
+
+ def _update_uri(self, uri, *args):
+ r_uri = uri
+ new_args, num = self._get_valid_args(*args)
+ if num != self._need_arg_num(uri):
+ r_uri += '/%s'
+
+ return r_uri % tuple(['%s' % arg for arg in new_args])
+
+ def _get_return(self, res, cls):
+ code = res.code
+ body = res.body
+ if body:
+ return code, self._get_return_body(code, body, cls)
+ else:
+ return code, None
+
+ @staticmethod
+ def _get_return_body(code, body, cls):
+ return cls.from_dict(json.loads(body)) if code < 300 and cls else body
+
+ def assert_href(self, body):
+ self.assertIn(self.basePath, body.href)
+
+ def assert_create_body(self, body, req=None, *args):
+ import inspect
+ if not req:
+ req = self.req_d
+ resource_name = ''
+ if inspect.isclass(req):
+ resource_name = req.name
+ elif isinstance(req, dict):
+ resource_name = req['name']
+ elif isinstance(req, str):
+ resource_name = json.loads(req)['name']
+ new_args = args + tuple([resource_name])
+ self.assertIn(self._get_uri(*new_args), body.href)
+
+ @staticmethod
+ def _clear():
+ fake_pymongo.pods.clear()
+ fake_pymongo.projects.clear()
+ fake_pymongo.testcases.clear()
+ fake_pymongo.results.clear()
+ fake_pymongo.scenarios.clear()
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py b/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py
new file mode 100644
index 0000000..95ed8ba
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py
@@ -0,0 +1,118 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.models import pod_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import fake_pymongo
+from opnfv_testapi.tests.unit.handlers import test_base as base
+
+
+class TestPodBase(base.TestBase):
+ def setUp(self):
+ super(TestPodBase, self).setUp()
+ self.get_res = pod_models.Pod
+ self.list_res = pod_models.Pods
+ self.basePath = '/api/v1/pods'
+ self.req_d = pod_models.PodCreateRequest(name=self.pod_d.name,
+ mode=self.pod_d.mode,
+ details=self.pod_d.details,
+ role=self.pod_d.role)
+ self.req_e = pod_models.PodCreateRequest(name=self.pod_e.name,
+ mode=self.pod_e.mode,
+ details=self.pod_e.details,
+ role=self.pod_e.role)
+
+ def assert_get_body(self, pod, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(pod.owner, 'ValidUser')
+ self.assertEqual(pod.name, req.name)
+ self.assertEqual(pod.mode, req.mode)
+ self.assertEqual(pod.details, req.details)
+ self.assertEqual(pod.role, req.role)
+ self.assertIsNotNone(pod.creation_date)
+ self.assertIsNotNone(pod._id)
+
+
+class TestPodCreate(TestPodBase):
+ @executor.create(httplib.BAD_REQUEST, message.not_login())
+ def test_notlogin(self):
+ return self.req_d
+
+ @executor.mock_invalid_lfid()
+ @executor.create(httplib.BAD_REQUEST, message.not_lfid())
+ def test_invalidLfid(self):
+ return self.req_d
+
+ @executor.mock_valid_lfid()
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None
+
+ @executor.mock_valid_lfid()
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ return pod_models.PodCreateRequest('')
+
+ @executor.mock_valid_lfid()
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ return pod_models.PodCreateRequest(None)
+
+ @executor.mock_valid_lfid()
+ @executor.create(httplib.OK, 'assert_create_body')
+ def test_success(self):
+ return self.req_d
+
+ @executor.mock_valid_lfid()
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ fake_pymongo.pods.insert(self.pod_d.format())
+ return self.req_d
+
+ @executor.mock_valid_lfid()
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExistCaseInsensitive(self):
+ fake_pymongo.pods.insert(self.pod_d.format())
+ self.req_d.name = self.req_d.name.upper()
+ return self.req_d
+
+
+class TestPodGet(TestPodBase):
+ def setUp(self):
+ super(TestPodGet, self).setUp()
+ fake_pymongo.pods.insert(self.pod_d.format())
+ fake_pymongo.pods.insert(self.pod_e.format())
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_get_body')
+ def test_getOne(self):
+ return self.pod_d.name
+
+ @executor.get(httplib.OK, '_assert_list')
+ def test_list(self):
+ return None
+
+ def _assert_list(self, body):
+ self.assertEqual(len(body.pods), 2)
+ for pod in body.pods:
+ if self.pod_d.name == pod.name:
+ self.assert_get_body(pod)
+ else:
+ self.assert_get_body(pod, self.pod_e)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_project.py b/testapi/opnfv_testapi/tests/unit/handlers/test_project.py
new file mode 100644
index 0000000..939cc0d
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/test_project.py
@@ -0,0 +1,137 @@
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.models import project_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.handlers import test_base as base
+
+
+class TestProjectBase(base.TestBase):
+ def setUp(self):
+ super(TestProjectBase, self).setUp()
+ self.req_d = project_models.ProjectCreateRequest('vping',
+ 'vping-ssh test')
+ self.req_e = project_models.ProjectCreateRequest('doctor',
+ 'doctor test')
+ self.get_res = project_models.Project
+ self.list_res = project_models.Projects
+ self.update_res = project_models.Project
+ self.basePath = '/api/v1/projects'
+
+ def assert_body(self, project, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(project.name, req.name)
+ self.assertEqual(project.description, req.description)
+ self.assertIsNotNone(project._id)
+ self.assertIsNotNone(project.creation_date)
+
+
+class TestProjectCreate(TestProjectBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ return project_models.ProjectCreateRequest('')
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ return project_models.ProjectCreateRequest(None)
+
+ @executor.create(httplib.OK, 'assert_create_body')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestProjectGet(TestProjectBase):
+ def setUp(self):
+ super(TestProjectGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_assert_list')
+ def test_list(self):
+ return None
+
+ def _assert_list(self, body):
+ for project in body.projects:
+ if self.req_d.name == project.name:
+ self.assert_body(project)
+ else:
+ self.assert_body(project, self.req_e)
+
+
+class TestProjectUpdate(TestProjectBase):
+ def setUp(self):
+ super(TestProjectUpdate, self).setUp()
+ _, d_body = self.create_d()
+ _, get_res = self.get(self.req_d.name)
+ self.index_d = get_res._id
+ self.create_e()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
+ def test_withoutBody(self):
+ return None, 'noBody'
+
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return self.req_e, 'notFound'
+
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
+ def test_newNameExist(self):
+ return self.req_e, self.req_d.name
+
+ @executor.update(httplib.FORBIDDEN, message.no_update())
+ def test_noUpdate(self):
+ return self.req_d, self.req_d.name
+
+ @executor.update(httplib.OK, '_assert_update')
+ def test_success(self):
+ req = project_models.ProjectUpdateRequest('newName', 'new description')
+ return req, self.req_d.name
+
+ def _assert_update(self, req, body):
+ self.assertEqual(self.index_d, body._id)
+ self.assert_body(body, req)
+ _, new_body = self.get(req.name)
+ self.assertEqual(self.index_d, new_body._id)
+ self.assert_body(new_body, req)
+
+
+class TestProjectDelete(TestProjectBase):
+ def setUp(self):
+ super(TestProjectDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return 'notFound'
+
+ @executor.delete(httplib.OK, '_assert_delete')
+ def test_success(self):
+ return self.req_d.name
+
+ def _assert_delete(self, body):
+ self.assertEqual(body, '')
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_result.py b/testapi/opnfv_testapi/tests/unit/handlers/test_result.py
new file mode 100644
index 0000000..b9f9ede
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/test_result.py
@@ -0,0 +1,413 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+from datetime import datetime
+from datetime import timedelta
+import httplib
+import json
+import urllib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.models import project_models
+from opnfv_testapi.models import result_models
+from opnfv_testapi.models import testcase_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import fake_pymongo
+from opnfv_testapi.tests.unit.handlers import test_base as base
+
+
+class Details(object):
+ def __init__(self, timestart=None, duration=None, status=None):
+ self.timestart = timestart
+ self.duration = duration
+ self.status = status
+ self.items = [{'item1': 1}, {'item2': 2}]
+
+ def format(self):
+ return {
+ "timestart": self.timestart,
+ "duration": self.duration,
+ "status": self.status,
+ 'items': [{'item1': 1}, {'item2': 2}]
+ }
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = Details()
+ t.timestart = a_dict.get('timestart')
+ t.duration = a_dict.get('duration')
+ t.status = a_dict.get('status')
+ t.items = a_dict.get('items')
+ return t
+
+
+class TestResultBase(base.TestBase):
+ def setUp(self):
+ super(TestResultBase, self).setUp()
+ self.pod = self.pod_d.name
+ self.project = 'functest'
+ self.case = 'vPing'
+ self.installer = 'fuel'
+ self.version = 'C'
+ self.build_tag = 'v3.0'
+ self.scenario = 'odl-l2'
+ self.criteria = 'PASS'
+ self.trust_indicator = result_models.TI(0.7)
+ self.start_date = str(datetime.now())
+ self.stop_date = str(datetime.now() + timedelta(minutes=1))
+ self.update_date = str(datetime.now() + timedelta(days=1))
+ self.update_step = -0.05
+ self.details = Details(timestart='0', duration='9s', status='OK')
+ self.req_d = result_models.ResultCreateRequest(
+ pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria,
+ trust_indicator=self.trust_indicator)
+ self.get_res = result_models.TestResult
+ self.list_res = result_models.TestResults
+ self.update_res = result_models.TestResult
+ self.basePath = '/api/v1/results'
+ self.req_project = project_models.ProjectCreateRequest(
+ self.project,
+ 'vping test')
+ self.req_testcase = testcase_models.TestcaseCreateRequest(
+ self.case,
+ '/cases/vping',
+ 'vping-ssh test')
+ fake_pymongo.pods.insert(self.pod_d.format())
+ self.create_help('/api/v1/projects', self.req_project)
+ self.create_help('/api/v1/projects/%s/cases',
+ self.req_testcase,
+ self.project)
+
+ def assert_res(self, result, req=None):
+ if req is None:
+ req = self.req_d
+ self.assertEqual(result.pod_name, req.pod_name)
+ self.assertEqual(result.project_name, req.project_name)
+ self.assertEqual(result.case_name, req.case_name)
+ self.assertEqual(result.installer, req.installer)
+ self.assertEqual(result.version, req.version)
+ details_req = Details.from_dict(req.details)
+ details_res = Details.from_dict(result.details)
+ self.assertEqual(details_res.duration, details_req.duration)
+ self.assertEqual(details_res.timestart, details_req.timestart)
+ self.assertEqual(details_res.status, details_req.status)
+ self.assertEqual(details_res.items, details_req.items)
+ self.assertEqual(result.build_tag, req.build_tag)
+ self.assertEqual(result.scenario, req.scenario)
+ self.assertEqual(result.criteria, req.criteria)
+ self.assertEqual(result.start_date, req.start_date)
+ self.assertEqual(result.stop_date, req.stop_date)
+ self.assertIsNotNone(result._id)
+ ti = result.trust_indicator
+ self.assertEqual(ti.current, req.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+ def _create_d(self):
+ _, res = self.create_d()
+ return res.href.split('/')[-1]
+
+ def upload(self, req):
+ if req and not isinstance(req, str) and hasattr(req, 'format'):
+ req = req.format()
+ res = self.fetch(self.basePath + '/upload',
+ method='POST',
+ body=json.dumps(req),
+ headers=self.headers)
+
+ return self._get_return(res, self.create_res)
+
+
+class TestResultUpload(TestResultBase):
+ @executor.upload(httplib.BAD_REQUEST, message.key_error('file'))
+ def test_filenotfind(self):
+ return None
+
+
+class TestResultCreate(TestResultBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_nobody(self):
+ return None
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('pod_name'))
+ def test_podNotProvided(self):
+ req = self.req_d
+ req.pod_name = None
+ return req
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('project_name'))
+ def test_projectNotProvided(self):
+ req = self.req_d
+ req.project_name = None
+ return req
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('case_name'))
+ def test_testcaseNotProvided(self):
+ req = self.req_d
+ req.case_name = None
+ return req
+
+ @executor.create(httplib.BAD_REQUEST,
+ message.invalid_value('criteria', ['PASS', 'FAIL']))
+ def test_invalid_criteria(self):
+ req = self.req_d
+ req.criteria = 'invalid'
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noPod(self):
+ req = self.req_d
+ req.pod_name = 'notExistPod'
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noProject(self):
+ req = self.req_d
+ req.project_name = 'notExistProject'
+ return req
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noTestcase(self):
+ req = self.req_d
+ req.case_name = 'notExistTestcase'
+ return req
+
+ @executor.create(httplib.OK, 'assert_href')
+ def test_success(self):
+ return self.req_d
+
+ @executor.create(httplib.OK, 'assert_href')
+ def test_key_with_doc(self):
+ req = copy.deepcopy(self.req_d)
+ req.details = {'1.name': 'dot_name'}
+ return req
+
+ @executor.create(httplib.OK, '_assert_no_ti')
+ def test_no_ti(self):
+ req = result_models.ResultCreateRequest(pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria)
+ self.actual_req = req
+ return req
+
+ def _assert_no_ti(self, body):
+ _id = body.href.split('/')[-1]
+ code, body = self.get(_id)
+ self.assert_res(body, self.actual_req)
+
+
+class TestResultGet(TestResultBase):
+ def setUp(self):
+ super(TestResultGet, self).setUp()
+ self.req_10d_before = self._create_changed_date(days=-10)
+ self.req_d_id = self._create_d()
+ self.req_10d_later = self._create_changed_date(days=10)
+
+ @executor.get(httplib.OK, 'assert_res')
+ def test_getOne(self):
+ return self.req_d_id
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryPod(self):
+ return self._set_query('pod')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryProject(self):
+ return self._set_query('project')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryTestcase(self):
+ return self._set_query('case')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryVersion(self):
+ return self._set_query('version')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryInstaller(self):
+ return self._set_query('installer')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryBuildTag(self):
+ return self._set_query('build_tag')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryScenario(self):
+ return self._set_query('scenario')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryTrustIndicator(self):
+ return self._set_query('trust_indicator')
+
+ @executor.query(httplib.OK, '_query_success', 3)
+ def test_queryCriteria(self):
+ return self._set_query('criteria')
+
+ @executor.query(httplib.BAD_REQUEST, message.must_int('period'))
+ def test_queryPeriodNotInt(self):
+ return self._set_query(period='a')
+
+ @executor.query(httplib.OK, '_query_period_one', 1)
+ def test_queryPeriodSuccess(self):
+ return self._set_query(period=5)
+
+ @executor.query(httplib.BAD_REQUEST, message.must_int('last'))
+ def test_queryLastNotInt(self):
+ return self._set_query(last='a')
+
+ @executor.query(httplib.OK, '_query_last_one', 1)
+ def test_queryLast(self):
+ return self._set_query(last=1)
+
+ @executor.query(httplib.OK, '_query_success', 4)
+ def test_queryPublic(self):
+ self._create_public_data()
+ return self._set_query()
+
+ @executor.query(httplib.OK, '_query_success', 1)
+ def test_queryPrivate(self):
+ self._create_private_data()
+ return self._set_query(public='false')
+
+ @executor.query(httplib.OK, '_query_period_one', 1)
+ def test_combination(self):
+ return self._set_query('pod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ period=5)
+
+ @executor.query(httplib.OK, '_query_success', 0)
+ def test_notFound(self):
+ return self._set_query('project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ pod='notExistPod',
+ period=1)
+
+ @executor.query(httplib.OK, '_query_success', 1)
+ def test_filterErrorStartdate(self):
+ self._create_error_start_date(None)
+ self._create_error_start_date('None')
+ self._create_error_start_date('null')
+ self._create_error_start_date('')
+ return self._set_query(period=5)
+
+ def _query_success(self, body, number):
+ self.assertEqual(number, len(body.results))
+
+ def _query_last_one(self, body, number):
+ self.assertEqual(number, len(body.results))
+ self.assert_res(body.results[0], self.req_10d_later)
+
+ def _query_period_one(self, body, number):
+ self.assertEqual(number, len(body.results))
+ self.assert_res(body.results[0], self.req_d)
+
+ def _create_error_start_date(self, start_date):
+ req = copy.deepcopy(self.req_d)
+ req.start_date = start_date
+ self.create(req)
+ return req
+
+ def _create_changed_date(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.start_date = datetime.now() + timedelta(**kwargs)
+ req.stop_date = str(req.start_date + timedelta(minutes=10))
+ req.start_date = str(req.start_date)
+ self.create(req)
+ return req
+
+ def _create_public_data(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.public = 'true'
+ self.create(req)
+ return req
+
+ def _create_private_data(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.public = 'false'
+ self.create(req)
+ return req
+
+ def _set_query(self, *args, **kwargs):
+ def get_value(arg):
+ return self.__getattribute__(arg) \
+ if arg != 'trust_indicator' else self.trust_indicator.current
+ query = []
+ for arg in args:
+ query.append((arg, get_value(arg)))
+ for k, v in kwargs.iteritems():
+ query.append((k, v))
+ return urllib.urlencode(query)
+
+
+class TestResultUpdate(TestResultBase):
+ def setUp(self):
+ super(TestResultUpdate, self).setUp()
+ self.req_d_id = self._create_d()
+
+ @executor.update(httplib.OK, '_assert_update_ti')
+ def test_success(self):
+ new_ti = copy.deepcopy(self.trust_indicator)
+ new_ti.current += self.update_step
+ new_ti.histories.append(
+ result_models.TIHistory(self.update_date, self.update_step))
+ new_data = copy.deepcopy(self.req_d)
+ new_data.trust_indicator = new_ti
+ update = result_models.ResultUpdateRequest(trust_indicator=new_ti)
+ self.update_req = new_data
+ return update, self.req_d_id
+
+ def _assert_update_ti(self, request, body):
+ ti = body.trust_indicator
+ self.assertEqual(ti.current, request.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py b/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py
new file mode 100644
index 0000000..de7777a
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py
@@ -0,0 +1,449 @@
+import functools
+import httplib
+import json
+import os
+
+from datetime import datetime
+
+from opnfv_testapi.common import message
+import opnfv_testapi.models.scenario_models as models
+from opnfv_testapi.tests.unit.handlers import test_base as base
+
+
+def _none_default(check, default):
+ return check if check else default
+
+
+class TestScenarioBase(base.TestBase):
+ def setUp(self):
+ super(TestScenarioBase, self).setUp()
+ self.get_res = models.Scenario
+ self.list_res = models.Scenarios
+ self.basePath = '/api/v1/scenarios'
+ self.req_d = self._load_request('scenario-c1.json')
+ self.req_2 = self._load_request('scenario-c2.json')
+
+ def tearDown(self):
+ pass
+
+ def assert_body(self, project, req=None):
+ pass
+
+ @staticmethod
+ def _load_request(f_req):
+ abs_file = os.path.join(os.path.dirname(__file__), f_req)
+ with open(abs_file, 'r') as f:
+ loader = json.load(f)
+ f.close()
+ return loader
+
+ def create_return_name(self, req):
+ _, res = self.create(req)
+ return res.href.split('/')[-1]
+
+ def assert_res(self, code, scenario, req=None):
+ self.assertEqual(code, httplib.OK)
+ if req is None:
+ req = self.req_d
+ self.assertIsNotNone(scenario._id)
+ self.assertIsNotNone(scenario.creation_date)
+ self.assertEqual(scenario, models.Scenario.from_dict(req))
+
+ @staticmethod
+ def set_query(*args):
+ uri = ''
+ for arg in args:
+ uri += arg + '&'
+ return uri[0: -1]
+
+ def get_and_assert(self, name):
+ code, body = self.get(name)
+ self.assert_res(code, body, self.req_d)
+
+
+class TestScenarioCreate(TestScenarioBase):
+ def test_withoutBody(self):
+ (code, body) = self.create()
+ self.assertEqual(code, httplib.BAD_REQUEST)
+
+ def test_emptyName(self):
+ req_empty = models.ScenarioCreateRequest('')
+ (code, body) = self.create(req_empty)
+ self.assertEqual(code, httplib.BAD_REQUEST)
+ self.assertIn(message.missing('name'), body)
+
+ def test_noneName(self):
+ req_none = models.ScenarioCreateRequest(None)
+ (code, body) = self.create(req_none)
+ self.assertEqual(code, httplib.BAD_REQUEST)
+ self.assertIn(message.missing('name'), body)
+
+ def test_success(self):
+ (code, body) = self.create_d()
+ self.assertEqual(code, httplib.OK)
+ self.assert_create_body(body)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ (code, body) = self.create_d()
+ self.assertEqual(code, httplib.FORBIDDEN)
+ self.assertIn(message.exist_base, body)
+
+
+class TestScenarioGet(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioGet, self).setUp()
+ self.scenario_1 = self.create_return_name(self.req_d)
+ self.scenario_2 = self.create_return_name(self.req_2)
+
+ def test_getByName(self):
+ self.get_and_assert(self.scenario_1)
+
+ def test_getAll(self):
+ self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
+
+ def test_queryName(self):
+ query = self.set_query('name=nosdn-nofeature-ha')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryInstaller(self):
+ query = self.set_query('installer=apex')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryVersion(self):
+ query = self.set_query('version=master')
+ self._query_and_assert(query, reqs=[self.req_d])
+
+ def test_queryProject(self):
+ query = self.set_query('project=functest')
+ self._query_and_assert(query, reqs=[self.req_d, self.req_2])
+
+ # close due to random fail, open again after solve it in another patch
+ # def test_queryCombination(self):
+ # query = self._set_query('name=nosdn-nofeature-ha',
+ # 'installer=apex',
+ # 'version=master',
+ # 'project=functest')
+ #
+ # self._query_and_assert(query, reqs=[self.req_d])
+
+ def _query_and_assert(self, query, found=True, reqs=None):
+ code, body = self.query(query)
+ if not found:
+ self.assertEqual(code, httplib.OK)
+ self.assertEqual(0, len(body.scenarios))
+ else:
+ self.assertEqual(len(reqs), len(body.scenarios))
+ for req in reqs:
+ for scenario in body.scenarios:
+ if req['name'] == scenario.name:
+ self.assert_res(code, scenario, req)
+
+
+class TestScenarioDelete(TestScenarioBase):
+ def test_notFound(self):
+ code, body = self.delete('notFound')
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+ def test_success(self):
+ scenario = self.create_return_name(self.req_d)
+ code, _ = self.delete(scenario)
+ self.assertEqual(code, httplib.OK)
+ code, _ = self.get(scenario)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+class TestScenarioUpdate(TestScenarioBase):
+ def setUp(self):
+ super(TestScenarioUpdate, self).setUp()
+ self.scenario = self.create_return_name(self.req_d)
+ self.scenario_2 = self.create_return_name(self.req_2)
+ self.update_url = ''
+ self.scenario_url = '/api/v1/scenarios/{}'.format(self.scenario)
+ self.installer = self.req_d['installers'][0]['installer']
+ self.version = self.req_d['installers'][0]['versions'][0]['version']
+ self.locate_project = 'installer={}&version={}&project={}'.format(
+ self.installer,
+ self.version,
+ 'functest')
+
+ def update_url_fixture(item):
+ def _update_url_fixture(xstep):
+ def wrapper(self, *args, **kwargs):
+ self.update_url = '{}/{}'.format(self.scenario_url, item)
+ locator = None
+ if item in ['projects', 'owner']:
+ locator = 'installer={}&version={}'.format(
+ self.installer,
+ self.version)
+ elif item in ['versions']:
+ locator = 'installer={}'.format(
+ self.installer)
+ elif item in ['rename']:
+ self.update_url = self.scenario_url
+
+ if locator:
+ self.update_url = '{}?{}'.format(self.update_url, locator)
+
+ xstep(self, *args, **kwargs)
+ return wrapper
+ return _update_url_fixture
+
+ def update_partial(operate, expected):
+ def _update_partial(set_update):
+ @functools.wraps(set_update)
+ def wrapper(self):
+ update = set_update(self)
+ code, body = getattr(self, operate)(update)
+ getattr(self, expected)(code)
+ return wrapper
+ return _update_partial
+
+ @update_partial('_add', '_success')
+ def test_addScore(self):
+ add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['scores'].append(add.format())
+ self.update_url = '{}/scores?{}'.format(self.scenario_url,
+ self.locate_project)
+
+ return add
+
+ @update_partial('_add', '_success')
+ def test_addTrustIndicator(self):
+ add = models.ScenarioTI(date=str(datetime.now()), status='gold')
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['trust_indicators'].append(add.format())
+ self.update_url = '{}/trust_indicators?{}'.format(self.scenario_url,
+ self.locate_project)
+
+ return add
+
+ @update_partial('_add', '_success')
+ def test_addCustoms(self):
+ adds = ['odl', 'parser', 'vping_ssh']
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = list(set(functest['customs'] + adds))
+ self.update_url = '{}/customs?{}'.format(self.scenario_url,
+ self.locate_project)
+ return adds
+
+ @update_partial('_update', '_success')
+ def test_updateCustoms(self):
+ updates = ['odl', 'parser', 'vping_ssh']
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = updates
+ self.update_url = '{}/customs?{}'.format(self.scenario_url,
+ self.locate_project)
+
+ return updates
+
+ @update_partial('_delete', '_success')
+ def test_deleteCustoms(self):
+ deletes = ['vping_ssh']
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
+ functest = filter(lambda f: f['project'] == 'functest', projects)[0]
+ functest['customs'] = ['healthcheck']
+ self.update_url = '{}/customs?{}'.format(self.scenario_url,
+ self.locate_project)
+
+ return deletes
+
+ @update_url_fixture('projects')
+ @update_partial('_add', '_success')
+ def test_addProjects_succ(self):
+ add = models.ScenarioProject(project='qtip').format()
+ self.req_d['installers'][0]['versions'][0]['projects'].append(add)
+ return [add]
+
+ @update_url_fixture('projects')
+ @update_partial('_add', '_conflict')
+ def test_addProjects_already_exist(self):
+ add = models.ScenarioProject(project='functest').format()
+ return [add]
+
+ @update_url_fixture('projects')
+ @update_partial('_add', '_bad_request')
+ def test_addProjects_bad_schema(self):
+ add = models.ScenarioProject(project='functest').format()
+ add['score'] = None
+ return [add]
+
+ @update_url_fixture('projects')
+ @update_partial('_update', '_success')
+ def test_updateProjects_succ(self):
+ update = models.ScenarioProject(project='qtip').format()
+ self.req_d['installers'][0]['versions'][0]['projects'] = [update]
+ return [update]
+
+ @update_url_fixture('projects')
+ @update_partial('_update', '_conflict')
+ def test_updateProjects_duplicated(self):
+ update = models.ScenarioProject(project='qtip').format()
+ return [update, update]
+
+ @update_url_fixture('projects')
+ @update_partial('_update', '_bad_request')
+ def test_updateProjects_bad_schema(self):
+ update = models.ScenarioProject(project='functest').format()
+ update['score'] = None
+ return [update]
+
+ @update_url_fixture('projects')
+ @update_partial('_delete', '_success')
+ def test_deleteProjects(self):
+ deletes = ['functest']
+ projects = self.req_d['installers'][0]['versions'][0]['projects']
+ self.req_d['installers'][0]['versions'][0]['projects'] = filter(
+ lambda f: f['project'] != 'functest',
+ projects)
+ return deletes
+
+ @update_url_fixture('owner')
+ @update_partial('_update', '_success')
+ def test_changeOwner(self):
+ new_owner = 'new_owner'
+ update = models.ScenarioChangeOwnerRequest(new_owner).format()
+ self.req_d['installers'][0]['versions'][0]['owner'] = new_owner
+ return update
+
+ @update_url_fixture('versions')
+ @update_partial('_add', '_success')
+ def test_addVersions_succ(self):
+ add = models.ScenarioVersion(version='Euphrates').format()
+ self.req_d['installers'][0]['versions'].append(add)
+ return [add]
+
+ @update_url_fixture('versions')
+ @update_partial('_add', '_conflict')
+ def test_addVersions_already_exist(self):
+ add = models.ScenarioVersion(version='master').format()
+ return [add]
+
+ @update_url_fixture('versions')
+ @update_partial('_add', '_bad_request')
+ def test_addVersions_bad_schema(self):
+ add = models.ScenarioVersion(version='euphrates').format()
+ add['notexist'] = None
+ return [add]
+
+ @update_url_fixture('versions')
+ @update_partial('_update', '_success')
+ def test_updateVersions_succ(self):
+ update = models.ScenarioVersion(version='euphrates').format()
+ self.req_d['installers'][0]['versions'] = [update]
+ return [update]
+
+ @update_url_fixture('versions')
+ @update_partial('_update', '_conflict')
+ def test_updateVersions_duplicated(self):
+ update = models.ScenarioVersion(version='euphrates').format()
+ return [update, update]
+
+ @update_url_fixture('versions')
+ @update_partial('_update', '_bad_request')
+ def test_updateVersions_bad_schema(self):
+ update = models.ScenarioVersion(version='euphrates').format()
+ update['not_owner'] = 'Iam'
+ return [update]
+
+ @update_url_fixture('versions')
+ @update_partial('_delete', '_success')
+ def test_deleteVersions(self):
+ deletes = ['master']
+ versions = self.req_d['installers'][0]['versions']
+ self.req_d['installers'][0]['versions'] = filter(
+ lambda f: f['version'] != 'master',
+ versions)
+ return deletes
+
+ @update_url_fixture('installers')
+ @update_partial('_add', '_success')
+ def test_addInstallers_succ(self):
+ add = models.ScenarioInstaller(installer='daisy').format()
+ self.req_d['installers'].append(add)
+ return [add]
+
+ @update_url_fixture('installers')
+ @update_partial('_add', '_conflict')
+ def test_addInstallers_already_exist(self):
+ add = models.ScenarioInstaller(installer='apex').format()
+ return [add]
+
+ @update_url_fixture('installers')
+ @update_partial('_add', '_bad_request')
+ def test_addInstallers_bad_schema(self):
+ add = models.ScenarioInstaller(installer='daisy').format()
+ add['not_exist'] = 'not_exist'
+ return [add]
+
+ @update_url_fixture('installers')
+ @update_partial('_update', '_success')
+ def test_updateInstallers_succ(self):
+ update = models.ScenarioInstaller(installer='daisy').format()
+ self.req_d['installers'] = [update]
+ return [update]
+
+ @update_url_fixture('installers')
+ @update_partial('_update', '_conflict')
+ def test_updateInstallers_duplicated(self):
+ update = models.ScenarioInstaller(installer='daisy').format()
+ return [update, update]
+
+ @update_url_fixture('installers')
+ @update_partial('_update', '_bad_request')
+ def test_updateInstallers_bad_schema(self):
+ update = models.ScenarioInstaller(installer='daisy').format()
+ update['not_exist'] = 'not_exist'
+ return [update]
+
+ @update_url_fixture('installers')
+ @update_partial('_delete', '_success')
+ def test_deleteInstallers(self):
+ deletes = ['apex']
+ installers = self.req_d['installers']
+ self.req_d['installers'] = filter(
+ lambda f: f['installer'] != 'apex',
+ installers)
+ return deletes
+
+ @update_url_fixture('rename')
+ @update_partial('_update', '_success')
+ def test_renameScenario(self):
+ new_name = 'new_scenario_name'
+ update = models.ScenarioUpdateRequest(name=new_name)
+ self.req_d['name'] = new_name
+ return update
+
+ @update_url_fixture('rename')
+ @update_partial('_update', '_forbidden')
+ def test_renameScenario_exist(self):
+ new_name = self.req_d['name']
+ update = models.ScenarioUpdateRequest(name=new_name)
+ return update
+
+ def _add(self, update_req):
+ return self.post_direct_url(self.update_url, update_req)
+
+ def _update(self, update_req):
+ return self.update_direct_url(self.update_url, update_req)
+
+ def _delete(self, update_req):
+ return self.delete_direct_url(self.update_url, update_req)
+
+ def _success(self, status):
+ self.assertEqual(status, httplib.OK)
+ self.get_and_assert(self.req_d['name'])
+
+ def _forbidden(self, status):
+ self.assertEqual(status, httplib.FORBIDDEN)
+
+ def _bad_request(self, status):
+ self.assertEqual(status, httplib.BAD_REQUEST)
+
+ def _conflict(self, status):
+ self.assertEqual(status, httplib.CONFLICT)
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py b/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py
new file mode 100644
index 0000000..e4c668e
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py
@@ -0,0 +1,201 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+import httplib
+import unittest
+
+from opnfv_testapi.common import message
+from opnfv_testapi.models import project_models
+from opnfv_testapi.models import testcase_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.handlers import test_base as base
+
+
+class TestCaseBase(base.TestBase):
+ def setUp(self):
+ super(TestCaseBase, self).setUp()
+ self.req_d = testcase_models.TestcaseCreateRequest('vping_1',
+ '/cases/vping_1',
+ 'vping-ssh test')
+ self.req_e = testcase_models.TestcaseCreateRequest('doctor_1',
+ '/cases/doctor_1',
+ 'create doctor')
+ self.update_d = testcase_models.TestcaseUpdateRequest('vping_1',
+ 'vping-ssh test',
+ 'functest')
+ self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1',
+ 'create doctor',
+ 'functest')
+ self.get_res = testcase_models.Testcase
+ self.list_res = testcase_models.Testcases
+ self.update_res = testcase_models.Testcase
+ self.basePath = '/api/v1/projects/%s/cases'
+ self.create_project()
+
+ def assert_body(self, case, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(case.name, req.name)
+ self.assertEqual(case.description, req.description)
+ self.assertEqual(case.url, req.url)
+ self.assertIsNotNone(case._id)
+ self.assertIsNotNone(case.creation_date)
+
+ def assert_update_body(self, old, new, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(new.name, req.name)
+ self.assertEqual(new.description, req.description)
+ self.assertEqual(new.url, old.url)
+ self.assertIsNotNone(new._id)
+ self.assertIsNotNone(new.creation_date)
+
+ def create_project(self):
+ req_p = project_models.ProjectCreateRequest('functest',
+ 'vping-ssh test')
+ self.create_help('/api/v1/projects', req_p)
+ self.project = req_p.name
+
+ def create_d(self):
+ return super(TestCaseBase, self).create_d(self.project)
+
+ def create_e(self):
+ return super(TestCaseBase, self).create_e(self.project)
+
+ def get(self, case=None):
+ return super(TestCaseBase, self).get(self.project, case)
+
+ def create(self, req=None, *args):
+ return super(TestCaseBase, self).create(req, self.project)
+
+ def update(self, new=None, case=None):
+ return super(TestCaseBase, self).update(new, self.project, case)
+
+ def delete(self, case):
+ return super(TestCaseBase, self).delete(self.project, case)
+
+
+class TestCaseCreate(TestCaseBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
+ def test_noBody(self):
+ return None
+
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
+ def test_noProject(self):
+ self.project = 'noProject'
+ return self.req_d
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_emptyName(self):
+ req_empty = testcase_models.TestcaseCreateRequest('')
+ return req_empty
+
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
+ def test_noneName(self):
+ req_none = testcase_models.TestcaseCreateRequest(None)
+ return req_none
+
+ @executor.create(httplib.OK, '_assert_success')
+ def test_success(self):
+ return self.req_d
+
+ def _assert_success(self, body):
+ self.assert_create_body(body, self.req_d, self.project)
+
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
+ def test_alreadyExist(self):
+ self.create_d()
+ return self.req_d
+
+
+class TestCaseGet(TestCaseBase):
+ def setUp(self):
+ super(TestCaseGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
+ def test_notExist(self):
+ return 'notExist'
+
+ @executor.get(httplib.OK, 'assert_body')
+ def test_getOne(self):
+ return self.req_d.name
+
+ @executor.get(httplib.OK, '_list')
+ def test_list(self):
+ return None
+
+ def _list(self, body):
+ for case in body.testcases:
+ if self.req_d.name == case.name:
+ self.assert_body(case)
+ else:
+ self.assert_body(case, self.req_e)
+
+
+class TestCaseUpdate(TestCaseBase):
+ def setUp(self):
+ super(TestCaseUpdate, self).setUp()
+ self.create_d()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
+ def test_noBody(self):
+ return None, 'noBody'
+
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return self.update_e, 'notFound'
+
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
+ def test_newNameExist(self):
+ self.create_e()
+ return self.update_e, self.req_d.name
+
+ @executor.update(httplib.FORBIDDEN, message.no_update())
+ def test_noUpdate(self):
+ return self.update_d, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_success(self):
+ return self.update_e, self.req_d.name
+
+ @executor.update(httplib.OK, '_update_success')
+ def test_with_dollar(self):
+ update = copy.deepcopy(self.update_d)
+ update.description = {'2. change': 'dollar change'}
+ return update, self.req_d.name
+
+ def _update_success(self, request, body):
+ self.assert_update_body(self.req_d, body, request)
+ _, new_body = self.get(request.name)
+ self.assert_update_body(self.req_d, new_body, request)
+
+
+class TestCaseDelete(TestCaseBase):
+ def setUp(self):
+ super(TestCaseDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
+ def test_notFound(self):
+ return 'notFound'
+
+ @executor.delete(httplib.OK, '_delete_success')
+ def test_success(self):
+ return self.req_d.name
+
+ def _delete_success(self, body):
+ self.assertEqual(body, '')
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, httplib.NOT_FOUND)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_token.py b/testapi/opnfv_testapi/tests/unit/handlers/test_token.py
new file mode 100644
index 0000000..e8b746d
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/test_token.py
@@ -0,0 +1,52 @@
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import httplib
+import unittest
+
+from tornado import web
+
+from opnfv_testapi.common import message
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import fake_pymongo
+from opnfv_testapi.tests.unit.handlers import test_result
+
+
+class TestTokenCreateResult(test_result.TestResultBase):
+ def get_app(self):
+ from opnfv_testapi.router import url_mappings
+ return web.Application(
+ url_mappings.mappings,
+ db=fake_pymongo,
+ debug=True,
+ auth=True
+ )
+
+ def setUp(self):
+ super(TestTokenCreateResult, self).setUp()
+ fake_pymongo.tokens.insert({"access_token": "12345"})
+
+ @executor.create(httplib.FORBIDDEN, message.invalid_token())
+ def test_resultCreateTokenInvalid(self):
+ self.headers['X-Auth-Token'] = '1234'
+ return self.req_d
+
+ @executor.create(httplib.UNAUTHORIZED, message.unauthorized())
+ def test_resultCreateTokenUnauthorized(self):
+ if 'X-Auth-Token' in self.headers:
+ self.headers.pop('X-Auth-Token')
+ return self.req_d
+
+ @executor.create(httplib.OK, '_create_success')
+ def test_resultCreateTokenSuccess(self):
+ self.headers['X-Auth-Token'] = '12345'
+ return self.req_d
+
+ def _create_success(self, body):
+ self.assertIn('CreateResponse', str(type(body)))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_version.py b/testapi/opnfv_testapi/tests/unit/handlers/test_version.py
new file mode 100644
index 0000000..6ef810f
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/handlers/test_version.py
@@ -0,0 +1,36 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import httplib
+import unittest
+
+from opnfv_testapi.models import base_models
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit.handlers import test_base as base
+
+
+class TestVersionBase(base.TestBase):
+ def setUp(self):
+ super(TestVersionBase, self).setUp()
+ self.list_res = base_models.Versions
+ self.basePath = '/versions'
+
+
+class TestVersion(TestVersionBase):
+ @executor.get(httplib.OK, '_get_success')
+ def test_success(self):
+ return None
+
+ def _get_success(self, body):
+ self.assertEqual(len(body.versions), 1)
+ self.assertEqual(body.versions[0].version, 'v1.0')
+ self.assertEqual(body.versions[0].description, 'basics')
+
+
+if __name__ == '__main__':
+ unittest.main()