summaryrefslogtreecommitdiffstats
path: root/utils/test/result_collection_api/tests/unit/fake_pymongo.py
diff options
context:
space:
mode:
authorSerenaFeng <feng.xiaowei@zte.com.cn>2016-05-17 21:05:27 +0800
committerSerenaFeng <feng.xiaowei@zte.com.cn>2016-05-17 21:12:20 +0800
commit892a693978fa78c8091e861e246e859d20fe99c4 (patch)
treef907cedb07ce80287cb0e251ea315a65eaa5050f /utils/test/result_collection_api/tests/unit/fake_pymongo.py
parent69fbbb4564539bf2aecbfd91a48825a8d8beea16 (diff)
add unittest framework for supporting unittest in testAPI
usage is shown in utils/test/result_collection_api/README.md JIRA: FUNCTEST-251 Change-Id: I788417e296c153cc485f4a4064697bdafc394e5b Signed-off-by: SerenaFeng <feng.xiaowei@zte.com.cn>
Diffstat (limited to 'utils/test/result_collection_api/tests/unit/fake_pymongo.py')
-rw-r--r--utils/test/result_collection_api/tests/unit/fake_pymongo.py129
1 files changed, 129 insertions, 0 deletions
diff --git a/utils/test/result_collection_api/tests/unit/fake_pymongo.py b/utils/test/result_collection_api/tests/unit/fake_pymongo.py
new file mode 100644
index 000000000..e2db46038
--- /dev/null
+++ b/utils/test/result_collection_api/tests/unit/fake_pymongo.py
@@ -0,0 +1,129 @@
+from bson.objectid import ObjectId
+from concurrent.futures import ThreadPoolExecutor
+
+__author__ = 'serena'
+
+
+class MemCursor(object):
+ def __init__(self, collection):
+ self.collection = collection
+ self.count = len(self.collection)
+
+ def _is_next_exist(self):
+ return self.count != 0
+
+ @property
+ def fetch_next(self):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._is_next_exist)
+ return result
+
+ def next_object(self):
+ self.count -= 1
+ return self.collection.pop()
+
+
+class MemDb(object):
+
+ def __init__(self):
+ self.contents = []
+ pass
+
+ def _find_one(self, spec_or_id=None, *args):
+ if spec_or_id is not None and not isinstance(spec_or_id, dict):
+ spec_or_id = {"_id": spec_or_id}
+ cursor = self._find(spec_or_id, *args)
+ for result in cursor:
+ return result
+ return None
+
+ def find_one(self, spec_or_id=None, *args):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._find_one, spec_or_id, *args)
+ return result
+
+ def _insert(self, doc_or_docs):
+
+ docs = doc_or_docs
+ return_one = False
+ if isinstance(docs, dict):
+ return_one = True
+ docs = [docs]
+
+ ids = []
+ for doc in docs:
+ if '_id' not in doc:
+ doc['_id'] = ObjectId()
+ if not self._find_one(doc['_id']):
+ ids.append(doc['_id'])
+ self.contents.append(doc_or_docs)
+
+ if len(ids) == 0:
+ return None
+ if return_one:
+ return ids[0]
+ else:
+ return ids
+
+ def insert(self, doc_or_docs):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._insert, doc_or_docs)
+ return result
+
+ @staticmethod
+ def _in(content, *args):
+ for arg in args:
+ for k, v in arg.iteritems():
+ if content.get(k, None) != v:
+ return False
+
+ return True
+
+ def _find(self, *args):
+ res = []
+ for content in self.contents:
+ if self._in(content, *args):
+ res.append(content)
+
+ return res
+
+ def find(self, *args):
+ return MemCursor(self._find(*args))
+
+ def _update(self, spec, document):
+ updated = False
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec):
+ for k, v in document.iteritems():
+ updated = True
+ content[k] = v
+ self.contents[index] = content
+ return updated
+
+ def update(self, spec, document):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._update, spec, document)
+ return result
+
+ def _remove(self, spec_or_id=None):
+ if spec_or_id is None:
+ self.contents = []
+ if not isinstance(spec_or_id, dict):
+ spec_or_id = {'_id': spec_or_id}
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec_or_id):
+ del self.contents[index]
+ return True
+ return False
+
+ def remove(self, spec_or_id=None):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(self._remove, spec_or_id)
+ return result
+
+pod = MemDb()
+test_projects = MemDb()
+test_cases = MemDb()
+test_results = MemDb()