summaryrefslogtreecommitdiffstats
path: root/cvp/opnfv_testapi/tests/unit/fake_pymongo.py
diff options
context:
space:
mode:
Diffstat (limited to 'cvp/opnfv_testapi/tests/unit/fake_pymongo.py')
-rw-r--r--cvp/opnfv_testapi/tests/unit/fake_pymongo.py284
1 files changed, 0 insertions, 284 deletions
diff --git a/cvp/opnfv_testapi/tests/unit/fake_pymongo.py b/cvp/opnfv_testapi/tests/unit/fake_pymongo.py
deleted file mode 100644
index 0ca83df6..00000000
--- a/cvp/opnfv_testapi/tests/unit/fake_pymongo.py
+++ /dev/null
@@ -1,284 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from operator import itemgetter
-
-from bson.objectid import ObjectId
-from concurrent.futures import ThreadPoolExecutor
-
-
-def thread_execute(method, *args, **kwargs):
- with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(method, *args, **kwargs)
- return result
-
-
-class MemCursor(object):
- def __init__(self, collection):
- self.collection = collection
- self.length = len(self.collection)
- self.sorted = []
-
- def _is_next_exist(self):
- return self.length != 0
-
- @property
- def fetch_next(self):
- return thread_execute(self._is_next_exist)
-
- def next_object(self):
- self.length -= 1
- return self.collection.pop()
-
- def sort(self, key_or_list):
- for k, v in key_or_list.iteritems():
- if v == -1:
- reverse = True
- else:
- reverse = False
-
- self.collection = sorted(self.collection,
- key=itemgetter(k), reverse=reverse)
- return self
-
- def limit(self, limit):
- if limit != 0 and limit < len(self.collection):
- self.collection = self.collection[0: limit]
- self.length = limit
- return self
-
- def skip(self, skip):
- if skip < self.length and (skip > 0):
- self.collection = self.collection[self.length - skip: -1]
- self.length -= skip
- elif skip >= self.length:
- self.collection = []
- self.length = 0
- return self
-
- def _count(self):
- return self.length
-
- def count(self):
- return thread_execute(self._count)
-
-
-class MemDb(object):
-
- def __init__(self, name):
- self.name = name
- self.contents = []
- pass
-
- def _find_one(self, spec_or_id=None, *args):
- if spec_or_id is not None and not isinstance(spec_or_id, dict):
- spec_or_id = {"_id": spec_or_id}
- if '_id' in spec_or_id:
- spec_or_id['_id'] = str(spec_or_id['_id'])
- cursor = self._find(spec_or_id, *args)
- for result in cursor:
- return result
- return None
-
- def find_one(self, spec_or_id=None, *args):
- return thread_execute(self._find_one, spec_or_id, *args)
-
- def _insert(self, doc_or_docs, check_keys=True):
-
- docs = doc_or_docs
- return_one = False
- if isinstance(docs, dict):
- return_one = True
- docs = [docs]
-
- if check_keys:
- for doc in docs:
- self._check_keys(doc)
-
- ids = []
- for doc in docs:
- if '_id' not in doc:
- doc['_id'] = str(ObjectId())
- if not self._find_one(doc['_id']):
- ids.append(doc['_id'])
- self.contents.append(doc_or_docs)
-
- if len(ids) == 0:
- return None
- if return_one:
- return ids[0]
- else:
- return ids
-
- def insert(self, doc_or_docs, check_keys=True):
- return thread_execute(self._insert, doc_or_docs, check_keys)
-
- @staticmethod
- def _compare_date(spec, value):
- gte = True
- lt = False
- for k, v in spec.iteritems():
- if k == '$gte' and value < v:
- gte = False
- elif k == '$lt' and value < v:
- lt = True
- return gte and lt
-
- def _in(self, content, *args):
- if self.name == 'scenarios':
- return self._in_scenarios(content, *args)
- else:
- return self._in_others(content, *args)
-
- def _in_scenarios_installer(self, installer, content):
- hit = False
- for s_installer in content['installers']:
- if installer == s_installer['installer']:
- hit = True
-
- return hit
-
- def _in_scenarios_version(self, version, content):
- hit = False
- for s_installer in content['installers']:
- for s_version in s_installer['versions']:
- if version == s_version['version']:
- hit = True
- return hit
-
- def _in_scenarios_project(self, project, content):
- hit = False
- for s_installer in content['installers']:
- for s_version in s_installer['versions']:
- for s_project in s_version['projects']:
- if project == s_project['project']:
- hit = True
-
- return hit
-
- def _in_scenarios(self, content, *args):
- for arg in args:
- for k, v in arg.iteritems():
- if k == 'installers':
- for inner in v.values():
- for i_k, i_v in inner.iteritems():
- if i_k == 'installer':
- return self._in_scenarios_installer(i_v,
- content)
- elif i_k == 'versions.version':
- return self._in_scenarios_version(i_v,
- content)
- elif i_k == 'versions.projects.project':
- return self._in_scenarios_project(i_v,
- content)
- elif content.get(k, None) != v:
- return False
-
- return True
-
- def _in_others(self, content, *args):
- for arg in args:
- for k, v in arg.iteritems():
- if k == 'start_date':
- if not MemDb._compare_date(v, content.get(k)):
- return False
- elif k == 'trust_indicator.current':
- if content.get('trust_indicator').get('current') != v:
- return False
- elif not isinstance(v, dict) and content.get(k, None) != v:
- return False
- return True
-
- def _find(self, *args):
- res = []
- for content in self.contents:
- if self._in(content, *args):
- res.append(content)
-
- return res
-
- def find(self, *args):
- return MemCursor(self._find(*args))
-
- def _aggregate(self, *args, **kwargs):
- res = self.contents
- print args
- for arg in args[0]:
- for k, v in arg.iteritems():
- if k == '$match':
- res = self._find(v)
- cursor = MemCursor(res)
- for arg in args[0]:
- for k, v in arg.iteritems():
- if k == '$sort':
- cursor = cursor.sort(v)
- elif k == '$skip':
- cursor = cursor.skip(v)
- elif k == '$limit':
- cursor = cursor.limit(v)
- return cursor
-
- def aggregate(self, *args, **kwargs):
- return self._aggregate(*args, **kwargs)
-
- def _update(self, spec, document, check_keys=True):
- updated = False
-
- if check_keys:
- self._check_keys(document)
-
- for index in range(len(self.contents)):
- content = self.contents[index]
- if self._in(content, spec):
- for k, v in document.iteritems():
- updated = True
- content[k] = v
- self.contents[index] = content
- return updated
-
- def update(self, spec, document, check_keys=True):
- return thread_execute(self._update, spec, document, check_keys)
-
- def _remove(self, spec_or_id=None):
- if spec_or_id is None:
- self.contents = []
- if not isinstance(spec_or_id, dict):
- spec_or_id = {'_id': spec_or_id}
- for index in range(len(self.contents)):
- content = self.contents[index]
- if self._in(content, spec_or_id):
- del self.contents[index]
- return True
- return False
-
- def remove(self, spec_or_id=None):
- return thread_execute(self._remove, spec_or_id)
-
- def clear(self):
- self._remove()
-
- def _check_keys(self, doc):
- for key in doc.keys():
- if '.' in key:
- raise NameError('key {} must not contain .'.format(key))
- if key.startswith('$'):
- raise NameError('key {} must not start with $'.format(key))
- if isinstance(doc.get(key), dict):
- self._check_keys(doc.get(key))
-
-
-def __getattr__(name):
- return globals()[name]
-
-
-pods = MemDb('pods')
-projects = MemDb('projects')
-testcases = MemDb('testcases')
-results = MemDb('results')
-scenarios = MemDb('scenarios')
-tokens = MemDb('tokens')