summaryrefslogtreecommitdiffstats
path: root/utils/test/testapi/opnfv_testapi/tests/unit
diff options
context:
space:
mode:
authorSerenaFeng <feng.xiaowei@zte.com.cn>2016-10-18 17:30:31 +0800
committerSerenaFeng <feng.xiaowei@zte.com.cn>2016-10-18 17:30:31 +0800
commitd30f7afdd2d39302174a788e9e008feb61b25596 (patch)
treeeed10dd17021d0f4ee87fb90089dab6eed5b5071 /utils/test/testapi/opnfv_testapi/tests/unit
parent307a8f07e8c17800ed1214394a3aacf02045a03a (diff)
rename result_collection_api to testapi
Change-Id: Iec4e3db23cd44f30831e17c127eda74e9d9b5d14 Signed-off-by: SerenaFeng <feng.xiaowei@zte.com.cn>
Diffstat (limited to 'utils/test/testapi/opnfv_testapi/tests/unit')
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/__init__.py9
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py191
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_base.py136
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py123
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py89
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_project.py141
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_result.py339
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py196
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_version.py31
9 files changed, 1255 insertions, 0 deletions
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py b/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py
new file mode 100644
index 000000000..3fc79f1d5
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/__init__.py
@@ -0,0 +1,9 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+__author__ = 'serena'
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py
new file mode 100644
index 000000000..3dd87e603
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/fake_pymongo.py
@@ -0,0 +1,191 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from bson.objectid import ObjectId
+from concurrent.futures import ThreadPoolExecutor
+from operator import itemgetter
+
+
+def thread_execute(method, *args, **kwargs):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(method, *args, **kwargs)
+ return result
+
+
+class MemCursor(object):
+ def __init__(self, collection):
+ self.collection = collection
+ self.count = len(self.collection)
+ self.sorted = []
+
+ def _is_next_exist(self):
+ return self.count != 0
+
+ @property
+ def fetch_next(self):
+ return thread_execute(self._is_next_exist)
+
+ def next_object(self):
+ self.count -= 1
+ return self.collection.pop()
+
+ def sort(self, key_or_list):
+ key = key_or_list[0][0]
+ if key_or_list[0][1] == -1:
+ reverse = True
+ else:
+ reverse = False
+
+ if key_or_list is not None:
+ self.collection = sorted(self.collection,
+ key=itemgetter(key), reverse=reverse)
+ return self
+
+ def limit(self, limit):
+ if limit != 0 and limit < len(self.collection):
+ self.collection = self.collection[0:limit]
+ self.count = limit
+ return self
+
+
+class MemDb(object):
+
+ def __init__(self):
+ self.contents = []
+ pass
+
+ def _find_one(self, spec_or_id=None, *args):
+ if spec_or_id is not None and not isinstance(spec_or_id, dict):
+ spec_or_id = {"_id": spec_or_id}
+ if '_id' in spec_or_id:
+ spec_or_id['_id'] = str(spec_or_id['_id'])
+ cursor = self._find(spec_or_id, *args)
+ for result in cursor:
+ return result
+ return None
+
+ def find_one(self, spec_or_id=None, *args):
+ return thread_execute(self._find_one, spec_or_id, *args)
+
+ def _insert(self, doc_or_docs, check_keys=True):
+
+ docs = doc_or_docs
+ return_one = False
+ if isinstance(docs, dict):
+ return_one = True
+ docs = [docs]
+
+ if check_keys:
+ for doc in docs:
+ self._check_keys(doc)
+
+ ids = []
+ for doc in docs:
+ if '_id' not in doc:
+ doc['_id'] = str(ObjectId())
+ if not self._find_one(doc['_id']):
+ ids.append(doc['_id'])
+ self.contents.append(doc_or_docs)
+
+ if len(ids) == 0:
+ return None
+ if return_one:
+ return ids[0]
+ else:
+ return ids
+
+ def insert(self, doc_or_docs, check_keys=True):
+ return thread_execute(self._insert, doc_or_docs, check_keys)
+
+ @staticmethod
+ def _compare_date(spec, value):
+ for k, v in spec.iteritems():
+ if k == '$gte' and value >= v:
+ return True
+ return False
+
+ @staticmethod
+ def _in(content, *args):
+ for arg in args:
+ for k, v in arg.iteritems():
+ if k == 'start_date':
+ if not MemDb._compare_date(v, content.get(k)):
+ return False
+ elif k == 'trust_indicator.current':
+ if content.get('trust_indicator').get('current') != v:
+ return False
+ elif content.get(k, None) != v:
+ return False
+
+ return True
+
+ def _find(self, *args):
+ res = []
+ for content in self.contents:
+ if self._in(content, *args):
+ res.append(content)
+
+ return res
+
+ def find(self, *args):
+ return MemCursor(self._find(*args))
+
+ def _update(self, spec, document, check_keys=True):
+ updated = False
+
+ if check_keys:
+ self._check_keys(document)
+
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec):
+ for k, v in document.iteritems():
+ updated = True
+ content[k] = v
+ self.contents[index] = content
+ return updated
+
+ def update(self, spec, document, check_keys=True):
+ return thread_execute(self._update, spec, document, check_keys)
+
+ def _remove(self, spec_or_id=None):
+ if spec_or_id is None:
+ self.contents = []
+ if not isinstance(spec_or_id, dict):
+ spec_or_id = {'_id': spec_or_id}
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec_or_id):
+ del self.contents[index]
+ return True
+ return False
+
+ def remove(self, spec_or_id=None):
+ return thread_execute(self._remove, spec_or_id)
+
+ def clear(self):
+ self._remove()
+
+ def _check_keys(self, doc):
+ for key in doc.keys():
+ if '.' in key:
+ raise NameError('key {} must not contain .'.format(key))
+ if key.startswith('$'):
+ raise NameError('key {} must not start with $'.format(key))
+ if isinstance(doc.get(key), dict):
+ self._check_keys(doc.get(key))
+
+
+def __getattr__(name):
+ return globals()[name]
+
+
+pods = MemDb()
+projects = MemDb()
+testcases = MemDb()
+results = MemDb()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py
new file mode 100644
index 000000000..ff1a1932c
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py
@@ -0,0 +1,136 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import json
+
+from tornado.web import Application
+from tornado.testing import AsyncHTTPTestCase
+
+from opnfv_testapi.router import url_mappings
+from opnfv_testapi.resources.models import CreateResponse
+import fake_pymongo
+
+
+class TestBase(AsyncHTTPTestCase):
+ headers = {'Content-Type': 'application/json; charset=UTF-8'}
+
+ def setUp(self):
+ self.basePath = ''
+ self.create_res = CreateResponse
+ self.get_res = None
+ self.list_res = None
+ self.update_res = None
+ self.req_d = None
+ self.req_e = None
+ self.addCleanup(self._clear)
+ super(TestBase, self).setUp()
+
+ def get_app(self):
+ return Application(
+ url_mappings.mappings,
+ db=fake_pymongo,
+ debug=True,
+ )
+
+ def create_d(self, *args):
+ return self.create(self.req_d, *args)
+
+ def create_e(self, *args):
+ return self.create(self.req_e, *args)
+
+ def create(self, req=None, *args):
+ return self.create_help(self.basePath, req, *args)
+
+ def create_help(self, uri, req, *args):
+ if req:
+ req = req.format()
+ res = self.fetch(self._update_uri(uri, *args),
+ method='POST',
+ body=json.dumps(req),
+ headers=self.headers)
+
+ return self._get_return(res, self.create_res)
+
+ def get(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='GET',
+ headers=self.headers)
+
+ def inner():
+ new_args, num = self._get_valid_args(*args)
+ return self.get_res \
+ if num != self._need_arg_num(self.basePath) else self.list_res
+ return self._get_return(res, inner())
+
+ def query(self, query):
+ res = self.fetch(self._get_query_uri(query),
+ method='GET',
+ headers=self.headers)
+ return self._get_return(res, self.list_res)
+
+ def update(self, new=None, *args):
+ if new:
+ new = new.format()
+ res = self.fetch(self._get_uri(*args),
+ method='PUT',
+ body=json.dumps(new),
+ headers=self.headers)
+ return self._get_return(res, self.update_res)
+
+ def delete(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='DELETE',
+ headers=self.headers)
+ return res.code, res.body
+
+ @staticmethod
+ def _get_valid_args(*args):
+ new_args = tuple(['%s' % arg for arg in args if arg is not None])
+ return new_args, len(new_args)
+
+ def _need_arg_num(self, uri):
+ return uri.count('%s')
+
+ def _get_query_uri(self, query):
+ return self.basePath + '?' + query
+
+ def _get_uri(self, *args):
+ return self._update_uri(self.basePath, *args)
+
+ def _update_uri(self, uri, *args):
+ r_uri = uri
+ new_args, num = self._get_valid_args(*args)
+ if num != self._need_arg_num(uri):
+ r_uri += '/%s'
+
+ return r_uri % tuple(['%s' % arg for arg in new_args])
+
+ def _get_return(self, res, cls):
+ code = res.code
+ body = res.body
+ return code, self._get_return_body(code, body, cls)
+
+ @staticmethod
+ def _get_return_body(code, body, cls):
+ return cls.from_dict(json.loads(body)) if code < 300 and cls else body
+
+ def assert_href(self, body):
+ self.assertIn(self.basePath, body.href)
+
+ def assert_create_body(self, body, req=None, *args):
+ if not req:
+ req = self.req_d
+ new_args = args + tuple([req.name])
+ self.assertIn(self._get_uri(*new_args), body.href)
+
+ @staticmethod
+ def _clear():
+ fake_pymongo.pods.clear()
+ fake_pymongo.projects.clear()
+ fake_pymongo.testcases.clear()
+ fake_pymongo.results.clear()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py
new file mode 100644
index 000000000..5f50ba867
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py
@@ -0,0 +1,123 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from tornado import gen
+from tornado.testing import AsyncHTTPTestCase, gen_test
+from tornado.web import Application
+
+import fake_pymongo
+
+
+class MyTest(AsyncHTTPTestCase):
+ def setUp(self):
+ super(MyTest, self).setUp()
+ self.db = fake_pymongo
+ self.addCleanup(self._clear)
+ self.io_loop.run_sync(self.fixture_setup)
+
+ def get_app(self):
+ return Application()
+
+ @gen.coroutine
+ def fixture_setup(self):
+ self.test1 = {'_id': '1', 'name': 'test1'}
+ self.test2 = {'name': 'test2'}
+ yield self.db.pods.insert({'_id': '1', 'name': 'test1'})
+ yield self.db.pods.insert({'name': 'test2'})
+
+ @gen_test
+ def test_find_one(self):
+ user = yield self.db.pods.find_one({'name': 'test1'})
+ self.assertEqual(user, self.test1)
+ self.db.pods.remove()
+
+ @gen_test
+ def test_find(self):
+ cursor = self.db.pods.find()
+ names = []
+ while (yield cursor.fetch_next):
+ ob = cursor.next_object()
+ names.append(ob.get('name'))
+ self.assertItemsEqual(names, ['test1', 'test2'])
+
+ @gen_test
+ def test_update(self):
+ yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertEqual(user.get('name', None), 'new_test1')
+
+ def test_update_dot_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ 'key 1. name must not contain .')
+
+ def test_update_dot_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ None,
+ check_keys=False)
+
+ def test_update_dollar_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ 'key $name must not start with $')
+
+ def test_update_dollar_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ None,
+ check_keys=False)
+
+ @gen_test
+ def test_remove(self):
+ yield self.db.pods.remove({'_id': '1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertIsNone(user)
+
+ def test_insert_dot_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ 'key 2. name must not contain .')
+
+ def test_insert_dot_no_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ None,
+ check_keys=False)
+
+ def test_insert_dollar_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ 'key $name must not start with $')
+
+ def test_insert_dollar_no_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ None,
+ check_keys=False)
+
+ def _clear(self):
+ self.db.pods.clear()
+
+ def _update_assert(self, docs, error=None, **kwargs):
+ self._db_assert('update', error, {'_id': '1'}, docs, **kwargs)
+
+ def _insert_assert(self, docs, error=None, **kwargs):
+ self._db_assert('insert', error, docs, **kwargs)
+
+ @gen_test
+ def _db_assert(self, method, error, *args, **kwargs):
+ name_error = None
+ try:
+ yield self._eval_pods_db(method, *args, **kwargs)
+ except NameError as err:
+ name_error = err.args[0]
+ finally:
+ self.assertEqual(name_error, error)
+
+ def _eval_pods_db(self, method, *args, **kwargs):
+ table_obj = vars(self.db)['pods']
+ return table_obj.__getattribute__(method)(*args, **kwargs)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py
new file mode 100644
index 000000000..a1184d554
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py
@@ -0,0 +1,89 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from test_base import TestBase
+from opnfv_testapi.resources.pod_models import PodCreateRequest, Pod, Pods
+from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \
+ HTTP_FORBIDDEN, HTTP_NOT_FOUND
+
+
+class TestPodBase(TestBase):
+ def setUp(self):
+ super(TestPodBase, self).setUp()
+ self.req_d = PodCreateRequest('zte-1', 'virtual',
+ 'zte pod 1', 'ci-pod')
+ self.req_e = PodCreateRequest('zte-2', 'metal', 'zte pod 2')
+ self.get_res = Pod
+ self.list_res = Pods
+ self.basePath = '/api/v1/pods'
+
+ def assert_get_body(self, pod, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(pod.name, req.name)
+ self.assertEqual(pod.mode, req.mode)
+ self.assertEqual(pod.details, req.details)
+ self.assertEqual(pod.role, req.role)
+ self.assertIsNotNone(pod.creation_date)
+ self.assertIsNotNone(pod._id)
+
+
+class TestPodCreate(TestPodBase):
+ def test_withoutBody(self):
+ (code, body) = self.create()
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_emptyName(self):
+ req_empty = PodCreateRequest('')
+ (code, body) = self.create(req_empty)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_noneName(self):
+ req_none = PodCreateRequest(None)
+ (code, body) = self.create(req_none)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_success(self):
+ code, body = self.create_d()
+ self.assertEqual(code, HTTP_OK)
+ self.assert_create_body(body)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ code, body = self.create_d()
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn('already exists', body)
+
+
+class TestPodGet(TestPodBase):
+ def test_notExist(self):
+ code, body = self.get('notExist')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_getOne(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ self.assert_get_body(body)
+
+ def test_list(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.get()
+ self.assertEqual(len(body.pods), 2)
+ for pod in body.pods:
+ if self.req_d.name == pod.name:
+ self.assert_get_body(pod)
+ else:
+ self.assert_get_body(pod, self.req_e)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py
new file mode 100644
index 000000000..327ddf7b2
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py
@@ -0,0 +1,141 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from test_base import TestBase
+from opnfv_testapi.resources.project_models import ProjectCreateRequest, \
+ Project, Projects, ProjectUpdateRequest
+from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \
+ HTTP_FORBIDDEN, HTTP_NOT_FOUND
+
+
+class TestProjectBase(TestBase):
+ def setUp(self):
+ super(TestProjectBase, self).setUp()
+ self.req_d = ProjectCreateRequest('vping', 'vping-ssh test')
+ self.req_e = ProjectCreateRequest('doctor', 'doctor test')
+ self.get_res = Project
+ self.list_res = Projects
+ self.update_res = Project
+ self.basePath = '/api/v1/projects'
+
+ def assert_body(self, project, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(project.name, req.name)
+ self.assertEqual(project.description, req.description)
+ self.assertIsNotNone(project._id)
+ self.assertIsNotNone(project.creation_date)
+
+
+class TestProjectCreate(TestProjectBase):
+ def test_withoutBody(self):
+ (code, body) = self.create()
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_emptyName(self):
+ req_empty = ProjectCreateRequest('')
+ (code, body) = self.create(req_empty)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_noneName(self):
+ req_none = ProjectCreateRequest(None)
+ (code, body) = self.create(req_none)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_success(self):
+ (code, body) = self.create_d()
+ self.assertEqual(code, HTTP_OK)
+ self.assert_create_body(body)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ (code, body) = self.create_d()
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn('already exists', body)
+
+
+class TestProjectGet(TestProjectBase):
+ def test_notExist(self):
+ code, body = self.get('notExist')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_getOne(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assert_body(body)
+
+ def test_list(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.get()
+ for project in body.projects:
+ if self.req_d.name == project.name:
+ self.assert_body(project)
+ else:
+ self.assert_body(project, self.req_e)
+
+
+class TestProjectUpdate(TestProjectBase):
+ def test_withoutBody(self):
+ code, _ = self.update(None, 'noBody')
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_notFound(self):
+ code, _ = self.update(self.req_e, 'notFound')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_newNameExist(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.update(self.req_e, self.req_d.name)
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn("already exists", body)
+
+ def test_noUpdate(self):
+ self.create_d()
+ code, body = self.update(self.req_d, self.req_d.name)
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn("Nothing to update", body)
+
+ def test_success(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ _id = body._id
+
+ req = ProjectUpdateRequest('newName', 'new description')
+ code, body = self.update(req, self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(_id, body._id)
+ self.assert_body(body, req)
+
+ _, new_body = self.get(req.name)
+ self.assertEqual(_id, new_body._id)
+ self.assert_body(new_body, req)
+
+
+class TestProjectDelete(TestProjectBase):
+ def test_notFound(self):
+ code, body = self.delete('notFound')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_success(self):
+ self.create_d()
+ code, body = self.delete(self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(body, '')
+
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py
new file mode 100644
index 000000000..8479b35cd
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py
@@ -0,0 +1,339 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+import unittest
+from datetime import datetime, timedelta
+
+from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \
+ HTTP_NOT_FOUND
+from opnfv_testapi.resources.pod_models import PodCreateRequest
+from opnfv_testapi.resources.project_models import ProjectCreateRequest
+from opnfv_testapi.resources.result_models import ResultCreateRequest, \
+ TestResult, TestResults, ResultUpdateRequest, TI, TIHistory
+from opnfv_testapi.resources.testcase_models import TestcaseCreateRequest
+from test_base import TestBase
+
+
+class Details(object):
+ def __init__(self, timestart=None, duration=None, status=None):
+ self.timestart = timestart
+ self.duration = duration
+ self.status = status
+
+ def format(self):
+ return {
+ "timestart": self.timestart,
+ "duration": self.duration,
+ "status": self.status
+ }
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = Details()
+ t.timestart = a_dict.get('timestart')
+ t.duration = a_dict.get('duration')
+ t.status = a_dict.get('status')
+ return t
+
+
+class TestResultBase(TestBase):
+ def setUp(self):
+ self.pod = 'zte-pod1'
+ self.project = 'functest'
+ self.case = 'vPing'
+ self.installer = 'fuel'
+ self.version = 'C'
+ self.build_tag = 'v3.0'
+ self.scenario = 'odl-l2'
+ self.criteria = 'passed'
+ self.trust_indicator = TI(0.7)
+ self.start_date = "2016-05-23 07:16:09.477097"
+ self.stop_date = "2016-05-23 07:16:19.477097"
+ self.update_date = "2016-05-24 07:16:19.477097"
+ self.update_step = -0.05
+ super(TestResultBase, self).setUp()
+ self.details = Details(timestart='0', duration='9s', status='OK')
+ self.req_d = ResultCreateRequest(pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria,
+ trust_indicator=self.trust_indicator)
+ self.get_res = TestResult
+ self.list_res = TestResults
+ self.update_res = TestResult
+ self.basePath = '/api/v1/results'
+ self.req_pod = PodCreateRequest(self.pod, 'metal', 'zte pod 1')
+ self.req_project = ProjectCreateRequest(self.project, 'vping test')
+ self.req_testcase = TestcaseCreateRequest(self.case,
+ '/cases/vping',
+ 'vping-ssh test')
+ self.create_help('/api/v1/pods', self.req_pod)
+ self.create_help('/api/v1/projects', self.req_project)
+ self.create_help('/api/v1/projects/%s/cases',
+ self.req_testcase,
+ self.project)
+
+ def assert_res(self, code, result, req=None):
+ self.assertEqual(code, HTTP_OK)
+ if req is None:
+ req = self.req_d
+ self.assertEqual(result.pod_name, req.pod_name)
+ self.assertEqual(result.project_name, req.project_name)
+ self.assertEqual(result.case_name, req.case_name)
+ self.assertEqual(result.installer, req.installer)
+ self.assertEqual(result.version, req.version)
+ details_req = Details.from_dict(req.details)
+ details_res = Details.from_dict(result.details)
+ self.assertEqual(details_res.duration, details_req.duration)
+ self.assertEqual(details_res.timestart, details_req.timestart)
+ self.assertEqual(details_res.status, details_req.status)
+ self.assertEqual(result.build_tag, req.build_tag)
+ self.assertEqual(result.scenario, req.scenario)
+ self.assertEqual(result.criteria, req.criteria)
+ self.assertEqual(result.start_date, req.start_date)
+ self.assertEqual(result.stop_date, req.stop_date)
+ self.assertIsNotNone(result._id)
+ ti = result.trust_indicator
+ self.assertEqual(ti.current, req.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+ def _create_d(self):
+ _, res = self.create_d()
+ return res.href.split('/')[-1]
+
+
+class TestResultCreate(TestResultBase):
+ def test_nobody(self):
+ (code, body) = self.create(None)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('no body', body)
+
+ def test_podNotProvided(self):
+ req = self.req_d
+ req.pod_name = None
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('pod_name missing', body)
+
+ def test_projectNotProvided(self):
+ req = self.req_d
+ req.project_name = None
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('project_name missing', body)
+
+ def test_testcaseNotProvided(self):
+ req = self.req_d
+ req.case_name = None
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('case_name missing', body)
+
+ def test_noPod(self):
+ req = self.req_d
+ req.pod_name = 'notExistPod'
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Could not find pod', body)
+
+ def test_noProject(self):
+ req = self.req_d
+ req.project_name = 'notExistProject'
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Could not find project', body)
+
+ def test_noTestcase(self):
+ req = self.req_d
+ req.case_name = 'notExistTestcase'
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Could not find testcase', body)
+
+ def test_success(self):
+ (code, body) = self.create_d()
+ self.assertEqual(code, HTTP_OK)
+ self.assert_href(body)
+
+ def test_key_with_doc(self):
+ req = copy.deepcopy(self.req_d)
+ req.details = {'1.name': 'dot_name'}
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_OK)
+ self.assert_href(body)
+
+ def test_no_ti(self):
+ req = ResultCreateRequest(pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria)
+ (code, res) = self.create(req)
+ _id = res.href.split('/')[-1]
+ self.assertEqual(code, HTTP_OK)
+ code, body = self.get(_id)
+ self.assert_res(code, body, req)
+
+
+class TestResultGet(TestResultBase):
+ def test_getOne(self):
+ _id = self._create_d()
+ code, body = self.get(_id)
+ self.assert_res(code, body)
+
+ def test_queryPod(self):
+ self._query_and_assert(self._set_query('pod'))
+
+ def test_queryProject(self):
+ self._query_and_assert(self._set_query('project'))
+
+ def test_queryTestcase(self):
+ self._query_and_assert(self._set_query('case'))
+
+ def test_queryVersion(self):
+ self._query_and_assert(self._set_query('version'))
+
+ def test_queryInstaller(self):
+ self._query_and_assert(self._set_query('installer'))
+
+ def test_queryBuildTag(self):
+ self._query_and_assert(self._set_query('build_tag'))
+
+ def test_queryScenario(self):
+ self._query_and_assert(self._set_query('scenario'))
+
+ def test_queryTrustIndicator(self):
+ self._query_and_assert(self._set_query('trust_indicator'))
+
+ def test_queryCriteria(self):
+ self._query_and_assert(self._set_query('criteria'))
+
+ def test_queryPeriodNotInt(self):
+ code, body = self.query(self._set_query('period=a'))
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('period must be int', body)
+
+ def test_queryPeriodFail(self):
+ self._query_and_assert(self._set_query('period=1'),
+ found=False, days=-10)
+
+ def test_queryPeriodSuccess(self):
+ self._query_and_assert(self._set_query('period=1'),
+ found=True)
+
+ def test_queryLastNotInt(self):
+ code, body = self.query(self._set_query('last=a'))
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('last must be int', body)
+
+ def test_queryLast(self):
+ self._create_changed_date()
+ req = self._create_changed_date(minutes=20)
+ self._create_changed_date(minutes=-20)
+ self._query_and_assert(self._set_query('last=1'), req=req)
+
+ def test_combination(self):
+ self._query_and_assert(self._set_query('pod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1'))
+
+ def test_notFound(self):
+ self._query_and_assert(self._set_query('pod=notExistPod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1'),
+ found=False)
+
+ def _query_and_assert(self, query, found=True, req=None, **kwargs):
+ if req is None:
+ req = self._create_changed_date(**kwargs)
+ code, body = self.query(query)
+ if not found:
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(0, len(body.results))
+ else:
+ self.assertEqual(1, len(body.results))
+ for result in body.results:
+ self.assert_res(code, result, req)
+
+ def _create_changed_date(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.start_date = datetime.now() + timedelta(**kwargs)
+ req.stop_date = str(req.start_date + timedelta(minutes=10))
+ req.start_date = str(req.start_date)
+ self.create(req)
+ return req
+
+ def _set_query(self, *args):
+ def get_value(arg):
+ return self.__getattribute__(arg) \
+ if arg != 'trust_indicator' else self.trust_indicator.current
+ uri = ''
+ for arg in args:
+ if '=' in arg:
+ uri += arg + '&'
+ else:
+ uri += '{}={}&'.format(arg, get_value(arg))
+ return uri[0: -1]
+
+
+class TestResultUpdate(TestResultBase):
+ def test_success(self):
+ _id = self._create_d()
+
+ new_ti = copy.deepcopy(self.trust_indicator)
+ new_ti.current += self.update_step
+ new_ti.histories.append(TIHistory(self.update_date, self.update_step))
+ new_data = copy.deepcopy(self.req_d)
+ new_data.trust_indicator = new_ti
+ update = ResultUpdateRequest(trust_indicator=new_ti)
+ code, body = self.update(update, _id)
+ self.assertEqual(_id, body._id)
+ self.assert_res(code, body, new_data)
+
+ code, new_body = self.get(_id)
+ self.assertEqual(_id, new_body._id)
+ self.assert_res(code, new_body, new_data)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py
new file mode 100644
index 000000000..cb767844a
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py
@@ -0,0 +1,196 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+import copy
+
+from test_base import TestBase
+from opnfv_testapi.resources.testcase_models import TestcaseCreateRequest, \
+ Testcase, Testcases, TestcaseUpdateRequest
+from opnfv_testapi.resources.project_models import ProjectCreateRequest
+from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \
+ HTTP_FORBIDDEN, HTTP_NOT_FOUND
+
+
+class TestCaseBase(TestBase):
+ def setUp(self):
+ super(TestCaseBase, self).setUp()
+ self.req_d = TestcaseCreateRequest('vping_1',
+ '/cases/vping_1',
+ 'vping-ssh test')
+ self.req_e = TestcaseCreateRequest('doctor_1',
+ '/cases/doctor_1',
+ 'create doctor')
+ self.update_d = TestcaseUpdateRequest('vping_1',
+ 'vping-ssh test',
+ 'functest')
+ self.update_e = TestcaseUpdateRequest('doctor_1',
+ 'create doctor',
+ 'functest')
+ self.get_res = Testcase
+ self.list_res = Testcases
+ self.update_res = Testcase
+ self.basePath = '/api/v1/projects/%s/cases'
+ self.create_project()
+
+ def assert_body(self, case, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(case.name, req.name)
+ self.assertEqual(case.description, req.description)
+ self.assertEqual(case.url, req.url)
+ self.assertIsNotNone(case._id)
+ self.assertIsNotNone(case.creation_date)
+
+ def assert_update_body(self, old, new, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(new.name, req.name)
+ self.assertEqual(new.description, req.description)
+ self.assertEqual(new.url, old.url)
+ self.assertIsNotNone(new._id)
+ self.assertIsNotNone(new.creation_date)
+
+ def create_project(self):
+ req_p = ProjectCreateRequest('functest', 'vping-ssh test')
+ self.create_help('/api/v1/projects', req_p)
+ self.project = req_p.name
+
+ def create_d(self):
+ return super(TestCaseBase, self).create_d(self.project)
+
+ def create_e(self):
+ return super(TestCaseBase, self).create_e(self.project)
+
+ def get(self, case=None):
+ return super(TestCaseBase, self).get(self.project, case)
+
+ def update(self, new=None, case=None):
+ return super(TestCaseBase, self).update(new, self.project, case)
+
+ def delete(self, case):
+ return super(TestCaseBase, self).delete(self.project, case)
+
+
+class TestCaseCreate(TestCaseBase):
+ def test_noBody(self):
+ (code, body) = self.create(None, 'vping')
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_noProject(self):
+ code, body = self.create(self.req_d, 'noProject')
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn('Could not find project', body)
+
+ def test_emptyName(self):
+ req_empty = TestcaseCreateRequest('')
+ (code, body) = self.create(req_empty, self.project)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_noneName(self):
+ req_none = TestcaseCreateRequest(None)
+ (code, body) = self.create(req_none, self.project)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_success(self):
+ code, body = self.create_d()
+ self.assertEqual(code, HTTP_OK)
+ self.assert_create_body(body, None, self.project)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ code, body = self.create_d()
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn('already exists', body)
+
+
+class TestCaseGet(TestCaseBase):
+ def test_notExist(self):
+ code, body = self.get('notExist')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_getOne(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assert_body(body)
+
+ def test_list(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.get()
+ for case in body.testcases:
+ if self.req_d.name == case.name:
+ self.assert_body(case)
+ else:
+ self.assert_body(case, self.req_e)
+
+
+class TestCaseUpdate(TestCaseBase):
+ def test_noBody(self):
+ code, _ = self.update(case='noBody')
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_notFound(self):
+ code, _ = self.update(self.update_e, 'notFound')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_newNameExist(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.update(self.update_e, self.req_d.name)
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn("already exists", body)
+
+ def test_noUpdate(self):
+ self.create_d()
+ code, body = self.update(self.update_d, self.req_d.name)
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn("Nothing to update", body)
+
+ def test_success(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ _id = body._id
+
+ code, body = self.update(self.update_e, self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(_id, body._id)
+ self.assert_update_body(self.req_d, body, self.update_e)
+
+ _, new_body = self.get(self.req_e.name)
+ self.assertEqual(_id, new_body._id)
+ self.assert_update_body(self.req_d, new_body, self.update_e)
+
+ def test_with_dollar(self):
+ self.create_d()
+ update = copy.deepcopy(self.update_d)
+ update.description = {'2. change': 'dollar change'}
+ code, body = self.update(update, self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+
+
+class TestCaseDelete(TestCaseBase):
+ def test_notFound(self):
+ code, body = self.delete('notFound')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_success(self):
+ self.create_d()
+ code, body = self.delete(self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(body, '')
+
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py
new file mode 100644
index 000000000..b6fbf45dc
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py
@@ -0,0 +1,31 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from test_base import TestBase
+from opnfv_testapi.resources.models import Versions
+
+
+class TestVersionBase(TestBase):
+ def setUp(self):
+ super(TestVersionBase, self).setUp()
+ self.list_res = Versions
+ self.basePath = '/versions'
+
+
+class TestVersion(TestVersionBase):
+ def test_success(self):
+ code, body = self.get()
+ self.assertEqual(200, code)
+ self.assertEqual(len(body.versions), 1)
+ self.assertEqual(body.versions[0].version, 'v1.0')
+ self.assertEqual(body.versions[0].description, 'basics')
+
+if __name__ == '__main__':
+ unittest.main()