summaryrefslogtreecommitdiffstats
path: root/testapi/opnfv_testapi
diff options
context:
space:
mode:
Diffstat (limited to 'testapi/opnfv_testapi')
-rw-r--r--testapi/opnfv_testapi/__init__.py8
-rw-r--r--testapi/opnfv_testapi/cmd/__init__.py8
-rw-r--r--testapi/opnfv_testapi/cmd/server.py70
-rw-r--r--testapi/opnfv_testapi/common/__init__.py8
-rw-r--r--testapi/opnfv_testapi/common/config.py95
-rw-r--r--testapi/opnfv_testapi/common/constants.py15
-rw-r--r--testapi/opnfv_testapi/resources/__init__.py8
-rw-r--r--testapi/opnfv_testapi/resources/handlers.py237
-rw-r--r--testapi/opnfv_testapi/resources/models.py71
-rw-r--r--testapi/opnfv_testapi/resources/pod_handlers.py87
-rw-r--r--testapi/opnfv_testapi/resources/pod_models.py85
-rw-r--r--testapi/opnfv_testapi/resources/project_handlers.py92
-rw-r--r--testapi/opnfv_testapi/resources/project_models.py94
-rw-r--r--testapi/opnfv_testapi/resources/result_handlers.py198
-rw-r--r--testapi/opnfv_testapi/resources/result_models.py231
-rw-r--r--testapi/opnfv_testapi/resources/testcase_handlers.py115
-rw-r--r--testapi/opnfv_testapi/resources/testcase_models.py105
-rw-r--r--testapi/opnfv_testapi/router/__init__.py9
-rw-r--r--testapi/opnfv_testapi/router/url_mappings.py48
-rw-r--r--testapi/opnfv_testapi/tests/__init__.py1
-rw-r--r--testapi/opnfv_testapi/tests/unit/__init__.py9
-rw-r--r--testapi/opnfv_testapi/tests/unit/fake_pymongo.py191
-rw-r--r--testapi/opnfv_testapi/tests/unit/test_base.py136
-rw-r--r--testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py123
-rw-r--r--testapi/opnfv_testapi/tests/unit/test_pod.py89
-rw-r--r--testapi/opnfv_testapi/tests/unit/test_project.py141
-rw-r--r--testapi/opnfv_testapi/tests/unit/test_result.py339
-rw-r--r--testapi/opnfv_testapi/tests/unit/test_testcase.py196
-rw-r--r--testapi/opnfv_testapi/tests/unit/test_version.py31
-rw-r--r--testapi/opnfv_testapi/tornado_swagger/README.md273
-rw-r--r--testapi/opnfv_testapi/tornado_swagger/__init__.py8
-rw-r--r--testapi/opnfv_testapi/tornado_swagger/handlers.py43
-rw-r--r--testapi/opnfv_testapi/tornado_swagger/settings.py32
-rw-r--r--testapi/opnfv_testapi/tornado_swagger/swagger.py290
-rw-r--r--testapi/opnfv_testapi/tornado_swagger/views.py128
35 files changed, 3614 insertions, 0 deletions
diff --git a/testapi/opnfv_testapi/__init__.py b/testapi/opnfv_testapi/__init__.py
new file mode 100644
index 0000000..363bc38
--- /dev/null
+++ b/testapi/opnfv_testapi/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/testapi/opnfv_testapi/cmd/__init__.py b/testapi/opnfv_testapi/cmd/__init__.py
new file mode 100644
index 0000000..363bc38
--- /dev/null
+++ b/testapi/opnfv_testapi/cmd/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/testapi/opnfv_testapi/cmd/server.py b/testapi/opnfv_testapi/cmd/server.py
new file mode 100644
index 0000000..c3d7346
--- /dev/null
+++ b/testapi/opnfv_testapi/cmd/server.py
@@ -0,0 +1,70 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+"""
+Pre-requisites:
+ pip install motor
+ pip install tornado
+
+We can launch the API with this file
+
+TODOs :
+ - logging
+ - json args validation with schemes
+ - POST/PUT/DELETE for PODs
+ - POST/PUT/GET/DELETE for installers, platforms (enrich results info)
+ - count cases for GET on projects
+ - count results for GET on cases
+ - include objects
+ - swagger documentation
+ - setup file
+ - results pagination
+ - unit tests
+
+"""
+
+import argparse
+
+import tornado.ioloop
+import motor
+
+from opnfv_testapi.common.config import APIConfig
+from opnfv_testapi.tornado_swagger import swagger
+from opnfv_testapi.router import url_mappings
+
+# optionally get config file from command line
+parser = argparse.ArgumentParser()
+parser.add_argument("-c", "--config-file", dest='config_file',
+ help="Config file location")
+args = parser.parse_args()
+CONF = APIConfig().parse(args.config_file)
+
+# connecting to MongoDB server, and choosing database
+client = motor.MotorClient(CONF.mongo_url)
+db = client[CONF.mongo_dbname]
+
+swagger.docs(base_url=CONF.swagger_base_url)
+
+
+def make_app():
+ return swagger.Application(
+ url_mappings.mappings,
+ db=db,
+ debug=CONF.api_debug_on,
+ )
+
+
+def main():
+ application = make_app()
+ application.listen(CONF.api_port)
+ tornado.ioloop.IOLoop.current().start()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/testapi/opnfv_testapi/common/__init__.py b/testapi/opnfv_testapi/common/__init__.py
new file mode 100644
index 0000000..05c0c93
--- /dev/null
+++ b/testapi/opnfv_testapi/common/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/testapi/opnfv_testapi/common/config.py b/testapi/opnfv_testapi/common/config.py
new file mode 100644
index 0000000..ecab88a
--- /dev/null
+++ b/testapi/opnfv_testapi/common/config.py
@@ -0,0 +1,95 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+# feng.xiaowei@zte.com.cn remove prepare_put_request 5-30-2016
+##############################################################################
+
+
+from ConfigParser import SafeConfigParser, NoOptionError
+
+
+class ParseError(Exception):
+ """
+ Custom exception class for config file
+ """
+
+ def __init__(self, message):
+ self.msg = message
+
+ def __str__(self):
+ return 'error parsing config file : %s' % self.msg
+
+
+class APIConfig:
+ """
+ The purpose of this class is to load values correctly from the config file.
+ Each key is declared as an attribute in __init__() and linked in parse()
+ """
+
+ def __init__(self):
+ self._default_config_location = "/etc/opnfv_testapi/config.ini"
+ self.mongo_url = None
+ self.mongo_dbname = None
+ self.api_port = None
+ self.api_debug_on = None
+ self._parser = None
+ self.swagger_base_url = None
+
+ def _get_parameter(self, section, param):
+ try:
+ return self._parser.get(section, param)
+ except NoOptionError:
+ raise ParseError("[%s.%s] parameter not found" % (section, param))
+
+ def _get_int_parameter(self, section, param):
+ try:
+ return int(self._get_parameter(section, param))
+ except ValueError:
+ raise ParseError("[%s.%s] not an int" % (section, param))
+
+ def _get_bool_parameter(self, section, param):
+ result = self._get_parameter(section, param)
+ if str(result).lower() == 'true':
+ return True
+ if str(result).lower() == 'false':
+ return False
+
+ raise ParseError(
+ "[%s.%s : %s] not a boolean" % (section, param, result))
+
+ @staticmethod
+ def parse(config_location=None):
+ obj = APIConfig()
+
+ if config_location is None:
+ config_location = obj._default_config_location
+
+ obj._parser = SafeConfigParser()
+ obj._parser.read(config_location)
+ if not obj._parser:
+ raise ParseError("%s not found" % config_location)
+
+ # Linking attributes to keys from file with their sections
+ obj.mongo_url = obj._get_parameter("mongo", "url")
+ obj.mongo_dbname = obj._get_parameter("mongo", "dbname")
+
+ obj.api_port = obj._get_int_parameter("api", "port")
+ obj.api_debug_on = obj._get_bool_parameter("api", "debug")
+ obj.swagger_base_url = obj._get_parameter("swagger", "base_url")
+
+ return obj
+
+ def __str__(self):
+ return "mongo_url = %s \n" \
+ "mongo_dbname = %s \n" \
+ "api_port = %s \n" \
+ "api_debug_on = %s \n" \
+ "swagger_base_url = %s \n" % (self.mongo_url,
+ self.mongo_dbname,
+ self.api_port,
+ self.api_debug_on,
+ self.swagger_base_url)
diff --git a/testapi/opnfv_testapi/common/constants.py b/testapi/opnfv_testapi/common/constants.py
new file mode 100644
index 0000000..4d39a14
--- /dev/null
+++ b/testapi/opnfv_testapi/common/constants.py
@@ -0,0 +1,15 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+
+DEFAULT_REPRESENTATION = "application/json"
+HTTP_BAD_REQUEST = 400
+HTTP_FORBIDDEN = 403
+HTTP_NOT_FOUND = 404
+HTTP_OK = 200
diff --git a/testapi/opnfv_testapi/resources/__init__.py b/testapi/opnfv_testapi/resources/__init__.py
new file mode 100644
index 0000000..05c0c93
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/testapi/opnfv_testapi/resources/handlers.py b/testapi/opnfv_testapi/resources/handlers.py
new file mode 100644
index 0000000..5059f5d
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/handlers.py
@@ -0,0 +1,237 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+# feng.xiaowei@zte.com.cn refactor db.pod to db.pods 5-19-2016
+# feng.xiaowei@zte.com.cn refactor test_project to project 5-19-2016
+# feng.xiaowei@zte.com.cn refactor response body 5-19-2016
+# feng.xiaowei@zte.com.cn refactor pod/project response info 5-19-2016
+# feng.xiaowei@zte.com.cn refactor testcase related handler 5-20-2016
+# feng.xiaowei@zte.com.cn refactor result related handler 5-23-2016
+# feng.xiaowei@zte.com.cn refactor dashboard related handler 5-24-2016
+# feng.xiaowei@zte.com.cn add methods to GenericApiHandler 5-26-2016
+# feng.xiaowei@zte.com.cn remove PodHandler 5-26-2016
+# feng.xiaowei@zte.com.cn remove ProjectHandler 5-26-2016
+# feng.xiaowei@zte.com.cn remove TestcaseHandler 5-27-2016
+# feng.xiaowei@zte.com.cn remove ResultHandler 5-29-2016
+# feng.xiaowei@zte.com.cn remove DashboardHandler 5-30-2016
+##############################################################################
+
+import json
+from datetime import datetime
+
+from tornado import gen
+from tornado.web import RequestHandler, asynchronous, HTTPError
+
+from models import CreateResponse
+from opnfv_testapi.common.constants import DEFAULT_REPRESENTATION, \
+ HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_FORBIDDEN
+from opnfv_testapi.tornado_swagger import swagger
+
+
+class GenericApiHandler(RequestHandler):
+ def __init__(self, application, request, **kwargs):
+ super(GenericApiHandler, self).__init__(application, request, **kwargs)
+ self.db = self.settings["db"]
+ self.json_args = None
+ self.table = None
+ self.table_cls = None
+ self.db_projects = 'projects'
+ self.db_pods = 'pods'
+ self.db_testcases = 'testcases'
+ self.db_results = 'results'
+
+ def prepare(self):
+ if self.request.method != "GET" and self.request.method != "DELETE":
+ if self.request.headers.get("Content-Type") is not None:
+ if self.request.headers["Content-Type"].startswith(
+ DEFAULT_REPRESENTATION):
+ try:
+ self.json_args = json.loads(self.request.body)
+ except (ValueError, KeyError, TypeError) as error:
+ raise HTTPError(HTTP_BAD_REQUEST,
+ "Bad Json format [{}]".
+ format(error))
+
+ def finish_request(self, json_object=None):
+ if json_object:
+ self.write(json.dumps(json_object))
+ self.set_header("Content-Type", DEFAULT_REPRESENTATION)
+ self.finish()
+
+ def _create_response(self, resource):
+ href = self.request.full_url() + '/' + str(resource)
+ return CreateResponse(href=href).format()
+
+ def format_data(self, data):
+ cls_data = self.table_cls.from_dict(data)
+ return cls_data.format_http()
+
+ @asynchronous
+ @gen.coroutine
+ def _create(self, miss_checks, db_checks, **kwargs):
+ """
+ :param miss_checks: [miss1, miss2]
+ :param db_checks: [(table, exist, query, error)]
+ """
+ if self.json_args is None:
+ raise HTTPError(HTTP_BAD_REQUEST, "no body")
+
+ data = self.table_cls.from_dict(self.json_args)
+ for miss in miss_checks:
+ miss_data = data.__getattribute__(miss)
+ if miss_data is None or miss_data == '':
+ raise HTTPError(HTTP_BAD_REQUEST,
+ '{} missing'.format(miss))
+
+ for k, v in kwargs.iteritems():
+ data.__setattr__(k, v)
+
+ for table, exist, query, error in db_checks:
+ check = yield self._eval_db_find_one(query(data), table)
+ if (exist and not check) or (not exist and check):
+ code, message = error(data)
+ raise HTTPError(code, message)
+
+ if self.table != 'results':
+ data.creation_date = datetime.now()
+ _id = yield self._eval_db(self.table, 'insert', data.format(),
+ check_keys=False)
+ if 'name' in self.json_args:
+ resource = data.name
+ else:
+ resource = _id
+ self.finish_request(self._create_response(resource))
+
+ @asynchronous
+ @gen.coroutine
+ def _list(self, query=None, res_op=None, *args, **kwargs):
+ if query is None:
+ query = {}
+ data = []
+ cursor = self._eval_db(self.table, 'find', query)
+ if 'sort' in kwargs:
+ cursor = cursor.sort(kwargs.get('sort'))
+ if 'last' in kwargs:
+ cursor = cursor.limit(kwargs.get('last'))
+ while (yield cursor.fetch_next):
+ data.append(self.format_data(cursor.next_object()))
+ if res_op is None:
+ res = {self.table: data}
+ else:
+ res = res_op(data, *args)
+ self.finish_request(res)
+
+ @asynchronous
+ @gen.coroutine
+ def _get_one(self, query):
+ data = yield self._eval_db_find_one(query)
+ if data is None:
+ raise HTTPError(HTTP_NOT_FOUND,
+ "[{}] not exist in table [{}]"
+ .format(query, self.table))
+ self.finish_request(self.format_data(data))
+
+ @asynchronous
+ @gen.coroutine
+ def _delete(self, query):
+ data = yield self._eval_db_find_one(query)
+ if data is None:
+ raise HTTPError(HTTP_NOT_FOUND,
+ "[{}] not exit in table [{}]"
+ .format(query, self.table))
+
+ yield self._eval_db(self.table, 'remove', query)
+ self.finish_request()
+
+ @asynchronous
+ @gen.coroutine
+ def _update(self, query, db_keys):
+ if self.json_args is None:
+ raise HTTPError(HTTP_BAD_REQUEST, "No payload")
+
+ # check old data exist
+ from_data = yield self._eval_db_find_one(query)
+ if from_data is None:
+ raise HTTPError(HTTP_NOT_FOUND,
+ "{} could not be found in table [{}]"
+ .format(query, self.table))
+
+ data = self.table_cls.from_dict(from_data)
+ # check new data exist
+ equal, new_query = self._update_query(db_keys, data)
+ if not equal:
+ to_data = yield self._eval_db_find_one(new_query)
+ if to_data is not None:
+ raise HTTPError(HTTP_FORBIDDEN,
+ "{} already exists in table [{}]"
+ .format(new_query, self.table))
+
+ # we merge the whole document """
+ edit_request = data.format()
+ edit_request.update(self._update_requests(data))
+
+ """ Updating the DB """
+ yield self._eval_db(self.table, 'update', query, edit_request,
+ check_keys=False)
+ edit_request['_id'] = str(data._id)
+ self.finish_request(edit_request)
+
+ def _update_requests(self, data):
+ request = dict()
+ for k, v in self.json_args.iteritems():
+ request = self._update_request(request, k, v,
+ data.__getattribute__(k))
+ if not request:
+ raise HTTPError(HTTP_FORBIDDEN, "Nothing to update")
+ return request
+
+ @staticmethod
+ def _update_request(edit_request, key, new_value, old_value):
+ """
+ This function serves to prepare the elements in the update request.
+ We try to avoid replace the exact values in the db
+ edit_request should be a dict in which we add an entry (key) after
+ comparing values
+ """
+ if not (new_value is None):
+ if new_value != old_value:
+ edit_request[key] = new_value
+
+ return edit_request
+
+ def _update_query(self, keys, data):
+ query = dict()
+ equal = True
+ for key in keys:
+ new = self.json_args.get(key)
+ old = data.__getattribute__(key)
+ if new is None:
+ new = old
+ elif new != old:
+ equal = False
+ query[key] = new
+ return equal, query
+
+ def _eval_db(self, table, method, *args, **kwargs):
+ exec_collection = self.db.__getattr__(table)
+ return exec_collection.__getattribute__(method)(*args, **kwargs)
+
+ def _eval_db_find_one(self, query, table=None):
+ if table is None:
+ table = self.table
+ return self._eval_db(table, 'find_one', query)
+
+
+class VersionHandler(GenericApiHandler):
+ @swagger.operation(nickname='list')
+ def get(self):
+ """
+ @description: list all supported versions
+ @rtype: L{Versions}
+ """
+ versions = [{'version': 'v1.0', 'description': 'basics'}]
+ self.finish_request({'versions': versions})
diff --git a/testapi/opnfv_testapi/resources/models.py b/testapi/opnfv_testapi/resources/models.py
new file mode 100644
index 0000000..e79308b
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/models.py
@@ -0,0 +1,71 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+# feng.xiaowei@zte.com.cn mv Pod to pod_models.py 5-18-2016
+# feng.xiaowei@zte.com.cn add MetaCreateResponse/MetaGetResponse 5-18-2016
+# feng.xiaowei@zte.com.cn mv TestProject to project_models.py 5-19-2016
+# feng.xiaowei@zte.com.cn delete meta class 5-19-2016
+# feng.xiaowei@zte.com.cn add CreateResponse 5-19-2016
+# feng.xiaowei@zte.com.cn mv TestCase to testcase_models.py 5-20-2016
+# feng.xiaowei@zte.com.cn mv TestResut to result_models.py 5-23-2016
+##############################################################################
+from opnfv_testapi.tornado_swagger import swagger
+
+
+@swagger.model()
+class CreateResponse(object):
+ def __init__(self, href=''):
+ self.href = href
+
+ @staticmethod
+ def from_dict(res_dict):
+ if res_dict is None:
+ return None
+
+ res = CreateResponse()
+ res.href = res_dict.get('href')
+ return res
+
+ def format(self):
+ return {'href': self.href}
+
+
+@swagger.model()
+class Versions(object):
+ """
+ @property versions:
+ @ptype versions: C{list} of L{Version}
+ """
+ def __init__(self):
+ self.versions = list()
+
+ @staticmethod
+ def from_dict(res_dict):
+ if res_dict is None:
+ return None
+
+ res = Versions()
+ for version in res_dict.get('versions'):
+ res.versions.append(Version.from_dict(version))
+ return res
+
+
+@swagger.model()
+class Version(object):
+ def __init__(self, version=None, description=None):
+ self.version = version
+ self.description = description
+
+ @staticmethod
+ def from_dict(a_dict):
+ if a_dict is None:
+ return None
+
+ ver = Version()
+ ver.version = a_dict.get('version')
+ ver.description = str(a_dict.get('description'))
+ return ver
diff --git a/testapi/opnfv_testapi/resources/pod_handlers.py b/testapi/opnfv_testapi/resources/pod_handlers.py
new file mode 100644
index 0000000..8f44439
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/pod_handlers.py
@@ -0,0 +1,87 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from opnfv_testapi.tornado_swagger import swagger
+from handlers import GenericApiHandler
+from pod_models import Pod
+from opnfv_testapi.common.constants import HTTP_FORBIDDEN
+
+
+class GenericPodHandler(GenericApiHandler):
+ def __init__(self, application, request, **kwargs):
+ super(GenericPodHandler, self).__init__(application, request, **kwargs)
+ self.table = 'pods'
+ self.table_cls = Pod
+
+
+class PodCLHandler(GenericPodHandler):
+ @swagger.operation(nickname='list-all')
+ def get(self):
+ """
+ @description: list all pods
+ @return 200: list all pods, empty list is no pod exist
+ @rtype: L{Pods}
+ """
+ self._list()
+
+ @swagger.operation(nickname='create')
+ def post(self):
+ """
+ @description: create a pod
+ @param body: pod to be created
+ @type body: L{PodCreateRequest}
+ @in body: body
+ @rtype: L{CreateResponse}
+ @return 200: pod is created.
+ @raise 403: pod already exists
+ @raise 400: body or name not provided
+ """
+ def query(data):
+ return {'name': data.name}
+
+ def error(data):
+ message = '{} already exists as a pod'.format(data.name)
+ return HTTP_FORBIDDEN, message
+
+ miss_checks = ['name']
+ db_checks = [(self.table, False, query, error)]
+ self._create(miss_checks, db_checks)
+
+
+class PodGURHandler(GenericPodHandler):
+ @swagger.operation(nickname='get-one')
+ def get(self, pod_name):
+ """
+ @description: get a single pod by pod_name
+ @rtype: L{Pod}
+ @return 200: pod exist
+ @raise 404: pod not exist
+ """
+ query = dict()
+ query['name'] = pod_name
+ self._get_one(query)
+
+ def delete(self, pod_name):
+ """ Remove a POD
+
+ # check for an existing pod to be deleted
+ mongo_dict = yield self.db.pods.find_one(
+ {'name': pod_name})
+ pod = TestProject.pod(mongo_dict)
+ if pod is None:
+ raise HTTPError(HTTP_NOT_FOUND,
+ "{} could not be found as a pod to be deleted"
+ .format(pod_name))
+
+ # just delete it, or maybe save it elsewhere in a future
+ res = yield self.db.projects.remove(
+ {'name': pod_name})
+
+ self.finish_request(answer)
+ """
+ pass
diff --git a/testapi/opnfv_testapi/resources/pod_models.py b/testapi/opnfv_testapi/resources/pod_models.py
new file mode 100644
index 0000000..7231806
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/pod_models.py
@@ -0,0 +1,85 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from opnfv_testapi.tornado_swagger import swagger
+
+# name: name of the POD e.g. zte-1
+# mode: metal or virtual
+# details: any detail
+# role: ci-pod or community-pod or single-node
+
+
+@swagger.model()
+class PodCreateRequest(object):
+ def __init__(self, name, mode='', details='', role=""):
+ self.name = name
+ self.mode = mode
+ self.details = details
+ self.role = role
+
+ def format(self):
+ return {
+ "name": self.name,
+ "mode": self.mode,
+ "details": self.details,
+ "role": self.role,
+ }
+
+
+@swagger.model()
+class Pod(PodCreateRequest):
+ def __init__(self,
+ name='', mode='', details='',
+ role="", _id='', create_date=''):
+ super(Pod, self).__init__(name, mode, details, role)
+ self._id = _id
+ self.creation_date = create_date
+
+ @staticmethod
+ def from_dict(pod_dict):
+ if pod_dict is None:
+ return None
+
+ p = Pod()
+ p._id = pod_dict.get('_id')
+ p.creation_date = str(pod_dict.get('creation_date'))
+ p.name = pod_dict.get('name')
+ p.mode = pod_dict.get('mode')
+ p.details = pod_dict.get('details')
+ p.role = pod_dict.get('role')
+ return p
+
+ def format(self):
+ f = super(Pod, self).format()
+ f['creation_date'] = str(self.creation_date)
+ return f
+
+ def format_http(self):
+ f = self.format()
+ f['_id'] = str(self._id)
+ return f
+
+
+@swagger.model()
+class Pods(object):
+ """
+ @property pods:
+ @ptype pods: C{list} of L{Pod}
+ """
+ def __init__(self):
+ self.pods = list()
+
+ @staticmethod
+ def from_dict(res_dict):
+ if res_dict is None:
+ return None
+
+ res = Pods()
+ for pod in res_dict.get('pods'):
+ res.pods.append(Pod.from_dict(pod))
+ return res
diff --git a/testapi/opnfv_testapi/resources/project_handlers.py b/testapi/opnfv_testapi/resources/project_handlers.py
new file mode 100644
index 0000000..1e9a972
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/project_handlers.py
@@ -0,0 +1,92 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from opnfv_testapi.tornado_swagger import swagger
+from handlers import GenericApiHandler
+from opnfv_testapi.common.constants import HTTP_FORBIDDEN
+from project_models import Project
+
+
+class GenericProjectHandler(GenericApiHandler):
+ def __init__(self, application, request, **kwargs):
+ super(GenericProjectHandler, self).__init__(application,
+ request,
+ **kwargs)
+ self.table = 'projects'
+ self.table_cls = Project
+
+
+class ProjectCLHandler(GenericProjectHandler):
+ @swagger.operation(nickname="list-all")
+ def get(self):
+ """
+ @description: list all projects
+ @return 200: return all projects, empty list is no project exist
+ @rtype: L{Projects}
+ """
+ self._list()
+
+ @swagger.operation(nickname="create")
+ def post(self):
+ """
+ @description: create a project
+ @param body: project to be created
+ @type body: L{ProjectCreateRequest}
+ @in body: body
+ @rtype: L{CreateResponse}
+ @return 200: project is created.
+ @raise 403: project already exists
+ @raise 400: body or name not provided
+ """
+ def query(data):
+ return {'name': data.name}
+
+ def error(data):
+ message = '{} already exists as a project'.format(data.name)
+ return HTTP_FORBIDDEN, message
+
+ miss_checks = ['name']
+ db_checks = [(self.table, False, query, error)]
+ self._create(miss_checks, db_checks)
+
+
+class ProjectGURHandler(GenericProjectHandler):
+ @swagger.operation(nickname='get-one')
+ def get(self, project_name):
+ """
+ @description: get a single project by project_name
+ @rtype: L{Project}
+ @return 200: project exist
+ @raise 404: project not exist
+ """
+ self._get_one({'name': project_name})
+
+ @swagger.operation(nickname="update")
+ def put(self, project_name):
+ """
+ @description: update a single project by project_name
+ @param body: project to be updated
+ @type body: L{ProjectUpdateRequest}
+ @in body: body
+ @rtype: L{Project}
+ @return 200: update success
+ @raise 404: project not exist
+ @raise 403: new project name already exist or nothing to update
+ """
+ query = {'name': project_name}
+ db_keys = ['name']
+ self._update(query, db_keys)
+
+ @swagger.operation(nickname='delete')
+ def delete(self, project_name):
+ """
+ @description: delete a project by project_name
+ @return 200: delete success
+ @raise 404: project not exist
+ """
+ self._delete({'name': project_name})
diff --git a/testapi/opnfv_testapi/resources/project_models.py b/testapi/opnfv_testapi/resources/project_models.py
new file mode 100644
index 0000000..f70630c
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/project_models.py
@@ -0,0 +1,94 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from opnfv_testapi.tornado_swagger import swagger
+
+
+@swagger.model()
+class ProjectCreateRequest(object):
+ def __init__(self, name, description=''):
+ self.name = name
+ self.description = description
+
+ def format(self):
+ return {
+ "name": self.name,
+ "description": self.description,
+ }
+
+
+@swagger.model()
+class ProjectUpdateRequest(object):
+ def __init__(self, name='', description=''):
+ self.name = name
+ self.description = description
+
+ def format(self):
+ return {
+ "name": self.name,
+ "description": self.description,
+ }
+
+
+@swagger.model()
+class Project(object):
+ def __init__(self,
+ name=None, _id=None, description=None, create_date=None):
+ self._id = _id
+ self.name = name
+ self.description = description
+ self.creation_date = create_date
+
+ @staticmethod
+ def from_dict(res_dict):
+
+ if res_dict is None:
+ return None
+
+ t = Project()
+ t._id = res_dict.get('_id')
+ t.creation_date = res_dict.get('creation_date')
+ t.name = res_dict.get('name')
+ t.description = res_dict.get('description')
+
+ return t
+
+ def format(self):
+ return {
+ "name": self.name,
+ "description": self.description,
+ "creation_date": str(self.creation_date)
+ }
+
+ def format_http(self):
+ return {
+ "_id": str(self._id),
+ "name": self.name,
+ "description": self.description,
+ "creation_date": str(self.creation_date),
+ }
+
+
+@swagger.model()
+class Projects(object):
+ """
+ @property projects:
+ @ptype projects: C{list} of L{Project}
+ """
+ def __init__(self):
+ self.projects = list()
+
+ @staticmethod
+ def from_dict(res_dict):
+ if res_dict is None:
+ return None
+
+ res = Projects()
+ for project in res_dict.get('projects'):
+ res.projects.append(Project.from_dict(project))
+ return res
diff --git a/testapi/opnfv_testapi/resources/result_handlers.py b/testapi/opnfv_testapi/resources/result_handlers.py
new file mode 100644
index 0000000..400b84a
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/result_handlers.py
@@ -0,0 +1,198 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from datetime import datetime, timedelta
+
+from bson.objectid import ObjectId
+from tornado.web import HTTPError
+
+from opnfv_testapi.common.constants import HTTP_BAD_REQUEST, HTTP_NOT_FOUND
+from opnfv_testapi.resources.handlers import GenericApiHandler
+from opnfv_testapi.resources.result_models import TestResult
+from opnfv_testapi.tornado_swagger import swagger
+
+
+class GenericResultHandler(GenericApiHandler):
+ def __init__(self, application, request, **kwargs):
+ super(GenericResultHandler, self).__init__(application,
+ request,
+ **kwargs)
+ self.table = self.db_results
+ self.table_cls = TestResult
+
+ def get_int(self, key, value):
+ try:
+ value = int(value)
+ except:
+ raise HTTPError(HTTP_BAD_REQUEST, '{} must be int'.format(key))
+ return value
+
+ def set_query(self):
+ query = dict()
+ for k in self.request.query_arguments.keys():
+ v = self.get_query_argument(k)
+ if k == 'project' or k == 'pod' or k == 'case':
+ query[k + '_name'] = v
+ elif k == 'period':
+ v = self.get_int(k, v)
+ if v > 0:
+ period = datetime.now() - timedelta(days=v)
+ obj = {"$gte": str(period)}
+ query['start_date'] = obj
+ elif k == 'trust_indicator':
+ query[k + '.current'] = float(v)
+ elif k != 'last':
+ query[k] = v
+ return query
+
+
+class ResultsCLHandler(GenericResultHandler):
+ @swagger.operation(nickname="list-all")
+ def get(self):
+ """
+ @description: Retrieve result(s) for a test project
+ on a specific pod.
+ @notes: Retrieve result(s) for a test project on a specific pod.
+ Available filters for this request are :
+ - project : project name
+ - case : case name
+ - pod : pod name
+ - version : platform version (Arno-R1, ...)
+ - installer (fuel, ...)
+ - build_tag : Jenkins build tag name
+ - period : x (x last days)
+ - scenario : the test scenario (previously version)
+ - criteria : the global criteria status passed or failed
+ - trust_indicator : evaluate the stability of the test case
+ to avoid running systematically long and stable test case
+
+ GET /results/project=functest&case=vPing&version=Arno-R1 \
+ &pod=pod_name&period=15
+ @return 200: all test results consist with query,
+ empty list if no result is found
+ @rtype: L{TestResults}
+ @param pod: pod name
+ @type pod: L{string}
+ @in pod: query
+ @required pod: False
+ @param project: project name
+ @type project: L{string}
+ @in project: query
+ @required project: False
+ @param case: case name
+ @type case: L{string}
+ @in case: query
+ @required case: False
+ @param version: i.e. Colorado
+ @type version: L{string}
+ @in version: query
+ @required version: False
+ @param installer: fuel/apex/joid/compass
+ @type installer: L{string}
+ @in installer: query
+ @required installer: False
+ @param build_tag: i.e. v3.0
+ @type build_tag: L{string}
+ @in build_tag: query
+ @required build_tag: False
+ @param scenario: i.e. odl
+ @type scenario: L{string}
+ @in scenario: query
+ @required scenario: False
+ @param criteria: i.e. passed
+ @type criteria: L{string}
+ @in criteria: query
+ @required criteria: False
+ @param period: last days
+ @type period: L{string}
+ @in period: query
+ @required period: False
+ @param last: last records stored until now
+ @type last: L{string}
+ @in last: query
+ @required last: False
+ @param trust_indicator: must be float
+ @type trust_indicator: L{float}
+ @in trust_indicator: query
+ @required trust_indicator: False
+ """
+ last = self.get_query_argument('last', 0)
+ if last is not None:
+ last = self.get_int('last', last)
+
+ self._list(self.set_query(), sort=[('start_date', -1)], last=last)
+
+ @swagger.operation(nickname="create")
+ def post(self):
+ """
+ @description: create a test result
+ @param body: result to be created
+ @type body: L{ResultCreateRequest}
+ @in body: body
+ @rtype: L{CreateResponse}
+ @return 200: result is created.
+ @raise 404: pod/project/testcase not exist
+ @raise 400: body/pod_name/project_name/case_name not provided
+ """
+ def pod_query(data):
+ return {'name': data.pod_name}
+
+ def pod_error(data):
+ message = 'Could not find pod [{}]'.format(data.pod_name)
+ return HTTP_NOT_FOUND, message
+
+ def project_query(data):
+ return {'name': data.project_name}
+
+ def project_error(data):
+ message = 'Could not find project [{}]'.format(data.project_name)
+ return HTTP_NOT_FOUND, message
+
+ def testcase_query(data):
+ return {'project_name': data.project_name, 'name': data.case_name}
+
+ def testcase_error(data):
+ message = 'Could not find testcase [{}] in project [{}]'\
+ .format(data.case_name, data.project_name)
+ return HTTP_NOT_FOUND, message
+
+ miss_checks = ['pod_name', 'project_name', 'case_name']
+ db_checks = [('pods', True, pod_query, pod_error),
+ ('projects', True, project_query, project_error),
+ ('testcases', True, testcase_query, testcase_error)]
+ self._create(miss_checks, db_checks)
+
+
+class ResultsGURHandler(GenericResultHandler):
+ @swagger.operation(nickname='get-one')
+ def get(self, result_id):
+ """
+ @description: get a single result by result_id
+ @rtype: L{TestResult}
+ @return 200: test result exist
+ @raise 404: test result not exist
+ """
+ query = dict()
+ query["_id"] = ObjectId(result_id)
+ self._get_one(query)
+
+ @swagger.operation(nickname="update")
+ def put(self, result_id):
+ """
+ @description: update a single result by _id
+ @param body: fields to be updated
+ @type body: L{ResultUpdateRequest}
+ @in body: body
+ @rtype: L{Result}
+ @return 200: update success
+ @raise 404: result not exist
+ @raise 403: nothing to update
+ """
+ query = {'_id': ObjectId(result_id)}
+ db_keys = []
+ self._update(query, db_keys)
diff --git a/testapi/opnfv_testapi/resources/result_models.py b/testapi/opnfv_testapi/resources/result_models.py
new file mode 100644
index 0000000..f73f5c6
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/result_models.py
@@ -0,0 +1,231 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from opnfv_testapi.tornado_swagger import swagger
+
+
+@swagger.model()
+class TIHistory(object):
+ """
+ @ptype step: L{float}
+ """
+ def __init__(self, date=None, step=0):
+ self.date = date
+ self.step = step
+
+ def format(self):
+ return {
+ "date": self.date,
+ "step": self.step
+ }
+
+ @staticmethod
+ def from_dict(a_dict):
+ if a_dict is None:
+ return None
+
+ return TIHistory(a_dict.get('date'), a_dict.get('step'))
+
+
+@swagger.model()
+class TI(object):
+ """
+ @property histories: trust_indicator update histories
+ @ptype histories: C{list} of L{TIHistory}
+ @ptype current: L{float}
+ """
+ def __init__(self, current=0):
+ self.current = current
+ self.histories = list()
+
+ def format(self):
+ hs = []
+ for h in self.histories:
+ hs.append(h.format())
+
+ return {
+ "current": self.current,
+ "histories": hs
+ }
+
+ @staticmethod
+ def from_dict(a_dict):
+ t = TI()
+ if a_dict:
+ t.current = a_dict.get('current')
+ if 'histories' in a_dict.keys():
+ for history in a_dict.get('histories', None):
+ t.histories.append(TIHistory.from_dict(history))
+ else:
+ t.histories = []
+ return t
+
+
+@swagger.model()
+class ResultCreateRequest(object):
+ """
+ @property trust_indicator:
+ @ptype trust_indicator: L{TI}
+ """
+ def __init__(self,
+ pod_name=None,
+ project_name=None,
+ case_name=None,
+ installer=None,
+ version=None,
+ start_date=None,
+ stop_date=None,
+ details=None,
+ build_tag=None,
+ scenario=None,
+ criteria=None,
+ trust_indicator=None):
+ self.pod_name = pod_name
+ self.project_name = project_name
+ self.case_name = case_name
+ self.installer = installer
+ self.version = version
+ self.start_date = start_date
+ self.stop_date = stop_date
+ self.details = details
+ self.build_tag = build_tag
+ self.scenario = scenario
+ self.criteria = criteria
+ self.trust_indicator = trust_indicator if trust_indicator else TI(0)
+
+ def format(self):
+ return {
+ "pod_name": self.pod_name,
+ "project_name": self.project_name,
+ "case_name": self.case_name,
+ "installer": self.installer,
+ "version": self.version,
+ "start_date": self.start_date,
+ "stop_date": self.stop_date,
+ "details": self.details,
+ "build_tag": self.build_tag,
+ "scenario": self.scenario,
+ "criteria": self.criteria,
+ "trust_indicator": self.trust_indicator.format()
+ }
+
+
+@swagger.model()
+class ResultUpdateRequest(object):
+ """
+ @property trust_indicator:
+ @ptype trust_indicator: L{TI}
+ """
+ def __init__(self, trust_indicator=None):
+ self.trust_indicator = trust_indicator
+
+ def format(self):
+ return {
+ "trust_indicator": self.trust_indicator.format(),
+ }
+
+
+@swagger.model()
+class TestResult(object):
+ """
+ @property trust_indicator: used for long duration test case
+ @ptype trust_indicator: L{TI}
+ """
+ def __init__(self, _id=None, case_name=None, project_name=None,
+ pod_name=None, installer=None, version=None,
+ start_date=None, stop_date=None, details=None,
+ build_tag=None, scenario=None, criteria=None,
+ trust_indicator=None):
+ self._id = _id
+ self.case_name = case_name
+ self.project_name = project_name
+ self.pod_name = pod_name
+ self.installer = installer
+ self.version = version
+ self.start_date = start_date
+ self.stop_date = stop_date
+ self.details = details
+ self.build_tag = build_tag
+ self.scenario = scenario
+ self.criteria = criteria
+ self.trust_indicator = trust_indicator
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = TestResult()
+ t._id = a_dict.get('_id')
+ t.case_name = a_dict.get('case_name')
+ t.pod_name = a_dict.get('pod_name')
+ t.project_name = a_dict.get('project_name')
+ t.start_date = str(a_dict.get('start_date'))
+ t.stop_date = str(a_dict.get('stop_date'))
+ t.details = a_dict.get('details')
+ t.version = a_dict.get('version')
+ t.installer = a_dict.get('installer')
+ t.build_tag = a_dict.get('build_tag')
+ t.scenario = a_dict.get('scenario')
+ t.criteria = a_dict.get('criteria')
+ t.trust_indicator = TI.from_dict(a_dict.get('trust_indicator'))
+ return t
+
+ def format(self):
+ return {
+ "case_name": self.case_name,
+ "project_name": self.project_name,
+ "pod_name": self.pod_name,
+ "start_date": str(self.start_date),
+ "stop_date": str(self.stop_date),
+ "version": self.version,
+ "installer": self.installer,
+ "details": self.details,
+ "build_tag": self.build_tag,
+ "scenario": self.scenario,
+ "criteria": self.criteria,
+ "trust_indicator": self.trust_indicator.format()
+ }
+
+ def format_http(self):
+ return {
+ "_id": str(self._id),
+ "case_name": self.case_name,
+ "project_name": self.project_name,
+ "pod_name": self.pod_name,
+ "start_date": str(self.start_date),
+ "stop_date": str(self.stop_date),
+ "version": self.version,
+ "installer": self.installer,
+ "details": self.details,
+ "build_tag": self.build_tag,
+ "scenario": self.scenario,
+ "criteria": self.criteria,
+ "trust_indicator": self.trust_indicator.format()
+ }
+
+
+@swagger.model()
+class TestResults(object):
+ """
+ @property results:
+ @ptype results: C{list} of L{TestResult}
+ """
+ def __init__(self):
+ self.results = list()
+
+ @staticmethod
+ def from_dict(a_dict):
+ if a_dict is None:
+ return None
+
+ res = TestResults()
+ for result in a_dict.get('results'):
+ res.results.append(TestResult.from_dict(result))
+ return res
diff --git a/testapi/opnfv_testapi/resources/testcase_handlers.py b/testapi/opnfv_testapi/resources/testcase_handlers.py
new file mode 100644
index 0000000..253aa66
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/testcase_handlers.py
@@ -0,0 +1,115 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from opnfv_testapi.common.constants import HTTP_FORBIDDEN
+from opnfv_testapi.resources.handlers import GenericApiHandler
+from opnfv_testapi.resources.testcase_models import Testcase
+from opnfv_testapi.tornado_swagger import swagger
+
+
+class GenericTestcaseHandler(GenericApiHandler):
+ def __init__(self, application, request, **kwargs):
+ super(GenericTestcaseHandler, self).__init__(application,
+ request,
+ **kwargs)
+ self.table = self.db_testcases
+ self.table_cls = Testcase
+
+
+class TestcaseCLHandler(GenericTestcaseHandler):
+ @swagger.operation(nickname="list-all")
+ def get(self, project_name):
+ """
+ @description: list all testcases of a project by project_name
+ @return 200: return all testcases of this project,
+ empty list is no testcase exist in this project
+ @rtype: L{TestCases}
+ """
+ query = dict()
+ query['project_name'] = project_name
+ self._list(query)
+
+ @swagger.operation(nickname="create")
+ def post(self, project_name):
+ """
+ @description: create a testcase of a project by project_name
+ @param body: testcase to be created
+ @type body: L{TestcaseCreateRequest}
+ @in body: body
+ @rtype: L{CreateResponse}
+ @return 200: testcase is created in this project.
+ @raise 403: project not exist
+ or testcase already exists in this project
+ @raise 400: body or name not provided
+ """
+ def p_query(data):
+ return {'name': data.project_name}
+
+ def tc_query(data):
+ return {
+ 'project_name': data.project_name,
+ 'name': data.name
+ }
+
+ def p_error(data):
+ message = 'Could not find project [{}]'.format(data.project_name)
+ return HTTP_FORBIDDEN, message
+
+ def tc_error(data):
+ message = '{} already exists as a testcase in project {}'\
+ .format(data.name, data.project_name)
+ return HTTP_FORBIDDEN, message
+
+ miss_checks = ['name']
+ db_checks = [(self.db_projects, True, p_query, p_error),
+ (self.db_testcases, False, tc_query, tc_error)]
+ self._create(miss_checks, db_checks, project_name=project_name)
+
+
+class TestcaseGURHandler(GenericTestcaseHandler):
+ @swagger.operation(nickname='get-one')
+ def get(self, project_name, case_name):
+ """
+ @description: get a single testcase
+ by case_name and project_name
+ @rtype: L{Testcase}
+ @return 200: testcase exist
+ @raise 404: testcase not exist
+ """
+ query = dict()
+ query['project_name'] = project_name
+ query["name"] = case_name
+ self._get_one(query)
+
+ @swagger.operation(nickname="update")
+ def put(self, project_name, case_name):
+ """
+ @description: update a single testcase
+ by project_name and case_name
+ @param body: testcase to be updated
+ @type body: L{TestcaseUpdateRequest}
+ @in body: body
+ @rtype: L{Project}
+ @return 200: update success
+ @raise 404: testcase or project not exist
+ @raise 403: new testcase name already exist in project
+ or nothing to update
+ """
+ query = {'project_name': project_name, 'name': case_name}
+ db_keys = ['name', 'project_name']
+ self._update(query, db_keys)
+
+ @swagger.operation(nickname='delete')
+ def delete(self, project_name, case_name):
+ """
+ @description: delete a testcase by project_name and case_name
+ @return 200: delete success
+ @raise 404: testcase not exist
+ """
+ query = {'project_name': project_name, 'name': case_name}
+ self._delete(query)
diff --git a/testapi/opnfv_testapi/resources/testcase_models.py b/testapi/opnfv_testapi/resources/testcase_models.py
new file mode 100644
index 0000000..c9dce60
--- /dev/null
+++ b/testapi/opnfv_testapi/resources/testcase_models.py
@@ -0,0 +1,105 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from opnfv_testapi.tornado_swagger import swagger
+
+
+@swagger.model()
+class TestcaseCreateRequest(object):
+ def __init__(self, name, url=None, description=None):
+ self.name = name
+ self.url = url
+ self.description = description
+
+ def format(self):
+ return {
+ "name": self.name,
+ "description": self.description,
+ "url": self.url,
+ }
+
+
+@swagger.model()
+class TestcaseUpdateRequest(object):
+ def __init__(self, name=None, description=None, project_name=None):
+ self.name = name
+ self.description = description
+ self.project_name = project_name
+
+ def format(self):
+ return {
+ "name": self.name,
+ "description": self.description,
+ "project_name": self.project_name,
+ }
+
+
+@swagger.model()
+class Testcase(object):
+ def __init__(self):
+ self._id = None
+ self.name = None
+ self.project_name = None
+ self.description = None
+ self.url = None
+ self.creation_date = None
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = Testcase()
+ t._id = a_dict.get('_id')
+ t.project_name = a_dict.get('project_name')
+ t.creation_date = a_dict.get('creation_date')
+ t.name = a_dict.get('name')
+ t.description = a_dict.get('description')
+ t.url = a_dict.get('url')
+
+ return t
+
+ def format(self):
+ return {
+ "name": self.name,
+ "description": self.description,
+ "project_name": self.project_name,
+ "creation_date": str(self.creation_date),
+ "url": self.url
+ }
+
+ def format_http(self):
+ return {
+ "_id": str(self._id),
+ "name": self.name,
+ "project_name": self.project_name,
+ "description": self.description,
+ "creation_date": str(self.creation_date),
+ "url": self.url,
+ }
+
+
+@swagger.model()
+class Testcases(object):
+ """
+ @property testcases:
+ @ptype testcases: C{list} of L{Testcase}
+ """
+ def __init__(self):
+ self.testcases = list()
+
+ @staticmethod
+ def from_dict(res_dict):
+ if res_dict is None:
+ return None
+
+ res = Testcases()
+ for testcase in res_dict.get('testcases'):
+ res.testcases.append(Testcase.from_dict(testcase))
+ return res
diff --git a/testapi/opnfv_testapi/router/__init__.py b/testapi/opnfv_testapi/router/__init__.py
new file mode 100644
index 0000000..3fc79f1
--- /dev/null
+++ b/testapi/opnfv_testapi/router/__init__.py
@@ -0,0 +1,9 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+__author__ = 'serena'
diff --git a/testapi/opnfv_testapi/router/url_mappings.py b/testapi/opnfv_testapi/router/url_mappings.py
new file mode 100644
index 0000000..eb648ec
--- /dev/null
+++ b/testapi/opnfv_testapi/router/url_mappings.py
@@ -0,0 +1,48 @@
+##############################################################################
+# Copyright (c) 2015 Orange
+# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from opnfv_testapi.resources.handlers import VersionHandler
+from opnfv_testapi.resources.testcase_handlers import TestcaseCLHandler, \
+ TestcaseGURHandler
+from opnfv_testapi.resources.pod_handlers import PodCLHandler, PodGURHandler
+from opnfv_testapi.resources.project_handlers import ProjectCLHandler, \
+ ProjectGURHandler
+from opnfv_testapi.resources.result_handlers import ResultsCLHandler, \
+ ResultsGURHandler
+
+
+mappings = [
+ # GET /versions => GET API version
+ (r"/versions", VersionHandler),
+
+ # few examples:
+ # GET /api/v1/pods => Get all pods
+ # GET /api/v1/pods/1 => Get details on POD 1
+ (r"/api/v1/pods", PodCLHandler),
+ (r"/api/v1/pods/([^/]+)", PodGURHandler),
+
+ # few examples:
+ # GET /projects
+ # GET /projects/yardstick
+ (r"/api/v1/projects", ProjectCLHandler),
+ (r"/api/v1/projects/([^/]+)", ProjectGURHandler),
+
+ # few examples
+ # GET /projects/qtip/cases => Get cases for qtip
+ (r"/api/v1/projects/([^/]+)/cases", TestcaseCLHandler),
+ (r"/api/v1/projects/([^/]+)/cases/([^/]+)", TestcaseGURHandler),
+
+ # new path to avoid a long depth
+ # GET /results?project=functest&case=keystone.catalog&pod=1
+ # => get results with optional filters
+ # POST /results =>
+ # Push results with mandatory request payload parameters
+ # (project, case, and pod)
+ (r"/api/v1/results", ResultsCLHandler),
+ (r"/api/v1/results/([^/]+)", ResultsGURHandler),
+]
diff --git a/testapi/opnfv_testapi/tests/__init__.py b/testapi/opnfv_testapi/tests/__init__.py
new file mode 100644
index 0000000..9f28b0b
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/__init__.py
@@ -0,0 +1 @@
+__author__ = 'serena'
diff --git a/testapi/opnfv_testapi/tests/unit/__init__.py b/testapi/opnfv_testapi/tests/unit/__init__.py
new file mode 100644
index 0000000..3fc79f1
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/__init__.py
@@ -0,0 +1,9 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+__author__ = 'serena'
diff --git a/testapi/opnfv_testapi/tests/unit/fake_pymongo.py b/testapi/opnfv_testapi/tests/unit/fake_pymongo.py
new file mode 100644
index 0000000..3dd87e6
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/fake_pymongo.py
@@ -0,0 +1,191 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from bson.objectid import ObjectId
+from concurrent.futures import ThreadPoolExecutor
+from operator import itemgetter
+
+
+def thread_execute(method, *args, **kwargs):
+ with ThreadPoolExecutor(max_workers=2) as executor:
+ result = executor.submit(method, *args, **kwargs)
+ return result
+
+
+class MemCursor(object):
+ def __init__(self, collection):
+ self.collection = collection
+ self.count = len(self.collection)
+ self.sorted = []
+
+ def _is_next_exist(self):
+ return self.count != 0
+
+ @property
+ def fetch_next(self):
+ return thread_execute(self._is_next_exist)
+
+ def next_object(self):
+ self.count -= 1
+ return self.collection.pop()
+
+ def sort(self, key_or_list):
+ key = key_or_list[0][0]
+ if key_or_list[0][1] == -1:
+ reverse = True
+ else:
+ reverse = False
+
+ if key_or_list is not None:
+ self.collection = sorted(self.collection,
+ key=itemgetter(key), reverse=reverse)
+ return self
+
+ def limit(self, limit):
+ if limit != 0 and limit < len(self.collection):
+ self.collection = self.collection[0:limit]
+ self.count = limit
+ return self
+
+
+class MemDb(object):
+
+ def __init__(self):
+ self.contents = []
+ pass
+
+ def _find_one(self, spec_or_id=None, *args):
+ if spec_or_id is not None and not isinstance(spec_or_id, dict):
+ spec_or_id = {"_id": spec_or_id}
+ if '_id' in spec_or_id:
+ spec_or_id['_id'] = str(spec_or_id['_id'])
+ cursor = self._find(spec_or_id, *args)
+ for result in cursor:
+ return result
+ return None
+
+ def find_one(self, spec_or_id=None, *args):
+ return thread_execute(self._find_one, spec_or_id, *args)
+
+ def _insert(self, doc_or_docs, check_keys=True):
+
+ docs = doc_or_docs
+ return_one = False
+ if isinstance(docs, dict):
+ return_one = True
+ docs = [docs]
+
+ if check_keys:
+ for doc in docs:
+ self._check_keys(doc)
+
+ ids = []
+ for doc in docs:
+ if '_id' not in doc:
+ doc['_id'] = str(ObjectId())
+ if not self._find_one(doc['_id']):
+ ids.append(doc['_id'])
+ self.contents.append(doc_or_docs)
+
+ if len(ids) == 0:
+ return None
+ if return_one:
+ return ids[0]
+ else:
+ return ids
+
+ def insert(self, doc_or_docs, check_keys=True):
+ return thread_execute(self._insert, doc_or_docs, check_keys)
+
+ @staticmethod
+ def _compare_date(spec, value):
+ for k, v in spec.iteritems():
+ if k == '$gte' and value >= v:
+ return True
+ return False
+
+ @staticmethod
+ def _in(content, *args):
+ for arg in args:
+ for k, v in arg.iteritems():
+ if k == 'start_date':
+ if not MemDb._compare_date(v, content.get(k)):
+ return False
+ elif k == 'trust_indicator.current':
+ if content.get('trust_indicator').get('current') != v:
+ return False
+ elif content.get(k, None) != v:
+ return False
+
+ return True
+
+ def _find(self, *args):
+ res = []
+ for content in self.contents:
+ if self._in(content, *args):
+ res.append(content)
+
+ return res
+
+ def find(self, *args):
+ return MemCursor(self._find(*args))
+
+ def _update(self, spec, document, check_keys=True):
+ updated = False
+
+ if check_keys:
+ self._check_keys(document)
+
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec):
+ for k, v in document.iteritems():
+ updated = True
+ content[k] = v
+ self.contents[index] = content
+ return updated
+
+ def update(self, spec, document, check_keys=True):
+ return thread_execute(self._update, spec, document, check_keys)
+
+ def _remove(self, spec_or_id=None):
+ if spec_or_id is None:
+ self.contents = []
+ if not isinstance(spec_or_id, dict):
+ spec_or_id = {'_id': spec_or_id}
+ for index in range(len(self.contents)):
+ content = self.contents[index]
+ if self._in(content, spec_or_id):
+ del self.contents[index]
+ return True
+ return False
+
+ def remove(self, spec_or_id=None):
+ return thread_execute(self._remove, spec_or_id)
+
+ def clear(self):
+ self._remove()
+
+ def _check_keys(self, doc):
+ for key in doc.keys():
+ if '.' in key:
+ raise NameError('key {} must not contain .'.format(key))
+ if key.startswith('$'):
+ raise NameError('key {} must not start with $'.format(key))
+ if isinstance(doc.get(key), dict):
+ self._check_keys(doc.get(key))
+
+
+def __getattr__(name):
+ return globals()[name]
+
+
+pods = MemDb()
+projects = MemDb()
+testcases = MemDb()
+results = MemDb()
diff --git a/testapi/opnfv_testapi/tests/unit/test_base.py b/testapi/opnfv_testapi/tests/unit/test_base.py
new file mode 100644
index 0000000..ff1a193
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/test_base.py
@@ -0,0 +1,136 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import json
+
+from tornado.web import Application
+from tornado.testing import AsyncHTTPTestCase
+
+from opnfv_testapi.router import url_mappings
+from opnfv_testapi.resources.models import CreateResponse
+import fake_pymongo
+
+
+class TestBase(AsyncHTTPTestCase):
+ headers = {'Content-Type': 'application/json; charset=UTF-8'}
+
+ def setUp(self):
+ self.basePath = ''
+ self.create_res = CreateResponse
+ self.get_res = None
+ self.list_res = None
+ self.update_res = None
+ self.req_d = None
+ self.req_e = None
+ self.addCleanup(self._clear)
+ super(TestBase, self).setUp()
+
+ def get_app(self):
+ return Application(
+ url_mappings.mappings,
+ db=fake_pymongo,
+ debug=True,
+ )
+
+ def create_d(self, *args):
+ return self.create(self.req_d, *args)
+
+ def create_e(self, *args):
+ return self.create(self.req_e, *args)
+
+ def create(self, req=None, *args):
+ return self.create_help(self.basePath, req, *args)
+
+ def create_help(self, uri, req, *args):
+ if req:
+ req = req.format()
+ res = self.fetch(self._update_uri(uri, *args),
+ method='POST',
+ body=json.dumps(req),
+ headers=self.headers)
+
+ return self._get_return(res, self.create_res)
+
+ def get(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='GET',
+ headers=self.headers)
+
+ def inner():
+ new_args, num = self._get_valid_args(*args)
+ return self.get_res \
+ if num != self._need_arg_num(self.basePath) else self.list_res
+ return self._get_return(res, inner())
+
+ def query(self, query):
+ res = self.fetch(self._get_query_uri(query),
+ method='GET',
+ headers=self.headers)
+ return self._get_return(res, self.list_res)
+
+ def update(self, new=None, *args):
+ if new:
+ new = new.format()
+ res = self.fetch(self._get_uri(*args),
+ method='PUT',
+ body=json.dumps(new),
+ headers=self.headers)
+ return self._get_return(res, self.update_res)
+
+ def delete(self, *args):
+ res = self.fetch(self._get_uri(*args),
+ method='DELETE',
+ headers=self.headers)
+ return res.code, res.body
+
+ @staticmethod
+ def _get_valid_args(*args):
+ new_args = tuple(['%s' % arg for arg in args if arg is not None])
+ return new_args, len(new_args)
+
+ def _need_arg_num(self, uri):
+ return uri.count('%s')
+
+ def _get_query_uri(self, query):
+ return self.basePath + '?' + query
+
+ def _get_uri(self, *args):
+ return self._update_uri(self.basePath, *args)
+
+ def _update_uri(self, uri, *args):
+ r_uri = uri
+ new_args, num = self._get_valid_args(*args)
+ if num != self._need_arg_num(uri):
+ r_uri += '/%s'
+
+ return r_uri % tuple(['%s' % arg for arg in new_args])
+
+ def _get_return(self, res, cls):
+ code = res.code
+ body = res.body
+ return code, self._get_return_body(code, body, cls)
+
+ @staticmethod
+ def _get_return_body(code, body, cls):
+ return cls.from_dict(json.loads(body)) if code < 300 and cls else body
+
+ def assert_href(self, body):
+ self.assertIn(self.basePath, body.href)
+
+ def assert_create_body(self, body, req=None, *args):
+ if not req:
+ req = self.req_d
+ new_args = args + tuple([req.name])
+ self.assertIn(self._get_uri(*new_args), body.href)
+
+ @staticmethod
+ def _clear():
+ fake_pymongo.pods.clear()
+ fake_pymongo.projects.clear()
+ fake_pymongo.testcases.clear()
+ fake_pymongo.results.clear()
diff --git a/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py b/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py
new file mode 100644
index 0000000..5f50ba8
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py
@@ -0,0 +1,123 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from tornado import gen
+from tornado.testing import AsyncHTTPTestCase, gen_test
+from tornado.web import Application
+
+import fake_pymongo
+
+
+class MyTest(AsyncHTTPTestCase):
+ def setUp(self):
+ super(MyTest, self).setUp()
+ self.db = fake_pymongo
+ self.addCleanup(self._clear)
+ self.io_loop.run_sync(self.fixture_setup)
+
+ def get_app(self):
+ return Application()
+
+ @gen.coroutine
+ def fixture_setup(self):
+ self.test1 = {'_id': '1', 'name': 'test1'}
+ self.test2 = {'name': 'test2'}
+ yield self.db.pods.insert({'_id': '1', 'name': 'test1'})
+ yield self.db.pods.insert({'name': 'test2'})
+
+ @gen_test
+ def test_find_one(self):
+ user = yield self.db.pods.find_one({'name': 'test1'})
+ self.assertEqual(user, self.test1)
+ self.db.pods.remove()
+
+ @gen_test
+ def test_find(self):
+ cursor = self.db.pods.find()
+ names = []
+ while (yield cursor.fetch_next):
+ ob = cursor.next_object()
+ names.append(ob.get('name'))
+ self.assertItemsEqual(names, ['test1', 'test2'])
+
+ @gen_test
+ def test_update(self):
+ yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertEqual(user.get('name', None), 'new_test1')
+
+ def test_update_dot_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ 'key 1. name must not contain .')
+
+ def test_update_dot_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
+ None,
+ check_keys=False)
+
+ def test_update_dollar_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ 'key $name must not start with $')
+
+ def test_update_dollar_no_error(self):
+ self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
+ None,
+ check_keys=False)
+
+ @gen_test
+ def test_remove(self):
+ yield self.db.pods.remove({'_id': '1'})
+ user = yield self.db.pods.find_one({'_id': '1'})
+ self.assertIsNone(user)
+
+ def test_insert_dot_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ 'key 2. name must not contain .')
+
+ def test_insert_dot_no_error(self):
+ self._insert_assert({'_id': '1', '2. name': 'test1'},
+ None,
+ check_keys=False)
+
+ def test_insert_dollar_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ 'key $name must not start with $')
+
+ def test_insert_dollar_no_error(self):
+ self._insert_assert({'_id': '1', '$name': 'test1'},
+ None,
+ check_keys=False)
+
+ def _clear(self):
+ self.db.pods.clear()
+
+ def _update_assert(self, docs, error=None, **kwargs):
+ self._db_assert('update', error, {'_id': '1'}, docs, **kwargs)
+
+ def _insert_assert(self, docs, error=None, **kwargs):
+ self._db_assert('insert', error, docs, **kwargs)
+
+ @gen_test
+ def _db_assert(self, method, error, *args, **kwargs):
+ name_error = None
+ try:
+ yield self._eval_pods_db(method, *args, **kwargs)
+ except NameError as err:
+ name_error = err.args[0]
+ finally:
+ self.assertEqual(name_error, error)
+
+ def _eval_pods_db(self, method, *args, **kwargs):
+ table_obj = vars(self.db)['pods']
+ return table_obj.__getattribute__(method)(*args, **kwargs)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/test_pod.py b/testapi/opnfv_testapi/tests/unit/test_pod.py
new file mode 100644
index 0000000..a1184d5
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/test_pod.py
@@ -0,0 +1,89 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from test_base import TestBase
+from opnfv_testapi.resources.pod_models import PodCreateRequest, Pod, Pods
+from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \
+ HTTP_FORBIDDEN, HTTP_NOT_FOUND
+
+
+class TestPodBase(TestBase):
+ def setUp(self):
+ super(TestPodBase, self).setUp()
+ self.req_d = PodCreateRequest('zte-1', 'virtual',
+ 'zte pod 1', 'ci-pod')
+ self.req_e = PodCreateRequest('zte-2', 'metal', 'zte pod 2')
+ self.get_res = Pod
+ self.list_res = Pods
+ self.basePath = '/api/v1/pods'
+
+ def assert_get_body(self, pod, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(pod.name, req.name)
+ self.assertEqual(pod.mode, req.mode)
+ self.assertEqual(pod.details, req.details)
+ self.assertEqual(pod.role, req.role)
+ self.assertIsNotNone(pod.creation_date)
+ self.assertIsNotNone(pod._id)
+
+
+class TestPodCreate(TestPodBase):
+ def test_withoutBody(self):
+ (code, body) = self.create()
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_emptyName(self):
+ req_empty = PodCreateRequest('')
+ (code, body) = self.create(req_empty)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_noneName(self):
+ req_none = PodCreateRequest(None)
+ (code, body) = self.create(req_none)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_success(self):
+ code, body = self.create_d()
+ self.assertEqual(code, HTTP_OK)
+ self.assert_create_body(body)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ code, body = self.create_d()
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn('already exists', body)
+
+
+class TestPodGet(TestPodBase):
+ def test_notExist(self):
+ code, body = self.get('notExist')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_getOne(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ self.assert_get_body(body)
+
+ def test_list(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.get()
+ self.assertEqual(len(body.pods), 2)
+ for pod in body.pods:
+ if self.req_d.name == pod.name:
+ self.assert_get_body(pod)
+ else:
+ self.assert_get_body(pod, self.req_e)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/test_project.py b/testapi/opnfv_testapi/tests/unit/test_project.py
new file mode 100644
index 0000000..327ddf7
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/test_project.py
@@ -0,0 +1,141 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from test_base import TestBase
+from opnfv_testapi.resources.project_models import ProjectCreateRequest, \
+ Project, Projects, ProjectUpdateRequest
+from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \
+ HTTP_FORBIDDEN, HTTP_NOT_FOUND
+
+
+class TestProjectBase(TestBase):
+ def setUp(self):
+ super(TestProjectBase, self).setUp()
+ self.req_d = ProjectCreateRequest('vping', 'vping-ssh test')
+ self.req_e = ProjectCreateRequest('doctor', 'doctor test')
+ self.get_res = Project
+ self.list_res = Projects
+ self.update_res = Project
+ self.basePath = '/api/v1/projects'
+
+ def assert_body(self, project, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(project.name, req.name)
+ self.assertEqual(project.description, req.description)
+ self.assertIsNotNone(project._id)
+ self.assertIsNotNone(project.creation_date)
+
+
+class TestProjectCreate(TestProjectBase):
+ def test_withoutBody(self):
+ (code, body) = self.create()
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_emptyName(self):
+ req_empty = ProjectCreateRequest('')
+ (code, body) = self.create(req_empty)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_noneName(self):
+ req_none = ProjectCreateRequest(None)
+ (code, body) = self.create(req_none)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_success(self):
+ (code, body) = self.create_d()
+ self.assertEqual(code, HTTP_OK)
+ self.assert_create_body(body)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ (code, body) = self.create_d()
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn('already exists', body)
+
+
+class TestProjectGet(TestProjectBase):
+ def test_notExist(self):
+ code, body = self.get('notExist')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_getOne(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assert_body(body)
+
+ def test_list(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.get()
+ for project in body.projects:
+ if self.req_d.name == project.name:
+ self.assert_body(project)
+ else:
+ self.assert_body(project, self.req_e)
+
+
+class TestProjectUpdate(TestProjectBase):
+ def test_withoutBody(self):
+ code, _ = self.update(None, 'noBody')
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_notFound(self):
+ code, _ = self.update(self.req_e, 'notFound')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_newNameExist(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.update(self.req_e, self.req_d.name)
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn("already exists", body)
+
+ def test_noUpdate(self):
+ self.create_d()
+ code, body = self.update(self.req_d, self.req_d.name)
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn("Nothing to update", body)
+
+ def test_success(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ _id = body._id
+
+ req = ProjectUpdateRequest('newName', 'new description')
+ code, body = self.update(req, self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(_id, body._id)
+ self.assert_body(body, req)
+
+ _, new_body = self.get(req.name)
+ self.assertEqual(_id, new_body._id)
+ self.assert_body(new_body, req)
+
+
+class TestProjectDelete(TestProjectBase):
+ def test_notFound(self):
+ code, body = self.delete('notFound')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_success(self):
+ self.create_d()
+ code, body = self.delete(self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(body, '')
+
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/test_result.py b/testapi/opnfv_testapi/tests/unit/test_result.py
new file mode 100644
index 0000000..8479b35
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/test_result.py
@@ -0,0 +1,339 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+import unittest
+from datetime import datetime, timedelta
+
+from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \
+ HTTP_NOT_FOUND
+from opnfv_testapi.resources.pod_models import PodCreateRequest
+from opnfv_testapi.resources.project_models import ProjectCreateRequest
+from opnfv_testapi.resources.result_models import ResultCreateRequest, \
+ TestResult, TestResults, ResultUpdateRequest, TI, TIHistory
+from opnfv_testapi.resources.testcase_models import TestcaseCreateRequest
+from test_base import TestBase
+
+
+class Details(object):
+ def __init__(self, timestart=None, duration=None, status=None):
+ self.timestart = timestart
+ self.duration = duration
+ self.status = status
+
+ def format(self):
+ return {
+ "timestart": self.timestart,
+ "duration": self.duration,
+ "status": self.status
+ }
+
+ @staticmethod
+ def from_dict(a_dict):
+
+ if a_dict is None:
+ return None
+
+ t = Details()
+ t.timestart = a_dict.get('timestart')
+ t.duration = a_dict.get('duration')
+ t.status = a_dict.get('status')
+ return t
+
+
+class TestResultBase(TestBase):
+ def setUp(self):
+ self.pod = 'zte-pod1'
+ self.project = 'functest'
+ self.case = 'vPing'
+ self.installer = 'fuel'
+ self.version = 'C'
+ self.build_tag = 'v3.0'
+ self.scenario = 'odl-l2'
+ self.criteria = 'passed'
+ self.trust_indicator = TI(0.7)
+ self.start_date = "2016-05-23 07:16:09.477097"
+ self.stop_date = "2016-05-23 07:16:19.477097"
+ self.update_date = "2016-05-24 07:16:19.477097"
+ self.update_step = -0.05
+ super(TestResultBase, self).setUp()
+ self.details = Details(timestart='0', duration='9s', status='OK')
+ self.req_d = ResultCreateRequest(pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria,
+ trust_indicator=self.trust_indicator)
+ self.get_res = TestResult
+ self.list_res = TestResults
+ self.update_res = TestResult
+ self.basePath = '/api/v1/results'
+ self.req_pod = PodCreateRequest(self.pod, 'metal', 'zte pod 1')
+ self.req_project = ProjectCreateRequest(self.project, 'vping test')
+ self.req_testcase = TestcaseCreateRequest(self.case,
+ '/cases/vping',
+ 'vping-ssh test')
+ self.create_help('/api/v1/pods', self.req_pod)
+ self.create_help('/api/v1/projects', self.req_project)
+ self.create_help('/api/v1/projects/%s/cases',
+ self.req_testcase,
+ self.project)
+
+ def assert_res(self, code, result, req=None):
+ self.assertEqual(code, HTTP_OK)
+ if req is None:
+ req = self.req_d
+ self.assertEqual(result.pod_name, req.pod_name)
+ self.assertEqual(result.project_name, req.project_name)
+ self.assertEqual(result.case_name, req.case_name)
+ self.assertEqual(result.installer, req.installer)
+ self.assertEqual(result.version, req.version)
+ details_req = Details.from_dict(req.details)
+ details_res = Details.from_dict(result.details)
+ self.assertEqual(details_res.duration, details_req.duration)
+ self.assertEqual(details_res.timestart, details_req.timestart)
+ self.assertEqual(details_res.status, details_req.status)
+ self.assertEqual(result.build_tag, req.build_tag)
+ self.assertEqual(result.scenario, req.scenario)
+ self.assertEqual(result.criteria, req.criteria)
+ self.assertEqual(result.start_date, req.start_date)
+ self.assertEqual(result.stop_date, req.stop_date)
+ self.assertIsNotNone(result._id)
+ ti = result.trust_indicator
+ self.assertEqual(ti.current, req.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
+
+ def _create_d(self):
+ _, res = self.create_d()
+ return res.href.split('/')[-1]
+
+
+class TestResultCreate(TestResultBase):
+ def test_nobody(self):
+ (code, body) = self.create(None)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('no body', body)
+
+ def test_podNotProvided(self):
+ req = self.req_d
+ req.pod_name = None
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('pod_name missing', body)
+
+ def test_projectNotProvided(self):
+ req = self.req_d
+ req.project_name = None
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('project_name missing', body)
+
+ def test_testcaseNotProvided(self):
+ req = self.req_d
+ req.case_name = None
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('case_name missing', body)
+
+ def test_noPod(self):
+ req = self.req_d
+ req.pod_name = 'notExistPod'
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Could not find pod', body)
+
+ def test_noProject(self):
+ req = self.req_d
+ req.project_name = 'notExistProject'
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Could not find project', body)
+
+ def test_noTestcase(self):
+ req = self.req_d
+ req.case_name = 'notExistTestcase'
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+ self.assertIn('Could not find testcase', body)
+
+ def test_success(self):
+ (code, body) = self.create_d()
+ self.assertEqual(code, HTTP_OK)
+ self.assert_href(body)
+
+ def test_key_with_doc(self):
+ req = copy.deepcopy(self.req_d)
+ req.details = {'1.name': 'dot_name'}
+ (code, body) = self.create(req)
+ self.assertEqual(code, HTTP_OK)
+ self.assert_href(body)
+
+ def test_no_ti(self):
+ req = ResultCreateRequest(pod_name=self.pod,
+ project_name=self.project,
+ case_name=self.case,
+ installer=self.installer,
+ version=self.version,
+ start_date=self.start_date,
+ stop_date=self.stop_date,
+ details=self.details.format(),
+ build_tag=self.build_tag,
+ scenario=self.scenario,
+ criteria=self.criteria)
+ (code, res) = self.create(req)
+ _id = res.href.split('/')[-1]
+ self.assertEqual(code, HTTP_OK)
+ code, body = self.get(_id)
+ self.assert_res(code, body, req)
+
+
+class TestResultGet(TestResultBase):
+ def test_getOne(self):
+ _id = self._create_d()
+ code, body = self.get(_id)
+ self.assert_res(code, body)
+
+ def test_queryPod(self):
+ self._query_and_assert(self._set_query('pod'))
+
+ def test_queryProject(self):
+ self._query_and_assert(self._set_query('project'))
+
+ def test_queryTestcase(self):
+ self._query_and_assert(self._set_query('case'))
+
+ def test_queryVersion(self):
+ self._query_and_assert(self._set_query('version'))
+
+ def test_queryInstaller(self):
+ self._query_and_assert(self._set_query('installer'))
+
+ def test_queryBuildTag(self):
+ self._query_and_assert(self._set_query('build_tag'))
+
+ def test_queryScenario(self):
+ self._query_and_assert(self._set_query('scenario'))
+
+ def test_queryTrustIndicator(self):
+ self._query_and_assert(self._set_query('trust_indicator'))
+
+ def test_queryCriteria(self):
+ self._query_and_assert(self._set_query('criteria'))
+
+ def test_queryPeriodNotInt(self):
+ code, body = self.query(self._set_query('period=a'))
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('period must be int', body)
+
+ def test_queryPeriodFail(self):
+ self._query_and_assert(self._set_query('period=1'),
+ found=False, days=-10)
+
+ def test_queryPeriodSuccess(self):
+ self._query_and_assert(self._set_query('period=1'),
+ found=True)
+
+ def test_queryLastNotInt(self):
+ code, body = self.query(self._set_query('last=a'))
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('last must be int', body)
+
+ def test_queryLast(self):
+ self._create_changed_date()
+ req = self._create_changed_date(minutes=20)
+ self._create_changed_date(minutes=-20)
+ self._query_and_assert(self._set_query('last=1'), req=req)
+
+ def test_combination(self):
+ self._query_and_assert(self._set_query('pod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1'))
+
+ def test_notFound(self):
+ self._query_and_assert(self._set_query('pod=notExistPod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1'),
+ found=False)
+
+ def _query_and_assert(self, query, found=True, req=None, **kwargs):
+ if req is None:
+ req = self._create_changed_date(**kwargs)
+ code, body = self.query(query)
+ if not found:
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(0, len(body.results))
+ else:
+ self.assertEqual(1, len(body.results))
+ for result in body.results:
+ self.assert_res(code, result, req)
+
+ def _create_changed_date(self, **kwargs):
+ req = copy.deepcopy(self.req_d)
+ req.start_date = datetime.now() + timedelta(**kwargs)
+ req.stop_date = str(req.start_date + timedelta(minutes=10))
+ req.start_date = str(req.start_date)
+ self.create(req)
+ return req
+
+ def _set_query(self, *args):
+ def get_value(arg):
+ return self.__getattribute__(arg) \
+ if arg != 'trust_indicator' else self.trust_indicator.current
+ uri = ''
+ for arg in args:
+ if '=' in arg:
+ uri += arg + '&'
+ else:
+ uri += '{}={}&'.format(arg, get_value(arg))
+ return uri[0: -1]
+
+
+class TestResultUpdate(TestResultBase):
+ def test_success(self):
+ _id = self._create_d()
+
+ new_ti = copy.deepcopy(self.trust_indicator)
+ new_ti.current += self.update_step
+ new_ti.histories.append(TIHistory(self.update_date, self.update_step))
+ new_data = copy.deepcopy(self.req_d)
+ new_data.trust_indicator = new_ti
+ update = ResultUpdateRequest(trust_indicator=new_ti)
+ code, body = self.update(update, _id)
+ self.assertEqual(_id, body._id)
+ self.assert_res(code, body, new_data)
+
+ code, new_body = self.get(_id)
+ self.assertEqual(_id, new_body._id)
+ self.assert_res(code, new_body, new_data)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/test_testcase.py b/testapi/opnfv_testapi/tests/unit/test_testcase.py
new file mode 100644
index 0000000..cb76784
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/test_testcase.py
@@ -0,0 +1,196 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+import copy
+
+from test_base import TestBase
+from opnfv_testapi.resources.testcase_models import TestcaseCreateRequest, \
+ Testcase, Testcases, TestcaseUpdateRequest
+from opnfv_testapi.resources.project_models import ProjectCreateRequest
+from opnfv_testapi.common.constants import HTTP_OK, HTTP_BAD_REQUEST, \
+ HTTP_FORBIDDEN, HTTP_NOT_FOUND
+
+
+class TestCaseBase(TestBase):
+ def setUp(self):
+ super(TestCaseBase, self).setUp()
+ self.req_d = TestcaseCreateRequest('vping_1',
+ '/cases/vping_1',
+ 'vping-ssh test')
+ self.req_e = TestcaseCreateRequest('doctor_1',
+ '/cases/doctor_1',
+ 'create doctor')
+ self.update_d = TestcaseUpdateRequest('vping_1',
+ 'vping-ssh test',
+ 'functest')
+ self.update_e = TestcaseUpdateRequest('doctor_1',
+ 'create doctor',
+ 'functest')
+ self.get_res = Testcase
+ self.list_res = Testcases
+ self.update_res = Testcase
+ self.basePath = '/api/v1/projects/%s/cases'
+ self.create_project()
+
+ def assert_body(self, case, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(case.name, req.name)
+ self.assertEqual(case.description, req.description)
+ self.assertEqual(case.url, req.url)
+ self.assertIsNotNone(case._id)
+ self.assertIsNotNone(case.creation_date)
+
+ def assert_update_body(self, old, new, req=None):
+ if not req:
+ req = self.req_d
+ self.assertEqual(new.name, req.name)
+ self.assertEqual(new.description, req.description)
+ self.assertEqual(new.url, old.url)
+ self.assertIsNotNone(new._id)
+ self.assertIsNotNone(new.creation_date)
+
+ def create_project(self):
+ req_p = ProjectCreateRequest('functest', 'vping-ssh test')
+ self.create_help('/api/v1/projects', req_p)
+ self.project = req_p.name
+
+ def create_d(self):
+ return super(TestCaseBase, self).create_d(self.project)
+
+ def create_e(self):
+ return super(TestCaseBase, self).create_e(self.project)
+
+ def get(self, case=None):
+ return super(TestCaseBase, self).get(self.project, case)
+
+ def update(self, new=None, case=None):
+ return super(TestCaseBase, self).update(new, self.project, case)
+
+ def delete(self, case):
+ return super(TestCaseBase, self).delete(self.project, case)
+
+
+class TestCaseCreate(TestCaseBase):
+ def test_noBody(self):
+ (code, body) = self.create(None, 'vping')
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_noProject(self):
+ code, body = self.create(self.req_d, 'noProject')
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn('Could not find project', body)
+
+ def test_emptyName(self):
+ req_empty = TestcaseCreateRequest('')
+ (code, body) = self.create(req_empty, self.project)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_noneName(self):
+ req_none = TestcaseCreateRequest(None)
+ (code, body) = self.create(req_none, self.project)
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+ self.assertIn('name missing', body)
+
+ def test_success(self):
+ code, body = self.create_d()
+ self.assertEqual(code, HTTP_OK)
+ self.assert_create_body(body, None, self.project)
+
+ def test_alreadyExist(self):
+ self.create_d()
+ code, body = self.create_d()
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn('already exists', body)
+
+
+class TestCaseGet(TestCaseBase):
+ def test_notExist(self):
+ code, body = self.get('notExist')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_getOne(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assert_body(body)
+
+ def test_list(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.get()
+ for case in body.testcases:
+ if self.req_d.name == case.name:
+ self.assert_body(case)
+ else:
+ self.assert_body(case, self.req_e)
+
+
+class TestCaseUpdate(TestCaseBase):
+ def test_noBody(self):
+ code, _ = self.update(case='noBody')
+ self.assertEqual(code, HTTP_BAD_REQUEST)
+
+ def test_notFound(self):
+ code, _ = self.update(self.update_e, 'notFound')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_newNameExist(self):
+ self.create_d()
+ self.create_e()
+ code, body = self.update(self.update_e, self.req_d.name)
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn("already exists", body)
+
+ def test_noUpdate(self):
+ self.create_d()
+ code, body = self.update(self.update_d, self.req_d.name)
+ self.assertEqual(code, HTTP_FORBIDDEN)
+ self.assertIn("Nothing to update", body)
+
+ def test_success(self):
+ self.create_d()
+ code, body = self.get(self.req_d.name)
+ _id = body._id
+
+ code, body = self.update(self.update_e, self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(_id, body._id)
+ self.assert_update_body(self.req_d, body, self.update_e)
+
+ _, new_body = self.get(self.req_e.name)
+ self.assertEqual(_id, new_body._id)
+ self.assert_update_body(self.req_d, new_body, self.update_e)
+
+ def test_with_dollar(self):
+ self.create_d()
+ update = copy.deepcopy(self.update_d)
+ update.description = {'2. change': 'dollar change'}
+ code, body = self.update(update, self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+
+
+class TestCaseDelete(TestCaseBase):
+ def test_notFound(self):
+ code, body = self.delete('notFound')
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+ def test_success(self):
+ self.create_d()
+ code, body = self.delete(self.req_d.name)
+ self.assertEqual(code, HTTP_OK)
+ self.assertEqual(body, '')
+
+ code, body = self.get(self.req_d.name)
+ self.assertEqual(code, HTTP_NOT_FOUND)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tests/unit/test_version.py b/testapi/opnfv_testapi/tests/unit/test_version.py
new file mode 100644
index 0000000..b6fbf45
--- /dev/null
+++ b/testapi/opnfv_testapi/tests/unit/test_version.py
@@ -0,0 +1,31 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import unittest
+
+from test_base import TestBase
+from opnfv_testapi.resources.models import Versions
+
+
+class TestVersionBase(TestBase):
+ def setUp(self):
+ super(TestVersionBase, self).setUp()
+ self.list_res = Versions
+ self.basePath = '/versions'
+
+
+class TestVersion(TestVersionBase):
+ def test_success(self):
+ code, body = self.get()
+ self.assertEqual(200, code)
+ self.assertEqual(len(body.versions), 1)
+ self.assertEqual(body.versions[0].version, 'v1.0')
+ self.assertEqual(body.versions[0].description, 'basics')
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testapi/opnfv_testapi/tornado_swagger/README.md b/testapi/opnfv_testapi/tornado_swagger/README.md
new file mode 100644
index 0000000..d815f21
--- /dev/null
+++ b/testapi/opnfv_testapi/tornado_swagger/README.md
@@ -0,0 +1,273 @@
+# tornado-swagger
+
+## What is tornado-swagger?
+tornado is a wrapper for tornado which enables swagger-ui support.
+
+In essense, you just need to wrap the Api instance and add a few python decorators to
+get full swagger support.http://swagger.io/
+
+
+## How to use:
+
+
+```python
+from tornado.web import RequestHandler, HTTPError
+from tornado_swagger import swagger
+
+swagger.docs()
+
+# You may decorate your operation with @swagger.operation and use docs to inform information
+class ItemNoParamHandler(GenericApiHandler):
+ @swagger.operation(nickname='create')
+ def post(self):
+ """
+ @param body: create test results for a item.
+ @type body: L{Item}
+ @return 200: item is created.
+ @raise 400: invalid input
+ """
+
+# Operations not decorated with @swagger.operation do not get added to the swagger docs
+
+class ItemNoParamHandler(GenericApiHandler):
+ def options(self):
+ """
+ I'm not visible in the swagger docs
+ """
+ pass
+
+
+# Then you use swagger.Application instead of tornado.web.Application
+# and do other operations as usual
+
+def make_app():
+ return swagger.Application([
+ (r"/items", ItemNoParamHandler),
+ (r"/items/([^/]+)", ItemHandler),
+ (r"/items/([^/]+)/cases/([^/]+)", ItemOptionParamHandler),
+ ])
+
+# You define models like this:
+@swagger.model
+class Item:
+ """
+ @descriptin:
+ This is an example of a model class that has parameters in its constructor
+ and the fields in the swagger spec are derived from the parameters to __init__.
+ @notes:
+ In this case we would have property1, property2 as required parameters
+ and property3 as optional parameter.
+ @property property3: Item decription
+ @ptype property3: L{PropertySubclass}
+ """
+ def __init__(self, property1, property2=None):
+ self.property1 = property1
+ self.property2 = property2
+
+# Swagger json:
+ "models": {
+ "Item": {
+ "description": "A description...",
+ "id": "Item",
+ "required": [
+ "property1",
+ ],
+ "properties": [
+ "property1": {
+ "type": "string"
+ },
+ "property2": {
+ "type": "string"
+ "default": null
+ }
+ ]
+ }
+ }
+
+# If you declare an __init__ method with meaningful arguments
+# then those args could be used to deduce the swagger model fields.
+# just as shown above
+
+# if you declare an @property in docs, this property property2 will also be used
+# to deduce the swagger model fields
+class Item:
+ """
+ @property property3: Item description
+ """
+ def __init__(self, property1, property2):
+ self.property1 = property1
+ self.property2 = property2
+
+# Swagger json:
+ "models": {
+ "Item": {
+ "description": "A description...",
+ "id": "Item",
+ "required": [
+ "property1",
+ ],
+ "properties": [
+ "property1": {
+ "type": "string"
+ },
+ "property2": {
+ "type": "string"
+ }
+ "property3": {
+ "type": "string"
+ }
+ ]
+ }
+ }
+
+# if you declare an argument with @ptype, the type of this argument will be specified
+# rather than the default 'string'
+class Item:
+ """
+ @ptype property3: L{PropertySubclass}
+ """
+ def __init__(self, property1, property2, property3=None):
+ self.property1 = property1
+ self.property2 = property2
+ self.property3 = property3
+
+# Swagger json:
+ "models": {
+ "Item": {
+ "description": "A description...",
+ "id": "Item",
+ "required": [
+ "property1",
+ ],
+ "properties": [
+ "property1": {
+ "type": "string"
+ },
+ "property2": {
+ "type": "string"
+ },
+ "property3": {
+ "type": "PropertySubclass"
+ "default": null
+ }
+ ]
+ }
+ }
+
+# if you want to declare an list property, you can do it like this:
+class Item:
+ """
+ @ptype property3: L{PropertySubclass}
+ @ptype property4: C{list} of L{PropertySubclass}
+ """
+ def __init__(self, property1, property2, property3, property4=None):
+ self.property1 = property1
+ self.property2 = property2
+ self.property3 = property3
+ self.property4 = property4
+
+# Swagger json:
+ "models": {
+ "Item": {
+ "description": "A description...",
+ "id": "Item",
+ "required": [
+ "property1",
+ ],
+ "properties": [
+ "property1": {
+ "type": "string"
+ },
+ "property2": {
+ "type": "string"
+ },
+ "property3": {
+ "type": "PropertySubclass"
+ "default": null
+ },
+ "property4": {
+ "default": null,
+ "items": {
+ "type": "PropertySubclass"},
+ "type": "array"
+ }
+ }
+ ]
+ }
+ }
+
+# if it is a query:
+class ItemQueryHandler(GenericApiHandler):
+ @swagger.operation(nickname='query')
+ def get(self):
+ """
+ @param property1:
+ @type property1: L{string}
+ @in property1: query
+ @required property1: False
+
+ @param property2:
+ @type property2: L{string}
+ @in property2: query
+ @required property2: True
+ @rtype: L{Item}
+
+ @notes: GET /item?property1=1&property2=1
+ """
+
+# Swagger json:
+ "apis": [
+ {
+ "operations": [
+ {
+ "parameters": [
+ {
+ "name": "property1",
+ "dataType": "string",
+ "paramType": "query",
+ "description": ""
+ },
+ {
+ "name": "property2",
+ "dataType": "string",
+ "paramType": "query",
+ "required": true,
+ "description": ""
+ }
+ ],
+ "responseClass": "Item",
+ "notes": null,
+ "responseMessages": [],
+ "summary": null,
+ "httpMethod": "GET",
+ "nickname": "query"
+ }
+ ],
+ "path": "/item",
+ "description": null
+ },
+ ....
+ ]
+```
+
+# Running and testing
+
+Now run your tornado app
+
+```
+python main.py
+```
+
+And visit:
+
+```
+curl http://ip:port/swagger/spec
+```
+
+access to web
+```
+http://ip:port/swagger/spec.html
+```
+
+# Passing more metadata to swagger
+customized arguments used in creating the 'swagger.docs' object will be supported later
diff --git a/testapi/opnfv_testapi/tornado_swagger/__init__.py b/testapi/opnfv_testapi/tornado_swagger/__init__.py
new file mode 100644
index 0000000..363bc38
--- /dev/null
+++ b/testapi/opnfv_testapi/tornado_swagger/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/testapi/opnfv_testapi/tornado_swagger/handlers.py b/testapi/opnfv_testapi/tornado_swagger/handlers.py
new file mode 100644
index 0000000..2154b46
--- /dev/null
+++ b/testapi/opnfv_testapi/tornado_swagger/handlers.py
@@ -0,0 +1,43 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from tornado.web import URLSpec, StaticFileHandler
+
+from settings import default_settings, \
+ SWAGGER_API_DOCS, SWAGGER_API_LIST, SWAGGER_API_SPEC
+from views import SwaggerUIHandler, SwaggerResourcesHandler, SwaggerApiHandler
+
+
+def swagger_handlers():
+ prefix = default_settings.get('swagger_prefix', '/swagger')
+ if prefix[-1] != '/':
+ prefix += '/'
+
+ def _path(suffix):
+ return prefix + suffix
+ return [
+ URLSpec(
+ _path(r'spec.html$'),
+ SwaggerUIHandler,
+ default_settings,
+ name=SWAGGER_API_DOCS),
+ URLSpec(
+ _path(r'spec.json$'),
+ SwaggerResourcesHandler,
+ default_settings,
+ name=SWAGGER_API_LIST),
+ URLSpec(
+ _path(r'spec$'),
+ SwaggerApiHandler,
+ default_settings,
+ name=SWAGGER_API_SPEC),
+ (
+ _path(r'(.*\.(css|png|gif|js))'),
+ StaticFileHandler,
+ {'path': default_settings.get('static_path')}),
+ ]
diff --git a/testapi/opnfv_testapi/tornado_swagger/settings.py b/testapi/opnfv_testapi/tornado_swagger/settings.py
new file mode 100644
index 0000000..88d0d0f
--- /dev/null
+++ b/testapi/opnfv_testapi/tornado_swagger/settings.py
@@ -0,0 +1,32 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import os.path
+
+SWAGGER_VERSION = '1.2'
+SWAGGER_API_DOCS = 'swagger-api-docs'
+SWAGGER_API_LIST = 'swagger-api-list'
+SWAGGER_API_SPEC = 'swagger-api-spec'
+STATIC_PATH = os.path.join(os.path.dirname(os.path.normpath(__file__)),
+ 'static')
+
+default_settings = {
+ 'base_url': '',
+ 'static_path': STATIC_PATH,
+ 'swagger_prefix': '/swagger',
+ 'api_version': 'v1.0',
+ 'api_key': '',
+ 'enabled_methods': ['get', 'post', 'put', 'patch', 'delete'],
+ 'exclude_namespaces': [],
+}
+
+models = []
+
+
+def basePath():
+ return default_settings.get('base_url')
diff --git a/testapi/opnfv_testapi/tornado_swagger/swagger.py b/testapi/opnfv_testapi/tornado_swagger/swagger.py
new file mode 100644
index 0000000..3d21ede
--- /dev/null
+++ b/testapi/opnfv_testapi/tornado_swagger/swagger.py
@@ -0,0 +1,290 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import inspect
+from functools import wraps
+from HTMLParser import HTMLParser
+
+import epydoc.markup
+import tornado.web
+
+from settings import default_settings, models
+from handlers import swagger_handlers
+
+
+class EpytextParser(HTMLParser):
+ a_text = False
+
+ def __init__(self, tag):
+ HTMLParser.__init__(self)
+ self.tag = tag
+ self.data = None
+
+ def handle_starttag(self, tag, attr):
+ if tag == self.tag:
+ self.a_text = True
+
+ def handle_endtag(self, tag):
+ if tag == self.tag:
+ self.a_text = False
+
+ def handle_data(self, data):
+ if self.a_text:
+ self.data = data
+
+ def get_data(self):
+ return self.data
+
+
+class DocParser(object):
+ def __init__(self):
+ self.notes = None
+ self.summary = None
+ self.responseClass = None
+ self.responseMessages = []
+ self.params = {}
+ self.properties = {}
+
+ def parse_docstring(self, text):
+ if text is None:
+ return
+
+ errors = []
+ doc = epydoc.markup.parse(text, markup='epytext', errors=errors)
+ _, fields = doc.split_fields(errors)
+
+ for field in fields:
+ tag = field.tag()
+ arg = field.arg()
+ body = field.body()
+ self._get_parser(tag)(arg=arg, body=body)
+ return doc
+
+ def _get_parser(self, tag):
+ parser = {
+ 'param': self._parse_param,
+ 'type': self._parse_type,
+ 'in': self._parse_in,
+ 'required': self._parse_required,
+ 'rtype': self._parse_rtype,
+ 'property': self._parse_property,
+ 'ptype': self._parse_ptype,
+ 'return': self._parse_return,
+ 'raise': self._parse_return,
+ 'notes': self._parse_notes,
+ 'description': self._parse_description,
+ }
+ return parser.get(tag, self._not_supported)
+
+ def _parse_param(self, **kwargs):
+ arg = kwargs.get('arg', None)
+ body = self._get_body(**kwargs)
+ self.params.setdefault(arg, {}).update({
+ 'name': arg,
+ 'description': body,
+ })
+
+ if 'paramType' not in self.params[arg]:
+ self.params[arg]['paramType'] = 'query'
+
+ def _parse_type(self, **kwargs):
+ arg = kwargs.get('arg', None)
+ body = self._get_body(**kwargs)
+ self.params.setdefault(arg, {}).update({
+ 'name': arg,
+ 'dataType': body
+ })
+
+ def _parse_in(self, **kwargs):
+ arg = kwargs.get('arg', None)
+ body = self._get_body(**kwargs)
+ self.params.setdefault(arg, {}).update({
+ 'name': arg,
+ 'paramType': body
+ })
+
+ def _parse_required(self, **kwargs):
+ arg = kwargs.get('arg', None)
+ body = self._get_body(**kwargs)
+ self.params.setdefault(arg, {}).update({
+ 'name': arg,
+ 'required': False if body in ['False', 'false'] else True
+ })
+
+ def _parse_rtype(self, **kwargs):
+ body = self._get_body(**kwargs)
+ self.responseClass = body
+
+ def _parse_property(self, **kwargs):
+ arg = kwargs.get('arg', None)
+ self.properties.setdefault(arg, {}).update({
+ 'type': 'string'
+ })
+
+ def _parse_ptype(self, **kwargs):
+ arg = kwargs.get('arg', None)
+ code = self._parse_epytext_para('code', **kwargs)
+ link = self._parse_epytext_para('link', **kwargs)
+ if code is None:
+ self.properties.setdefault(arg, {}).update({
+ 'type': link
+ })
+ elif code == 'list':
+ self.properties.setdefault(arg, {}).update({
+ 'type': 'array',
+ 'items': {'type': link}
+ })
+
+ def _parse_return(self, **kwargs):
+ arg = kwargs.get('arg', None)
+ body = self._get_body(**kwargs)
+ self.responseMessages.append({
+ 'code': arg,
+ 'message': body
+ })
+
+ def _parse_notes(self, **kwargs):
+ body = self._get_body(**kwargs)
+ self.notes = self._sanitize_doc(body)
+
+ def _parse_description(self, **kwargs):
+ body = self._get_body(**kwargs)
+ self.summary = self._sanitize_doc(body)
+
+ def _not_supported(self, **kwargs):
+ pass
+
+ @staticmethod
+ def _sanitize_doc(comment):
+ return comment.replace('\n', '<br/>') if comment else comment
+
+ @staticmethod
+ def _get_body(**kwargs):
+ body = kwargs.get('body', None)
+ return body.to_plaintext(None).strip() if body else body
+
+ @staticmethod
+ def _parse_epytext_para(tag, **kwargs):
+ def _parse_epytext(tag, body):
+ epytextParser = EpytextParser(tag)
+ epytextParser.feed(str(body))
+ data = epytextParser.get_data()
+ epytextParser.close()
+ return data
+
+ body = kwargs.get('body', None)
+ return _parse_epytext(tag, body) if body else body
+
+
+class model(DocParser):
+ def __init__(self, *args, **kwargs):
+ super(model, self).__init__()
+ self.args = args
+ self.kwargs = kwargs
+ self.required = []
+ self.cls = None
+
+ def __call__(self, *args):
+ if self.cls:
+ return self.cls
+
+ cls = args[0]
+ self._parse_model(cls)
+
+ return cls
+
+ def _parse_model(self, cls):
+ self.id = cls.__name__
+ self.cls = cls
+ if '__init__' in dir(cls):
+ self._parse_args(cls.__init__)
+ self.parse_docstring(inspect.getdoc(cls))
+ models.append(self)
+
+ def _parse_args(self, func):
+ argspec = inspect.getargspec(func)
+ argspec.args.remove("self")
+ defaults = {}
+ if argspec.defaults:
+ defaults = list(zip(argspec.args[-len(argspec.defaults):],
+ argspec.defaults))
+ required_args_count = len(argspec.args) - len(defaults)
+ for arg in argspec.args[:required_args_count]:
+ self.required.append(arg)
+ self.properties.setdefault(arg, {'type': 'string'})
+ for arg, default in defaults:
+ self.properties.setdefault(arg, {
+ 'type': 'string',
+ "default": default
+ })
+
+
+class operation(DocParser):
+ def __init__(self, nickname='apis', **kwds):
+ super(operation, self).__init__()
+ self.nickname = nickname
+ self.func = None
+ self.func_args = []
+ self.kwds = kwds
+
+ def __call__(self, *args, **kwds):
+ if self.func:
+ return self.func(*args, **kwds)
+
+ func = args[0]
+ self._parse_operation(func)
+
+ @wraps(func)
+ def __wrapper__(*in_args, **in_kwds):
+ return self.func(*in_args, **in_kwds)
+
+ __wrapper__.rest_api = self
+ return __wrapper__
+
+ def _parse_operation(self, func):
+ self.func = func
+
+ self.__name__ = func.__name__
+ self._parse_args(func)
+ self.parse_docstring(inspect.getdoc(self.func))
+
+ def _parse_args(self, func):
+ argspec = inspect.getargspec(func)
+ argspec.args.remove("self")
+
+ defaults = []
+ if argspec.defaults:
+ defaults = argspec.args[-len(argspec.defaults):]
+
+ for arg in argspec.args:
+ if arg in defaults:
+ required = False
+ else:
+ required = True
+ self.params.setdefault(arg, {
+ 'name': arg,
+ 'required': required,
+ 'paramType': 'path',
+ 'dataType': 'string'
+ })
+ self.func_args = argspec.args
+
+
+def docs(**opts):
+ default_settings.update(opts)
+
+
+class Application(tornado.web.Application):
+ def __init__(self, handlers=None,
+ default_host="",
+ transforms=None,
+ **settings):
+ super(Application, self).__init__(swagger_handlers() + handlers,
+ default_host,
+ transforms,
+ **settings)
diff --git a/testapi/opnfv_testapi/tornado_swagger/views.py b/testapi/opnfv_testapi/tornado_swagger/views.py
new file mode 100644
index 0000000..2508319
--- /dev/null
+++ b/testapi/opnfv_testapi/tornado_swagger/views.py
@@ -0,0 +1,128 @@
+##############################################################################
+# Copyright (c) 2016 ZTE Corporation
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import inspect
+import json
+
+import tornado.template
+import tornado.web
+
+from settings import SWAGGER_VERSION, SWAGGER_API_LIST, SWAGGER_API_SPEC
+from settings import models, basePath
+
+
+def json_dumps(obj, pretty=False):
+ return json.dumps(obj, sort_keys=True, indent=4, separators=(',', ': ')) \
+ if pretty else json.dumps(obj)
+
+
+class SwaggerUIHandler(tornado.web.RequestHandler):
+ def initialize(self, static_path, **kwds):
+ self.static_path = static_path
+
+ def get_template_path(self):
+ return self.static_path
+
+ def get(self):
+ discovery_url = basePath() + self.reverse_url(SWAGGER_API_LIST)
+ self.render('index.html', discovery_url=discovery_url)
+
+
+class SwaggerResourcesHandler(tornado.web.RequestHandler):
+ def initialize(self, api_version, exclude_namespaces, **kwds):
+ self.api_version = api_version
+ self.exclude_namespaces = exclude_namespaces
+
+ def get(self):
+ self.set_header('content-type', 'application/json')
+ resources = {
+ 'apiVersion': self.api_version,
+ 'swaggerVersion': SWAGGER_VERSION,
+ 'basePath': basePath(),
+ 'produces': ["application/json"],
+ 'description': 'Test Api Spec',
+ 'apis': [{
+ 'path': self.reverse_url(SWAGGER_API_SPEC),
+ 'description': 'Test Api Spec'
+ }]
+ }
+
+ self.finish(json_dumps(resources, self.get_arguments('pretty')))
+
+
+class SwaggerApiHandler(tornado.web.RequestHandler):
+ def initialize(self, api_version, base_url, **kwds):
+ self.api_version = api_version
+ self.base_url = base_url
+
+ def get(self):
+ self.set_header('content-type', 'application/json')
+ apis = self.find_api(self.application.handlers)
+ if apis is None:
+ raise tornado.web.HTTPError(404)
+
+ specs = {
+ 'apiVersion': self.api_version,
+ 'swaggerVersion': SWAGGER_VERSION,
+ 'basePath': basePath(),
+ 'apis': [self.__get_api_spec__(path, spec, operations)
+ for path, spec, operations in apis],
+ 'models': self.__get_models_spec(models)
+ }
+ self.finish(json_dumps(specs, self.get_arguments('pretty')))
+
+ def __get_models_spec(self, models):
+ models_spec = {}
+ for model in models:
+ models_spec.setdefault(model.id, self.__get_model_spec(model))
+ return models_spec
+
+ @staticmethod
+ def __get_model_spec(model):
+ return {
+ 'description': model.summary,
+ 'id': model.id,
+ 'notes': model.notes,
+ 'properties': model.properties,
+ 'required': model.required
+ }
+
+ @staticmethod
+ def __get_api_spec__(path, spec, operations):
+ return {
+ 'path': path,
+ 'description': spec.handler_class.__doc__,
+ 'operations': [{
+ 'httpMethod': api.func.__name__.upper(),
+ 'nickname': api.nickname,
+ 'parameters': api.params.values(),
+ 'summary': api.summary,
+ 'notes': api.notes,
+ 'responseClass': api.responseClass,
+ 'responseMessages': api.responseMessages,
+ } for api in operations]
+ }
+
+ @staticmethod
+ def find_api(host_handlers):
+ def get_path(url, args):
+ return url % tuple(['{%s}' % arg for arg in args])
+
+ def get_operations(cls):
+ return [member.rest_api
+ for (_, member) in inspect.getmembers(cls)
+ if hasattr(member, 'rest_api')]
+
+ for host, handlers in host_handlers:
+ for spec in handlers:
+ for (_, mbr) in inspect.getmembers(spec.handler_class):
+ if inspect.ismethod(mbr) and hasattr(mbr, 'rest_api'):
+ path = get_path(spec._path, mbr.rest_api.func_args)
+ operations = get_operations(spec.handler_class)
+ yield path, spec, operations
+ break