summaryrefslogtreecommitdiffstats
path: root/utils/test/result_collection_api/tests/unit
diff options
context:
space:
mode:
authorSerenaFeng <feng.xiaowei@zte.com.cn>2016-05-17 21:05:27 +0800
committerSerenaFeng <feng.xiaowei@zte.com.cn>2016-05-17 21:12:20 +0800
commit892a693978fa78c8091e861e246e859d20fe99c4 (patch)
treef907cedb07ce80287cb0e251ea315a65eaa5050f /utils/test/result_collection_api/tests/unit
parent69fbbb4564539bf2aecbfd91a48825a8d8beea16 (diff)
add unittest framework for supporting unittest in testAPI
usage is shown in utils/test/result_collection_api/README.md JIRA: FUNCTEST-251 Change-Id: I788417e296c153cc485f4a4064697bdafc394e5b Signed-off-by: SerenaFeng <feng.xiaowei@zte.com.cn>
Diffstat (limited to 'utils/test/result_collection_api/tests/unit')
-rw-r--r--utils/test/result_collection_api/tests/unit/__init__.py1
-rw-r--r--utils/test/result_collection_api/tests/unit/fake_pymongo.py129
-rw-r--r--utils/test/result_collection_api/tests/unit/test_base.py36
-rw-r--r--utils/test/result_collection_api/tests/unit/test_fake_pymongo.py52
-rw-r--r--utils/test/result_collection_api/tests/unit/test_version.py14
5 files changed, 232 insertions, 0 deletions
diff --git a/utils/test/result_collection_api/tests/unit/__init__.py b/utils/test/result_collection_api/tests/unit/__init__.py
new file mode 100644
index 000000000..3ed9fd0f3
--- /dev/null
+++ b/utils/test/result_collection_api/tests/unit/__init__.py
@@ -0,0 +1 @@
+__author__ = 'root'
diff --git a/utils/test/result_collection_api/tests/unit/fake_pymongo.py b/utils/test/result_collection_api/tests/unit/fake_pymongo.py
new file mode 100644
index 000000000..e2db46038
--- /dev/null
+++ b/utils/test/result_collection_api/tests/unit/fake_pymongo.py
@@ -0,0 +1,129 @@
+from bson.objectid import ObjectId
+from concurrent.futures import ThreadPoolExecutor
+
+__author__ = 'serena'
+
+
+class MemCursor(object):
+ def __init__(self, collection):
+ self.collection = collection
+ self.count = len(self.collection)
+
+ def _is_next_exist(self):
+ return self.count != 0
+
+ @property
+ def fetch_next(self):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._is_next_exist)
+ return result
+
+ def next_object(self):
+ self.count -= 1
+ return self.collection.pop()
+
+
+class MemDb(object):
+
+ def __init__(self):
+ self.contents = []
+ pass
+
+ def _find_one(self, spec_or_id=None, *args):
+ if spec_or_id is not None and not isinstance(spec_or_id, dict):
+ spec_or_id = {"_id": spec_or_id}
+ cursor = self._find(spec_or_id, *args)
+ for result in cursor:
+ return result
+ return None
+
+ def find_one(self, spec_or_id=None, *args):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._find_one, spec_or_id, *args)
+ return result
+
+ def _insert(self, doc_or_docs):
+
+ docs = doc_or_docs
+ return_one = False
+ if isinstance(docs, dict):
+ return_one = True
+ docs = [docs]
+
+ ids = []
+ for doc in docs:
+ if '_id' not in doc:
+ doc['_id'] = ObjectId()
+ if not self._find_one(doc['_id']):
+ ids.append(doc['_id'])
+ self.contents.append(doc_or_docs)
+
+ if len(ids) == 0:
+ return None
+ if return_one:
+ return ids[0]
+ else:
+ return ids
+
+ def insert(self, doc_or_docs):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._insert, doc_or_docs)
+ return result
+
+ @staticmethod
+ def _in(content, *args):
+ for arg in args:
+ for k, v in arg.iteritems():
+ if content.get(k, None) != v:
+ return False
+
+ return True
+
+ def _find(self, *args):
+ res = []
+ for content in self.contents:
+ if self._in(content, *args):
+ res.append(content)
+
+ return res
+
+ def find(self, *args):
+ return MemCursor(self._find(*args))
+
+ def _update(self, spec, document):
+ updated = False
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec):
+ for k, v in document.iteritems():
+ updated = True
+ content[k] = v
+ self.contents[index] = content
+ return updated
+
+ def update(self, spec, document):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._update, spec, document)
+ return result
+
+ def _remove(self, spec_or_id=None):
+ if spec_or_id is None:
+ self.contents = []
+ if not isinstance(spec_or_id, dict):
+ spec_or_id = {'_id': spec_or_id}
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec_or_id):
+ del self.contents[index]
+ return True
+ return False
+
+ def remove(self, spec_or_id=None):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._remove, spec_or_id)
+ return result
+
+pod = MemDb()
+test_projects = MemDb()
+test_cases = MemDb()
+test_results = MemDb()
diff --git a/utils/test/result_collection_api/tests/unit/test_base.py b/utils/test/result_collection_api/tests/unit/test_base.py
new file mode 100644
index 000000000..b72436eb0
--- /dev/null
+++ b/utils/test/result_collection_api/tests/unit/test_base.py
@@ -0,0 +1,36 @@
+from tornado.web import Application
+from tornado.testing import AsyncHTTPTestCase
+
+from resources.handlers import VersionHandler, PodHandler, \
+ TestProjectHandler, TestCasesHandler, TestResultsHandler, DashboardHandler
+import fake_pymongo
+
+
+class TestBase(AsyncHTTPTestCase):
+ def get_app(self):
+ return Application(
+ [
+ (r"/version", VersionHandler),
+ (r"/pods", PodHandler),
+ (r"/pods/([^/]+)", PodHandler),
+ (r"/test_projects", TestProjectHandler),
+ (r"/test_projects/([^/]+)", TestProjectHandler),
+ (r"/test_projects/([^/]+)/cases", TestCasesHandler),
+ (r"/test_projects/([^/]+)/cases/([^/]+)", TestCasesHandler),
+ (r"/results", TestResultsHandler),
+ (r"/results([^/]*)", TestResultsHandler),
+ (r"/results/([^/]*)", TestResultsHandler),
+ (r"/dashboard", DashboardHandler),
+ (r"/dashboard([^/]*)", DashboardHandler),
+ (r"/dashboard/([^/]*)", DashboardHandler),
+ ],
+ db=fake_pymongo,
+ debug=True,
+ )
+
+ def tearDown(self):
+ yield fake_pymongo.pod.remove()
+ yield fake_pymongo.test_projects.remove()
+ yield fake_pymongo.test_cases.remove()
+ yield fake_pymongo.test_results.remove()
+ super(TestBase, self).tearDown()
diff --git a/utils/test/result_collection_api/tests/unit/test_fake_pymongo.py b/utils/test/result_collection_api/tests/unit/test_fake_pymongo.py
new file mode 100644
index 000000000..5ddbf28d9
--- /dev/null
+++ b/utils/test/result_collection_api/tests/unit/test_fake_pymongo.py
@@ -0,0 +1,52 @@
+import unittest
+from tornado.web import Application
+from tornado import gen
+from tornado.testing import AsyncHTTPTestCase, gen_test
+
+import fake_pymongo
+
+
+class MyTest(AsyncHTTPTestCase):
+ def setUp(self):
+ super(MyTest, self).setUp()
+ self.db = fake_pymongo
+ self.io_loop.run_sync(self.fixture_setup)
+
+ def get_app(self):
+ return Application()
+
+ @gen.coroutine
+ def fixture_setup(self):
+ self.test1 = {'_id': '1', 'name': 'test1'}
+ self.test2 = {'name': 'test2'}
+ yield self.db.pod.insert({'_id': '1', 'name': 'test1'})
+ yield self.db.pod.insert({'name': 'test2'})
+
+ @gen_test
+ def test_find_one(self):
+ user = yield self.db.pod.find_one({'name': 'test1'})
+ self.assertEqual(user, self.test1)
+
+ @gen_test
+ def test_find(self):
+ cursor = self.db.pod.find()
+ names = []
+ while (yield cursor.fetch_next):
+ ob = cursor.next_object()
+ names.append(ob.get('name'))
+ self.assertItemsEqual(names, ['test1', 'test2'])
+
+ @gen_test
+ def test_update(self):
+ yield self.db.pod.update({'_id': '1'}, {'name': 'new_test1'})
+ user = yield self.db.pod.find_one({'_id': '1'})
+ self.assertEqual(user.get('name', None), 'new_test1')
+
+ @gen_test
+ def test_remove(self):
+ yield self.db.pod.remove({'_id': '1'})
+ user = yield self.db.pod.find_one({'_id': '1'})
+ self.assertIsNone(user)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/utils/test/result_collection_api/tests/unit/test_version.py b/utils/test/result_collection_api/tests/unit/test_version.py
new file mode 100644
index 000000000..918f2f052
--- /dev/null
+++ b/utils/test/result_collection_api/tests/unit/test_version.py
@@ -0,0 +1,14 @@
+import unittest
+
+from test_base import TestBase
+
+__author__ = 'serena'
+
+
+class TestVersion(TestBase):
+ def test_get_version(self):
+ response = self.fetch('/version')
+ self.assertEqual(response.code, 200)
+
+if __name__ == '__main__':
+ unittest.main()