From 9db91e3b166f07750a045d5e24f820837f5772b4 Mon Sep 17 00:00:00 2001 From: SerenaFeng Date: Fri, 27 Oct 2017 16:28:48 +0800 Subject: divide resources into handlers and models divide resources into handlers&models put ui handlers into handlers directory put User into user_models.py Change-Id: I3d9e260097205213c3ea8d4eac08b9019e017f71 Signed-off-by: SerenaFeng --- testapi/opnfv_testapi/handlers/__init__.py | 0 testapi/opnfv_testapi/handlers/base_handlers.py | 257 +++++++ testapi/opnfv_testapi/handlers/pod_handlers.py | 78 +++ testapi/opnfv_testapi/handlers/project_handlers.py | 86 +++ testapi/opnfv_testapi/handlers/result_handlers.py | 288 ++++++++ testapi/opnfv_testapi/handlers/root_handlers.py | 10 + .../opnfv_testapi/handlers/scenario_handlers.py | 775 +++++++++++++++++++++ testapi/opnfv_testapi/handlers/sign_handlers.py | 59 ++ .../opnfv_testapi/handlers/testcase_handlers.py | 103 +++ testapi/opnfv_testapi/handlers/user_handlers.py | 25 + testapi/opnfv_testapi/models/__init__.py | 8 + testapi/opnfv_testapi/models/base_models.py | 138 ++++ testapi/opnfv_testapi/models/pod_models.py | 53 ++ testapi/opnfv_testapi/models/project_models.py | 48 ++ testapi/opnfv_testapi/models/result_models.py | 129 ++++ testapi/opnfv_testapi/models/scenario_models.py | 218 ++++++ testapi/opnfv_testapi/models/testcase_models.py | 95 +++ testapi/opnfv_testapi/models/user_models.py | 9 + testapi/opnfv_testapi/resources/__init__.py | 8 - testapi/opnfv_testapi/resources/handlers.py | 257 ------- testapi/opnfv_testapi/resources/models.py | 138 ---- testapi/opnfv_testapi/resources/pod_handlers.py | 78 --- testapi/opnfv_testapi/resources/pod_models.py | 53 -- .../opnfv_testapi/resources/project_handlers.py | 86 --- testapi/opnfv_testapi/resources/project_models.py | 48 -- testapi/opnfv_testapi/resources/result_handlers.py | 288 -------- testapi/opnfv_testapi/resources/result_models.py | 129 ---- .../opnfv_testapi/resources/scenario_handlers.py | 775 --------------------- testapi/opnfv_testapi/resources/scenario_models.py | 218 ------ .../opnfv_testapi/resources/testcase_handlers.py | 103 --- testapi/opnfv_testapi/resources/testcase_models.py | 95 --- testapi/opnfv_testapi/router/url_mappings.py | 31 +- testapi/opnfv_testapi/tests/unit/executor.py | 2 +- .../opnfv_testapi/tests/unit/handlers/__init__.py | 0 .../tests/unit/handlers/scenario-c1.json | 38 + .../tests/unit/handlers/scenario-c2.json | 73 ++ .../opnfv_testapi/tests/unit/handlers/test_base.py | 204 ++++++ .../opnfv_testapi/tests/unit/handlers/test_pod.py | 118 ++++ .../tests/unit/handlers/test_project.py | 137 ++++ .../tests/unit/handlers/test_result.py | 413 +++++++++++ .../tests/unit/handlers/test_scenario.py | 449 ++++++++++++ .../tests/unit/handlers/test_testcase.py | 201 ++++++ .../tests/unit/handlers/test_token.py | 52 ++ .../tests/unit/handlers/test_version.py | 36 + .../opnfv_testapi/tests/unit/resources/__init__.py | 0 .../tests/unit/resources/scenario-c1.json | 38 - .../tests/unit/resources/scenario-c2.json | 73 -- .../tests/unit/resources/test_base.py | 204 ------ .../tests/unit/resources/test_fake_pymongo.py | 123 ---- .../opnfv_testapi/tests/unit/resources/test_pod.py | 118 ---- .../tests/unit/resources/test_project.py | 137 ---- .../tests/unit/resources/test_result.py | 413 ----------- .../tests/unit/resources/test_scenario.py | 449 ------------ .../tests/unit/resources/test_testcase.py | 201 ------ .../tests/unit/resources/test_token.py | 52 -- .../tests/unit/resources/test_version.py | 36 - testapi/opnfv_testapi/tornado_swagger/handlers.py | 2 +- testapi/opnfv_testapi/ui/__init__.py | 0 testapi/opnfv_testapi/ui/auth/__init__.py | 0 testapi/opnfv_testapi/ui/auth/sign.py | 59 -- testapi/opnfv_testapi/ui/auth/user.py | 33 - testapi/opnfv_testapi/ui/root.py | 10 - 62 files changed, 4118 insertions(+), 4239 deletions(-) create mode 100644 testapi/opnfv_testapi/handlers/__init__.py create mode 100644 testapi/opnfv_testapi/handlers/base_handlers.py create mode 100644 testapi/opnfv_testapi/handlers/pod_handlers.py create mode 100644 testapi/opnfv_testapi/handlers/project_handlers.py create mode 100644 testapi/opnfv_testapi/handlers/result_handlers.py create mode 100644 testapi/opnfv_testapi/handlers/root_handlers.py create mode 100644 testapi/opnfv_testapi/handlers/scenario_handlers.py create mode 100644 testapi/opnfv_testapi/handlers/sign_handlers.py create mode 100644 testapi/opnfv_testapi/handlers/testcase_handlers.py create mode 100644 testapi/opnfv_testapi/handlers/user_handlers.py create mode 100644 testapi/opnfv_testapi/models/__init__.py create mode 100644 testapi/opnfv_testapi/models/base_models.py create mode 100644 testapi/opnfv_testapi/models/pod_models.py create mode 100644 testapi/opnfv_testapi/models/project_models.py create mode 100644 testapi/opnfv_testapi/models/result_models.py create mode 100644 testapi/opnfv_testapi/models/scenario_models.py create mode 100644 testapi/opnfv_testapi/models/testcase_models.py create mode 100644 testapi/opnfv_testapi/models/user_models.py delete mode 100644 testapi/opnfv_testapi/resources/__init__.py delete mode 100644 testapi/opnfv_testapi/resources/handlers.py delete mode 100644 testapi/opnfv_testapi/resources/models.py delete mode 100644 testapi/opnfv_testapi/resources/pod_handlers.py delete mode 100644 testapi/opnfv_testapi/resources/pod_models.py delete mode 100644 testapi/opnfv_testapi/resources/project_handlers.py delete mode 100644 testapi/opnfv_testapi/resources/project_models.py delete mode 100644 testapi/opnfv_testapi/resources/result_handlers.py delete mode 100644 testapi/opnfv_testapi/resources/result_models.py delete mode 100644 testapi/opnfv_testapi/resources/scenario_handlers.py delete mode 100644 testapi/opnfv_testapi/resources/scenario_models.py delete mode 100644 testapi/opnfv_testapi/resources/testcase_handlers.py delete mode 100644 testapi/opnfv_testapi/resources/testcase_models.py create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/__init__.py create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/test_base.py create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/test_pod.py create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/test_project.py create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/test_result.py create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/test_token.py create mode 100644 testapi/opnfv_testapi/tests/unit/handlers/test_version.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/__init__.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_base.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_pod.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_project.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_result.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_scenario.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_testcase.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_token.py delete mode 100644 testapi/opnfv_testapi/tests/unit/resources/test_version.py delete mode 100644 testapi/opnfv_testapi/ui/__init__.py delete mode 100644 testapi/opnfv_testapi/ui/auth/__init__.py delete mode 100644 testapi/opnfv_testapi/ui/auth/sign.py delete mode 100644 testapi/opnfv_testapi/ui/auth/user.py delete mode 100644 testapi/opnfv_testapi/ui/root.py (limited to 'testapi') diff --git a/testapi/opnfv_testapi/handlers/__init__.py b/testapi/opnfv_testapi/handlers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/testapi/opnfv_testapi/handlers/base_handlers.py b/testapi/opnfv_testapi/handlers/base_handlers.py new file mode 100644 index 0000000..a8ee3db --- /dev/null +++ b/testapi/opnfv_testapi/handlers/base_handlers.py @@ -0,0 +1,257 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# feng.xiaowei@zte.com.cn refactor db.pod to db.pods 5-19-2016 +# feng.xiaowei@zte.com.cn refactor test_project to project 5-19-2016 +# feng.xiaowei@zte.com.cn refactor response body 5-19-2016 +# feng.xiaowei@zte.com.cn refactor pod/project response info 5-19-2016 +# feng.xiaowei@zte.com.cn refactor testcase related handler 5-20-2016 +# feng.xiaowei@zte.com.cn refactor result related handler 5-23-2016 +# feng.xiaowei@zte.com.cn refactor dashboard related handler 5-24-2016 +# feng.xiaowei@zte.com.cn add methods to GenericApiHandler 5-26-2016 +# feng.xiaowei@zte.com.cn remove PodHandler 5-26-2016 +# feng.xiaowei@zte.com.cn remove ProjectHandler 5-26-2016 +# feng.xiaowei@zte.com.cn remove TestcaseHandler 5-27-2016 +# feng.xiaowei@zte.com.cn remove ResultHandler 5-29-2016 +# feng.xiaowei@zte.com.cn remove DashboardHandler 5-30-2016 +############################################################################## + +from datetime import datetime +import json + +from tornado import gen +from tornado import web + +from opnfv_testapi.common import check +from opnfv_testapi.common import message +from opnfv_testapi.common import raises +from opnfv_testapi.db import api as dbapi +from opnfv_testapi.models import base_models +from opnfv_testapi.tornado_swagger import swagger + +DEFAULT_REPRESENTATION = "application/json" + + +class GenericApiHandler(web.RequestHandler): + def __init__(self, application, request, **kwargs): + super(GenericApiHandler, self).__init__(application, request, **kwargs) + self.json_args = None + self.table = None + self.table_cls = None + self.db_projects = 'projects' + self.db_pods = 'pods' + self.db_testcases = 'testcases' + self.db_results = 'results' + self.db_scenarios = 'scenarios' + self.auth = self.settings["auth"] + + def prepare(self): + if self.request.body: + if self.request.headers.get("Content-Type") is not None: + if self.request.headers["Content-Type"].startswith( + DEFAULT_REPRESENTATION): + try: + self.json_args = json.loads(self.request.body) + except (ValueError, KeyError, TypeError) as error: + raises.BadRequest(message.bad_format(str(error))) + + def finish_request(self, json_object=None): + if json_object: + self.write(json.dumps(json_object)) + self.set_header("Content-Type", DEFAULT_REPRESENTATION) + self.finish() + + def _create_response(self, resource): + href = self.request.full_url() + '/' + str(resource) + return base_models.CreateResponse(href=href).format() + + def format_data(self, data): + cls_data = self.table_cls.from_dict(data) + return cls_data.format_http() + + @web.asynchronous + @gen.coroutine + @check.is_authorized + @check.valid_token + @check.no_body + @check.miss_fields + @check.values_check + @check.carriers_exist + @check.new_not_exists + def _create(self, **kwargs): + """ + :param miss_checks: [miss1, miss2] + :param db_checks: [(table, exist, query, error)] + """ + data = self.table_cls.from_dict(self.json_args) + for k, v in kwargs.iteritems(): + if k != 'query': + data.__setattr__(k, v) + + if self.table != 'results': + data.creation_date = datetime.now() + _id = yield dbapi.db_save(self.table, data.format()) + if 'name' in self.json_args: + resource = data.name + else: + resource = _id + self.finish_request(self._create_response(resource)) + + @web.asynchronous + @gen.coroutine + def _list(self, query=None, res_op=None, *args, **kwargs): + sort = kwargs.get('sort') + page = kwargs.get('page', 0) + last = kwargs.get('last', 0) + per_page = kwargs.get('per_page', 0) + if query is None: + query = {} + pipelines = list() + pipelines.append({'$match': query}) + + total_pages = 0 + data = list() + cursor = dbapi.db_list(self.table, query) + records_count = yield cursor.count() + if records_count > 0: + if page > 0: + total_pages, return_nr = self._calc_total_pages(records_count, + last, + page, + per_page) + pipelines = self._set_pipelines(pipelines, + sort, + return_nr, + page, + per_page) + cursor = dbapi.db_aggregate(self.table, pipelines) + while (yield cursor.fetch_next): + data.append(self.format_data(cursor.next_object())) + if res_op is None: + res = {self.table: data} + else: + res = res_op(data, *args) + if page > 0: + res.update({ + 'pagination': { + 'current_page': kwargs.get('page'), + 'total_pages': total_pages + } + }) + self.finish_request(res) + + @staticmethod + def _calc_total_pages(records_count, last, page, per_page): + records_nr = records_count + if (records_count > last) and (last > 0): + records_nr = last + + total_pages, remainder = divmod(records_nr, per_page) + if remainder > 0: + total_pages += 1 + if page > 1 and page > total_pages: + raises.BadRequest( + 'Request page > total_pages [{}]'.format(total_pages)) + return total_pages, records_nr + + @staticmethod + def _set_pipelines(pipelines, sort, return_nr, page, per_page): + if sort: + pipelines.append({'$sort': sort}) + + over = (page - 1) * per_page + left = return_nr - over + pipelines.append({'$skip': over}) + pipelines.append({'$limit': per_page if per_page < left else left}) + + return pipelines + + @web.asynchronous + @gen.coroutine + @check.not_exist + def _get_one(self, data, query=None): + self.finish_request(self.format_data(data)) + + @web.asynchronous + @gen.coroutine + @check.not_exist + def _delete(self, data, query=None): + yield dbapi.db_delete(self.table, query) + self.finish_request() + + @web.asynchronous + @gen.coroutine + @check.no_body + @check.not_exist + @check.updated_one_not_exist + def _update(self, data, query=None, **kwargs): + data = self.table_cls.from_dict(data) + update_req = self._update_requests(data) + yield dbapi.db_update(self.table, query, update_req) + update_req['_id'] = str(data._id) + self.finish_request(update_req) + + @web.asynchronous + @gen.coroutine + @check.no_body + @check.not_exist + @check.updated_one_not_exist + def pure_update(self, data, query=None, **kwargs): + data = self.table_cls.from_dict(data) + update_req = self._update_requests(data) + yield dbapi.db_update(self.table, query, update_req) + self.finish_request() + + def _update_requests(self, data): + request = dict() + for k, v in self.json_args.iteritems(): + request = self._update_request(request, k, v, + data.__getattribute__(k)) + if not request: + raises.Forbidden(message.no_update()) + + edit_request = data.format() + edit_request.update(request) + return edit_request + + @staticmethod + def _update_request(edit_request, key, new_value, old_value): + """ + This function serves to prepare the elements in the update request. + We try to avoid replace the exact values in the db + edit_request should be a dict in which we add an entry (key) after + comparing values + """ + if not (new_value is None): + if new_value != old_value: + edit_request[key] = new_value + + return edit_request + + def _update_query(self, keys, data): + query = dict() + equal = True + for key in keys: + new = self.json_args.get(key) + old = data.get(key) + if new is None: + new = old + elif new != old: + equal = False + query[key] = new + return query if not equal else dict() + + +class VersionHandler(GenericApiHandler): + @swagger.operation(nickname='listAllVersions') + def get(self): + """ + @description: list all supported versions + @rtype: L{Versions} + """ + versions = [{'version': 'v1.0', 'description': 'basics'}] + self.finish_request({'versions': versions}) diff --git a/testapi/opnfv_testapi/handlers/pod_handlers.py b/testapi/opnfv_testapi/handlers/pod_handlers.py new file mode 100644 index 0000000..abf5bf9 --- /dev/null +++ b/testapi/opnfv_testapi/handlers/pod_handlers.py @@ -0,0 +1,78 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from opnfv_testapi.handlers import base_handlers +from opnfv_testapi.models import pod_models +from opnfv_testapi.tornado_swagger import swagger + + +class GenericPodHandler(base_handlers.GenericApiHandler): + def __init__(self, application, request, **kwargs): + super(GenericPodHandler, self).__init__(application, request, **kwargs) + self.table = 'pods' + self.table_cls = pod_models.Pod + + +class PodCLHandler(GenericPodHandler): + @swagger.operation(nickname='listAllPods') + def get(self): + """ + @description: list all pods + @return 200: list all pods, empty list is no pod exist + @rtype: L{Pods} + """ + self._list() + + @swagger.operation(nickname='createPod') + def post(self): + """ + @description: create a pod + @param body: pod to be created + @type body: L{PodCreateRequest} + @in body: body + @rtype: L{CreateResponse} + @return 200: pod is created. + @raise 403: pod already exists + @raise 400: body or name not provided + """ + def query(): + return {'name': self.json_args.get('name')} + miss_fields = ['name'] + self._create(miss_fields=miss_fields, query=query) + + +class PodGURHandler(GenericPodHandler): + @swagger.operation(nickname='getPodByName') + def get(self, pod_name): + """ + @description: get a single pod by pod_name + @rtype: L{Pod} + @return 200: pod exist + @raise 404: pod not exist + """ + self._get_one(query={'name': pod_name}) + + def delete(self, pod_name): + """ Remove a POD + + # check for an existing pod to be deleted + mongo_dict = yield self.db.pods.find_one( + {'name': pod_name}) + pod = TestProject.pod(mongo_dict) + if pod is None: + raise HTTPError(HTTP_NOT_FOUND, + "{} could not be found as a pod to be deleted" + .format(pod_name)) + + # just delete it, or maybe save it elsewhere in a future + res = yield self.db.projects.remove( + {'name': pod_name}) + + self.finish_request(answer) + """ + pass diff --git a/testapi/opnfv_testapi/handlers/project_handlers.py b/testapi/opnfv_testapi/handlers/project_handlers.py new file mode 100644 index 0000000..30d9ab3 --- /dev/null +++ b/testapi/opnfv_testapi/handlers/project_handlers.py @@ -0,0 +1,86 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from opnfv_testapi.handlers import base_handlers +from opnfv_testapi.models import project_models +from opnfv_testapi.tornado_swagger import swagger + + +class GenericProjectHandler(base_handlers.GenericApiHandler): + def __init__(self, application, request, **kwargs): + super(GenericProjectHandler, self).__init__(application, + request, + **kwargs) + self.table = 'projects' + self.table_cls = project_models.Project + + +class ProjectCLHandler(GenericProjectHandler): + @swagger.operation(nickname="listAllProjects") + def get(self): + """ + @description: list all projects + @return 200: return all projects, empty list is no project exist + @rtype: L{Projects} + """ + self._list() + + @swagger.operation(nickname="createProject") + def post(self): + """ + @description: create a project + @param body: project to be created + @type body: L{ProjectCreateRequest} + @in body: body + @rtype: L{CreateResponse} + @return 200: project is created. + @raise 403: project already exists + @raise 400: body or name not provided + """ + def query(): + return {'name': self.json_args.get('name')} + miss_fields = ['name'] + self._create(miss_fields=miss_fields, query=query) + + +class ProjectGURHandler(GenericProjectHandler): + @swagger.operation(nickname='getProjectByName') + def get(self, project_name): + """ + @description: get a single project by project_name + @rtype: L{Project} + @return 200: project exist + @raise 404: project not exist + """ + self._get_one(query={'name': project_name}) + + @swagger.operation(nickname="updateProjectByName") + def put(self, project_name): + """ + @description: update a single project by project_name + @param body: project to be updated + @type body: L{ProjectUpdateRequest} + @in body: body + @rtype: L{Project} + @return 200: update success + @raise 404: project not exist + @raise 403: new project name already exist or nothing to update + """ + query = {'name': project_name} + db_keys = ['name'] + self._update(query=query, db_keys=db_keys) + + @swagger.operation(nickname='deleteProjectByName') + def delete(self, project_name): + """ + @description: delete a project by project_name + @return 200: delete success + @raise 404: project not exist + """ + self._delete(query={'name': project_name}) diff --git a/testapi/opnfv_testapi/handlers/result_handlers.py b/testapi/opnfv_testapi/handlers/result_handlers.py new file mode 100644 index 0000000..c4b61ff --- /dev/null +++ b/testapi/opnfv_testapi/handlers/result_handlers.py @@ -0,0 +1,288 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from datetime import datetime +from datetime import timedelta +import json +import logging + +from bson import objectid + +from opnfv_testapi.common import constants +from opnfv_testapi.common import message +from opnfv_testapi.common import raises +from opnfv_testapi.common.config import CONF +from opnfv_testapi.handlers import base_handlers +from opnfv_testapi.models import result_models +from opnfv_testapi.tornado_swagger import swagger + + +class GenericResultHandler(base_handlers.GenericApiHandler): + def __init__(self, application, request, **kwargs): + super(GenericResultHandler, self).__init__(application, + request, + **kwargs) + self.table = self.db_results + self.table_cls = result_models.TestResult + + def get_int(self, key, value): + try: + value = int(value) + except Exception: + raises.BadRequest(message.must_int(key)) + return value + + def set_query(self): + query = dict() + date_range = dict() + + query['public'] = {'$not': {'$eq': 'false'}} + for k in self.request.query_arguments.keys(): + v = self.get_query_argument(k) + if k == 'project' or k == 'pod' or k == 'case': + query[k + '_name'] = v + elif k == 'period': + v = self.get_int(k, v) + if v > 0: + period = datetime.now() - timedelta(days=v) + obj = {"$gte": str(period)} + query['start_date'] = obj + elif k == 'trust_indicator': + query[k + '.current'] = float(v) + elif k == 'from': + date_range.update({'$gte': str(v)}) + elif k == 'to': + date_range.update({'$lt': str(v)}) + elif k == 'signed': + username = self.get_secure_cookie(constants.TESTAPI_ID) + role = self.get_secure_cookie(constants.ROLE) + if role: + del query['public'] + if role != "reviewer": + query['user'] = username + elif k not in ['last', 'page', 'descend']: + query[k] = v + if date_range: + query['start_date'] = date_range + + # if $lt is not provided, + # empty/None/null/'' start_date will also be returned + if 'start_date' in query and '$lt' not in query['start_date']: + query['start_date'].update({'$lt': str(datetime.now())}) + + return query + + +class ResultsCLHandler(GenericResultHandler): + @swagger.operation(nickname="queryTestResults") + def get(self): + """ + @description: Retrieve result(s) for a test project + on a specific pod. + @notes: Retrieve result(s) for a test project on a specific pod. + Available filters for this request are : + - project : project name + - case : case name + - pod : pod name + - version : platform version (Arno-R1, ...) + - installer : fuel/apex/compass/joid/daisy + - build_tag : Jenkins build tag name + - period : x last days, incompatible with from/to + - from : starting time in 2016-01-01 or 2016-01-01 00:01:23 + - to : ending time in 2016-01-01 or 2016-01-01 00:01:23 + - scenario : the test scenario (previously version) + - criteria : the global criteria status passed or failed + - trust_indicator : evaluate the stability of the test case + to avoid running systematically long and stable test case + - signed : get logined user result + + GET /results/project=functest&case=vPing&version=Arno-R1 \ + &pod=pod_name&period=15&signed + @return 200: all test results consist with query, + empty list if no result is found + @rtype: L{TestResults} + @param pod: pod name + @type pod: L{string} + @in pod: query + @required pod: False + @param project: project name + @type project: L{string} + @in project: query + @required project: False + @param case: case name + @type case: L{string} + @in case: query + @required case: False + @param version: i.e. Colorado + @type version: L{string} + @in version: query + @required version: False + @param installer: fuel/apex/joid/compass + @type installer: L{string} + @in installer: query + @required installer: False + @param build_tag: i.e. v3.0 + @type build_tag: L{string} + @in build_tag: query + @required build_tag: False + @param scenario: i.e. odl + @type scenario: L{string} + @in scenario: query + @required scenario: False + @param criteria: i.e. passed + @type criteria: L{string} + @in criteria: query + @required criteria: False + @param period: last days + @type period: L{string} + @in period: query + @required period: False + @param from: i.e. 2016-01-01 or 2016-01-01 00:01:23 + @type from: L{string} + @in from: query + @required from: False + @param to: i.e. 2016-01-01 or 2016-01-01 00:01:23 + @type to: L{string} + @in to: query + @required to: False + @param last: last records stored until now + @type last: L{string} + @in last: query + @required last: False + @param page: which page to list, default to 1 + @type page: L{int} + @in page: query + @required page: False + @param trust_indicator: must be float + @type trust_indicator: L{float} + @in trust_indicator: query + @required trust_indicator: False + @param signed: user results or all results + @type signed: L{string} + @in signed: query + @required signed: False + @param descend: true, newest2oldest; false, oldest2newest + @type descend: L{string} + @in descend: query + @required descend: False + """ + def descend_limit(): + descend = self.get_query_argument('descend', 'true') + return -1 if descend.lower() == 'true' else 1 + + def last_limit(): + return self.get_int('last', self.get_query_argument('last', 0)) + + def page_limit(): + return self.get_int('page', self.get_query_argument('page', 1)) + + limitations = { + 'sort': {'_id': descend_limit()}, + 'last': last_limit(), + 'page': page_limit(), + 'per_page': CONF.api_results_per_page + } + + self._list(query=self.set_query(), **limitations) + + @swagger.operation(nickname="createTestResult") + def post(self): + """ + @description: create a test result + @param body: result to be created + @type body: L{ResultCreateRequest} + @in body: body + @rtype: L{CreateResponse} + @return 200: result is created. + @raise 404: pod/project/testcase not exist + @raise 400: body/pod_name/project_name/case_name not provided + """ + self._post() + + def _post(self): + def pod_query(): + return {'name': self.json_args.get('pod_name')} + + def project_query(): + return {'name': self.json_args.get('project_name')} + + def testcase_query(): + return {'project_name': self.json_args.get('project_name'), + 'name': self.json_args.get('case_name')} + + def options_check(field, options): + return self.json_args.get(field).upper() in options + + miss_fields = ['pod_name', 'project_name', 'case_name'] + carriers = [('pods', pod_query), + ('projects', project_query), + ('testcases', testcase_query)] + values_check = [('criteria', options_check, ['PASS', 'FAIL'])] + + self._create(miss_fields=miss_fields, + carriers=carriers, + values_check=values_check) + + +class ResultsUploadHandler(ResultsCLHandler): + @swagger.operation(nickname="uploadTestResult") + def post(self): + """ + @description: upload and create a test result + @param body: result to be created + @type body: L{ResultCreateRequest} + @in body: body + @rtype: L{CreateResponse} + @return 200: result is created. + @raise 404: pod/project/testcase not exist + @raise 400: body/pod_name/project_name/case_name not provided + """ + logging.info('file upload') + fileinfo = self.request.files['file'][0] + is_public = self.get_body_argument('public') + logging.warning('public:%s', is_public) + logging.info('results is :%s', fileinfo['filename']) + logging.info('results is :%s', fileinfo['body']) + self.json_args = json.loads(fileinfo['body']).copy() + self.json_args['public'] = is_public + + openid = self.get_secure_cookie(constants.TESTAPI_ID) + if openid: + self.json_args['user'] = openid + + super(ResultsUploadHandler, self)._post() + + +class ResultsGURHandler(GenericResultHandler): + @swagger.operation(nickname='getTestResultById') + def get(self, result_id): + """ + @description: get a single result by result_id + @rtype: L{TestResult} + @return 200: test result exist + @raise 404: test result not exist + """ + query = dict() + query["_id"] = objectid.ObjectId(result_id) + self._get_one(query=query) + + @swagger.operation(nickname="updateTestResultById") + def put(self, result_id): + """ + @description: update a single result by _id + @param body: fields to be updated + @type body: L{ResultUpdateRequest} + @in body: body + @rtype: L{Result} + @return 200: update success + @raise 404: result not exist + @raise 403: nothing to update + """ + query = {'_id': objectid.ObjectId(result_id)} + db_keys = [] + self._update(query=query, db_keys=db_keys) diff --git a/testapi/opnfv_testapi/handlers/root_handlers.py b/testapi/opnfv_testapi/handlers/root_handlers.py new file mode 100644 index 0000000..92920fa --- /dev/null +++ b/testapi/opnfv_testapi/handlers/root_handlers.py @@ -0,0 +1,10 @@ +from opnfv_testapi.common.config import CONF +from opnfv_testapi.handlers import base_handlers + + +class RootHandler(base_handlers.GenericApiHandler): + def get_template_path(self): + return CONF.ui_static_path + + def get(self): + self.render('testapi-ui/index.html') diff --git a/testapi/opnfv_testapi/handlers/scenario_handlers.py b/testapi/opnfv_testapi/handlers/scenario_handlers.py new file mode 100644 index 0000000..67abcff --- /dev/null +++ b/testapi/opnfv_testapi/handlers/scenario_handlers.py @@ -0,0 +1,775 @@ +import functools + +from opnfv_testapi.common import message +from opnfv_testapi.common import raises +from opnfv_testapi.handlers import base_handlers +import opnfv_testapi.models.scenario_models as models +from opnfv_testapi.tornado_swagger import swagger + + +class GenericScenarioHandler(base_handlers.GenericApiHandler): + def __init__(self, application, request, **kwargs): + super(GenericScenarioHandler, self).__init__(application, + request, + **kwargs) + self.table = self.db_scenarios + self.table_cls = models.Scenario + + def set_query(self, locators): + query = dict() + elem_query = dict() + for k, v in locators.iteritems(): + if k == 'scenario': + query['name'] = v + elif k == 'installer': + elem_query["installer"] = v + elif k == 'version': + elem_query["versions.version"] = v + elif k == 'project': + elem_query["versions.projects.project"] = v + else: + query[k] = v + if elem_query: + query['installers'] = {'$elemMatch': elem_query} + return query + + +class ScenariosCLHandler(GenericScenarioHandler): + @swagger.operation(nickname="queryScenarios") + def get(self): + """ + @description: Retrieve scenario(s). + @notes: Retrieve scenario(s) + Available filters for this request are : + - name : scenario name + + GET /scenarios?name=scenario_1 + @param name: scenario name + @type name: L{string} + @in name: query + @required name: False + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: False + @param version: version + @type version: L{string} + @in version: query + @required version: False + @param project: project name + @type project: L{string} + @in project: query + @required project: False + @return 200: all scenarios satisfy queries, + empty list if no scenario is found + @rtype: L{Scenarios} + """ + + def _set_query(): + query = dict() + elem_query = dict() + for k in self.request.query_arguments.keys(): + v = self.get_query_argument(k) + if k == 'installer': + elem_query["installer"] = v + elif k == 'version': + elem_query["versions.version"] = v + elif k == 'project': + elem_query["versions.projects.project"] = v + else: + query[k] = v + if elem_query: + query['installers'] = {'$elemMatch': elem_query} + return query + + self._list(query=_set_query()) + + @swagger.operation(nickname="createScenario") + def post(self): + """ + @description: create a new scenario by name + @param body: scenario to be created + @type body: L{ScenarioCreateRequest} + @in body: body + @rtype: L{CreateResponse} + @return 200: scenario is created. + @raise 403: scenario already exists + @raise 400: body or name not provided + """ + def query(): + return {'name': self.json_args.get('name')} + miss_fields = ['name'] + self._create(miss_fields=miss_fields, query=query) + + +class ScenarioGURHandler(GenericScenarioHandler): + @swagger.operation(nickname='getScenarioByName') + def get(self, name): + """ + @description: get a single scenario by name + @rtype: L{Scenario} + @return 200: scenario exist + @raise 404: scenario not exist + """ + self._get_one(query={'name': name}) + pass + + @swagger.operation(nickname="updateScenarioName") + def put(self, name): + """ + @description: update scenario, only rename is supported currently + @param body: fields to be updated + @type body: L{ScenarioUpdateRequest} + @in body: body + @rtype: L{Scenario} + @return 200: update success + @raise 404: scenario not exist + @raise 403: nothing to update + """ + query = {'name': name} + db_keys = ['name'] + self._update(query=query, db_keys=db_keys) + + @swagger.operation(nickname="deleteScenarioByName") + def delete(self, name): + """ + @description: delete a scenario by name + @return 200: delete success + @raise 404: scenario not exist: + """ + self._delete(query={'name': name}) + + +class ScenarioUpdater(object): + def __init__(self, data, body=None, + installer=None, version=None, project=None): + self.data = data + self.body = body + self.installer = installer + self.version = version + self.project = project + + def update(self, item, action): + updates = { + ('scores', 'post'): self._update_requests_add_score, + ('trust_indicators', 'post'): self._update_requests_add_ti, + ('customs', 'post'): self._update_requests_add_customs, + ('customs', 'put'): self._update_requests_update_customs, + ('customs', 'delete'): self._update_requests_delete_customs, + ('projects', 'post'): self._update_requests_add_projects, + ('projects', 'put'): self._update_requests_update_projects, + ('projects', 'delete'): self._update_requests_delete_projects, + ('owner', 'put'): self._update_requests_change_owner, + ('versions', 'post'): self._update_requests_add_versions, + ('versions', 'put'): self._update_requests_update_versions, + ('versions', 'delete'): self._update_requests_delete_versions, + ('installers', 'post'): self._update_requests_add_installers, + ('installers', 'put'): self._update_requests_update_installers, + ('installers', 'delete'): self._update_requests_delete_installers, + } + updates[(item, action)](self.data) + + return self.data.format() + + def iter_installers(xstep): + @functools.wraps(xstep) + def magic(self, data): + [xstep(self, installer) + for installer in self._filter_installers(data.installers)] + return magic + + def iter_versions(xstep): + @functools.wraps(xstep) + def magic(self, installer): + [xstep(self, version) + for version in (self._filter_versions(installer.versions))] + return magic + + def iter_projects(xstep): + @functools.wraps(xstep) + def magic(self, version): + [xstep(self, project) + for project in (self._filter_projects(version.projects))] + return magic + + @iter_installers + @iter_versions + @iter_projects + def _update_requests_add_score(self, project): + project.scores.append( + models.ScenarioScore.from_dict(self.body)) + + @iter_installers + @iter_versions + @iter_projects + def _update_requests_add_ti(self, project): + project.trust_indicators.append( + models.ScenarioTI.from_dict(self.body)) + + @iter_installers + @iter_versions + @iter_projects + def _update_requests_add_customs(self, project): + project.customs = list(set(project.customs + self.body)) + + @iter_installers + @iter_versions + @iter_projects + def _update_requests_update_customs(self, project): + project.customs = list(set(self.body)) + + @iter_installers + @iter_versions + @iter_projects + def _update_requests_delete_customs(self, project): + project.customs = filter( + lambda f: f not in self.body, + project.customs) + + @iter_installers + @iter_versions + def _update_requests_add_projects(self, version): + version.projects = self._update_with_body(models.ScenarioProject, + 'project', + version.projects) + + @iter_installers + @iter_versions + def _update_requests_update_projects(self, version): + version.projects = self._update_with_body(models.ScenarioProject, + 'project', + list()) + + @iter_installers + @iter_versions + def _update_requests_delete_projects(self, version): + version.projects = self._remove_projects(version.projects) + + @iter_installers + @iter_versions + def _update_requests_change_owner(self, version): + version.owner = self.body.get('owner') + + @iter_installers + def _update_requests_add_versions(self, installer): + installer.versions = self._update_with_body(models.ScenarioVersion, + 'version', + installer.versions) + + @iter_installers + def _update_requests_update_versions(self, installer): + installer.versions = self._update_with_body(models.ScenarioVersion, + 'version', + list()) + + @iter_installers + def _update_requests_delete_versions(self, installer): + installer.versions = self._remove_versions(installer.versions) + + def _update_requests_add_installers(self, scenario): + scenario.installers = self._update_with_body(models.ScenarioInstaller, + 'installer', + scenario.installers) + + def _update_requests_update_installers(self, scenario): + scenario.installers = self._update_with_body(models.ScenarioInstaller, + 'installer', + list()) + + def _update_requests_delete_installers(self, scenario): + scenario.installers = self._remove_installers(scenario.installers) + + def _update_with_body(self, clazz, field, withs): + exists = list() + malformat = list() + for new in self.body: + try: + format_new = clazz.from_dict_with_raise(new) + new_name = getattr(format_new, field) + if not any(getattr(o, field) == new_name for o in withs): + withs.append(format_new) + else: + exists.append(new_name) + except Exception as error: + malformat.append(error.message) + if malformat: + raises.BadRequest(message.bad_format(malformat)) + elif exists: + raises.Conflict(message.exist('{}s'.format(field), exists)) + return withs + + def _filter_installers(self, installers): + return self._filter('installer', installers) + + def _remove_installers(self, installers): + return self._remove('installer', installers) + + def _filter_versions(self, versions): + return self._filter('version', versions) + + def _remove_versions(self, versions): + return self._remove('version', versions) + + def _filter_projects(self, projects): + return self._filter('project', projects) + + def _remove_projects(self, projects): + return self._remove('project', projects) + + def _filter(self, item, items): + return filter( + lambda f: getattr(f, item) == getattr(self, item), + items) + + def _remove(self, field, fields): + return filter( + lambda f: getattr(f, field) not in self.body, + fields) + + +class GenericScenarioUpdateHandler(GenericScenarioHandler): + def __init__(self, application, request, **kwargs): + super(GenericScenarioUpdateHandler, self).__init__(application, + request, + **kwargs) + self.installer = None + self.version = None + self.project = None + self.item = None + self.action = None + + def do_update(self, item, action, locators): + self.item = item + self.action = action + for k, v in locators.iteritems(): + if not v: + v = self.get_query_argument(k) + setattr(self, k, v) + locators[k] = v + self.pure_update(query=self.set_query(locators=locators)) + + def _update_requests(self, data): + return ScenarioUpdater(data, + self.json_args, + self.installer, + self.version, + self.project).update(self.item, self.action) + + +class ScenarioScoresHandler(GenericScenarioUpdateHandler): + @swagger.operation(nickname="addScoreRecord") + def post(self, scenario): + """ + @description: add a new score record + @notes: add a new score record to a project + POST /api/v1/scenarios//scores? \ + installer=& \ + version=& \ + project= + @param body: score to be added + @type body: L{ScenarioScore} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @param version: version + @type version: L{string} + @in version: query + @required version: True + @param project: project name + @type project: L{string} + @in project: query + @required project: True + @return 200: score is created. + @raise 404: scenario/installer/version/project not existed + """ + self.do_update('scores', + 'post', + locators={'scenario': scenario, + 'installer': None, + 'version': None, + 'project': None}) + + +class ScenarioTIsHandler(GenericScenarioUpdateHandler): + @swagger.operation(nickname="addTrustIndicatorRecord") + def post(self, scenario): + """ + @description: add a new trust indicator record + @notes: add a new trust indicator record to a project + POST /api/v1/scenarios//trust_indicators? \ + installer=& \ + version=& \ + project= + @param body: trust indicator to be added + @type body: L{ScenarioTI} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @param version: version + @type version: L{string} + @in version: query + @required version: True + @param project: project name + @type project: L{string} + @in project: query + @required project: True + @return 200: trust indicator is added. + @raise 404: scenario/installer/version/project not existed + """ + self.do_update('trust_indicators', + 'post', + locators={'scenario': scenario, + 'installer': None, + 'version': None, + 'project': None}) + + +class ScenarioCustomsHandler(GenericScenarioUpdateHandler): + @swagger.operation(nickname="addCustomizedTestCases") + def post(self, scenario): + """ + @description: add customized test cases + @notes: add several test cases to a project + POST /api/v1/scenarios//customs? \ + installer=& \ + version=& \ + project= + @param body: test cases to be added + @type body: C{list} of L{string} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @param version: version + @type version: L{string} + @in version: query + @required version: True + @param project: project name + @type project: L{string} + @in project: query + @required project: True + @return 200: test cases are added. + @raise 404: scenario/installer/version/project not existed + """ + self.do_update('customs', + 'post', + locators={'scenario': scenario, + 'installer': None, + 'version': None, + 'project': None}) + + @swagger.operation(nickname="updateCustomizedTestCases") + def put(self, scenario): + """ + @description: update customized test cases + @notes: substitute all the customized test cases + PUT /api/v1/scenarios//customs? \ + installer=& \ + version=& \ + project= + @param body: new supported test cases + @type body: C{list} of L{string} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @param version: version + @type version: L{string} + @in version: query + @required version: True + @param project: project name + @type project: L{string} + @in project: query + @required project: True + @return 200: substitute test cases success. + @raise 404: scenario/installer/version/project not existed + """ + self.do_update('customs', + 'put', + locators={'scenario': scenario, + 'installer': None, + 'version': None, + 'project': None}) + + @swagger.operation(nickname="deleteCustomizedTestCases") + def delete(self, scenario): + """ + @description: delete one or several customized test cases + @notes: delete one or some customized test cases + DELETE /api/v1/scenarios//customs? \ + installer=& \ + version=& \ + project= + @param body: test case(s) to be deleted + @type body: C{list} of L{string} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @param version: version + @type version: L{string} + @in version: query + @required version: True + @param project: project name + @type project: L{string} + @in project: query + @required project: True + @return 200: delete test case(s) success. + @raise 404: scenario/installer/version/project not existed + """ + self.do_update('customs', + 'delete', + locators={'scenario': scenario, + 'installer': None, + 'version': None, + 'project': None}) + + +class ScenarioProjectsHandler(GenericScenarioUpdateHandler): + @swagger.operation(nickname="addProjectsUnderScenario") + def post(self, scenario): + """ + @description: add projects to scenario + @notes: add one or multiple projects + POST /api/v1/scenarios//projects? \ + installer=& \ + version= + @param body: projects to be added + @type body: C{list} of L{ScenarioProject} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @param version: version + @type version: L{string} + @in version: query + @required version: True + @return 200: projects are added. + @raise 400: bad schema + @raise 409: conflict, project already exists + @raise 404: scenario/installer/version not existed + """ + self.do_update('projects', + 'post', + locators={'scenario': scenario, + 'installer': None, + 'version': None}) + + @swagger.operation(nickname="updateScenarioProjects") + def put(self, scenario): + """ + @description: replace all projects + @notes: substitute all projects, delete existed ones with new provides + PUT /api/v1/scenarios//projects? \ + installer=& \ + version= + @param body: new projects + @type body: C{list} of L{ScenarioProject} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @param version: version + @type version: L{string} + @in version: query + @required version: True + @return 200: replace projects success. + @raise 400: bad schema + @raise 404: scenario/installer/version not existed + """ + self.do_update('projects', + 'put', + locators={'scenario': scenario, + 'installer': None, + 'version': None}) + + @swagger.operation(nickname="deleteProjectsUnderScenario") + def delete(self, scenario): + """ + @description: delete one or multiple projects + @notes: delete one or multiple projects + DELETE /api/v1/scenarios//projects? \ + installer=& \ + version= + @param body: projects(names) to be deleted + @type body: C{list} of L{string} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @param version: version + @type version: L{string} + @in version: query + @required version: True + @return 200: delete project(s) success. + @raise 404: scenario/installer/version not existed + """ + self.do_update('projects', + 'delete', + locators={'scenario': scenario, + 'installer': None, + 'version': None}) + + +class ScenarioOwnerHandler(GenericScenarioUpdateHandler): + @swagger.operation(nickname="changeScenarioOwner") + def put(self, scenario): + """ + @description: change scenario owner + @notes: substitute all projects, delete existed ones with new provides + PUT /api/v1/scenarios//owner? \ + installer=& \ + version= + @param body: new owner + @type body: L{ScenarioChangeOwnerRequest} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @param version: version + @type version: L{string} + @in version: query + @required version: True + @return 200: change owner success. + @raise 404: scenario/installer/version not existed + """ + self.do_update('owner', + 'put', + locators={'scenario': scenario, + 'installer': None, + 'version': None}) + + +class ScenarioVersionsHandler(GenericScenarioUpdateHandler): + @swagger.operation(nickname="addVersionsUnderScenario") + def post(self, scenario): + """ + @description: add versions to scenario + @notes: add one or multiple versions + POST /api/v1/scenarios//versions? \ + installer= + @param body: versions to be added + @type body: C{list} of L{ScenarioVersion} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @return 200: versions are added. + @raise 400: bad schema + @raise 409: conflict, version already exists + @raise 404: scenario/installer not exist + """ + self.do_update('versions', + 'post', + locators={'scenario': scenario, + 'installer': None}) + + @swagger.operation(nickname="updateVersionsUnderScenario") + def put(self, scenario): + """ + @description: replace all versions + @notes: substitute all versions as a totality + PUT /api/v1/scenarios//versions? \ + installer= + @param body: new versions + @type body: C{list} of L{ScenarioVersion} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @return 200: replace versions success. + @raise 400: bad schema + @raise 404: scenario/installer not exist + """ + self.do_update('versions', + 'put', + locators={'scenario': scenario, + 'installer': None}) + + @swagger.operation(nickname="deleteVersionsUnderScenario") + def delete(self, scenario): + """ + @description: delete one or multiple versions + @notes: delete one or multiple versions + DELETE /api/v1/scenarios//versions? \ + installer= + @param body: versions(names) to be deleted + @type body: C{list} of L{string} + @in body: body + @param installer: installer type + @type installer: L{string} + @in installer: query + @required installer: True + @return 200: delete versions success. + @raise 404: scenario/installer not exist + """ + self.do_update('versions', + 'delete', + locators={'scenario': scenario, + 'installer': None}) + + +class ScenarioInstallersHandler(GenericScenarioUpdateHandler): + @swagger.operation(nickname="addInstallersUnderScenario") + def post(self, scenario): + """ + @description: add installers to scenario + @notes: add one or multiple installers + POST /api/v1/scenarios//installers + @param body: installers to be added + @type body: C{list} of L{ScenarioInstaller} + @in body: body + @return 200: installers are added. + @raise 400: bad schema + @raise 409: conflict, installer already exists + @raise 404: scenario not exist + """ + self.do_update('installers', + 'post', + locators={'scenario': scenario}) + + @swagger.operation(nickname="updateInstallersUnderScenario") + def put(self, scenario): + """ + @description: replace all installers + @notes: substitute all installers as a totality + PUT /api/v1/scenarios//installers + @param body: new installers + @type body: C{list} of L{ScenarioInstaller} + @in body: body + @return 200: replace versions success. + @raise 400: bad schema + @raise 404: scenario/installer not exist + """ + self.do_update('installers', + 'put', + locators={'scenario': scenario}) + + @swagger.operation(nickname="deleteInstallersUnderScenario") + def delete(self, scenario): + """ + @description: delete one or multiple installers + @notes: delete one or multiple installers + DELETE /api/v1/scenarios//installers + @param body: installers(names) to be deleted + @type body: C{list} of L{string} + @in body: body + @return 200: delete versions success. + @raise 404: scenario/installer not exist + """ + self.do_update('installers', + 'delete', + locators={'scenario': scenario}) diff --git a/testapi/opnfv_testapi/handlers/sign_handlers.py b/testapi/opnfv_testapi/handlers/sign_handlers.py new file mode 100644 index 0000000..7540662 --- /dev/null +++ b/testapi/opnfv_testapi/handlers/sign_handlers.py @@ -0,0 +1,59 @@ +from cas import CASClient +from tornado import gen +from tornado import web + +from opnfv_testapi.common import constants +from opnfv_testapi.common.config import CONF +from opnfv_testapi.db import api as dbapi +from opnfv_testapi.handlers import base_handlers + + +class SignBaseHandler(base_handlers.GenericApiHandler): + def __init__(self, application, request, **kwargs): + super(SignBaseHandler, self).__init__(application, request, **kwargs) + self.table = 'users' + self.cas_client = CASClient(version='2', + server_url=CONF.lfid_cas_url, + service_url='{}/{}'.format( + CONF.ui_url, + CONF.lfid_signin_return)) + + +class SigninHandler(SignBaseHandler): + def get(self): + self.redirect(url=(self.cas_client.get_login_url())) + + +class SigninReturnHandler(SignBaseHandler): + + @web.asynchronous + @gen.coroutine + def get(self): + ticket = self.get_query_argument('ticket', default=None) + if ticket: + (user, attrs, _) = self.cas_client.verify_ticket(ticket=ticket) + login_user = { + 'user': user, + 'email': attrs.get('mail'), + 'fullname': attrs.get('field_lf_full_name'), + 'groups': constants.TESTAPI_USERS + attrs.get('group', []) + } + q_user = {'user': user} + db_user = yield dbapi.db_find_one(self.table, q_user) + if not db_user: + dbapi.db_save(self.table, login_user) + else: + dbapi.db_update(self.table, q_user, login_user) + + self.clear_cookie(constants.TESTAPI_ID) + self.set_secure_cookie(constants.TESTAPI_ID, user) + + self.redirect(url=CONF.ui_url) + + +class SignoutHandler(SignBaseHandler): + def get(self): + """Handle signout request.""" + self.clear_cookie(constants.TESTAPI_ID) + logout_url = self.cas_client.get_logout_url(redirect_url=CONF.ui_url) + self.redirect(url=logout_url) diff --git a/testapi/opnfv_testapi/handlers/testcase_handlers.py b/testapi/opnfv_testapi/handlers/testcase_handlers.py new file mode 100644 index 0000000..c4c3c21 --- /dev/null +++ b/testapi/opnfv_testapi/handlers/testcase_handlers.py @@ -0,0 +1,103 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from opnfv_testapi.handlers import base_handlers +from opnfv_testapi.models import testcase_models +from opnfv_testapi.tornado_swagger import swagger + + +class GenericTestcaseHandler(base_handlers.GenericApiHandler): + def __init__(self, application, request, **kwargs): + super(GenericTestcaseHandler, self).__init__(application, + request, + **kwargs) + self.table = self.db_testcases + self.table_cls = testcase_models.Testcase + + +class TestcaseCLHandler(GenericTestcaseHandler): + @swagger.operation(nickname="listAllTestCases") + def get(self, project_name): + """ + @description: list all testcases of a project by project_name + @return 200: return all testcases of this project, + empty list is no testcase exist in this project + @rtype: L{TestCases} + """ + self._list(query={'project_name': project_name}) + + @swagger.operation(nickname="createTestCase") + def post(self, project_name): + """ + @description: create a testcase of a project by project_name + @param body: testcase to be created + @type body: L{TestcaseCreateRequest} + @in body: body + @rtype: L{CreateResponse} + @return 200: testcase is created in this project. + @raise 403: project not exist + or testcase already exists in this project + @raise 400: body or name not provided + """ + def project_query(): + return {'name': project_name} + + def testcase_query(): + return {'project_name': project_name, + 'name': self.json_args.get('name')} + miss_fields = ['name'] + carriers = [(self.db_projects, project_query)] + self._create(miss_fields=miss_fields, + carriers=carriers, + query=testcase_query, + project_name=project_name) + + +class TestcaseGURHandler(GenericTestcaseHandler): + @swagger.operation(nickname='getTestCaseByName') + def get(self, project_name, case_name): + """ + @description: get a single testcase + by case_name and project_name + @rtype: L{Testcase} + @return 200: testcase exist + @raise 404: testcase not exist + """ + query = dict() + query['project_name'] = project_name + query["name"] = case_name + self._get_one(query=query) + + @swagger.operation(nickname="updateTestCaseByName") + def put(self, project_name, case_name): + """ + @description: update a single testcase + by project_name and case_name + @param body: testcase to be updated + @type body: L{TestcaseUpdateRequest} + @in body: body + @rtype: L{Project} + @return 200: update success + @raise 404: testcase or project not exist + @raise 403: new testcase name already exist in project + or nothing to update + """ + query = {'project_name': project_name, 'name': case_name} + db_keys = ['name', 'project_name'] + self._update(query=query, db_keys=db_keys) + + @swagger.operation(nickname='deleteTestCaseByName') + def delete(self, project_name, case_name): + """ + @description: delete a testcase by project_name and case_name + @return 200: delete success + @raise 404: testcase not exist + """ + query = {'project_name': project_name, 'name': case_name} + self._delete(query=query) diff --git a/testapi/opnfv_testapi/handlers/user_handlers.py b/testapi/opnfv_testapi/handlers/user_handlers.py new file mode 100644 index 0000000..5067e35 --- /dev/null +++ b/testapi/opnfv_testapi/handlers/user_handlers.py @@ -0,0 +1,25 @@ +from opnfv_testapi.common import constants +from opnfv_testapi.common import raises +from opnfv_testapi.common.config import CONF +from opnfv_testapi.handlers import base_handlers +from opnfv_testapi.models.user_models import User + + +class UserHandler(base_handlers.GenericApiHandler): + def __init__(self, application, request, **kwargs): + super(UserHandler, self).__init__(application, request, **kwargs) + self.table = 'users' + self.table_cls = User + + def get(self): + if CONF.api_authenticate: + username = self.get_secure_cookie(constants.TESTAPI_ID) + if username: + self._get_one(query={'user': username}) + else: + raises.Unauthorized('Unauthorized') + else: + self.finish_request(User('anonymous', + 'anonymous@linuxfoundation.com', + 'anonymous lf', + constants.TESTAPI_USERS).format()) diff --git a/testapi/opnfv_testapi/models/__init__.py b/testapi/opnfv_testapi/models/__init__.py new file mode 100644 index 0000000..05c0c93 --- /dev/null +++ b/testapi/opnfv_testapi/models/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/testapi/opnfv_testapi/models/base_models.py b/testapi/opnfv_testapi/models/base_models.py new file mode 100644 index 0000000..27396d1 --- /dev/null +++ b/testapi/opnfv_testapi/models/base_models.py @@ -0,0 +1,138 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# feng.xiaowei@zte.com.cn mv Pod to pod_models.py 5-18-2016 +# feng.xiaowei@zte.com.cn add MetaCreateResponse/MetaGetResponse 5-18-2016 +# feng.xiaowei@zte.com.cn mv TestProject to project_models.py 5-19-2016 +# feng.xiaowei@zte.com.cn delete meta class 5-19-2016 +# feng.xiaowei@zte.com.cn add CreateResponse 5-19-2016 +# feng.xiaowei@zte.com.cn mv TestCase to testcase_models.py 5-20-2016 +# feng.xiaowei@zte.com.cn mv TestResut to result_models.py 5-23-2016 +# feng.xiaowei@zte.com.cn add ModelBase 12-20-2016 +############################################################################## +import ast +import copy + +from opnfv_testapi.tornado_swagger import swagger + + +class ModelBase(object): + + def format(self): + return self._format(['_id']) + + def format_http(self): + return self._format([]) + + @classmethod + def from_dict(cls, a_dict): + if a_dict is None: + return None + + attr_parser = cls.attr_parser() + t = cls() + for k, v in a_dict.iteritems(): + value = v + if isinstance(v, dict) and k in attr_parser: + value = attr_parser[k].from_dict(v) + elif isinstance(v, list) and k in attr_parser: + value = [] + for item in v: + value.append(attr_parser[k].from_dict(item)) + + t.__setattr__(k, value) + + return t + + @classmethod + def from_dict_with_raise(cls, a_dict): + if a_dict is None: + return None + + attr_parser = cls.attr_parser() + t = cls() + for k, v in a_dict.iteritems(): + if k not in t.__dict__: + raise AttributeError( + '{} has no attribute {}'.format(cls.__name__, k)) + value = v + if isinstance(v, dict) and k in attr_parser: + value = attr_parser[k].from_dict_with_raise(v) + elif isinstance(v, list) and k in attr_parser: + value = [] + for item in v: + value.append(attr_parser[k].from_dict_with_raise(item)) + + t.__setattr__(k, value) + + return t + + @staticmethod + def attr_parser(): + return {} + + def _format(self, excludes): + new_obj = copy.deepcopy(self) + dicts = new_obj.__dict__ + for k in dicts.keys(): + if k in excludes: + del dicts[k] + elif dicts[k]: + dicts[k] = self._obj_format(dicts[k]) + return dicts + + def _obj_format(self, obj): + if self._has_format(obj): + obj = obj.format() + elif isinstance(obj, unicode): + try: + obj = self._obj_format(ast.literal_eval(obj)) + except Exception: + try: + obj = str(obj) + except Exception: + obj = obj + elif isinstance(obj, list): + hs = list() + for h in obj: + hs.append(self._obj_format(h)) + obj = hs + elif not isinstance(obj, (str, int, float, dict)): + obj = str(obj) + return obj + + @staticmethod + def _has_format(obj): + return not isinstance(obj, (str, unicode)) and hasattr(obj, 'format') + + +@swagger.model() +class CreateResponse(ModelBase): + def __init__(self, href=''): + self.href = href + + +@swagger.model() +class Versions(ModelBase): + """ + @property versions: + @ptype versions: C{list} of L{Version} + """ + + def __init__(self): + self.versions = list() + + @staticmethod + def attr_parser(): + return {'versions': Version} + + +@swagger.model() +class Version(ModelBase): + def __init__(self, version=None, description=None): + self.version = version + self.description = description diff --git a/testapi/opnfv_testapi/models/pod_models.py b/testapi/opnfv_testapi/models/pod_models.py new file mode 100644 index 0000000..15c2833 --- /dev/null +++ b/testapi/opnfv_testapi/models/pod_models.py @@ -0,0 +1,53 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from opnfv_testapi.models import base_models +from opnfv_testapi.tornado_swagger import swagger + + +# name: name of the POD e.g. zte-1 +# mode: metal or virtual +# details: any detail +# role: ci-pod or community-pod or single-node + + +@swagger.model() +class PodCreateRequest(base_models.ModelBase): + def __init__(self, name, mode='', details='', role=""): + self.name = name + self.mode = mode + self.details = details + self.role = role + + +@swagger.model() +class Pod(base_models.ModelBase): + def __init__(self, + name='', mode='', details='', + role="", _id='', create_date='', owner=''): + self.name = name + self.mode = mode + self.details = details + self.role = role + self._id = _id + self.creation_date = create_date + self.owner = owner + + +@swagger.model() +class Pods(base_models.ModelBase): + """ + @property pods: + @ptype pods: C{list} of L{Pod} + """ + def __init__(self): + self.pods = list() + + @staticmethod + def attr_parser(): + return {'pods': Pod} diff --git a/testapi/opnfv_testapi/models/project_models.py b/testapi/opnfv_testapi/models/project_models.py new file mode 100644 index 0000000..5f280f1 --- /dev/null +++ b/testapi/opnfv_testapi/models/project_models.py @@ -0,0 +1,48 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from opnfv_testapi.models import base_models +from opnfv_testapi.tornado_swagger import swagger + + +@swagger.model() +class ProjectCreateRequest(base_models.ModelBase): + def __init__(self, name, description=''): + self.name = name + self.description = description + + +@swagger.model() +class ProjectUpdateRequest(base_models.ModelBase): + def __init__(self, name='', description=''): + self.name = name + self.description = description + + +@swagger.model() +class Project(base_models.ModelBase): + def __init__(self, + name=None, _id=None, description=None, create_date=None): + self._id = _id + self.name = name + self.description = description + self.creation_date = create_date + + +@swagger.model() +class Projects(base_models.ModelBase): + """ + @property projects: + @ptype projects: C{list} of L{Project} + """ + def __init__(self): + self.projects = list() + + @staticmethod + def attr_parser(): + return {'projects': Project} diff --git a/testapi/opnfv_testapi/models/result_models.py b/testapi/opnfv_testapi/models/result_models.py new file mode 100644 index 0000000..97fda08 --- /dev/null +++ b/testapi/opnfv_testapi/models/result_models.py @@ -0,0 +1,129 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from opnfv_testapi.models import base_models +from opnfv_testapi.tornado_swagger import swagger + + +@swagger.model() +class TIHistory(base_models.ModelBase): + """ + @ptype step: L{float} + """ + def __init__(self, date=None, step=0): + self.date = date + self.step = step + + +@swagger.model() +class TI(base_models.ModelBase): + """ + @property histories: trust_indicator update histories + @ptype histories: C{list} of L{TIHistory} + @ptype current: L{float} + """ + def __init__(self, current=0): + self.current = current + self.histories = list() + + @staticmethod + def attr_parser(): + return {'histories': TIHistory} + + +@swagger.model() +class ResultCreateRequest(base_models.ModelBase): + """ + @property trust_indicator: + @ptype trust_indicator: L{TI} + """ + def __init__(self, + pod_name=None, + project_name=None, + case_name=None, + installer=None, + version=None, + start_date=None, + stop_date=None, + details=None, + build_tag=None, + scenario=None, + criteria=None, + user=None, + public="true", + trust_indicator=None): + self.pod_name = pod_name + self.project_name = project_name + self.case_name = case_name + self.installer = installer + self.version = version + self.start_date = start_date + self.stop_date = stop_date + self.details = details + self.build_tag = build_tag + self.scenario = scenario + self.criteria = criteria + self.user = user + self.public = public + self.trust_indicator = trust_indicator if trust_indicator else TI(0) + + +@swagger.model() +class ResultUpdateRequest(base_models.ModelBase): + """ + @property trust_indicator: + @ptype trust_indicator: L{TI} + """ + def __init__(self, trust_indicator=None): + self.trust_indicator = trust_indicator + + +@swagger.model() +class TestResult(base_models.ModelBase): + """ + @property trust_indicator: used for long duration test case + @ptype trust_indicator: L{TI} + """ + def __init__(self, _id=None, case_name=None, project_name=None, + pod_name=None, installer=None, version=None, + start_date=None, stop_date=None, details=None, + build_tag=None, scenario=None, criteria=None, + user=None, public="true", trust_indicator=None): + self._id = _id + self.case_name = case_name + self.project_name = project_name + self.pod_name = pod_name + self.installer = installer + self.version = version + self.start_date = start_date + self.stop_date = stop_date + self.details = details + self.build_tag = build_tag + self.scenario = scenario + self.criteria = criteria + self.user = user + self.public = public + self.trust_indicator = trust_indicator + + @staticmethod + def attr_parser(): + return {'trust_indicator': TI} + + +@swagger.model() +class TestResults(base_models.ModelBase): + """ + @property results: + @ptype results: C{list} of L{TestResult} + """ + def __init__(self): + self.results = list() + + @staticmethod + def attr_parser(): + return {'results': TestResult} diff --git a/testapi/opnfv_testapi/models/scenario_models.py b/testapi/opnfv_testapi/models/scenario_models.py new file mode 100644 index 0000000..0610c6b --- /dev/null +++ b/testapi/opnfv_testapi/models/scenario_models.py @@ -0,0 +1,218 @@ +from opnfv_testapi.models import base_models +from opnfv_testapi.tornado_swagger import swagger + + +def list_default(value): + return value if value else list() + + +def dict_default(value): + return value if value else dict() + + +@swagger.model() +class ScenarioTI(base_models.ModelBase): + def __init__(self, date=None, status='silver'): + self.date = date + self.status = status + + def __eq__(self, other): + return (self.date == other.date and + self.status == other.status) + + def __ne__(self, other): + return not self.__eq__(other) + + +@swagger.model() +class ScenarioScore(base_models.ModelBase): + def __init__(self, date=None, score='0'): + self.date = date + self.score = score + + def __eq__(self, other): + return (self.date == other.date and + self.score == other.score) + + def __ne__(self, other): + return not self.__eq__(other) + + +@swagger.model() +class ScenarioProject(base_models.ModelBase): + """ + @property customs: + @ptype customs: C{list} of L{string} + @property scores: + @ptype scores: C{list} of L{ScenarioScore} + @property trust_indicators: + @ptype trust_indicators: C{list} of L{ScenarioTI} + """ + def __init__(self, + project='', + customs=None, + scores=None, + trust_indicators=None): + self.project = project + self.customs = list_default(customs) + self.scores = list_default(scores) + self.trust_indicators = list_default(trust_indicators) + + @staticmethod + def attr_parser(): + return {'scores': ScenarioScore, + 'trust_indicators': ScenarioTI} + + def __eq__(self, other): + return (self.project == other.project and + self._customs_eq(other) and + self._scores_eq(other) and + self._ti_eq(other)) + + def __ne__(self, other): + return not self.__eq__(other) + + def _customs_eq(self, other): + return set(self.customs) == set(other.customs) + + def _scores_eq(self, other): + return self.scores == other.scores + + def _ti_eq(self, other): + return self.trust_indicators == other.trust_indicators + + +@swagger.model() +class ScenarioVersion(base_models.ModelBase): + """ + @property projects: + @ptype projects: C{list} of L{ScenarioProject} + """ + def __init__(self, owner=None, version=None, projects=None): + self.owner = owner + self.version = version + self.projects = list_default(projects) + + @staticmethod + def attr_parser(): + return {'projects': ScenarioProject} + + def __eq__(self, other): + return (self.version == other.version and + self.owner == other.owner and + self._projects_eq(other)) + + def __ne__(self, other): + return not self.__eq__(other) + + def _projects_eq(self, other): + for s_project in self.projects: + for o_project in other.projects: + if s_project.project == o_project.project: + if s_project != o_project: + return False + + return True + + +@swagger.model() +class ScenarioInstaller(base_models.ModelBase): + """ + @property versions: + @ptype versions: C{list} of L{ScenarioVersion} + """ + def __init__(self, installer=None, versions=None): + self.installer = installer + self.versions = list_default(versions) + + @staticmethod + def attr_parser(): + return {'versions': ScenarioVersion} + + def __eq__(self, other): + return (self.installer == other.installer and self._versions_eq(other)) + + def __ne__(self, other): + return not self.__eq__(other) + + def _versions_eq(self, other): + for s_version in self.versions: + for o_version in other.versions: + if s_version.version == o_version.version: + if s_version != o_version: + return False + + return True + + +@swagger.model() +class ScenarioCreateRequest(base_models.ModelBase): + """ + @property installers: + @ptype installers: C{list} of L{ScenarioInstaller} + """ + def __init__(self, name='', installers=None): + self.name = name + self.installers = list_default(installers) + + @staticmethod + def attr_parser(): + return {'installers': ScenarioInstaller} + + +@swagger.model() +class ScenarioChangeOwnerRequest(base_models.ModelBase): + def __init__(self, owner=None): + self.owner = owner + + +@swagger.model() +class ScenarioUpdateRequest(base_models.ModelBase): + def __init__(self, name=None): + self.name = name + + +@swagger.model() +class Scenario(base_models.ModelBase): + """ + @property installers: + @ptype installers: C{list} of L{ScenarioInstaller} + """ + def __init__(self, name='', create_date='', _id='', installers=None): + self.name = name + self._id = _id + self.creation_date = create_date + self.installers = list_default(installers) + + @staticmethod + def attr_parser(): + return {'installers': ScenarioInstaller} + + def __ne__(self, other): + return not self.__eq__(other) + + def __eq__(self, other): + return (self.name == other.name and self._installers_eq(other)) + + def _installers_eq(self, other): + for s_install in self.installers: + for o_install in other.installers: + if s_install.installer == o_install.installer: + if s_install != o_install: + return False + + return True + + +@swagger.model() +class Scenarios(base_models.ModelBase): + """ + @property scenarios: + @ptype scenarios: C{list} of L{Scenario} + """ + def __init__(self): + self.scenarios = list() + + @staticmethod + def attr_parser(): + return {'scenarios': Scenario} diff --git a/testapi/opnfv_testapi/models/testcase_models.py b/testapi/opnfv_testapi/models/testcase_models.py new file mode 100644 index 0000000..d1b8877 --- /dev/null +++ b/testapi/opnfv_testapi/models/testcase_models.py @@ -0,0 +1,95 @@ +############################################################################## +# Copyright (c) 2015 Orange +# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from opnfv_testapi.models import base_models +from opnfv_testapi.tornado_swagger import swagger + + +@swagger.model() +class TestcaseCreateRequest(base_models.ModelBase): + def __init__(self, name, url=None, description=None, + catalog_description=None, tier=None, ci_loop=None, + criteria=None, blocking=None, dependencies=None, run=None, + domains=None, tags=None, version=None): + self.name = name + self.url = url + self.description = description + self.catalog_description = catalog_description + self.tier = tier + self.ci_loop = ci_loop + self.criteria = criteria + self.blocking = blocking + self.dependencies = dependencies + self.run = run + self.domains = domains + self.tags = tags + self.version = version + self.trust = "Silver" + + +@swagger.model() +class TestcaseUpdateRequest(base_models.ModelBase): + def __init__(self, name=None, description=None, project_name=None, + catalog_description=None, tier=None, ci_loop=None, + criteria=None, blocking=None, dependencies=None, run=None, + domains=None, tags=None, version=None, trust=None): + self.name = name + self.description = description + self.catalog_description = catalog_description + self.project_name = project_name + self.tier = tier + self.ci_loop = ci_loop + self.criteria = criteria + self.blocking = blocking + self.dependencies = dependencies + self.run = run + self.domains = domains + self.tags = tags + self.version = version + self.trust = trust + + +@swagger.model() +class Testcase(base_models.ModelBase): + def __init__(self, _id=None, name=None, project_name=None, + description=None, url=None, creation_date=None, + catalog_description=None, tier=None, ci_loop=None, + criteria=None, blocking=None, dependencies=None, run=None, + domains=None, tags=None, version=None, + trust=None): + self._id = None + self.name = None + self.project_name = None + self.description = None + self.catalog_description = None + self.url = None + self.creation_date = None + self.tier = None + self.ci_loop = None + self.criteria = None + self.blocking = None + self.dependencies = None + self.run = None + self.domains = None + self.tags = None + self.version = None + self.trust = None + + +@swagger.model() +class Testcases(base_models.ModelBase): + """ + @property testcases: + @ptype testcases: C{list} of L{Testcase} + """ + def __init__(self): + self.testcases = list() + + @staticmethod + def attr_parser(): + return {'testcases': Testcase} diff --git a/testapi/opnfv_testapi/models/user_models.py b/testapi/opnfv_testapi/models/user_models.py new file mode 100644 index 0000000..90fbadc --- /dev/null +++ b/testapi/opnfv_testapi/models/user_models.py @@ -0,0 +1,9 @@ +from opnfv_testapi.models import base_models + + +class User(base_models.ModelBase): + def __init__(self, user=None, email=None, fullname=None, groups=None): + self.user = user + self.email = email + self.fullname = fullname + self.groups = groups diff --git a/testapi/opnfv_testapi/resources/__init__.py b/testapi/opnfv_testapi/resources/__init__.py deleted file mode 100644 index 05c0c93..0000000 --- a/testapi/opnfv_testapi/resources/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## diff --git a/testapi/opnfv_testapi/resources/handlers.py b/testapi/opnfv_testapi/resources/handlers.py deleted file mode 100644 index 6c7a819..0000000 --- a/testapi/opnfv_testapi/resources/handlers.py +++ /dev/null @@ -1,257 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -# feng.xiaowei@zte.com.cn refactor db.pod to db.pods 5-19-2016 -# feng.xiaowei@zte.com.cn refactor test_project to project 5-19-2016 -# feng.xiaowei@zte.com.cn refactor response body 5-19-2016 -# feng.xiaowei@zte.com.cn refactor pod/project response info 5-19-2016 -# feng.xiaowei@zte.com.cn refactor testcase related handler 5-20-2016 -# feng.xiaowei@zte.com.cn refactor result related handler 5-23-2016 -# feng.xiaowei@zte.com.cn refactor dashboard related handler 5-24-2016 -# feng.xiaowei@zte.com.cn add methods to GenericApiHandler 5-26-2016 -# feng.xiaowei@zte.com.cn remove PodHandler 5-26-2016 -# feng.xiaowei@zte.com.cn remove ProjectHandler 5-26-2016 -# feng.xiaowei@zte.com.cn remove TestcaseHandler 5-27-2016 -# feng.xiaowei@zte.com.cn remove ResultHandler 5-29-2016 -# feng.xiaowei@zte.com.cn remove DashboardHandler 5-30-2016 -############################################################################## - -import json -from datetime import datetime - -from tornado import gen -from tornado import web - -from opnfv_testapi.common import check -from opnfv_testapi.common import message -from opnfv_testapi.common import raises -from opnfv_testapi.db import api as dbapi -from opnfv_testapi.resources import models -from opnfv_testapi.tornado_swagger import swagger - -DEFAULT_REPRESENTATION = "application/json" - - -class GenericApiHandler(web.RequestHandler): - def __init__(self, application, request, **kwargs): - super(GenericApiHandler, self).__init__(application, request, **kwargs) - self.json_args = None - self.table = None - self.table_cls = None - self.db_projects = 'projects' - self.db_pods = 'pods' - self.db_testcases = 'testcases' - self.db_results = 'results' - self.db_scenarios = 'scenarios' - self.auth = self.settings["auth"] - - def prepare(self): - if self.request.body: - if self.request.headers.get("Content-Type") is not None: - if self.request.headers["Content-Type"].startswith( - DEFAULT_REPRESENTATION): - try: - self.json_args = json.loads(self.request.body) - except (ValueError, KeyError, TypeError) as error: - raises.BadRequest(message.bad_format(str(error))) - - def finish_request(self, json_object=None): - if json_object: - self.write(json.dumps(json_object)) - self.set_header("Content-Type", DEFAULT_REPRESENTATION) - self.finish() - - def _create_response(self, resource): - href = self.request.full_url() + '/' + str(resource) - return models.CreateResponse(href=href).format() - - def format_data(self, data): - cls_data = self.table_cls.from_dict(data) - return cls_data.format_http() - - @web.asynchronous - @gen.coroutine - @check.is_authorized - @check.valid_token - @check.no_body - @check.miss_fields - @check.values_check - @check.carriers_exist - @check.new_not_exists - def _create(self, **kwargs): - """ - :param miss_checks: [miss1, miss2] - :param db_checks: [(table, exist, query, error)] - """ - data = self.table_cls.from_dict(self.json_args) - for k, v in kwargs.iteritems(): - if k != 'query': - data.__setattr__(k, v) - - if self.table != 'results': - data.creation_date = datetime.now() - _id = yield dbapi.db_save(self.table, data.format()) - if 'name' in self.json_args: - resource = data.name - else: - resource = _id - self.finish_request(self._create_response(resource)) - - @web.asynchronous - @gen.coroutine - def _list(self, query=None, res_op=None, *args, **kwargs): - sort = kwargs.get('sort') - page = kwargs.get('page', 0) - last = kwargs.get('last', 0) - per_page = kwargs.get('per_page', 0) - if query is None: - query = {} - pipelines = list() - pipelines.append({'$match': query}) - - total_pages = 0 - data = list() - cursor = dbapi.db_list(self.table, query) - records_count = yield cursor.count() - if records_count > 0: - if page > 0: - total_pages, return_nr = self._calc_total_pages(records_count, - last, - page, - per_page) - pipelines = self._set_pipelines(pipelines, - sort, - return_nr, - page, - per_page) - cursor = dbapi.db_aggregate(self.table, pipelines) - while (yield cursor.fetch_next): - data.append(self.format_data(cursor.next_object())) - if res_op is None: - res = {self.table: data} - else: - res = res_op(data, *args) - if page > 0: - res.update({ - 'pagination': { - 'current_page': kwargs.get('page'), - 'total_pages': total_pages - } - }) - self.finish_request(res) - - @staticmethod - def _calc_total_pages(records_count, last, page, per_page): - records_nr = records_count - if (records_count > last) and (last > 0): - records_nr = last - - total_pages, remainder = divmod(records_nr, per_page) - if remainder > 0: - total_pages += 1 - if page > 1 and page > total_pages: - raises.BadRequest( - 'Request page > total_pages [{}]'.format(total_pages)) - return total_pages, records_nr - - @staticmethod - def _set_pipelines(pipelines, sort, return_nr, page, per_page): - if sort: - pipelines.append({'$sort': sort}) - - over = (page - 1) * per_page - left = return_nr - over - pipelines.append({'$skip': over}) - pipelines.append({'$limit': per_page if per_page < left else left}) - - return pipelines - - @web.asynchronous - @gen.coroutine - @check.not_exist - def _get_one(self, data, query=None): - self.finish_request(self.format_data(data)) - - @web.asynchronous - @gen.coroutine - @check.not_exist - def _delete(self, data, query=None): - yield dbapi.db_delete(self.table, query) - self.finish_request() - - @web.asynchronous - @gen.coroutine - @check.no_body - @check.not_exist - @check.updated_one_not_exist - def _update(self, data, query=None, **kwargs): - data = self.table_cls.from_dict(data) - update_req = self._update_requests(data) - yield dbapi.db_update(self.table, query, update_req) - update_req['_id'] = str(data._id) - self.finish_request(update_req) - - @web.asynchronous - @gen.coroutine - @check.no_body - @check.not_exist - @check.updated_one_not_exist - def pure_update(self, data, query=None, **kwargs): - data = self.table_cls.from_dict(data) - update_req = self._update_requests(data) - yield dbapi.db_update(self.table, query, update_req) - self.finish_request() - - def _update_requests(self, data): - request = dict() - for k, v in self.json_args.iteritems(): - request = self._update_request(request, k, v, - data.__getattribute__(k)) - if not request: - raises.Forbidden(message.no_update()) - - edit_request = data.format() - edit_request.update(request) - return edit_request - - @staticmethod - def _update_request(edit_request, key, new_value, old_value): - """ - This function serves to prepare the elements in the update request. - We try to avoid replace the exact values in the db - edit_request should be a dict in which we add an entry (key) after - comparing values - """ - if not (new_value is None): - if new_value != old_value: - edit_request[key] = new_value - - return edit_request - - def _update_query(self, keys, data): - query = dict() - equal = True - for key in keys: - new = self.json_args.get(key) - old = data.get(key) - if new is None: - new = old - elif new != old: - equal = False - query[key] = new - return query if not equal else dict() - - -class VersionHandler(GenericApiHandler): - @swagger.operation(nickname='listAllVersions') - def get(self): - """ - @description: list all supported versions - @rtype: L{Versions} - """ - versions = [{'version': 'v1.0', 'description': 'basics'}] - self.finish_request({'versions': versions}) diff --git a/testapi/opnfv_testapi/resources/models.py b/testapi/opnfv_testapi/resources/models.py deleted file mode 100644 index 27396d1..0000000 --- a/testapi/opnfv_testapi/resources/models.py +++ /dev/null @@ -1,138 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -# feng.xiaowei@zte.com.cn mv Pod to pod_models.py 5-18-2016 -# feng.xiaowei@zte.com.cn add MetaCreateResponse/MetaGetResponse 5-18-2016 -# feng.xiaowei@zte.com.cn mv TestProject to project_models.py 5-19-2016 -# feng.xiaowei@zte.com.cn delete meta class 5-19-2016 -# feng.xiaowei@zte.com.cn add CreateResponse 5-19-2016 -# feng.xiaowei@zte.com.cn mv TestCase to testcase_models.py 5-20-2016 -# feng.xiaowei@zte.com.cn mv TestResut to result_models.py 5-23-2016 -# feng.xiaowei@zte.com.cn add ModelBase 12-20-2016 -############################################################################## -import ast -import copy - -from opnfv_testapi.tornado_swagger import swagger - - -class ModelBase(object): - - def format(self): - return self._format(['_id']) - - def format_http(self): - return self._format([]) - - @classmethod - def from_dict(cls, a_dict): - if a_dict is None: - return None - - attr_parser = cls.attr_parser() - t = cls() - for k, v in a_dict.iteritems(): - value = v - if isinstance(v, dict) and k in attr_parser: - value = attr_parser[k].from_dict(v) - elif isinstance(v, list) and k in attr_parser: - value = [] - for item in v: - value.append(attr_parser[k].from_dict(item)) - - t.__setattr__(k, value) - - return t - - @classmethod - def from_dict_with_raise(cls, a_dict): - if a_dict is None: - return None - - attr_parser = cls.attr_parser() - t = cls() - for k, v in a_dict.iteritems(): - if k not in t.__dict__: - raise AttributeError( - '{} has no attribute {}'.format(cls.__name__, k)) - value = v - if isinstance(v, dict) and k in attr_parser: - value = attr_parser[k].from_dict_with_raise(v) - elif isinstance(v, list) and k in attr_parser: - value = [] - for item in v: - value.append(attr_parser[k].from_dict_with_raise(item)) - - t.__setattr__(k, value) - - return t - - @staticmethod - def attr_parser(): - return {} - - def _format(self, excludes): - new_obj = copy.deepcopy(self) - dicts = new_obj.__dict__ - for k in dicts.keys(): - if k in excludes: - del dicts[k] - elif dicts[k]: - dicts[k] = self._obj_format(dicts[k]) - return dicts - - def _obj_format(self, obj): - if self._has_format(obj): - obj = obj.format() - elif isinstance(obj, unicode): - try: - obj = self._obj_format(ast.literal_eval(obj)) - except Exception: - try: - obj = str(obj) - except Exception: - obj = obj - elif isinstance(obj, list): - hs = list() - for h in obj: - hs.append(self._obj_format(h)) - obj = hs - elif not isinstance(obj, (str, int, float, dict)): - obj = str(obj) - return obj - - @staticmethod - def _has_format(obj): - return not isinstance(obj, (str, unicode)) and hasattr(obj, 'format') - - -@swagger.model() -class CreateResponse(ModelBase): - def __init__(self, href=''): - self.href = href - - -@swagger.model() -class Versions(ModelBase): - """ - @property versions: - @ptype versions: C{list} of L{Version} - """ - - def __init__(self): - self.versions = list() - - @staticmethod - def attr_parser(): - return {'versions': Version} - - -@swagger.model() -class Version(ModelBase): - def __init__(self, version=None, description=None): - self.version = version - self.description = description diff --git a/testapi/opnfv_testapi/resources/pod_handlers.py b/testapi/opnfv_testapi/resources/pod_handlers.py deleted file mode 100644 index 5029887..0000000 --- a/testapi/opnfv_testapi/resources/pod_handlers.py +++ /dev/null @@ -1,78 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import handlers -from opnfv_testapi.resources import pod_models -from opnfv_testapi.tornado_swagger import swagger - - -class GenericPodHandler(handlers.GenericApiHandler): - def __init__(self, application, request, **kwargs): - super(GenericPodHandler, self).__init__(application, request, **kwargs) - self.table = 'pods' - self.table_cls = pod_models.Pod - - -class PodCLHandler(GenericPodHandler): - @swagger.operation(nickname='listAllPods') - def get(self): - """ - @description: list all pods - @return 200: list all pods, empty list is no pod exist - @rtype: L{Pods} - """ - self._list() - - @swagger.operation(nickname='createPod') - def post(self): - """ - @description: create a pod - @param body: pod to be created - @type body: L{PodCreateRequest} - @in body: body - @rtype: L{CreateResponse} - @return 200: pod is created. - @raise 403: pod already exists - @raise 400: body or name not provided - """ - def query(): - return {'name': self.json_args.get('name')} - miss_fields = ['name'] - self._create(miss_fields=miss_fields, query=query) - - -class PodGURHandler(GenericPodHandler): - @swagger.operation(nickname='getPodByName') - def get(self, pod_name): - """ - @description: get a single pod by pod_name - @rtype: L{Pod} - @return 200: pod exist - @raise 404: pod not exist - """ - self._get_one(query={'name': pod_name}) - - def delete(self, pod_name): - """ Remove a POD - - # check for an existing pod to be deleted - mongo_dict = yield self.db.pods.find_one( - {'name': pod_name}) - pod = TestProject.pod(mongo_dict) - if pod is None: - raise HTTPError(HTTP_NOT_FOUND, - "{} could not be found as a pod to be deleted" - .format(pod_name)) - - # just delete it, or maybe save it elsewhere in a future - res = yield self.db.projects.remove( - {'name': pod_name}) - - self.finish_request(answer) - """ - pass diff --git a/testapi/opnfv_testapi/resources/pod_models.py b/testapi/opnfv_testapi/resources/pod_models.py deleted file mode 100644 index 415d3d6..0000000 --- a/testapi/opnfv_testapi/resources/pod_models.py +++ /dev/null @@ -1,53 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -from opnfv_testapi.resources import models -from opnfv_testapi.tornado_swagger import swagger - - -# name: name of the POD e.g. zte-1 -# mode: metal or virtual -# details: any detail -# role: ci-pod or community-pod or single-node - - -@swagger.model() -class PodCreateRequest(models.ModelBase): - def __init__(self, name, mode='', details='', role=""): - self.name = name - self.mode = mode - self.details = details - self.role = role - - -@swagger.model() -class Pod(models.ModelBase): - def __init__(self, - name='', mode='', details='', - role="", _id='', create_date='', owner=''): - self.name = name - self.mode = mode - self.details = details - self.role = role - self._id = _id - self.creation_date = create_date - self.owner = owner - - -@swagger.model() -class Pods(models.ModelBase): - """ - @property pods: - @ptype pods: C{list} of L{Pod} - """ - def __init__(self): - self.pods = list() - - @staticmethod - def attr_parser(): - return {'pods': Pod} diff --git a/testapi/opnfv_testapi/resources/project_handlers.py b/testapi/opnfv_testapi/resources/project_handlers.py deleted file mode 100644 index be29507..0000000 --- a/testapi/opnfv_testapi/resources/project_handlers.py +++ /dev/null @@ -1,86 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - -from opnfv_testapi.resources import handlers -from opnfv_testapi.resources import project_models -from opnfv_testapi.tornado_swagger import swagger - - -class GenericProjectHandler(handlers.GenericApiHandler): - def __init__(self, application, request, **kwargs): - super(GenericProjectHandler, self).__init__(application, - request, - **kwargs) - self.table = 'projects' - self.table_cls = project_models.Project - - -class ProjectCLHandler(GenericProjectHandler): - @swagger.operation(nickname="listAllProjects") - def get(self): - """ - @description: list all projects - @return 200: return all projects, empty list is no project exist - @rtype: L{Projects} - """ - self._list() - - @swagger.operation(nickname="createProject") - def post(self): - """ - @description: create a project - @param body: project to be created - @type body: L{ProjectCreateRequest} - @in body: body - @rtype: L{CreateResponse} - @return 200: project is created. - @raise 403: project already exists - @raise 400: body or name not provided - """ - def query(): - return {'name': self.json_args.get('name')} - miss_fields = ['name'] - self._create(miss_fields=miss_fields, query=query) - - -class ProjectGURHandler(GenericProjectHandler): - @swagger.operation(nickname='getProjectByName') - def get(self, project_name): - """ - @description: get a single project by project_name - @rtype: L{Project} - @return 200: project exist - @raise 404: project not exist - """ - self._get_one(query={'name': project_name}) - - @swagger.operation(nickname="updateProjectByName") - def put(self, project_name): - """ - @description: update a single project by project_name - @param body: project to be updated - @type body: L{ProjectUpdateRequest} - @in body: body - @rtype: L{Project} - @return 200: update success - @raise 404: project not exist - @raise 403: new project name already exist or nothing to update - """ - query = {'name': project_name} - db_keys = ['name'] - self._update(query=query, db_keys=db_keys) - - @swagger.operation(nickname='deleteProjectByName') - def delete(self, project_name): - """ - @description: delete a project by project_name - @return 200: delete success - @raise 404: project not exist - """ - self._delete(query={'name': project_name}) diff --git a/testapi/opnfv_testapi/resources/project_models.py b/testapi/opnfv_testapi/resources/project_models.py deleted file mode 100644 index 3243882..0000000 --- a/testapi/opnfv_testapi/resources/project_models.py +++ /dev/null @@ -1,48 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -from opnfv_testapi.resources import models -from opnfv_testapi.tornado_swagger import swagger - - -@swagger.model() -class ProjectCreateRequest(models.ModelBase): - def __init__(self, name, description=''): - self.name = name - self.description = description - - -@swagger.model() -class ProjectUpdateRequest(models.ModelBase): - def __init__(self, name='', description=''): - self.name = name - self.description = description - - -@swagger.model() -class Project(models.ModelBase): - def __init__(self, - name=None, _id=None, description=None, create_date=None): - self._id = _id - self.name = name - self.description = description - self.creation_date = create_date - - -@swagger.model() -class Projects(models.ModelBase): - """ - @property projects: - @ptype projects: C{list} of L{Project} - """ - def __init__(self): - self.projects = list() - - @staticmethod - def attr_parser(): - return {'projects': Project} diff --git a/testapi/opnfv_testapi/resources/result_handlers.py b/testapi/opnfv_testapi/resources/result_handlers.py deleted file mode 100644 index 4cd533c..0000000 --- a/testapi/opnfv_testapi/resources/result_handlers.py +++ /dev/null @@ -1,288 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import json -import logging - -from bson import objectid -from datetime import datetime -from datetime import timedelta - -from opnfv_testapi.common import constants -from opnfv_testapi.common import message -from opnfv_testapi.common import raises -from opnfv_testapi.common.config import CONF -from opnfv_testapi.resources import handlers -from opnfv_testapi.resources import result_models -from opnfv_testapi.tornado_swagger import swagger - - -class GenericResultHandler(handlers.GenericApiHandler): - def __init__(self, application, request, **kwargs): - super(GenericResultHandler, self).__init__(application, - request, - **kwargs) - self.table = self.db_results - self.table_cls = result_models.TestResult - - def get_int(self, key, value): - try: - value = int(value) - except Exception: - raises.BadRequest(message.must_int(key)) - return value - - def set_query(self): - query = dict() - date_range = dict() - - query['public'] = {'$not': {'$eq': 'false'}} - for k in self.request.query_arguments.keys(): - v = self.get_query_argument(k) - if k == 'project' or k == 'pod' or k == 'case': - query[k + '_name'] = v - elif k == 'period': - v = self.get_int(k, v) - if v > 0: - period = datetime.now() - timedelta(days=v) - obj = {"$gte": str(period)} - query['start_date'] = obj - elif k == 'trust_indicator': - query[k + '.current'] = float(v) - elif k == 'from': - date_range.update({'$gte': str(v)}) - elif k == 'to': - date_range.update({'$lt': str(v)}) - elif k == 'signed': - username = self.get_secure_cookie(constants.TESTAPI_ID) - role = self.get_secure_cookie(constants.ROLE) - if role: - del query['public'] - if role != "reviewer": - query['user'] = username - elif k not in ['last', 'page', 'descend']: - query[k] = v - if date_range: - query['start_date'] = date_range - - # if $lt is not provided, - # empty/None/null/'' start_date will also be returned - if 'start_date' in query and '$lt' not in query['start_date']: - query['start_date'].update({'$lt': str(datetime.now())}) - - return query - - -class ResultsCLHandler(GenericResultHandler): - @swagger.operation(nickname="queryTestResults") - def get(self): - """ - @description: Retrieve result(s) for a test project - on a specific pod. - @notes: Retrieve result(s) for a test project on a specific pod. - Available filters for this request are : - - project : project name - - case : case name - - pod : pod name - - version : platform version (Arno-R1, ...) - - installer : fuel/apex/compass/joid/daisy - - build_tag : Jenkins build tag name - - period : x last days, incompatible with from/to - - from : starting time in 2016-01-01 or 2016-01-01 00:01:23 - - to : ending time in 2016-01-01 or 2016-01-01 00:01:23 - - scenario : the test scenario (previously version) - - criteria : the global criteria status passed or failed - - trust_indicator : evaluate the stability of the test case - to avoid running systematically long and stable test case - - signed : get logined user result - - GET /results/project=functest&case=vPing&version=Arno-R1 \ - &pod=pod_name&period=15&signed - @return 200: all test results consist with query, - empty list if no result is found - @rtype: L{TestResults} - @param pod: pod name - @type pod: L{string} - @in pod: query - @required pod: False - @param project: project name - @type project: L{string} - @in project: query - @required project: False - @param case: case name - @type case: L{string} - @in case: query - @required case: False - @param version: i.e. Colorado - @type version: L{string} - @in version: query - @required version: False - @param installer: fuel/apex/joid/compass - @type installer: L{string} - @in installer: query - @required installer: False - @param build_tag: i.e. v3.0 - @type build_tag: L{string} - @in build_tag: query - @required build_tag: False - @param scenario: i.e. odl - @type scenario: L{string} - @in scenario: query - @required scenario: False - @param criteria: i.e. passed - @type criteria: L{string} - @in criteria: query - @required criteria: False - @param period: last days - @type period: L{string} - @in period: query - @required period: False - @param from: i.e. 2016-01-01 or 2016-01-01 00:01:23 - @type from: L{string} - @in from: query - @required from: False - @param to: i.e. 2016-01-01 or 2016-01-01 00:01:23 - @type to: L{string} - @in to: query - @required to: False - @param last: last records stored until now - @type last: L{string} - @in last: query - @required last: False - @param page: which page to list, default to 1 - @type page: L{int} - @in page: query - @required page: False - @param trust_indicator: must be float - @type trust_indicator: L{float} - @in trust_indicator: query - @required trust_indicator: False - @param signed: user results or all results - @type signed: L{string} - @in signed: query - @required signed: False - @param descend: true, newest2oldest; false, oldest2newest - @type descend: L{string} - @in descend: query - @required descend: False - """ - def descend_limit(): - descend = self.get_query_argument('descend', 'true') - return -1 if descend.lower() == 'true' else 1 - - def last_limit(): - return self.get_int('last', self.get_query_argument('last', 0)) - - def page_limit(): - return self.get_int('page', self.get_query_argument('page', 1)) - - limitations = { - 'sort': {'_id': descend_limit()}, - 'last': last_limit(), - 'page': page_limit(), - 'per_page': CONF.api_results_per_page - } - - self._list(query=self.set_query(), **limitations) - - @swagger.operation(nickname="createTestResult") - def post(self): - """ - @description: create a test result - @param body: result to be created - @type body: L{ResultCreateRequest} - @in body: body - @rtype: L{CreateResponse} - @return 200: result is created. - @raise 404: pod/project/testcase not exist - @raise 400: body/pod_name/project_name/case_name not provided - """ - self._post() - - def _post(self): - def pod_query(): - return {'name': self.json_args.get('pod_name')} - - def project_query(): - return {'name': self.json_args.get('project_name')} - - def testcase_query(): - return {'project_name': self.json_args.get('project_name'), - 'name': self.json_args.get('case_name')} - - def options_check(field, options): - return self.json_args.get(field).upper() in options - - miss_fields = ['pod_name', 'project_name', 'case_name'] - carriers = [('pods', pod_query), - ('projects', project_query), - ('testcases', testcase_query)] - values_check = [('criteria', options_check, ['PASS', 'FAIL'])] - - self._create(miss_fields=miss_fields, - carriers=carriers, - values_check=values_check) - - -class ResultsUploadHandler(ResultsCLHandler): - @swagger.operation(nickname="uploadTestResult") - def post(self): - """ - @description: upload and create a test result - @param body: result to be created - @type body: L{ResultCreateRequest} - @in body: body - @rtype: L{CreateResponse} - @return 200: result is created. - @raise 404: pod/project/testcase not exist - @raise 400: body/pod_name/project_name/case_name not provided - """ - logging.info('file upload') - fileinfo = self.request.files['file'][0] - is_public = self.get_body_argument('public') - logging.warning('public:%s', is_public) - logging.info('results is :%s', fileinfo['filename']) - logging.info('results is :%s', fileinfo['body']) - self.json_args = json.loads(fileinfo['body']).copy() - self.json_args['public'] = is_public - - openid = self.get_secure_cookie(constants.TESTAPI_ID) - if openid: - self.json_args['user'] = openid - - super(ResultsUploadHandler, self)._post() - - -class ResultsGURHandler(GenericResultHandler): - @swagger.operation(nickname='getTestResultById') - def get(self, result_id): - """ - @description: get a single result by result_id - @rtype: L{TestResult} - @return 200: test result exist - @raise 404: test result not exist - """ - query = dict() - query["_id"] = objectid.ObjectId(result_id) - self._get_one(query=query) - - @swagger.operation(nickname="updateTestResultById") - def put(self, result_id): - """ - @description: update a single result by _id - @param body: fields to be updated - @type body: L{ResultUpdateRequest} - @in body: body - @rtype: L{Result} - @return 200: update success - @raise 404: result not exist - @raise 403: nothing to update - """ - query = {'_id': objectid.ObjectId(result_id)} - db_keys = [] - self._update(query=query, db_keys=db_keys) diff --git a/testapi/opnfv_testapi/resources/result_models.py b/testapi/opnfv_testapi/resources/result_models.py deleted file mode 100644 index 890bf82..0000000 --- a/testapi/opnfv_testapi/resources/result_models.py +++ /dev/null @@ -1,129 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -from opnfv_testapi.resources import models -from opnfv_testapi.tornado_swagger import swagger - - -@swagger.model() -class TIHistory(models.ModelBase): - """ - @ptype step: L{float} - """ - def __init__(self, date=None, step=0): - self.date = date - self.step = step - - -@swagger.model() -class TI(models.ModelBase): - """ - @property histories: trust_indicator update histories - @ptype histories: C{list} of L{TIHistory} - @ptype current: L{float} - """ - def __init__(self, current=0): - self.current = current - self.histories = list() - - @staticmethod - def attr_parser(): - return {'histories': TIHistory} - - -@swagger.model() -class ResultCreateRequest(models.ModelBase): - """ - @property trust_indicator: - @ptype trust_indicator: L{TI} - """ - def __init__(self, - pod_name=None, - project_name=None, - case_name=None, - installer=None, - version=None, - start_date=None, - stop_date=None, - details=None, - build_tag=None, - scenario=None, - criteria=None, - user=None, - public="true", - trust_indicator=None): - self.pod_name = pod_name - self.project_name = project_name - self.case_name = case_name - self.installer = installer - self.version = version - self.start_date = start_date - self.stop_date = stop_date - self.details = details - self.build_tag = build_tag - self.scenario = scenario - self.criteria = criteria - self.user = user - self.public = public - self.trust_indicator = trust_indicator if trust_indicator else TI(0) - - -@swagger.model() -class ResultUpdateRequest(models.ModelBase): - """ - @property trust_indicator: - @ptype trust_indicator: L{TI} - """ - def __init__(self, trust_indicator=None): - self.trust_indicator = trust_indicator - - -@swagger.model() -class TestResult(models.ModelBase): - """ - @property trust_indicator: used for long duration test case - @ptype trust_indicator: L{TI} - """ - def __init__(self, _id=None, case_name=None, project_name=None, - pod_name=None, installer=None, version=None, - start_date=None, stop_date=None, details=None, - build_tag=None, scenario=None, criteria=None, - user=None, public="true", trust_indicator=None): - self._id = _id - self.case_name = case_name - self.project_name = project_name - self.pod_name = pod_name - self.installer = installer - self.version = version - self.start_date = start_date - self.stop_date = stop_date - self.details = details - self.build_tag = build_tag - self.scenario = scenario - self.criteria = criteria - self.user = user - self.public = public - self.trust_indicator = trust_indicator - - @staticmethod - def attr_parser(): - return {'trust_indicator': TI} - - -@swagger.model() -class TestResults(models.ModelBase): - """ - @property results: - @ptype results: C{list} of L{TestResult} - """ - def __init__(self): - self.results = list() - - @staticmethod - def attr_parser(): - return {'results': TestResult} diff --git a/testapi/opnfv_testapi/resources/scenario_handlers.py b/testapi/opnfv_testapi/resources/scenario_handlers.py deleted file mode 100644 index e9c19a7..0000000 --- a/testapi/opnfv_testapi/resources/scenario_handlers.py +++ /dev/null @@ -1,775 +0,0 @@ -import functools - -from opnfv_testapi.common import message -from opnfv_testapi.common import raises -from opnfv_testapi.resources import handlers -import opnfv_testapi.resources.scenario_models as models -from opnfv_testapi.tornado_swagger import swagger - - -class GenericScenarioHandler(handlers.GenericApiHandler): - def __init__(self, application, request, **kwargs): - super(GenericScenarioHandler, self).__init__(application, - request, - **kwargs) - self.table = self.db_scenarios - self.table_cls = models.Scenario - - def set_query(self, locators): - query = dict() - elem_query = dict() - for k, v in locators.iteritems(): - if k == 'scenario': - query['name'] = v - elif k == 'installer': - elem_query["installer"] = v - elif k == 'version': - elem_query["versions.version"] = v - elif k == 'project': - elem_query["versions.projects.project"] = v - else: - query[k] = v - if elem_query: - query['installers'] = {'$elemMatch': elem_query} - return query - - -class ScenariosCLHandler(GenericScenarioHandler): - @swagger.operation(nickname="queryScenarios") - def get(self): - """ - @description: Retrieve scenario(s). - @notes: Retrieve scenario(s) - Available filters for this request are : - - name : scenario name - - GET /scenarios?name=scenario_1 - @param name: scenario name - @type name: L{string} - @in name: query - @required name: False - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: False - @param version: version - @type version: L{string} - @in version: query - @required version: False - @param project: project name - @type project: L{string} - @in project: query - @required project: False - @return 200: all scenarios satisfy queries, - empty list if no scenario is found - @rtype: L{Scenarios} - """ - - def _set_query(): - query = dict() - elem_query = dict() - for k in self.request.query_arguments.keys(): - v = self.get_query_argument(k) - if k == 'installer': - elem_query["installer"] = v - elif k == 'version': - elem_query["versions.version"] = v - elif k == 'project': - elem_query["versions.projects.project"] = v - else: - query[k] = v - if elem_query: - query['installers'] = {'$elemMatch': elem_query} - return query - - self._list(query=_set_query()) - - @swagger.operation(nickname="createScenario") - def post(self): - """ - @description: create a new scenario by name - @param body: scenario to be created - @type body: L{ScenarioCreateRequest} - @in body: body - @rtype: L{CreateResponse} - @return 200: scenario is created. - @raise 403: scenario already exists - @raise 400: body or name not provided - """ - def query(): - return {'name': self.json_args.get('name')} - miss_fields = ['name'] - self._create(miss_fields=miss_fields, query=query) - - -class ScenarioGURHandler(GenericScenarioHandler): - @swagger.operation(nickname='getScenarioByName') - def get(self, name): - """ - @description: get a single scenario by name - @rtype: L{Scenario} - @return 200: scenario exist - @raise 404: scenario not exist - """ - self._get_one(query={'name': name}) - pass - - @swagger.operation(nickname="updateScenarioName") - def put(self, name): - """ - @description: update scenario, only rename is supported currently - @param body: fields to be updated - @type body: L{ScenarioUpdateRequest} - @in body: body - @rtype: L{Scenario} - @return 200: update success - @raise 404: scenario not exist - @raise 403: nothing to update - """ - query = {'name': name} - db_keys = ['name'] - self._update(query=query, db_keys=db_keys) - - @swagger.operation(nickname="deleteScenarioByName") - def delete(self, name): - """ - @description: delete a scenario by name - @return 200: delete success - @raise 404: scenario not exist: - """ - self._delete(query={'name': name}) - - -class ScenarioUpdater(object): - def __init__(self, data, body=None, - installer=None, version=None, project=None): - self.data = data - self.body = body - self.installer = installer - self.version = version - self.project = project - - def update(self, item, action): - updates = { - ('scores', 'post'): self._update_requests_add_score, - ('trust_indicators', 'post'): self._update_requests_add_ti, - ('customs', 'post'): self._update_requests_add_customs, - ('customs', 'put'): self._update_requests_update_customs, - ('customs', 'delete'): self._update_requests_delete_customs, - ('projects', 'post'): self._update_requests_add_projects, - ('projects', 'put'): self._update_requests_update_projects, - ('projects', 'delete'): self._update_requests_delete_projects, - ('owner', 'put'): self._update_requests_change_owner, - ('versions', 'post'): self._update_requests_add_versions, - ('versions', 'put'): self._update_requests_update_versions, - ('versions', 'delete'): self._update_requests_delete_versions, - ('installers', 'post'): self._update_requests_add_installers, - ('installers', 'put'): self._update_requests_update_installers, - ('installers', 'delete'): self._update_requests_delete_installers, - } - updates[(item, action)](self.data) - - return self.data.format() - - def iter_installers(xstep): - @functools.wraps(xstep) - def magic(self, data): - [xstep(self, installer) - for installer in self._filter_installers(data.installers)] - return magic - - def iter_versions(xstep): - @functools.wraps(xstep) - def magic(self, installer): - [xstep(self, version) - for version in (self._filter_versions(installer.versions))] - return magic - - def iter_projects(xstep): - @functools.wraps(xstep) - def magic(self, version): - [xstep(self, project) - for project in (self._filter_projects(version.projects))] - return magic - - @iter_installers - @iter_versions - @iter_projects - def _update_requests_add_score(self, project): - project.scores.append( - models.ScenarioScore.from_dict(self.body)) - - @iter_installers - @iter_versions - @iter_projects - def _update_requests_add_ti(self, project): - project.trust_indicators.append( - models.ScenarioTI.from_dict(self.body)) - - @iter_installers - @iter_versions - @iter_projects - def _update_requests_add_customs(self, project): - project.customs = list(set(project.customs + self.body)) - - @iter_installers - @iter_versions - @iter_projects - def _update_requests_update_customs(self, project): - project.customs = list(set(self.body)) - - @iter_installers - @iter_versions - @iter_projects - def _update_requests_delete_customs(self, project): - project.customs = filter( - lambda f: f not in self.body, - project.customs) - - @iter_installers - @iter_versions - def _update_requests_add_projects(self, version): - version.projects = self._update_with_body(models.ScenarioProject, - 'project', - version.projects) - - @iter_installers - @iter_versions - def _update_requests_update_projects(self, version): - version.projects = self._update_with_body(models.ScenarioProject, - 'project', - list()) - - @iter_installers - @iter_versions - def _update_requests_delete_projects(self, version): - version.projects = self._remove_projects(version.projects) - - @iter_installers - @iter_versions - def _update_requests_change_owner(self, version): - version.owner = self.body.get('owner') - - @iter_installers - def _update_requests_add_versions(self, installer): - installer.versions = self._update_with_body(models.ScenarioVersion, - 'version', - installer.versions) - - @iter_installers - def _update_requests_update_versions(self, installer): - installer.versions = self._update_with_body(models.ScenarioVersion, - 'version', - list()) - - @iter_installers - def _update_requests_delete_versions(self, installer): - installer.versions = self._remove_versions(installer.versions) - - def _update_requests_add_installers(self, scenario): - scenario.installers = self._update_with_body(models.ScenarioInstaller, - 'installer', - scenario.installers) - - def _update_requests_update_installers(self, scenario): - scenario.installers = self._update_with_body(models.ScenarioInstaller, - 'installer', - list()) - - def _update_requests_delete_installers(self, scenario): - scenario.installers = self._remove_installers(scenario.installers) - - def _update_with_body(self, clazz, field, withs): - exists = list() - malformat = list() - for new in self.body: - try: - format_new = clazz.from_dict_with_raise(new) - new_name = getattr(format_new, field) - if not any(getattr(o, field) == new_name for o in withs): - withs.append(format_new) - else: - exists.append(new_name) - except Exception as error: - malformat.append(error.message) - if malformat: - raises.BadRequest(message.bad_format(malformat)) - elif exists: - raises.Conflict(message.exist('{}s'.format(field), exists)) - return withs - - def _filter_installers(self, installers): - return self._filter('installer', installers) - - def _remove_installers(self, installers): - return self._remove('installer', installers) - - def _filter_versions(self, versions): - return self._filter('version', versions) - - def _remove_versions(self, versions): - return self._remove('version', versions) - - def _filter_projects(self, projects): - return self._filter('project', projects) - - def _remove_projects(self, projects): - return self._remove('project', projects) - - def _filter(self, item, items): - return filter( - lambda f: getattr(f, item) == getattr(self, item), - items) - - def _remove(self, field, fields): - return filter( - lambda f: getattr(f, field) not in self.body, - fields) - - -class GenericScenarioUpdateHandler(GenericScenarioHandler): - def __init__(self, application, request, **kwargs): - super(GenericScenarioUpdateHandler, self).__init__(application, - request, - **kwargs) - self.installer = None - self.version = None - self.project = None - self.item = None - self.action = None - - def do_update(self, item, action, locators): - self.item = item - self.action = action - for k, v in locators.iteritems(): - if not v: - v = self.get_query_argument(k) - setattr(self, k, v) - locators[k] = v - self.pure_update(query=self.set_query(locators=locators)) - - def _update_requests(self, data): - return ScenarioUpdater(data, - self.json_args, - self.installer, - self.version, - self.project).update(self.item, self.action) - - -class ScenarioScoresHandler(GenericScenarioUpdateHandler): - @swagger.operation(nickname="addScoreRecord") - def post(self, scenario): - """ - @description: add a new score record - @notes: add a new score record to a project - POST /api/v1/scenarios//scores? \ - installer=& \ - version=& \ - project= - @param body: score to be added - @type body: L{ScenarioScore} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @param version: version - @type version: L{string} - @in version: query - @required version: True - @param project: project name - @type project: L{string} - @in project: query - @required project: True - @return 200: score is created. - @raise 404: scenario/installer/version/project not existed - """ - self.do_update('scores', - 'post', - locators={'scenario': scenario, - 'installer': None, - 'version': None, - 'project': None}) - - -class ScenarioTIsHandler(GenericScenarioUpdateHandler): - @swagger.operation(nickname="addTrustIndicatorRecord") - def post(self, scenario): - """ - @description: add a new trust indicator record - @notes: add a new trust indicator record to a project - POST /api/v1/scenarios//trust_indicators? \ - installer=& \ - version=& \ - project= - @param body: trust indicator to be added - @type body: L{ScenarioTI} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @param version: version - @type version: L{string} - @in version: query - @required version: True - @param project: project name - @type project: L{string} - @in project: query - @required project: True - @return 200: trust indicator is added. - @raise 404: scenario/installer/version/project not existed - """ - self.do_update('trust_indicators', - 'post', - locators={'scenario': scenario, - 'installer': None, - 'version': None, - 'project': None}) - - -class ScenarioCustomsHandler(GenericScenarioUpdateHandler): - @swagger.operation(nickname="addCustomizedTestCases") - def post(self, scenario): - """ - @description: add customized test cases - @notes: add several test cases to a project - POST /api/v1/scenarios//customs? \ - installer=& \ - version=& \ - project= - @param body: test cases to be added - @type body: C{list} of L{string} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @param version: version - @type version: L{string} - @in version: query - @required version: True - @param project: project name - @type project: L{string} - @in project: query - @required project: True - @return 200: test cases are added. - @raise 404: scenario/installer/version/project not existed - """ - self.do_update('customs', - 'post', - locators={'scenario': scenario, - 'installer': None, - 'version': None, - 'project': None}) - - @swagger.operation(nickname="updateCustomizedTestCases") - def put(self, scenario): - """ - @description: update customized test cases - @notes: substitute all the customized test cases - PUT /api/v1/scenarios//customs? \ - installer=& \ - version=& \ - project= - @param body: new supported test cases - @type body: C{list} of L{string} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @param version: version - @type version: L{string} - @in version: query - @required version: True - @param project: project name - @type project: L{string} - @in project: query - @required project: True - @return 200: substitute test cases success. - @raise 404: scenario/installer/version/project not existed - """ - self.do_update('customs', - 'put', - locators={'scenario': scenario, - 'installer': None, - 'version': None, - 'project': None}) - - @swagger.operation(nickname="deleteCustomizedTestCases") - def delete(self, scenario): - """ - @description: delete one or several customized test cases - @notes: delete one or some customized test cases - DELETE /api/v1/scenarios//customs? \ - installer=& \ - version=& \ - project= - @param body: test case(s) to be deleted - @type body: C{list} of L{string} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @param version: version - @type version: L{string} - @in version: query - @required version: True - @param project: project name - @type project: L{string} - @in project: query - @required project: True - @return 200: delete test case(s) success. - @raise 404: scenario/installer/version/project not existed - """ - self.do_update('customs', - 'delete', - locators={'scenario': scenario, - 'installer': None, - 'version': None, - 'project': None}) - - -class ScenarioProjectsHandler(GenericScenarioUpdateHandler): - @swagger.operation(nickname="addProjectsUnderScenario") - def post(self, scenario): - """ - @description: add projects to scenario - @notes: add one or multiple projects - POST /api/v1/scenarios//projects? \ - installer=& \ - version= - @param body: projects to be added - @type body: C{list} of L{ScenarioProject} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @param version: version - @type version: L{string} - @in version: query - @required version: True - @return 200: projects are added. - @raise 400: bad schema - @raise 409: conflict, project already exists - @raise 404: scenario/installer/version not existed - """ - self.do_update('projects', - 'post', - locators={'scenario': scenario, - 'installer': None, - 'version': None}) - - @swagger.operation(nickname="updateScenarioProjects") - def put(self, scenario): - """ - @description: replace all projects - @notes: substitute all projects, delete existed ones with new provides - PUT /api/v1/scenarios//projects? \ - installer=& \ - version= - @param body: new projects - @type body: C{list} of L{ScenarioProject} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @param version: version - @type version: L{string} - @in version: query - @required version: True - @return 200: replace projects success. - @raise 400: bad schema - @raise 404: scenario/installer/version not existed - """ - self.do_update('projects', - 'put', - locators={'scenario': scenario, - 'installer': None, - 'version': None}) - - @swagger.operation(nickname="deleteProjectsUnderScenario") - def delete(self, scenario): - """ - @description: delete one or multiple projects - @notes: delete one or multiple projects - DELETE /api/v1/scenarios//projects? \ - installer=& \ - version= - @param body: projects(names) to be deleted - @type body: C{list} of L{string} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @param version: version - @type version: L{string} - @in version: query - @required version: True - @return 200: delete project(s) success. - @raise 404: scenario/installer/version not existed - """ - self.do_update('projects', - 'delete', - locators={'scenario': scenario, - 'installer': None, - 'version': None}) - - -class ScenarioOwnerHandler(GenericScenarioUpdateHandler): - @swagger.operation(nickname="changeScenarioOwner") - def put(self, scenario): - """ - @description: change scenario owner - @notes: substitute all projects, delete existed ones with new provides - PUT /api/v1/scenarios//owner? \ - installer=& \ - version= - @param body: new owner - @type body: L{ScenarioChangeOwnerRequest} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @param version: version - @type version: L{string} - @in version: query - @required version: True - @return 200: change owner success. - @raise 404: scenario/installer/version not existed - """ - self.do_update('owner', - 'put', - locators={'scenario': scenario, - 'installer': None, - 'version': None}) - - -class ScenarioVersionsHandler(GenericScenarioUpdateHandler): - @swagger.operation(nickname="addVersionsUnderScenario") - def post(self, scenario): - """ - @description: add versions to scenario - @notes: add one or multiple versions - POST /api/v1/scenarios//versions? \ - installer= - @param body: versions to be added - @type body: C{list} of L{ScenarioVersion} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @return 200: versions are added. - @raise 400: bad schema - @raise 409: conflict, version already exists - @raise 404: scenario/installer not exist - """ - self.do_update('versions', - 'post', - locators={'scenario': scenario, - 'installer': None}) - - @swagger.operation(nickname="updateVersionsUnderScenario") - def put(self, scenario): - """ - @description: replace all versions - @notes: substitute all versions as a totality - PUT /api/v1/scenarios//versions? \ - installer= - @param body: new versions - @type body: C{list} of L{ScenarioVersion} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @return 200: replace versions success. - @raise 400: bad schema - @raise 404: scenario/installer not exist - """ - self.do_update('versions', - 'put', - locators={'scenario': scenario, - 'installer': None}) - - @swagger.operation(nickname="deleteVersionsUnderScenario") - def delete(self, scenario): - """ - @description: delete one or multiple versions - @notes: delete one or multiple versions - DELETE /api/v1/scenarios//versions? \ - installer= - @param body: versions(names) to be deleted - @type body: C{list} of L{string} - @in body: body - @param installer: installer type - @type installer: L{string} - @in installer: query - @required installer: True - @return 200: delete versions success. - @raise 404: scenario/installer not exist - """ - self.do_update('versions', - 'delete', - locators={'scenario': scenario, - 'installer': None}) - - -class ScenarioInstallersHandler(GenericScenarioUpdateHandler): - @swagger.operation(nickname="addInstallersUnderScenario") - def post(self, scenario): - """ - @description: add installers to scenario - @notes: add one or multiple installers - POST /api/v1/scenarios//installers - @param body: installers to be added - @type body: C{list} of L{ScenarioInstaller} - @in body: body - @return 200: installers are added. - @raise 400: bad schema - @raise 409: conflict, installer already exists - @raise 404: scenario not exist - """ - self.do_update('installers', - 'post', - locators={'scenario': scenario}) - - @swagger.operation(nickname="updateInstallersUnderScenario") - def put(self, scenario): - """ - @description: replace all installers - @notes: substitute all installers as a totality - PUT /api/v1/scenarios//installers - @param body: new installers - @type body: C{list} of L{ScenarioInstaller} - @in body: body - @return 200: replace versions success. - @raise 400: bad schema - @raise 404: scenario/installer not exist - """ - self.do_update('installers', - 'put', - locators={'scenario': scenario}) - - @swagger.operation(nickname="deleteInstallersUnderScenario") - def delete(self, scenario): - """ - @description: delete one or multiple installers - @notes: delete one or multiple installers - DELETE /api/v1/scenarios//installers - @param body: installers(names) to be deleted - @type body: C{list} of L{string} - @in body: body - @return 200: delete versions success. - @raise 404: scenario/installer not exist - """ - self.do_update('installers', - 'delete', - locators={'scenario': scenario}) diff --git a/testapi/opnfv_testapi/resources/scenario_models.py b/testapi/opnfv_testapi/resources/scenario_models.py deleted file mode 100644 index d950ed1..0000000 --- a/testapi/opnfv_testapi/resources/scenario_models.py +++ /dev/null @@ -1,218 +0,0 @@ -from opnfv_testapi.resources import models -from opnfv_testapi.tornado_swagger import swagger - - -def list_default(value): - return value if value else list() - - -def dict_default(value): - return value if value else dict() - - -@swagger.model() -class ScenarioTI(models.ModelBase): - def __init__(self, date=None, status='silver'): - self.date = date - self.status = status - - def __eq__(self, other): - return (self.date == other.date and - self.status == other.status) - - def __ne__(self, other): - return not self.__eq__(other) - - -@swagger.model() -class ScenarioScore(models.ModelBase): - def __init__(self, date=None, score='0'): - self.date = date - self.score = score - - def __eq__(self, other): - return (self.date == other.date and - self.score == other.score) - - def __ne__(self, other): - return not self.__eq__(other) - - -@swagger.model() -class ScenarioProject(models.ModelBase): - """ - @property customs: - @ptype customs: C{list} of L{string} - @property scores: - @ptype scores: C{list} of L{ScenarioScore} - @property trust_indicators: - @ptype trust_indicators: C{list} of L{ScenarioTI} - """ - def __init__(self, - project='', - customs=None, - scores=None, - trust_indicators=None): - self.project = project - self.customs = list_default(customs) - self.scores = list_default(scores) - self.trust_indicators = list_default(trust_indicators) - - @staticmethod - def attr_parser(): - return {'scores': ScenarioScore, - 'trust_indicators': ScenarioTI} - - def __eq__(self, other): - return (self.project == other.project and - self._customs_eq(other) and - self._scores_eq(other) and - self._ti_eq(other)) - - def __ne__(self, other): - return not self.__eq__(other) - - def _customs_eq(self, other): - return set(self.customs) == set(other.customs) - - def _scores_eq(self, other): - return self.scores == other.scores - - def _ti_eq(self, other): - return self.trust_indicators == other.trust_indicators - - -@swagger.model() -class ScenarioVersion(models.ModelBase): - """ - @property projects: - @ptype projects: C{list} of L{ScenarioProject} - """ - def __init__(self, owner=None, version=None, projects=None): - self.owner = owner - self.version = version - self.projects = list_default(projects) - - @staticmethod - def attr_parser(): - return {'projects': ScenarioProject} - - def __eq__(self, other): - return (self.version == other.version and - self.owner == other.owner and - self._projects_eq(other)) - - def __ne__(self, other): - return not self.__eq__(other) - - def _projects_eq(self, other): - for s_project in self.projects: - for o_project in other.projects: - if s_project.project == o_project.project: - if s_project != o_project: - return False - - return True - - -@swagger.model() -class ScenarioInstaller(models.ModelBase): - """ - @property versions: - @ptype versions: C{list} of L{ScenarioVersion} - """ - def __init__(self, installer=None, versions=None): - self.installer = installer - self.versions = list_default(versions) - - @staticmethod - def attr_parser(): - return {'versions': ScenarioVersion} - - def __eq__(self, other): - return (self.installer == other.installer and self._versions_eq(other)) - - def __ne__(self, other): - return not self.__eq__(other) - - def _versions_eq(self, other): - for s_version in self.versions: - for o_version in other.versions: - if s_version.version == o_version.version: - if s_version != o_version: - return False - - return True - - -@swagger.model() -class ScenarioCreateRequest(models.ModelBase): - """ - @property installers: - @ptype installers: C{list} of L{ScenarioInstaller} - """ - def __init__(self, name='', installers=None): - self.name = name - self.installers = list_default(installers) - - @staticmethod - def attr_parser(): - return {'installers': ScenarioInstaller} - - -@swagger.model() -class ScenarioChangeOwnerRequest(models.ModelBase): - def __init__(self, owner=None): - self.owner = owner - - -@swagger.model() -class ScenarioUpdateRequest(models.ModelBase): - def __init__(self, name=None): - self.name = name - - -@swagger.model() -class Scenario(models.ModelBase): - """ - @property installers: - @ptype installers: C{list} of L{ScenarioInstaller} - """ - def __init__(self, name='', create_date='', _id='', installers=None): - self.name = name - self._id = _id - self.creation_date = create_date - self.installers = list_default(installers) - - @staticmethod - def attr_parser(): - return {'installers': ScenarioInstaller} - - def __ne__(self, other): - return not self.__eq__(other) - - def __eq__(self, other): - return (self.name == other.name and self._installers_eq(other)) - - def _installers_eq(self, other): - for s_install in self.installers: - for o_install in other.installers: - if s_install.installer == o_install.installer: - if s_install != o_install: - return False - - return True - - -@swagger.model() -class Scenarios(models.ModelBase): - """ - @property scenarios: - @ptype scenarios: C{list} of L{Scenario} - """ - def __init__(self): - self.scenarios = list() - - @staticmethod - def attr_parser(): - return {'scenarios': Scenario} diff --git a/testapi/opnfv_testapi/resources/testcase_handlers.py b/testapi/opnfv_testapi/resources/testcase_handlers.py deleted file mode 100644 index 9399326..0000000 --- a/testapi/opnfv_testapi/resources/testcase_handlers.py +++ /dev/null @@ -1,103 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - -from opnfv_testapi.resources import handlers -from opnfv_testapi.resources import testcase_models -from opnfv_testapi.tornado_swagger import swagger - - -class GenericTestcaseHandler(handlers.GenericApiHandler): - def __init__(self, application, request, **kwargs): - super(GenericTestcaseHandler, self).__init__(application, - request, - **kwargs) - self.table = self.db_testcases - self.table_cls = testcase_models.Testcase - - -class TestcaseCLHandler(GenericTestcaseHandler): - @swagger.operation(nickname="listAllTestCases") - def get(self, project_name): - """ - @description: list all testcases of a project by project_name - @return 200: return all testcases of this project, - empty list is no testcase exist in this project - @rtype: L{TestCases} - """ - self._list(query={'project_name': project_name}) - - @swagger.operation(nickname="createTestCase") - def post(self, project_name): - """ - @description: create a testcase of a project by project_name - @param body: testcase to be created - @type body: L{TestcaseCreateRequest} - @in body: body - @rtype: L{CreateResponse} - @return 200: testcase is created in this project. - @raise 403: project not exist - or testcase already exists in this project - @raise 400: body or name not provided - """ - def project_query(): - return {'name': project_name} - - def testcase_query(): - return {'project_name': project_name, - 'name': self.json_args.get('name')} - miss_fields = ['name'] - carriers = [(self.db_projects, project_query)] - self._create(miss_fields=miss_fields, - carriers=carriers, - query=testcase_query, - project_name=project_name) - - -class TestcaseGURHandler(GenericTestcaseHandler): - @swagger.operation(nickname='getTestCaseByName') - def get(self, project_name, case_name): - """ - @description: get a single testcase - by case_name and project_name - @rtype: L{Testcase} - @return 200: testcase exist - @raise 404: testcase not exist - """ - query = dict() - query['project_name'] = project_name - query["name"] = case_name - self._get_one(query=query) - - @swagger.operation(nickname="updateTestCaseByName") - def put(self, project_name, case_name): - """ - @description: update a single testcase - by project_name and case_name - @param body: testcase to be updated - @type body: L{TestcaseUpdateRequest} - @in body: body - @rtype: L{Project} - @return 200: update success - @raise 404: testcase or project not exist - @raise 403: new testcase name already exist in project - or nothing to update - """ - query = {'project_name': project_name, 'name': case_name} - db_keys = ['name', 'project_name'] - self._update(query=query, db_keys=db_keys) - - @swagger.operation(nickname='deleteTestCaseByName') - def delete(self, project_name, case_name): - """ - @description: delete a testcase by project_name and case_name - @return 200: delete success - @raise 404: testcase not exist - """ - query = {'project_name': project_name, 'name': case_name} - self._delete(query=query) diff --git a/testapi/opnfv_testapi/resources/testcase_models.py b/testapi/opnfv_testapi/resources/testcase_models.py deleted file mode 100644 index 2379dfc..0000000 --- a/testapi/opnfv_testapi/resources/testcase_models.py +++ /dev/null @@ -1,95 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Orange -# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -from opnfv_testapi.resources import models -from opnfv_testapi.tornado_swagger import swagger - - -@swagger.model() -class TestcaseCreateRequest(models.ModelBase): - def __init__(self, name, url=None, description=None, - catalog_description=None, tier=None, ci_loop=None, - criteria=None, blocking=None, dependencies=None, run=None, - domains=None, tags=None, version=None): - self.name = name - self.url = url - self.description = description - self.catalog_description = catalog_description - self.tier = tier - self.ci_loop = ci_loop - self.criteria = criteria - self.blocking = blocking - self.dependencies = dependencies - self.run = run - self.domains = domains - self.tags = tags - self.version = version - self.trust = "Silver" - - -@swagger.model() -class TestcaseUpdateRequest(models.ModelBase): - def __init__(self, name=None, description=None, project_name=None, - catalog_description=None, tier=None, ci_loop=None, - criteria=None, blocking=None, dependencies=None, run=None, - domains=None, tags=None, version=None, trust=None): - self.name = name - self.description = description - self.catalog_description = catalog_description - self.project_name = project_name - self.tier = tier - self.ci_loop = ci_loop - self.criteria = criteria - self.blocking = blocking - self.dependencies = dependencies - self.run = run - self.domains = domains - self.tags = tags - self.version = version - self.trust = trust - - -@swagger.model() -class Testcase(models.ModelBase): - def __init__(self, _id=None, name=None, project_name=None, - description=None, url=None, creation_date=None, - catalog_description=None, tier=None, ci_loop=None, - criteria=None, blocking=None, dependencies=None, run=None, - domains=None, tags=None, version=None, - trust=None): - self._id = None - self.name = None - self.project_name = None - self.description = None - self.catalog_description = None - self.url = None - self.creation_date = None - self.tier = None - self.ci_loop = None - self.criteria = None - self.blocking = None - self.dependencies = None - self.run = None - self.domains = None - self.tags = None - self.version = None - self.trust = None - - -@swagger.model() -class Testcases(models.ModelBase): - """ - @property testcases: - @ptype testcases: C{list} of L{Testcase} - """ - def __init__(self): - self.testcases = list() - - @staticmethod - def attr_parser(): - return {'testcases': Testcase} diff --git a/testapi/opnfv_testapi/router/url_mappings.py b/testapi/opnfv_testapi/router/url_mappings.py index ce0a3ee..349d557 100644 --- a/testapi/opnfv_testapi/router/url_mappings.py +++ b/testapi/opnfv_testapi/router/url_mappings.py @@ -9,19 +9,19 @@ import tornado.web from opnfv_testapi.common.config import CONF -from opnfv_testapi.resources import handlers -from opnfv_testapi.resources import pod_handlers -from opnfv_testapi.resources import project_handlers -from opnfv_testapi.resources import result_handlers -from opnfv_testapi.resources import scenario_handlers -from opnfv_testapi.resources import testcase_handlers -from opnfv_testapi.ui import root -from opnfv_testapi.ui.auth import sign -from opnfv_testapi.ui.auth import user +from opnfv_testapi.handlers import base_handlers +from opnfv_testapi.handlers import pod_handlers +from opnfv_testapi.handlers import project_handlers +from opnfv_testapi.handlers import result_handlers +from opnfv_testapi.handlers import root_handlers +from opnfv_testapi.handlers import scenario_handlers +from opnfv_testapi.handlers import sign_handlers +from opnfv_testapi.handlers import testcase_handlers +from opnfv_testapi.handlers import user_handlers mappings = [ # GET /versions => GET API version - (r"/versions", handlers.VersionHandler), + (r"/versions", base_handlers.VersionHandler), # few examples: # GET /api/v1/pods => Get all pods @@ -74,10 +74,11 @@ mappings = [ tornado.web.StaticFileHandler, {'path': CONF.ui_static_path}), - (r'/', root.RootHandler), - (r'/api/v1/auth/signin', sign.SigninHandler), - (r'/{}'.format(CONF.lfid_signin_return), sign.SigninReturnHandler), - (r'/api/v1/auth/signout', sign.SignoutHandler), - (r'/api/v1/profile', user.UserHandler), + (r'/', root_handlers.RootHandler), + (r'/api/v1/auth/signin', sign_handlers.SigninHandler), + (r'/{}'.format(CONF.lfid_signin_return), + sign_handlers.SigninReturnHandler), + (r'/api/v1/auth/signout', sign_handlers.SignoutHandler), + (r'/api/v1/profile', user_handlers.UserHandler), ] diff --git a/testapi/opnfv_testapi/tests/unit/executor.py b/testapi/opnfv_testapi/tests/unit/executor.py index aa99b90..743c076 100644 --- a/testapi/opnfv_testapi/tests/unit/executor.py +++ b/testapi/opnfv_testapi/tests/unit/executor.py @@ -14,7 +14,7 @@ import mock O_get_secure_cookie = ( - 'opnfv_testapi.resources.handlers.GenericApiHandler.get_secure_cookie') + 'opnfv_testapi.handlers.base_handlers.GenericApiHandler.get_secure_cookie') def thread_execute(method, *args, **kwargs): diff --git a/testapi/opnfv_testapi/tests/unit/handlers/__init__.py b/testapi/opnfv_testapi/tests/unit/handlers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json new file mode 100644 index 0000000..1878022 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c1.json @@ -0,0 +1,38 @@ +{ + "name": "nosdn-nofeature-ha", + "installers": + [ + { + "installer": "apex", + "versions": + [ + { + "owner": "Luke", + "version": "master", + "projects": + [ + { + "project": "functest", + "customs": [ "healthcheck", "vping_ssh"], + "scores": + [ + { + "date": "2017-01-08 22:46:44", + "score": "12/14" + } + + ], + "trust_indicators": [] + }, + { + "project": "yardstick", + "customs": [], + "scores": [], + "trust_indicators": [] + } + ] + } + ] + } + ] +} diff --git a/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json new file mode 100644 index 0000000..980051c --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/scenario-c2.json @@ -0,0 +1,73 @@ +{ + "name": "odl_2-nofeature-ha", + "installers": + [ + { + "installer": "fuel", + "versions": + [ + { + "owner": "Lucky", + "version": "danube", + "projects": + [ + { + "project": "functest", + "customs": [ "healthcheck", "vping_ssh"], + "scores": [], + "trust_indicators": [ + { + "date": "2017-01-18 22:46:44", + "status": "silver" + } + + ] + }, + { + "project": "yardstick", + "customs": ["suite-a"], + "scores": [ + { + "date": "2017-01-08 22:46:44", + "score": "0/1" + } + ], + "trust_indicators": [ + { + "date": "2017-01-18 22:46:44", + "status": "gold" + } + ] + } + ] + }, + { + "owner": "Luke", + "version": "colorado", + "projects": + [ + { + "project": "functest", + "customs": [ "healthcheck", "vping_ssh"], + "scores": + [ + { + "date": "2017-01-09 22:46:44", + "score": "11/14" + } + + ], + "trust_indicators": [] + }, + { + "project": "yardstick", + "customs": [], + "scores": [], + "trust_indicators": [] + } + ] + } + ] + } + ] +} diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_base.py b/testapi/opnfv_testapi/tests/unit/handlers/test_base.py new file mode 100644 index 0000000..b7fabb9 --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_base.py @@ -0,0 +1,204 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from datetime import datetime +import json +from os import path + +from bson.objectid import ObjectId +import mock +from tornado import testing + +from opnfv_testapi.models import base_models +from opnfv_testapi.models import pod_models +from opnfv_testapi.tests.unit import fake_pymongo + + +class TestBase(testing.AsyncHTTPTestCase): + headers = {'Content-Type': 'application/json; charset=UTF-8'} + + def setUp(self): + self._patch_server() + self.basePath = '' + self.create_res = base_models.CreateResponse + self.get_res = None + self.list_res = None + self.update_res = None + self.pod_d = pod_models.Pod(name='zte-pod1', + mode='virtual', + details='zte pod 1', + role='community-ci', + _id=str(ObjectId()), + owner='ValidUser', + create_date=str(datetime.now())) + self.pod_e = pod_models.Pod(name='zte-pod2', + mode='metal', + details='zte pod 2', + role='production-ci', + _id=str(ObjectId()), + owner='ValidUser', + create_date=str(datetime.now())) + self.req_d = None + self.req_e = None + self.addCleanup(self._clear) + super(TestBase, self).setUp() + fake_pymongo.users.insert({"user": "ValidUser", + 'email': 'validuser@lf.com', + 'fullname': 'Valid User', + 'groups': [ + 'opnfv-testapi-users', + 'opnfv-gerrit-functest-submitters', + 'opnfv-gerrit-qtip-contributors'] + }) + + def tearDown(self): + self.db_patcher.stop() + self.config_patcher.stop() + + def _patch_server(self): + import argparse + config = path.join(path.dirname(__file__), + '../../../../etc/config.ini') + self.config_patcher = mock.patch( + 'argparse.ArgumentParser.parse_known_args', + return_value=(argparse.Namespace(config_file=config), None)) + self.db_patcher = mock.patch('opnfv_testapi.db.api.DB', + fake_pymongo) + self.config_patcher.start() + self.db_patcher.start() + + def get_app(self): + from opnfv_testapi.cmd import server + return server.make_app() + + def create_d(self, *args): + return self.create(self.req_d, *args) + + def create_e(self, *args): + return self.create(self.req_e, *args) + + def create(self, req=None, *args): + return self.create_help(self.basePath, req, *args) + + def create_help(self, uri, req, *args): + return self.post_direct_url(self._update_uri(uri, *args), req) + + def post_direct_url(self, url, req): + if req and not isinstance(req, str) and hasattr(req, 'format'): + req = req.format() + res = self.fetch(url, + method='POST', + body=json.dumps(req), + headers=self.headers) + + return self._get_return(res, self.create_res) + + def get(self, *args): + res = self.fetch(self._get_uri(*args), + method='GET', + headers=self.headers) + + def inner(): + new_args, num = self._get_valid_args(*args) + return self.get_res \ + if num != self._need_arg_num(self.basePath) else self.list_res + return self._get_return(res, inner()) + + def query(self, query): + res = self.fetch(self._get_query_uri(query), + method='GET', + headers=self.headers) + return self._get_return(res, self.list_res) + + def update_direct_url(self, url, new=None): + if new and hasattr(new, 'format'): + new = new.format() + res = self.fetch(url, + method='PUT', + body=json.dumps(new), + headers=self.headers) + return self._get_return(res, self.update_res) + + def update(self, new=None, *args): + return self.update_direct_url(self._get_uri(*args), new) + + def delete_direct_url(self, url, body): + if body: + res = self.fetch(url, + method='DELETE', + body=json.dumps(body), + headers=self.headers, + allow_nonstandard_methods=True) + else: + res = self.fetch(url, + method='DELETE', + headers=self.headers) + + return res.code, res.body + + def delete(self, *args): + return self.delete_direct_url(self._get_uri(*args), None) + + @staticmethod + def _get_valid_args(*args): + new_args = tuple(['%s' % arg for arg in args if arg is not None]) + return new_args, len(new_args) + + def _need_arg_num(self, uri): + return uri.count('%s') + + def _get_query_uri(self, query): + return self.basePath + '?' + query if query else self.basePath + + def _get_uri(self, *args): + return self._update_uri(self.basePath, *args) + + def _update_uri(self, uri, *args): + r_uri = uri + new_args, num = self._get_valid_args(*args) + if num != self._need_arg_num(uri): + r_uri += '/%s' + + return r_uri % tuple(['%s' % arg for arg in new_args]) + + def _get_return(self, res, cls): + code = res.code + body = res.body + if body: + return code, self._get_return_body(code, body, cls) + else: + return code, None + + @staticmethod + def _get_return_body(code, body, cls): + return cls.from_dict(json.loads(body)) if code < 300 and cls else body + + def assert_href(self, body): + self.assertIn(self.basePath, body.href) + + def assert_create_body(self, body, req=None, *args): + import inspect + if not req: + req = self.req_d + resource_name = '' + if inspect.isclass(req): + resource_name = req.name + elif isinstance(req, dict): + resource_name = req['name'] + elif isinstance(req, str): + resource_name = json.loads(req)['name'] + new_args = args + tuple([resource_name]) + self.assertIn(self._get_uri(*new_args), body.href) + + @staticmethod + def _clear(): + fake_pymongo.pods.clear() + fake_pymongo.projects.clear() + fake_pymongo.testcases.clear() + fake_pymongo.results.clear() + fake_pymongo.scenarios.clear() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py b/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py new file mode 100644 index 0000000..95ed8ba --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_pod.py @@ -0,0 +1,118 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import httplib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.models import pod_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit import fake_pymongo +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class TestPodBase(base.TestBase): + def setUp(self): + super(TestPodBase, self).setUp() + self.get_res = pod_models.Pod + self.list_res = pod_models.Pods + self.basePath = '/api/v1/pods' + self.req_d = pod_models.PodCreateRequest(name=self.pod_d.name, + mode=self.pod_d.mode, + details=self.pod_d.details, + role=self.pod_d.role) + self.req_e = pod_models.PodCreateRequest(name=self.pod_e.name, + mode=self.pod_e.mode, + details=self.pod_e.details, + role=self.pod_e.role) + + def assert_get_body(self, pod, req=None): + if not req: + req = self.req_d + self.assertEqual(pod.owner, 'ValidUser') + self.assertEqual(pod.name, req.name) + self.assertEqual(pod.mode, req.mode) + self.assertEqual(pod.details, req.details) + self.assertEqual(pod.role, req.role) + self.assertIsNotNone(pod.creation_date) + self.assertIsNotNone(pod._id) + + +class TestPodCreate(TestPodBase): + @executor.create(httplib.BAD_REQUEST, message.not_login()) + def test_notlogin(self): + return self.req_d + + @executor.mock_invalid_lfid() + @executor.create(httplib.BAD_REQUEST, message.not_lfid()) + def test_invalidLfid(self): + return self.req_d + + @executor.mock_valid_lfid() + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_withoutBody(self): + return None + + @executor.mock_valid_lfid() + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_emptyName(self): + return pod_models.PodCreateRequest('') + + @executor.mock_valid_lfid() + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_noneName(self): + return pod_models.PodCreateRequest(None) + + @executor.mock_valid_lfid() + @executor.create(httplib.OK, 'assert_create_body') + def test_success(self): + return self.req_d + + @executor.mock_valid_lfid() + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExist(self): + fake_pymongo.pods.insert(self.pod_d.format()) + return self.req_d + + @executor.mock_valid_lfid() + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExistCaseInsensitive(self): + fake_pymongo.pods.insert(self.pod_d.format()) + self.req_d.name = self.req_d.name.upper() + return self.req_d + + +class TestPodGet(TestPodBase): + def setUp(self): + super(TestPodGet, self).setUp() + fake_pymongo.pods.insert(self.pod_d.format()) + fake_pymongo.pods.insert(self.pod_e.format()) + + @executor.get(httplib.NOT_FOUND, message.not_found_base) + def test_notExist(self): + return 'notExist' + + @executor.get(httplib.OK, 'assert_get_body') + def test_getOne(self): + return self.pod_d.name + + @executor.get(httplib.OK, '_assert_list') + def test_list(self): + return None + + def _assert_list(self, body): + self.assertEqual(len(body.pods), 2) + for pod in body.pods: + if self.pod_d.name == pod.name: + self.assert_get_body(pod) + else: + self.assert_get_body(pod, self.pod_e) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_project.py b/testapi/opnfv_testapi/tests/unit/handlers/test_project.py new file mode 100644 index 0000000..939cc0d --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_project.py @@ -0,0 +1,137 @@ +import httplib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.models import project_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class TestProjectBase(base.TestBase): + def setUp(self): + super(TestProjectBase, self).setUp() + self.req_d = project_models.ProjectCreateRequest('vping', + 'vping-ssh test') + self.req_e = project_models.ProjectCreateRequest('doctor', + 'doctor test') + self.get_res = project_models.Project + self.list_res = project_models.Projects + self.update_res = project_models.Project + self.basePath = '/api/v1/projects' + + def assert_body(self, project, req=None): + if not req: + req = self.req_d + self.assertEqual(project.name, req.name) + self.assertEqual(project.description, req.description) + self.assertIsNotNone(project._id) + self.assertIsNotNone(project.creation_date) + + +class TestProjectCreate(TestProjectBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_withoutBody(self): + return None + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_emptyName(self): + return project_models.ProjectCreateRequest('') + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_noneName(self): + return project_models.ProjectCreateRequest(None) + + @executor.create(httplib.OK, 'assert_create_body') + def test_success(self): + return self.req_d + + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExist(self): + self.create_d() + return self.req_d + + +class TestProjectGet(TestProjectBase): + def setUp(self): + super(TestProjectGet, self).setUp() + self.create_d() + self.create_e() + + @executor.get(httplib.NOT_FOUND, message.not_found_base) + def test_notExist(self): + return 'notExist' + + @executor.get(httplib.OK, 'assert_body') + def test_getOne(self): + return self.req_d.name + + @executor.get(httplib.OK, '_assert_list') + def test_list(self): + return None + + def _assert_list(self, body): + for project in body.projects: + if self.req_d.name == project.name: + self.assert_body(project) + else: + self.assert_body(project, self.req_e) + + +class TestProjectUpdate(TestProjectBase): + def setUp(self): + super(TestProjectUpdate, self).setUp() + _, d_body = self.create_d() + _, get_res = self.get(self.req_d.name) + self.index_d = get_res._id + self.create_e() + + @executor.update(httplib.BAD_REQUEST, message.no_body()) + def test_withoutBody(self): + return None, 'noBody' + + @executor.update(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return self.req_e, 'notFound' + + @executor.update(httplib.FORBIDDEN, message.exist_base) + def test_newNameExist(self): + return self.req_e, self.req_d.name + + @executor.update(httplib.FORBIDDEN, message.no_update()) + def test_noUpdate(self): + return self.req_d, self.req_d.name + + @executor.update(httplib.OK, '_assert_update') + def test_success(self): + req = project_models.ProjectUpdateRequest('newName', 'new description') + return req, self.req_d.name + + def _assert_update(self, req, body): + self.assertEqual(self.index_d, body._id) + self.assert_body(body, req) + _, new_body = self.get(req.name) + self.assertEqual(self.index_d, new_body._id) + self.assert_body(new_body, req) + + +class TestProjectDelete(TestProjectBase): + def setUp(self): + super(TestProjectDelete, self).setUp() + self.create_d() + + @executor.delete(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return 'notFound' + + @executor.delete(httplib.OK, '_assert_delete') + def test_success(self): + return self.req_d.name + + def _assert_delete(self, body): + self.assertEqual(body, '') + code, body = self.get(self.req_d.name) + self.assertEqual(code, httplib.NOT_FOUND) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_result.py b/testapi/opnfv_testapi/tests/unit/handlers/test_result.py new file mode 100644 index 0000000..b9f9ede --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_result.py @@ -0,0 +1,413 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import copy +from datetime import datetime +from datetime import timedelta +import httplib +import json +import urllib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.models import project_models +from opnfv_testapi.models import result_models +from opnfv_testapi.models import testcase_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit import fake_pymongo +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class Details(object): + def __init__(self, timestart=None, duration=None, status=None): + self.timestart = timestart + self.duration = duration + self.status = status + self.items = [{'item1': 1}, {'item2': 2}] + + def format(self): + return { + "timestart": self.timestart, + "duration": self.duration, + "status": self.status, + 'items': [{'item1': 1}, {'item2': 2}] + } + + @staticmethod + def from_dict(a_dict): + + if a_dict is None: + return None + + t = Details() + t.timestart = a_dict.get('timestart') + t.duration = a_dict.get('duration') + t.status = a_dict.get('status') + t.items = a_dict.get('items') + return t + + +class TestResultBase(base.TestBase): + def setUp(self): + super(TestResultBase, self).setUp() + self.pod = self.pod_d.name + self.project = 'functest' + self.case = 'vPing' + self.installer = 'fuel' + self.version = 'C' + self.build_tag = 'v3.0' + self.scenario = 'odl-l2' + self.criteria = 'PASS' + self.trust_indicator = result_models.TI(0.7) + self.start_date = str(datetime.now()) + self.stop_date = str(datetime.now() + timedelta(minutes=1)) + self.update_date = str(datetime.now() + timedelta(days=1)) + self.update_step = -0.05 + self.details = Details(timestart='0', duration='9s', status='OK') + self.req_d = result_models.ResultCreateRequest( + pod_name=self.pod, + project_name=self.project, + case_name=self.case, + installer=self.installer, + version=self.version, + start_date=self.start_date, + stop_date=self.stop_date, + details=self.details.format(), + build_tag=self.build_tag, + scenario=self.scenario, + criteria=self.criteria, + trust_indicator=self.trust_indicator) + self.get_res = result_models.TestResult + self.list_res = result_models.TestResults + self.update_res = result_models.TestResult + self.basePath = '/api/v1/results' + self.req_project = project_models.ProjectCreateRequest( + self.project, + 'vping test') + self.req_testcase = testcase_models.TestcaseCreateRequest( + self.case, + '/cases/vping', + 'vping-ssh test') + fake_pymongo.pods.insert(self.pod_d.format()) + self.create_help('/api/v1/projects', self.req_project) + self.create_help('/api/v1/projects/%s/cases', + self.req_testcase, + self.project) + + def assert_res(self, result, req=None): + if req is None: + req = self.req_d + self.assertEqual(result.pod_name, req.pod_name) + self.assertEqual(result.project_name, req.project_name) + self.assertEqual(result.case_name, req.case_name) + self.assertEqual(result.installer, req.installer) + self.assertEqual(result.version, req.version) + details_req = Details.from_dict(req.details) + details_res = Details.from_dict(result.details) + self.assertEqual(details_res.duration, details_req.duration) + self.assertEqual(details_res.timestart, details_req.timestart) + self.assertEqual(details_res.status, details_req.status) + self.assertEqual(details_res.items, details_req.items) + self.assertEqual(result.build_tag, req.build_tag) + self.assertEqual(result.scenario, req.scenario) + self.assertEqual(result.criteria, req.criteria) + self.assertEqual(result.start_date, req.start_date) + self.assertEqual(result.stop_date, req.stop_date) + self.assertIsNotNone(result._id) + ti = result.trust_indicator + self.assertEqual(ti.current, req.trust_indicator.current) + if ti.histories: + history = ti.histories[0] + self.assertEqual(history.date, self.update_date) + self.assertEqual(history.step, self.update_step) + + def _create_d(self): + _, res = self.create_d() + return res.href.split('/')[-1] + + def upload(self, req): + if req and not isinstance(req, str) and hasattr(req, 'format'): + req = req.format() + res = self.fetch(self.basePath + '/upload', + method='POST', + body=json.dumps(req), + headers=self.headers) + + return self._get_return(res, self.create_res) + + +class TestResultUpload(TestResultBase): + @executor.upload(httplib.BAD_REQUEST, message.key_error('file')) + def test_filenotfind(self): + return None + + +class TestResultCreate(TestResultBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_nobody(self): + return None + + @executor.create(httplib.BAD_REQUEST, message.missing('pod_name')) + def test_podNotProvided(self): + req = self.req_d + req.pod_name = None + return req + + @executor.create(httplib.BAD_REQUEST, message.missing('project_name')) + def test_projectNotProvided(self): + req = self.req_d + req.project_name = None + return req + + @executor.create(httplib.BAD_REQUEST, message.missing('case_name')) + def test_testcaseNotProvided(self): + req = self.req_d + req.case_name = None + return req + + @executor.create(httplib.BAD_REQUEST, + message.invalid_value('criteria', ['PASS', 'FAIL'])) + def test_invalid_criteria(self): + req = self.req_d + req.criteria = 'invalid' + return req + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noPod(self): + req = self.req_d + req.pod_name = 'notExistPod' + return req + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noProject(self): + req = self.req_d + req.project_name = 'notExistProject' + return req + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noTestcase(self): + req = self.req_d + req.case_name = 'notExistTestcase' + return req + + @executor.create(httplib.OK, 'assert_href') + def test_success(self): + return self.req_d + + @executor.create(httplib.OK, 'assert_href') + def test_key_with_doc(self): + req = copy.deepcopy(self.req_d) + req.details = {'1.name': 'dot_name'} + return req + + @executor.create(httplib.OK, '_assert_no_ti') + def test_no_ti(self): + req = result_models.ResultCreateRequest(pod_name=self.pod, + project_name=self.project, + case_name=self.case, + installer=self.installer, + version=self.version, + start_date=self.start_date, + stop_date=self.stop_date, + details=self.details.format(), + build_tag=self.build_tag, + scenario=self.scenario, + criteria=self.criteria) + self.actual_req = req + return req + + def _assert_no_ti(self, body): + _id = body.href.split('/')[-1] + code, body = self.get(_id) + self.assert_res(body, self.actual_req) + + +class TestResultGet(TestResultBase): + def setUp(self): + super(TestResultGet, self).setUp() + self.req_10d_before = self._create_changed_date(days=-10) + self.req_d_id = self._create_d() + self.req_10d_later = self._create_changed_date(days=10) + + @executor.get(httplib.OK, 'assert_res') + def test_getOne(self): + return self.req_d_id + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryPod(self): + return self._set_query('pod') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryProject(self): + return self._set_query('project') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryTestcase(self): + return self._set_query('case') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryVersion(self): + return self._set_query('version') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryInstaller(self): + return self._set_query('installer') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryBuildTag(self): + return self._set_query('build_tag') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryScenario(self): + return self._set_query('scenario') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryTrustIndicator(self): + return self._set_query('trust_indicator') + + @executor.query(httplib.OK, '_query_success', 3) + def test_queryCriteria(self): + return self._set_query('criteria') + + @executor.query(httplib.BAD_REQUEST, message.must_int('period')) + def test_queryPeriodNotInt(self): + return self._set_query(period='a') + + @executor.query(httplib.OK, '_query_period_one', 1) + def test_queryPeriodSuccess(self): + return self._set_query(period=5) + + @executor.query(httplib.BAD_REQUEST, message.must_int('last')) + def test_queryLastNotInt(self): + return self._set_query(last='a') + + @executor.query(httplib.OK, '_query_last_one', 1) + def test_queryLast(self): + return self._set_query(last=1) + + @executor.query(httplib.OK, '_query_success', 4) + def test_queryPublic(self): + self._create_public_data() + return self._set_query() + + @executor.query(httplib.OK, '_query_success', 1) + def test_queryPrivate(self): + self._create_private_data() + return self._set_query(public='false') + + @executor.query(httplib.OK, '_query_period_one', 1) + def test_combination(self): + return self._set_query('pod', + 'project', + 'case', + 'version', + 'installer', + 'build_tag', + 'scenario', + 'trust_indicator', + 'criteria', + period=5) + + @executor.query(httplib.OK, '_query_success', 0) + def test_notFound(self): + return self._set_query('project', + 'case', + 'version', + 'installer', + 'build_tag', + 'scenario', + 'trust_indicator', + 'criteria', + pod='notExistPod', + period=1) + + @executor.query(httplib.OK, '_query_success', 1) + def test_filterErrorStartdate(self): + self._create_error_start_date(None) + self._create_error_start_date('None') + self._create_error_start_date('null') + self._create_error_start_date('') + return self._set_query(period=5) + + def _query_success(self, body, number): + self.assertEqual(number, len(body.results)) + + def _query_last_one(self, body, number): + self.assertEqual(number, len(body.results)) + self.assert_res(body.results[0], self.req_10d_later) + + def _query_period_one(self, body, number): + self.assertEqual(number, len(body.results)) + self.assert_res(body.results[0], self.req_d) + + def _create_error_start_date(self, start_date): + req = copy.deepcopy(self.req_d) + req.start_date = start_date + self.create(req) + return req + + def _create_changed_date(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.start_date = datetime.now() + timedelta(**kwargs) + req.stop_date = str(req.start_date + timedelta(minutes=10)) + req.start_date = str(req.start_date) + self.create(req) + return req + + def _create_public_data(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.public = 'true' + self.create(req) + return req + + def _create_private_data(self, **kwargs): + req = copy.deepcopy(self.req_d) + req.public = 'false' + self.create(req) + return req + + def _set_query(self, *args, **kwargs): + def get_value(arg): + return self.__getattribute__(arg) \ + if arg != 'trust_indicator' else self.trust_indicator.current + query = [] + for arg in args: + query.append((arg, get_value(arg))) + for k, v in kwargs.iteritems(): + query.append((k, v)) + return urllib.urlencode(query) + + +class TestResultUpdate(TestResultBase): + def setUp(self): + super(TestResultUpdate, self).setUp() + self.req_d_id = self._create_d() + + @executor.update(httplib.OK, '_assert_update_ti') + def test_success(self): + new_ti = copy.deepcopy(self.trust_indicator) + new_ti.current += self.update_step + new_ti.histories.append( + result_models.TIHistory(self.update_date, self.update_step)) + new_data = copy.deepcopy(self.req_d) + new_data.trust_indicator = new_ti + update = result_models.ResultUpdateRequest(trust_indicator=new_ti) + self.update_req = new_data + return update, self.req_d_id + + def _assert_update_ti(self, request, body): + ti = body.trust_indicator + self.assertEqual(ti.current, request.trust_indicator.current) + if ti.histories: + history = ti.histories[0] + self.assertEqual(history.date, self.update_date) + self.assertEqual(history.step, self.update_step) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py b/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py new file mode 100644 index 0000000..de7777a --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_scenario.py @@ -0,0 +1,449 @@ +import functools +import httplib +import json +import os + +from datetime import datetime + +from opnfv_testapi.common import message +import opnfv_testapi.models.scenario_models as models +from opnfv_testapi.tests.unit.handlers import test_base as base + + +def _none_default(check, default): + return check if check else default + + +class TestScenarioBase(base.TestBase): + def setUp(self): + super(TestScenarioBase, self).setUp() + self.get_res = models.Scenario + self.list_res = models.Scenarios + self.basePath = '/api/v1/scenarios' + self.req_d = self._load_request('scenario-c1.json') + self.req_2 = self._load_request('scenario-c2.json') + + def tearDown(self): + pass + + def assert_body(self, project, req=None): + pass + + @staticmethod + def _load_request(f_req): + abs_file = os.path.join(os.path.dirname(__file__), f_req) + with open(abs_file, 'r') as f: + loader = json.load(f) + f.close() + return loader + + def create_return_name(self, req): + _, res = self.create(req) + return res.href.split('/')[-1] + + def assert_res(self, code, scenario, req=None): + self.assertEqual(code, httplib.OK) + if req is None: + req = self.req_d + self.assertIsNotNone(scenario._id) + self.assertIsNotNone(scenario.creation_date) + self.assertEqual(scenario, models.Scenario.from_dict(req)) + + @staticmethod + def set_query(*args): + uri = '' + for arg in args: + uri += arg + '&' + return uri[0: -1] + + def get_and_assert(self, name): + code, body = self.get(name) + self.assert_res(code, body, self.req_d) + + +class TestScenarioCreate(TestScenarioBase): + def test_withoutBody(self): + (code, body) = self.create() + self.assertEqual(code, httplib.BAD_REQUEST) + + def test_emptyName(self): + req_empty = models.ScenarioCreateRequest('') + (code, body) = self.create(req_empty) + self.assertEqual(code, httplib.BAD_REQUEST) + self.assertIn(message.missing('name'), body) + + def test_noneName(self): + req_none = models.ScenarioCreateRequest(None) + (code, body) = self.create(req_none) + self.assertEqual(code, httplib.BAD_REQUEST) + self.assertIn(message.missing('name'), body) + + def test_success(self): + (code, body) = self.create_d() + self.assertEqual(code, httplib.OK) + self.assert_create_body(body) + + def test_alreadyExist(self): + self.create_d() + (code, body) = self.create_d() + self.assertEqual(code, httplib.FORBIDDEN) + self.assertIn(message.exist_base, body) + + +class TestScenarioGet(TestScenarioBase): + def setUp(self): + super(TestScenarioGet, self).setUp() + self.scenario_1 = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + + def test_getByName(self): + self.get_and_assert(self.scenario_1) + + def test_getAll(self): + self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) + + def test_queryName(self): + query = self.set_query('name=nosdn-nofeature-ha') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryInstaller(self): + query = self.set_query('installer=apex') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryVersion(self): + query = self.set_query('version=master') + self._query_and_assert(query, reqs=[self.req_d]) + + def test_queryProject(self): + query = self.set_query('project=functest') + self._query_and_assert(query, reqs=[self.req_d, self.req_2]) + + # close due to random fail, open again after solve it in another patch + # def test_queryCombination(self): + # query = self._set_query('name=nosdn-nofeature-ha', + # 'installer=apex', + # 'version=master', + # 'project=functest') + # + # self._query_and_assert(query, reqs=[self.req_d]) + + def _query_and_assert(self, query, found=True, reqs=None): + code, body = self.query(query) + if not found: + self.assertEqual(code, httplib.OK) + self.assertEqual(0, len(body.scenarios)) + else: + self.assertEqual(len(reqs), len(body.scenarios)) + for req in reqs: + for scenario in body.scenarios: + if req['name'] == scenario.name: + self.assert_res(code, scenario, req) + + +class TestScenarioDelete(TestScenarioBase): + def test_notFound(self): + code, body = self.delete('notFound') + self.assertEqual(code, httplib.NOT_FOUND) + + def test_success(self): + scenario = self.create_return_name(self.req_d) + code, _ = self.delete(scenario) + self.assertEqual(code, httplib.OK) + code, _ = self.get(scenario) + self.assertEqual(code, httplib.NOT_FOUND) + + +class TestScenarioUpdate(TestScenarioBase): + def setUp(self): + super(TestScenarioUpdate, self).setUp() + self.scenario = self.create_return_name(self.req_d) + self.scenario_2 = self.create_return_name(self.req_2) + self.update_url = '' + self.scenario_url = '/api/v1/scenarios/{}'.format(self.scenario) + self.installer = self.req_d['installers'][0]['installer'] + self.version = self.req_d['installers'][0]['versions'][0]['version'] + self.locate_project = 'installer={}&version={}&project={}'.format( + self.installer, + self.version, + 'functest') + + def update_url_fixture(item): + def _update_url_fixture(xstep): + def wrapper(self, *args, **kwargs): + self.update_url = '{}/{}'.format(self.scenario_url, item) + locator = None + if item in ['projects', 'owner']: + locator = 'installer={}&version={}'.format( + self.installer, + self.version) + elif item in ['versions']: + locator = 'installer={}'.format( + self.installer) + elif item in ['rename']: + self.update_url = self.scenario_url + + if locator: + self.update_url = '{}?{}'.format(self.update_url, locator) + + xstep(self, *args, **kwargs) + return wrapper + return _update_url_fixture + + def update_partial(operate, expected): + def _update_partial(set_update): + @functools.wraps(set_update) + def wrapper(self): + update = set_update(self) + code, body = getattr(self, operate)(update) + getattr(self, expected)(code) + return wrapper + return _update_partial + + @update_partial('_add', '_success') + def test_addScore(self): + add = models.ScenarioScore(date=str(datetime.now()), score='11/12') + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['scores'].append(add.format()) + self.update_url = '{}/scores?{}'.format(self.scenario_url, + self.locate_project) + + return add + + @update_partial('_add', '_success') + def test_addTrustIndicator(self): + add = models.ScenarioTI(date=str(datetime.now()), status='gold') + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['trust_indicators'].append(add.format()) + self.update_url = '{}/trust_indicators?{}'.format(self.scenario_url, + self.locate_project) + + return add + + @update_partial('_add', '_success') + def test_addCustoms(self): + adds = ['odl', 'parser', 'vping_ssh'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = list(set(functest['customs'] + adds)) + self.update_url = '{}/customs?{}'.format(self.scenario_url, + self.locate_project) + return adds + + @update_partial('_update', '_success') + def test_updateCustoms(self): + updates = ['odl', 'parser', 'vping_ssh'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = updates + self.update_url = '{}/customs?{}'.format(self.scenario_url, + self.locate_project) + + return updates + + @update_partial('_delete', '_success') + def test_deleteCustoms(self): + deletes = ['vping_ssh'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + functest = filter(lambda f: f['project'] == 'functest', projects)[0] + functest['customs'] = ['healthcheck'] + self.update_url = '{}/customs?{}'.format(self.scenario_url, + self.locate_project) + + return deletes + + @update_url_fixture('projects') + @update_partial('_add', '_success') + def test_addProjects_succ(self): + add = models.ScenarioProject(project='qtip').format() + self.req_d['installers'][0]['versions'][0]['projects'].append(add) + return [add] + + @update_url_fixture('projects') + @update_partial('_add', '_conflict') + def test_addProjects_already_exist(self): + add = models.ScenarioProject(project='functest').format() + return [add] + + @update_url_fixture('projects') + @update_partial('_add', '_bad_request') + def test_addProjects_bad_schema(self): + add = models.ScenarioProject(project='functest').format() + add['score'] = None + return [add] + + @update_url_fixture('projects') + @update_partial('_update', '_success') + def test_updateProjects_succ(self): + update = models.ScenarioProject(project='qtip').format() + self.req_d['installers'][0]['versions'][0]['projects'] = [update] + return [update] + + @update_url_fixture('projects') + @update_partial('_update', '_conflict') + def test_updateProjects_duplicated(self): + update = models.ScenarioProject(project='qtip').format() + return [update, update] + + @update_url_fixture('projects') + @update_partial('_update', '_bad_request') + def test_updateProjects_bad_schema(self): + update = models.ScenarioProject(project='functest').format() + update['score'] = None + return [update] + + @update_url_fixture('projects') + @update_partial('_delete', '_success') + def test_deleteProjects(self): + deletes = ['functest'] + projects = self.req_d['installers'][0]['versions'][0]['projects'] + self.req_d['installers'][0]['versions'][0]['projects'] = filter( + lambda f: f['project'] != 'functest', + projects) + return deletes + + @update_url_fixture('owner') + @update_partial('_update', '_success') + def test_changeOwner(self): + new_owner = 'new_owner' + update = models.ScenarioChangeOwnerRequest(new_owner).format() + self.req_d['installers'][0]['versions'][0]['owner'] = new_owner + return update + + @update_url_fixture('versions') + @update_partial('_add', '_success') + def test_addVersions_succ(self): + add = models.ScenarioVersion(version='Euphrates').format() + self.req_d['installers'][0]['versions'].append(add) + return [add] + + @update_url_fixture('versions') + @update_partial('_add', '_conflict') + def test_addVersions_already_exist(self): + add = models.ScenarioVersion(version='master').format() + return [add] + + @update_url_fixture('versions') + @update_partial('_add', '_bad_request') + def test_addVersions_bad_schema(self): + add = models.ScenarioVersion(version='euphrates').format() + add['notexist'] = None + return [add] + + @update_url_fixture('versions') + @update_partial('_update', '_success') + def test_updateVersions_succ(self): + update = models.ScenarioVersion(version='euphrates').format() + self.req_d['installers'][0]['versions'] = [update] + return [update] + + @update_url_fixture('versions') + @update_partial('_update', '_conflict') + def test_updateVersions_duplicated(self): + update = models.ScenarioVersion(version='euphrates').format() + return [update, update] + + @update_url_fixture('versions') + @update_partial('_update', '_bad_request') + def test_updateVersions_bad_schema(self): + update = models.ScenarioVersion(version='euphrates').format() + update['not_owner'] = 'Iam' + return [update] + + @update_url_fixture('versions') + @update_partial('_delete', '_success') + def test_deleteVersions(self): + deletes = ['master'] + versions = self.req_d['installers'][0]['versions'] + self.req_d['installers'][0]['versions'] = filter( + lambda f: f['version'] != 'master', + versions) + return deletes + + @update_url_fixture('installers') + @update_partial('_add', '_success') + def test_addInstallers_succ(self): + add = models.ScenarioInstaller(installer='daisy').format() + self.req_d['installers'].append(add) + return [add] + + @update_url_fixture('installers') + @update_partial('_add', '_conflict') + def test_addInstallers_already_exist(self): + add = models.ScenarioInstaller(installer='apex').format() + return [add] + + @update_url_fixture('installers') + @update_partial('_add', '_bad_request') + def test_addInstallers_bad_schema(self): + add = models.ScenarioInstaller(installer='daisy').format() + add['not_exist'] = 'not_exist' + return [add] + + @update_url_fixture('installers') + @update_partial('_update', '_success') + def test_updateInstallers_succ(self): + update = models.ScenarioInstaller(installer='daisy').format() + self.req_d['installers'] = [update] + return [update] + + @update_url_fixture('installers') + @update_partial('_update', '_conflict') + def test_updateInstallers_duplicated(self): + update = models.ScenarioInstaller(installer='daisy').format() + return [update, update] + + @update_url_fixture('installers') + @update_partial('_update', '_bad_request') + def test_updateInstallers_bad_schema(self): + update = models.ScenarioInstaller(installer='daisy').format() + update['not_exist'] = 'not_exist' + return [update] + + @update_url_fixture('installers') + @update_partial('_delete', '_success') + def test_deleteInstallers(self): + deletes = ['apex'] + installers = self.req_d['installers'] + self.req_d['installers'] = filter( + lambda f: f['installer'] != 'apex', + installers) + return deletes + + @update_url_fixture('rename') + @update_partial('_update', '_success') + def test_renameScenario(self): + new_name = 'new_scenario_name' + update = models.ScenarioUpdateRequest(name=new_name) + self.req_d['name'] = new_name + return update + + @update_url_fixture('rename') + @update_partial('_update', '_forbidden') + def test_renameScenario_exist(self): + new_name = self.req_d['name'] + update = models.ScenarioUpdateRequest(name=new_name) + return update + + def _add(self, update_req): + return self.post_direct_url(self.update_url, update_req) + + def _update(self, update_req): + return self.update_direct_url(self.update_url, update_req) + + def _delete(self, update_req): + return self.delete_direct_url(self.update_url, update_req) + + def _success(self, status): + self.assertEqual(status, httplib.OK) + self.get_and_assert(self.req_d['name']) + + def _forbidden(self, status): + self.assertEqual(status, httplib.FORBIDDEN) + + def _bad_request(self, status): + self.assertEqual(status, httplib.BAD_REQUEST) + + def _conflict(self, status): + self.assertEqual(status, httplib.CONFLICT) diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py b/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py new file mode 100644 index 0000000..e4c668e --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_testcase.py @@ -0,0 +1,201 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import copy +import httplib +import unittest + +from opnfv_testapi.common import message +from opnfv_testapi.models import project_models +from opnfv_testapi.models import testcase_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class TestCaseBase(base.TestBase): + def setUp(self): + super(TestCaseBase, self).setUp() + self.req_d = testcase_models.TestcaseCreateRequest('vping_1', + '/cases/vping_1', + 'vping-ssh test') + self.req_e = testcase_models.TestcaseCreateRequest('doctor_1', + '/cases/doctor_1', + 'create doctor') + self.update_d = testcase_models.TestcaseUpdateRequest('vping_1', + 'vping-ssh test', + 'functest') + self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1', + 'create doctor', + 'functest') + self.get_res = testcase_models.Testcase + self.list_res = testcase_models.Testcases + self.update_res = testcase_models.Testcase + self.basePath = '/api/v1/projects/%s/cases' + self.create_project() + + def assert_body(self, case, req=None): + if not req: + req = self.req_d + self.assertEqual(case.name, req.name) + self.assertEqual(case.description, req.description) + self.assertEqual(case.url, req.url) + self.assertIsNotNone(case._id) + self.assertIsNotNone(case.creation_date) + + def assert_update_body(self, old, new, req=None): + if not req: + req = self.req_d + self.assertEqual(new.name, req.name) + self.assertEqual(new.description, req.description) + self.assertEqual(new.url, old.url) + self.assertIsNotNone(new._id) + self.assertIsNotNone(new.creation_date) + + def create_project(self): + req_p = project_models.ProjectCreateRequest('functest', + 'vping-ssh test') + self.create_help('/api/v1/projects', req_p) + self.project = req_p.name + + def create_d(self): + return super(TestCaseBase, self).create_d(self.project) + + def create_e(self): + return super(TestCaseBase, self).create_e(self.project) + + def get(self, case=None): + return super(TestCaseBase, self).get(self.project, case) + + def create(self, req=None, *args): + return super(TestCaseBase, self).create(req, self.project) + + def update(self, new=None, case=None): + return super(TestCaseBase, self).update(new, self.project, case) + + def delete(self, case): + return super(TestCaseBase, self).delete(self.project, case) + + +class TestCaseCreate(TestCaseBase): + @executor.create(httplib.BAD_REQUEST, message.no_body()) + def test_noBody(self): + return None + + @executor.create(httplib.FORBIDDEN, message.not_found_base) + def test_noProject(self): + self.project = 'noProject' + return self.req_d + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_emptyName(self): + req_empty = testcase_models.TestcaseCreateRequest('') + return req_empty + + @executor.create(httplib.BAD_REQUEST, message.missing('name')) + def test_noneName(self): + req_none = testcase_models.TestcaseCreateRequest(None) + return req_none + + @executor.create(httplib.OK, '_assert_success') + def test_success(self): + return self.req_d + + def _assert_success(self, body): + self.assert_create_body(body, self.req_d, self.project) + + @executor.create(httplib.FORBIDDEN, message.exist_base) + def test_alreadyExist(self): + self.create_d() + return self.req_d + + +class TestCaseGet(TestCaseBase): + def setUp(self): + super(TestCaseGet, self).setUp() + self.create_d() + self.create_e() + + @executor.get(httplib.NOT_FOUND, message.not_found_base) + def test_notExist(self): + return 'notExist' + + @executor.get(httplib.OK, 'assert_body') + def test_getOne(self): + return self.req_d.name + + @executor.get(httplib.OK, '_list') + def test_list(self): + return None + + def _list(self, body): + for case in body.testcases: + if self.req_d.name == case.name: + self.assert_body(case) + else: + self.assert_body(case, self.req_e) + + +class TestCaseUpdate(TestCaseBase): + def setUp(self): + super(TestCaseUpdate, self).setUp() + self.create_d() + + @executor.update(httplib.BAD_REQUEST, message.no_body()) + def test_noBody(self): + return None, 'noBody' + + @executor.update(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return self.update_e, 'notFound' + + @executor.update(httplib.FORBIDDEN, message.exist_base) + def test_newNameExist(self): + self.create_e() + return self.update_e, self.req_d.name + + @executor.update(httplib.FORBIDDEN, message.no_update()) + def test_noUpdate(self): + return self.update_d, self.req_d.name + + @executor.update(httplib.OK, '_update_success') + def test_success(self): + return self.update_e, self.req_d.name + + @executor.update(httplib.OK, '_update_success') + def test_with_dollar(self): + update = copy.deepcopy(self.update_d) + update.description = {'2. change': 'dollar change'} + return update, self.req_d.name + + def _update_success(self, request, body): + self.assert_update_body(self.req_d, body, request) + _, new_body = self.get(request.name) + self.assert_update_body(self.req_d, new_body, request) + + +class TestCaseDelete(TestCaseBase): + def setUp(self): + super(TestCaseDelete, self).setUp() + self.create_d() + + @executor.delete(httplib.NOT_FOUND, message.not_found_base) + def test_notFound(self): + return 'notFound' + + @executor.delete(httplib.OK, '_delete_success') + def test_success(self): + return self.req_d.name + + def _delete_success(self, body): + self.assertEqual(body, '') + code, body = self.get(self.req_d.name) + self.assertEqual(code, httplib.NOT_FOUND) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_token.py b/testapi/opnfv_testapi/tests/unit/handlers/test_token.py new file mode 100644 index 0000000..e8b746d --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_token.py @@ -0,0 +1,52 @@ +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +import httplib +import unittest + +from tornado import web + +from opnfv_testapi.common import message +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit import fake_pymongo +from opnfv_testapi.tests.unit.handlers import test_result + + +class TestTokenCreateResult(test_result.TestResultBase): + def get_app(self): + from opnfv_testapi.router import url_mappings + return web.Application( + url_mappings.mappings, + db=fake_pymongo, + debug=True, + auth=True + ) + + def setUp(self): + super(TestTokenCreateResult, self).setUp() + fake_pymongo.tokens.insert({"access_token": "12345"}) + + @executor.create(httplib.FORBIDDEN, message.invalid_token()) + def test_resultCreateTokenInvalid(self): + self.headers['X-Auth-Token'] = '1234' + return self.req_d + + @executor.create(httplib.UNAUTHORIZED, message.unauthorized()) + def test_resultCreateTokenUnauthorized(self): + if 'X-Auth-Token' in self.headers: + self.headers.pop('X-Auth-Token') + return self.req_d + + @executor.create(httplib.OK, '_create_success') + def test_resultCreateTokenSuccess(self): + self.headers['X-Auth-Token'] = '12345' + return self.req_d + + def _create_success(self, body): + self.assertIn('CreateResponse', str(type(body))) + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/handlers/test_version.py b/testapi/opnfv_testapi/tests/unit/handlers/test_version.py new file mode 100644 index 0000000..6ef810f --- /dev/null +++ b/testapi/opnfv_testapi/tests/unit/handlers/test_version.py @@ -0,0 +1,36 @@ +############################################################################## +# Copyright (c) 2016 ZTE Corporation +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import httplib +import unittest + +from opnfv_testapi.models import base_models +from opnfv_testapi.tests.unit import executor +from opnfv_testapi.tests.unit.handlers import test_base as base + + +class TestVersionBase(base.TestBase): + def setUp(self): + super(TestVersionBase, self).setUp() + self.list_res = base_models.Versions + self.basePath = '/versions' + + +class TestVersion(TestVersionBase): + @executor.get(httplib.OK, '_get_success') + def test_success(self): + return None + + def _get_success(self, body): + self.assertEqual(len(body.versions), 1) + self.assertEqual(body.versions[0].version, 'v1.0') + self.assertEqual(body.versions[0].description, 'basics') + + +if __name__ == '__main__': + unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/__init__.py b/testapi/opnfv_testapi/tests/unit/resources/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json b/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json deleted file mode 100644 index 1878022..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/scenario-c1.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "name": "nosdn-nofeature-ha", - "installers": - [ - { - "installer": "apex", - "versions": - [ - { - "owner": "Luke", - "version": "master", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": - [ - { - "date": "2017-01-08 22:46:44", - "score": "12/14" - } - - ], - "trust_indicators": [] - }, - { - "project": "yardstick", - "customs": [], - "scores": [], - "trust_indicators": [] - } - ] - } - ] - } - ] -} diff --git a/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json b/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json deleted file mode 100644 index 980051c..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/scenario-c2.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "name": "odl_2-nofeature-ha", - "installers": - [ - { - "installer": "fuel", - "versions": - [ - { - "owner": "Lucky", - "version": "danube", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": [], - "trust_indicators": [ - { - "date": "2017-01-18 22:46:44", - "status": "silver" - } - - ] - }, - { - "project": "yardstick", - "customs": ["suite-a"], - "scores": [ - { - "date": "2017-01-08 22:46:44", - "score": "0/1" - } - ], - "trust_indicators": [ - { - "date": "2017-01-18 22:46:44", - "status": "gold" - } - ] - } - ] - }, - { - "owner": "Luke", - "version": "colorado", - "projects": - [ - { - "project": "functest", - "customs": [ "healthcheck", "vping_ssh"], - "scores": - [ - { - "date": "2017-01-09 22:46:44", - "score": "11/14" - } - - ], - "trust_indicators": [] - }, - { - "project": "yardstick", - "customs": [], - "scores": [], - "trust_indicators": [] - } - ] - } - ] - } - ] -} diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_base.py b/testapi/opnfv_testapi/tests/unit/resources/test_base.py deleted file mode 100644 index 89cd7e8..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/test_base.py +++ /dev/null @@ -1,204 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -from datetime import datetime -import json -from os import path - -from bson.objectid import ObjectId -import mock -from tornado import testing - -from opnfv_testapi.resources import models -from opnfv_testapi.resources import pod_models -from opnfv_testapi.tests.unit import fake_pymongo - - -class TestBase(testing.AsyncHTTPTestCase): - headers = {'Content-Type': 'application/json; charset=UTF-8'} - - def setUp(self): - self._patch_server() - self.basePath = '' - self.create_res = models.CreateResponse - self.get_res = None - self.list_res = None - self.update_res = None - self.pod_d = pod_models.Pod(name='zte-pod1', - mode='virtual', - details='zte pod 1', - role='community-ci', - _id=str(ObjectId()), - owner='ValidUser', - create_date=str(datetime.now())) - self.pod_e = pod_models.Pod(name='zte-pod2', - mode='metal', - details='zte pod 2', - role='production-ci', - _id=str(ObjectId()), - owner='ValidUser', - create_date=str(datetime.now())) - self.req_d = None - self.req_e = None - self.addCleanup(self._clear) - super(TestBase, self).setUp() - fake_pymongo.users.insert({"user": "ValidUser", - 'email': 'validuser@lf.com', - 'fullname': 'Valid User', - 'groups': [ - 'opnfv-testapi-users', - 'opnfv-gerrit-functest-submitters', - 'opnfv-gerrit-qtip-contributors'] - }) - - def tearDown(self): - self.db_patcher.stop() - self.config_patcher.stop() - - def _patch_server(self): - import argparse - config = path.join(path.dirname(__file__), - '../../../../etc/config.ini') - self.config_patcher = mock.patch( - 'argparse.ArgumentParser.parse_known_args', - return_value=(argparse.Namespace(config_file=config), None)) - self.db_patcher = mock.patch('opnfv_testapi.db.api.DB', - fake_pymongo) - self.config_patcher.start() - self.db_patcher.start() - - def get_app(self): - from opnfv_testapi.cmd import server - return server.make_app() - - def create_d(self, *args): - return self.create(self.req_d, *args) - - def create_e(self, *args): - return self.create(self.req_e, *args) - - def create(self, req=None, *args): - return self.create_help(self.basePath, req, *args) - - def create_help(self, uri, req, *args): - return self.post_direct_url(self._update_uri(uri, *args), req) - - def post_direct_url(self, url, req): - if req and not isinstance(req, str) and hasattr(req, 'format'): - req = req.format() - res = self.fetch(url, - method='POST', - body=json.dumps(req), - headers=self.headers) - - return self._get_return(res, self.create_res) - - def get(self, *args): - res = self.fetch(self._get_uri(*args), - method='GET', - headers=self.headers) - - def inner(): - new_args, num = self._get_valid_args(*args) - return self.get_res \ - if num != self._need_arg_num(self.basePath) else self.list_res - return self._get_return(res, inner()) - - def query(self, query): - res = self.fetch(self._get_query_uri(query), - method='GET', - headers=self.headers) - return self._get_return(res, self.list_res) - - def update_direct_url(self, url, new=None): - if new and hasattr(new, 'format'): - new = new.format() - res = self.fetch(url, - method='PUT', - body=json.dumps(new), - headers=self.headers) - return self._get_return(res, self.update_res) - - def update(self, new=None, *args): - return self.update_direct_url(self._get_uri(*args), new) - - def delete_direct_url(self, url, body): - if body: - res = self.fetch(url, - method='DELETE', - body=json.dumps(body), - headers=self.headers, - allow_nonstandard_methods=True) - else: - res = self.fetch(url, - method='DELETE', - headers=self.headers) - - return res.code, res.body - - def delete(self, *args): - return self.delete_direct_url(self._get_uri(*args), None) - - @staticmethod - def _get_valid_args(*args): - new_args = tuple(['%s' % arg for arg in args if arg is not None]) - return new_args, len(new_args) - - def _need_arg_num(self, uri): - return uri.count('%s') - - def _get_query_uri(self, query): - return self.basePath + '?' + query if query else self.basePath - - def _get_uri(self, *args): - return self._update_uri(self.basePath, *args) - - def _update_uri(self, uri, *args): - r_uri = uri - new_args, num = self._get_valid_args(*args) - if num != self._need_arg_num(uri): - r_uri += '/%s' - - return r_uri % tuple(['%s' % arg for arg in new_args]) - - def _get_return(self, res, cls): - code = res.code - body = res.body - if body: - return code, self._get_return_body(code, body, cls) - else: - return code, None - - @staticmethod - def _get_return_body(code, body, cls): - return cls.from_dict(json.loads(body)) if code < 300 and cls else body - - def assert_href(self, body): - self.assertIn(self.basePath, body.href) - - def assert_create_body(self, body, req=None, *args): - import inspect - if not req: - req = self.req_d - resource_name = '' - if inspect.isclass(req): - resource_name = req.name - elif isinstance(req, dict): - resource_name = req['name'] - elif isinstance(req, str): - resource_name = json.loads(req)['name'] - new_args = args + tuple([resource_name]) - self.assertIn(self._get_uri(*new_args), body.href) - - @staticmethod - def _clear(): - fake_pymongo.pods.clear() - fake_pymongo.projects.clear() - fake_pymongo.testcases.clear() - fake_pymongo.results.clear() - fake_pymongo.scenarios.clear() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py b/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py deleted file mode 100644 index 1ebc96f..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py +++ /dev/null @@ -1,123 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import unittest - -from tornado import gen -from tornado import testing -from tornado import web - -from opnfv_testapi.tests.unit import fake_pymongo - - -class MyTest(testing.AsyncHTTPTestCase): - def setUp(self): - super(MyTest, self).setUp() - self.db = fake_pymongo - self.addCleanup(self._clear) - self.io_loop.run_sync(self.fixture_setup) - - def get_app(self): - return web.Application() - - @gen.coroutine - def fixture_setup(self): - self.test1 = {'_id': '1', 'name': 'test1'} - self.test2 = {'name': 'test2'} - yield self.db.pods.insert({'_id': '1', 'name': 'test1'}) - yield self.db.pods.insert({'name': 'test2'}) - - @testing.gen_test - def test_find_one(self): - user = yield self.db.pods.find_one({'name': 'test1'}) - self.assertEqual(user, self.test1) - self.db.pods.remove() - - @testing.gen_test - def test_find(self): - cursor = self.db.pods.find() - names = [] - while (yield cursor.fetch_next): - ob = cursor.next_object() - names.append(ob.get('name')) - self.assertItemsEqual(names, ['test1', 'test2']) - - @testing.gen_test - def test_update(self): - yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'}) - user = yield self.db.pods.find_one({'_id': '1'}) - self.assertEqual(user.get('name', None), 'new_test1') - - def test_update_dot_error(self): - self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, - 'key 1. name must not contain .') - - def test_update_dot_no_error(self): - self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}}, - None, - check_keys=False) - - def test_update_dollar_error(self): - self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, - 'key $name must not start with $') - - def test_update_dollar_no_error(self): - self._update_assert({'_id': '1', 'name': {'$name': 'test1'}}, - None, - check_keys=False) - - @testing.gen_test - def test_remove(self): - yield self.db.pods.remove({'_id': '1'}) - user = yield self.db.pods.find_one({'_id': '1'}) - self.assertIsNone(user) - - def test_insert_dot_error(self): - self._insert_assert({'_id': '1', '2. name': 'test1'}, - 'key 2. name must not contain .') - - def test_insert_dot_no_error(self): - self._insert_assert({'_id': '1', '2. name': 'test1'}, - None, - check_keys=False) - - def test_insert_dollar_error(self): - self._insert_assert({'_id': '1', '$name': 'test1'}, - 'key $name must not start with $') - - def test_insert_dollar_no_error(self): - self._insert_assert({'_id': '1', '$name': 'test1'}, - None, - check_keys=False) - - def _clear(self): - self.db.pods.clear() - - def _update_assert(self, docs, error=None, **kwargs): - self._db_assert('update', error, {'_id': '1'}, docs, **kwargs) - - def _insert_assert(self, docs, error=None, **kwargs): - self._db_assert('insert', error, docs, **kwargs) - - @testing.gen_test - def _db_assert(self, method, error, *args, **kwargs): - name_error = None - try: - yield self._eval_pods_db(method, *args, **kwargs) - except NameError as err: - name_error = err.args[0] - finally: - self.assertEqual(name_error, error) - - def _eval_pods_db(self, method, *args, **kwargs): - table_obj = vars(self.db)['pods'] - return table_obj.__getattribute__(method)(*args, **kwargs) - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_pod.py b/testapi/opnfv_testapi/tests/unit/resources/test_pod.py deleted file mode 100644 index 5d9da3a..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/test_pod.py +++ /dev/null @@ -1,118 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import pod_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import fake_pymongo -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestPodBase(base.TestBase): - def setUp(self): - super(TestPodBase, self).setUp() - self.get_res = pod_models.Pod - self.list_res = pod_models.Pods - self.basePath = '/api/v1/pods' - self.req_d = pod_models.PodCreateRequest(name=self.pod_d.name, - mode=self.pod_d.mode, - details=self.pod_d.details, - role=self.pod_d.role) - self.req_e = pod_models.PodCreateRequest(name=self.pod_e.name, - mode=self.pod_e.mode, - details=self.pod_e.details, - role=self.pod_e.role) - - def assert_get_body(self, pod, req=None): - if not req: - req = self.req_d - self.assertEqual(pod.owner, 'ValidUser') - self.assertEqual(pod.name, req.name) - self.assertEqual(pod.mode, req.mode) - self.assertEqual(pod.details, req.details) - self.assertEqual(pod.role, req.role) - self.assertIsNotNone(pod.creation_date) - self.assertIsNotNone(pod._id) - - -class TestPodCreate(TestPodBase): - @executor.create(httplib.BAD_REQUEST, message.not_login()) - def test_notlogin(self): - return self.req_d - - @executor.mock_invalid_lfid() - @executor.create(httplib.BAD_REQUEST, message.not_lfid()) - def test_invalidLfid(self): - return self.req_d - - @executor.mock_valid_lfid() - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None - - @executor.mock_valid_lfid() - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - return pod_models.PodCreateRequest('') - - @executor.mock_valid_lfid() - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - return pod_models.PodCreateRequest(None) - - @executor.mock_valid_lfid() - @executor.create(httplib.OK, 'assert_create_body') - def test_success(self): - return self.req_d - - @executor.mock_valid_lfid() - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - fake_pymongo.pods.insert(self.pod_d.format()) - return self.req_d - - @executor.mock_valid_lfid() - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExistCaseInsensitive(self): - fake_pymongo.pods.insert(self.pod_d.format()) - self.req_d.name = self.req_d.name.upper() - return self.req_d - - -class TestPodGet(TestPodBase): - def setUp(self): - super(TestPodGet, self).setUp() - fake_pymongo.pods.insert(self.pod_d.format()) - fake_pymongo.pods.insert(self.pod_e.format()) - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_get_body') - def test_getOne(self): - return self.pod_d.name - - @executor.get(httplib.OK, '_assert_list') - def test_list(self): - return None - - def _assert_list(self, body): - self.assertEqual(len(body.pods), 2) - for pod in body.pods: - if self.pod_d.name == pod.name: - self.assert_get_body(pod) - else: - self.assert_get_body(pod, self.pod_e) - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_project.py b/testapi/opnfv_testapi/tests/unit/resources/test_project.py deleted file mode 100644 index 0622ba8..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/test_project.py +++ /dev/null @@ -1,137 +0,0 @@ -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import project_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestProjectBase(base.TestBase): - def setUp(self): - super(TestProjectBase, self).setUp() - self.req_d = project_models.ProjectCreateRequest('vping', - 'vping-ssh test') - self.req_e = project_models.ProjectCreateRequest('doctor', - 'doctor test') - self.get_res = project_models.Project - self.list_res = project_models.Projects - self.update_res = project_models.Project - self.basePath = '/api/v1/projects' - - def assert_body(self, project, req=None): - if not req: - req = self.req_d - self.assertEqual(project.name, req.name) - self.assertEqual(project.description, req.description) - self.assertIsNotNone(project._id) - self.assertIsNotNone(project.creation_date) - - -class TestProjectCreate(TestProjectBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - return project_models.ProjectCreateRequest('') - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - return project_models.ProjectCreateRequest(None) - - @executor.create(httplib.OK, 'assert_create_body') - def test_success(self): - return self.req_d - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestProjectGet(TestProjectBase): - def setUp(self): - super(TestProjectGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_assert_list') - def test_list(self): - return None - - def _assert_list(self, body): - for project in body.projects: - if self.req_d.name == project.name: - self.assert_body(project) - else: - self.assert_body(project, self.req_e) - - -class TestProjectUpdate(TestProjectBase): - def setUp(self): - super(TestProjectUpdate, self).setUp() - _, d_body = self.create_d() - _, get_res = self.get(self.req_d.name) - self.index_d = get_res._id - self.create_e() - - @executor.update(httplib.BAD_REQUEST, message.no_body()) - def test_withoutBody(self): - return None, 'noBody' - - @executor.update(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return self.req_e, 'notFound' - - @executor.update(httplib.FORBIDDEN, message.exist_base) - def test_newNameExist(self): - return self.req_e, self.req_d.name - - @executor.update(httplib.FORBIDDEN, message.no_update()) - def test_noUpdate(self): - return self.req_d, self.req_d.name - - @executor.update(httplib.OK, '_assert_update') - def test_success(self): - req = project_models.ProjectUpdateRequest('newName', 'new description') - return req, self.req_d.name - - def _assert_update(self, req, body): - self.assertEqual(self.index_d, body._id) - self.assert_body(body, req) - _, new_body = self.get(req.name) - self.assertEqual(self.index_d, new_body._id) - self.assert_body(new_body, req) - - -class TestProjectDelete(TestProjectBase): - def setUp(self): - super(TestProjectDelete, self).setUp() - self.create_d() - - @executor.delete(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return 'notFound' - - @executor.delete(httplib.OK, '_assert_delete') - def test_success(self): - return self.req_d.name - - def _assert_delete(self, body): - self.assertEqual(body, '') - code, body = self.get(self.req_d.name) - self.assertEqual(code, httplib.NOT_FOUND) - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_result.py b/testapi/opnfv_testapi/tests/unit/resources/test_result.py deleted file mode 100644 index 6c1a07a..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/test_result.py +++ /dev/null @@ -1,413 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import copy -from datetime import datetime -from datetime import timedelta -import httplib -import json -import urllib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import project_models -from opnfv_testapi.resources import result_models -from opnfv_testapi.resources import testcase_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import fake_pymongo -from opnfv_testapi.tests.unit.resources import test_base as base - - -class Details(object): - def __init__(self, timestart=None, duration=None, status=None): - self.timestart = timestart - self.duration = duration - self.status = status - self.items = [{'item1': 1}, {'item2': 2}] - - def format(self): - return { - "timestart": self.timestart, - "duration": self.duration, - "status": self.status, - 'items': [{'item1': 1}, {'item2': 2}] - } - - @staticmethod - def from_dict(a_dict): - - if a_dict is None: - return None - - t = Details() - t.timestart = a_dict.get('timestart') - t.duration = a_dict.get('duration') - t.status = a_dict.get('status') - t.items = a_dict.get('items') - return t - - -class TestResultBase(base.TestBase): - def setUp(self): - super(TestResultBase, self).setUp() - self.pod = self.pod_d.name - self.project = 'functest' - self.case = 'vPing' - self.installer = 'fuel' - self.version = 'C' - self.build_tag = 'v3.0' - self.scenario = 'odl-l2' - self.criteria = 'PASS' - self.trust_indicator = result_models.TI(0.7) - self.start_date = str(datetime.now()) - self.stop_date = str(datetime.now() + timedelta(minutes=1)) - self.update_date = str(datetime.now() + timedelta(days=1)) - self.update_step = -0.05 - self.details = Details(timestart='0', duration='9s', status='OK') - self.req_d = result_models.ResultCreateRequest( - pod_name=self.pod, - project_name=self.project, - case_name=self.case, - installer=self.installer, - version=self.version, - start_date=self.start_date, - stop_date=self.stop_date, - details=self.details.format(), - build_tag=self.build_tag, - scenario=self.scenario, - criteria=self.criteria, - trust_indicator=self.trust_indicator) - self.get_res = result_models.TestResult - self.list_res = result_models.TestResults - self.update_res = result_models.TestResult - self.basePath = '/api/v1/results' - self.req_project = project_models.ProjectCreateRequest( - self.project, - 'vping test') - self.req_testcase = testcase_models.TestcaseCreateRequest( - self.case, - '/cases/vping', - 'vping-ssh test') - fake_pymongo.pods.insert(self.pod_d.format()) - self.create_help('/api/v1/projects', self.req_project) - self.create_help('/api/v1/projects/%s/cases', - self.req_testcase, - self.project) - - def assert_res(self, result, req=None): - if req is None: - req = self.req_d - self.assertEqual(result.pod_name, req.pod_name) - self.assertEqual(result.project_name, req.project_name) - self.assertEqual(result.case_name, req.case_name) - self.assertEqual(result.installer, req.installer) - self.assertEqual(result.version, req.version) - details_req = Details.from_dict(req.details) - details_res = Details.from_dict(result.details) - self.assertEqual(details_res.duration, details_req.duration) - self.assertEqual(details_res.timestart, details_req.timestart) - self.assertEqual(details_res.status, details_req.status) - self.assertEqual(details_res.items, details_req.items) - self.assertEqual(result.build_tag, req.build_tag) - self.assertEqual(result.scenario, req.scenario) - self.assertEqual(result.criteria, req.criteria) - self.assertEqual(result.start_date, req.start_date) - self.assertEqual(result.stop_date, req.stop_date) - self.assertIsNotNone(result._id) - ti = result.trust_indicator - self.assertEqual(ti.current, req.trust_indicator.current) - if ti.histories: - history = ti.histories[0] - self.assertEqual(history.date, self.update_date) - self.assertEqual(history.step, self.update_step) - - def _create_d(self): - _, res = self.create_d() - return res.href.split('/')[-1] - - def upload(self, req): - if req and not isinstance(req, str) and hasattr(req, 'format'): - req = req.format() - res = self.fetch(self.basePath + '/upload', - method='POST', - body=json.dumps(req), - headers=self.headers) - - return self._get_return(res, self.create_res) - - -class TestResultUpload(TestResultBase): - @executor.upload(httplib.BAD_REQUEST, message.key_error('file')) - def test_filenotfind(self): - return None - - -class TestResultCreate(TestResultBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_nobody(self): - return None - - @executor.create(httplib.BAD_REQUEST, message.missing('pod_name')) - def test_podNotProvided(self): - req = self.req_d - req.pod_name = None - return req - - @executor.create(httplib.BAD_REQUEST, message.missing('project_name')) - def test_projectNotProvided(self): - req = self.req_d - req.project_name = None - return req - - @executor.create(httplib.BAD_REQUEST, message.missing('case_name')) - def test_testcaseNotProvided(self): - req = self.req_d - req.case_name = None - return req - - @executor.create(httplib.BAD_REQUEST, - message.invalid_value('criteria', ['PASS', 'FAIL'])) - def test_invalid_criteria(self): - req = self.req_d - req.criteria = 'invalid' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noPod(self): - req = self.req_d - req.pod_name = 'notExistPod' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noProject(self): - req = self.req_d - req.project_name = 'notExistProject' - return req - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noTestcase(self): - req = self.req_d - req.case_name = 'notExistTestcase' - return req - - @executor.create(httplib.OK, 'assert_href') - def test_success(self): - return self.req_d - - @executor.create(httplib.OK, 'assert_href') - def test_key_with_doc(self): - req = copy.deepcopy(self.req_d) - req.details = {'1.name': 'dot_name'} - return req - - @executor.create(httplib.OK, '_assert_no_ti') - def test_no_ti(self): - req = result_models.ResultCreateRequest(pod_name=self.pod, - project_name=self.project, - case_name=self.case, - installer=self.installer, - version=self.version, - start_date=self.start_date, - stop_date=self.stop_date, - details=self.details.format(), - build_tag=self.build_tag, - scenario=self.scenario, - criteria=self.criteria) - self.actual_req = req - return req - - def _assert_no_ti(self, body): - _id = body.href.split('/')[-1] - code, body = self.get(_id) - self.assert_res(body, self.actual_req) - - -class TestResultGet(TestResultBase): - def setUp(self): - super(TestResultGet, self).setUp() - self.req_10d_before = self._create_changed_date(days=-10) - self.req_d_id = self._create_d() - self.req_10d_later = self._create_changed_date(days=10) - - @executor.get(httplib.OK, 'assert_res') - def test_getOne(self): - return self.req_d_id - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryPod(self): - return self._set_query('pod') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryProject(self): - return self._set_query('project') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryTestcase(self): - return self._set_query('case') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryVersion(self): - return self._set_query('version') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryInstaller(self): - return self._set_query('installer') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryBuildTag(self): - return self._set_query('build_tag') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryScenario(self): - return self._set_query('scenario') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryTrustIndicator(self): - return self._set_query('trust_indicator') - - @executor.query(httplib.OK, '_query_success', 3) - def test_queryCriteria(self): - return self._set_query('criteria') - - @executor.query(httplib.BAD_REQUEST, message.must_int('period')) - def test_queryPeriodNotInt(self): - return self._set_query(period='a') - - @executor.query(httplib.OK, '_query_period_one', 1) - def test_queryPeriodSuccess(self): - return self._set_query(period=5) - - @executor.query(httplib.BAD_REQUEST, message.must_int('last')) - def test_queryLastNotInt(self): - return self._set_query(last='a') - - @executor.query(httplib.OK, '_query_last_one', 1) - def test_queryLast(self): - return self._set_query(last=1) - - @executor.query(httplib.OK, '_query_success', 4) - def test_queryPublic(self): - self._create_public_data() - return self._set_query() - - @executor.query(httplib.OK, '_query_success', 1) - def test_queryPrivate(self): - self._create_private_data() - return self._set_query(public='false') - - @executor.query(httplib.OK, '_query_period_one', 1) - def test_combination(self): - return self._set_query('pod', - 'project', - 'case', - 'version', - 'installer', - 'build_tag', - 'scenario', - 'trust_indicator', - 'criteria', - period=5) - - @executor.query(httplib.OK, '_query_success', 0) - def test_notFound(self): - return self._set_query('project', - 'case', - 'version', - 'installer', - 'build_tag', - 'scenario', - 'trust_indicator', - 'criteria', - pod='notExistPod', - period=1) - - @executor.query(httplib.OK, '_query_success', 1) - def test_filterErrorStartdate(self): - self._create_error_start_date(None) - self._create_error_start_date('None') - self._create_error_start_date('null') - self._create_error_start_date('') - return self._set_query(period=5) - - def _query_success(self, body, number): - self.assertEqual(number, len(body.results)) - - def _query_last_one(self, body, number): - self.assertEqual(number, len(body.results)) - self.assert_res(body.results[0], self.req_10d_later) - - def _query_period_one(self, body, number): - self.assertEqual(number, len(body.results)) - self.assert_res(body.results[0], self.req_d) - - def _create_error_start_date(self, start_date): - req = copy.deepcopy(self.req_d) - req.start_date = start_date - self.create(req) - return req - - def _create_changed_date(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.start_date = datetime.now() + timedelta(**kwargs) - req.stop_date = str(req.start_date + timedelta(minutes=10)) - req.start_date = str(req.start_date) - self.create(req) - return req - - def _create_public_data(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.public = 'true' - self.create(req) - return req - - def _create_private_data(self, **kwargs): - req = copy.deepcopy(self.req_d) - req.public = 'false' - self.create(req) - return req - - def _set_query(self, *args, **kwargs): - def get_value(arg): - return self.__getattribute__(arg) \ - if arg != 'trust_indicator' else self.trust_indicator.current - query = [] - for arg in args: - query.append((arg, get_value(arg))) - for k, v in kwargs.iteritems(): - query.append((k, v)) - return urllib.urlencode(query) - - -class TestResultUpdate(TestResultBase): - def setUp(self): - super(TestResultUpdate, self).setUp() - self.req_d_id = self._create_d() - - @executor.update(httplib.OK, '_assert_update_ti') - def test_success(self): - new_ti = copy.deepcopy(self.trust_indicator) - new_ti.current += self.update_step - new_ti.histories.append( - result_models.TIHistory(self.update_date, self.update_step)) - new_data = copy.deepcopy(self.req_d) - new_data.trust_indicator = new_ti - update = result_models.ResultUpdateRequest(trust_indicator=new_ti) - self.update_req = new_data - return update, self.req_d_id - - def _assert_update_ti(self, request, body): - ti = body.trust_indicator - self.assertEqual(ti.current, request.trust_indicator.current) - if ti.histories: - history = ti.histories[0] - self.assertEqual(history.date, self.update_date) - self.assertEqual(history.step, self.update_step) - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py b/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py deleted file mode 100644 index 1367fc6..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/test_scenario.py +++ /dev/null @@ -1,449 +0,0 @@ -import functools -import httplib -import json -import os - -from datetime import datetime - -from opnfv_testapi.common import message -import opnfv_testapi.resources.scenario_models as models -from opnfv_testapi.tests.unit.resources import test_base as base - - -def _none_default(check, default): - return check if check else default - - -class TestScenarioBase(base.TestBase): - def setUp(self): - super(TestScenarioBase, self).setUp() - self.get_res = models.Scenario - self.list_res = models.Scenarios - self.basePath = '/api/v1/scenarios' - self.req_d = self._load_request('scenario-c1.json') - self.req_2 = self._load_request('scenario-c2.json') - - def tearDown(self): - pass - - def assert_body(self, project, req=None): - pass - - @staticmethod - def _load_request(f_req): - abs_file = os.path.join(os.path.dirname(__file__), f_req) - with open(abs_file, 'r') as f: - loader = json.load(f) - f.close() - return loader - - def create_return_name(self, req): - _, res = self.create(req) - return res.href.split('/')[-1] - - def assert_res(self, code, scenario, req=None): - self.assertEqual(code, httplib.OK) - if req is None: - req = self.req_d - self.assertIsNotNone(scenario._id) - self.assertIsNotNone(scenario.creation_date) - self.assertEqual(scenario, models.Scenario.from_dict(req)) - - @staticmethod - def set_query(*args): - uri = '' - for arg in args: - uri += arg + '&' - return uri[0: -1] - - def get_and_assert(self, name): - code, body = self.get(name) - self.assert_res(code, body, self.req_d) - - -class TestScenarioCreate(TestScenarioBase): - def test_withoutBody(self): - (code, body) = self.create() - self.assertEqual(code, httplib.BAD_REQUEST) - - def test_emptyName(self): - req_empty = models.ScenarioCreateRequest('') - (code, body) = self.create(req_empty) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_noneName(self): - req_none = models.ScenarioCreateRequest(None) - (code, body) = self.create(req_none) - self.assertEqual(code, httplib.BAD_REQUEST) - self.assertIn(message.missing('name'), body) - - def test_success(self): - (code, body) = self.create_d() - self.assertEqual(code, httplib.OK) - self.assert_create_body(body) - - def test_alreadyExist(self): - self.create_d() - (code, body) = self.create_d() - self.assertEqual(code, httplib.FORBIDDEN) - self.assertIn(message.exist_base, body) - - -class TestScenarioGet(TestScenarioBase): - def setUp(self): - super(TestScenarioGet, self).setUp() - self.scenario_1 = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - - def test_getByName(self): - self.get_and_assert(self.scenario_1) - - def test_getAll(self): - self._query_and_assert(query=None, reqs=[self.req_d, self.req_2]) - - def test_queryName(self): - query = self.set_query('name=nosdn-nofeature-ha') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryInstaller(self): - query = self.set_query('installer=apex') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryVersion(self): - query = self.set_query('version=master') - self._query_and_assert(query, reqs=[self.req_d]) - - def test_queryProject(self): - query = self.set_query('project=functest') - self._query_and_assert(query, reqs=[self.req_d, self.req_2]) - - # close due to random fail, open again after solve it in another patch - # def test_queryCombination(self): - # query = self._set_query('name=nosdn-nofeature-ha', - # 'installer=apex', - # 'version=master', - # 'project=functest') - # - # self._query_and_assert(query, reqs=[self.req_d]) - - def _query_and_assert(self, query, found=True, reqs=None): - code, body = self.query(query) - if not found: - self.assertEqual(code, httplib.OK) - self.assertEqual(0, len(body.scenarios)) - else: - self.assertEqual(len(reqs), len(body.scenarios)) - for req in reqs: - for scenario in body.scenarios: - if req['name'] == scenario.name: - self.assert_res(code, scenario, req) - - -class TestScenarioDelete(TestScenarioBase): - def test_notFound(self): - code, body = self.delete('notFound') - self.assertEqual(code, httplib.NOT_FOUND) - - def test_success(self): - scenario = self.create_return_name(self.req_d) - code, _ = self.delete(scenario) - self.assertEqual(code, httplib.OK) - code, _ = self.get(scenario) - self.assertEqual(code, httplib.NOT_FOUND) - - -class TestScenarioUpdate(TestScenarioBase): - def setUp(self): - super(TestScenarioUpdate, self).setUp() - self.scenario = self.create_return_name(self.req_d) - self.scenario_2 = self.create_return_name(self.req_2) - self.update_url = '' - self.scenario_url = '/api/v1/scenarios/{}'.format(self.scenario) - self.installer = self.req_d['installers'][0]['installer'] - self.version = self.req_d['installers'][0]['versions'][0]['version'] - self.locate_project = 'installer={}&version={}&project={}'.format( - self.installer, - self.version, - 'functest') - - def update_url_fixture(item): - def _update_url_fixture(xstep): - def wrapper(self, *args, **kwargs): - self.update_url = '{}/{}'.format(self.scenario_url, item) - locator = None - if item in ['projects', 'owner']: - locator = 'installer={}&version={}'.format( - self.installer, - self.version) - elif item in ['versions']: - locator = 'installer={}'.format( - self.installer) - elif item in ['rename']: - self.update_url = self.scenario_url - - if locator: - self.update_url = '{}?{}'.format(self.update_url, locator) - - xstep(self, *args, **kwargs) - return wrapper - return _update_url_fixture - - def update_partial(operate, expected): - def _update_partial(set_update): - @functools.wraps(set_update) - def wrapper(self): - update = set_update(self) - code, body = getattr(self, operate)(update) - getattr(self, expected)(code) - return wrapper - return _update_partial - - @update_partial('_add', '_success') - def test_addScore(self): - add = models.ScenarioScore(date=str(datetime.now()), score='11/12') - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['scores'].append(add.format()) - self.update_url = '{}/scores?{}'.format(self.scenario_url, - self.locate_project) - - return add - - @update_partial('_add', '_success') - def test_addTrustIndicator(self): - add = models.ScenarioTI(date=str(datetime.now()), status='gold') - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['trust_indicators'].append(add.format()) - self.update_url = '{}/trust_indicators?{}'.format(self.scenario_url, - self.locate_project) - - return add - - @update_partial('_add', '_success') - def test_addCustoms(self): - adds = ['odl', 'parser', 'vping_ssh'] - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = list(set(functest['customs'] + adds)) - self.update_url = '{}/customs?{}'.format(self.scenario_url, - self.locate_project) - return adds - - @update_partial('_update', '_success') - def test_updateCustoms(self): - updates = ['odl', 'parser', 'vping_ssh'] - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = updates - self.update_url = '{}/customs?{}'.format(self.scenario_url, - self.locate_project) - - return updates - - @update_partial('_delete', '_success') - def test_deleteCustoms(self): - deletes = ['vping_ssh'] - projects = self.req_d['installers'][0]['versions'][0]['projects'] - functest = filter(lambda f: f['project'] == 'functest', projects)[0] - functest['customs'] = ['healthcheck'] - self.update_url = '{}/customs?{}'.format(self.scenario_url, - self.locate_project) - - return deletes - - @update_url_fixture('projects') - @update_partial('_add', '_success') - def test_addProjects_succ(self): - add = models.ScenarioProject(project='qtip').format() - self.req_d['installers'][0]['versions'][0]['projects'].append(add) - return [add] - - @update_url_fixture('projects') - @update_partial('_add', '_conflict') - def test_addProjects_already_exist(self): - add = models.ScenarioProject(project='functest').format() - return [add] - - @update_url_fixture('projects') - @update_partial('_add', '_bad_request') - def test_addProjects_bad_schema(self): - add = models.ScenarioProject(project='functest').format() - add['score'] = None - return [add] - - @update_url_fixture('projects') - @update_partial('_update', '_success') - def test_updateProjects_succ(self): - update = models.ScenarioProject(project='qtip').format() - self.req_d['installers'][0]['versions'][0]['projects'] = [update] - return [update] - - @update_url_fixture('projects') - @update_partial('_update', '_conflict') - def test_updateProjects_duplicated(self): - update = models.ScenarioProject(project='qtip').format() - return [update, update] - - @update_url_fixture('projects') - @update_partial('_update', '_bad_request') - def test_updateProjects_bad_schema(self): - update = models.ScenarioProject(project='functest').format() - update['score'] = None - return [update] - - @update_url_fixture('projects') - @update_partial('_delete', '_success') - def test_deleteProjects(self): - deletes = ['functest'] - projects = self.req_d['installers'][0]['versions'][0]['projects'] - self.req_d['installers'][0]['versions'][0]['projects'] = filter( - lambda f: f['project'] != 'functest', - projects) - return deletes - - @update_url_fixture('owner') - @update_partial('_update', '_success') - def test_changeOwner(self): - new_owner = 'new_owner' - update = models.ScenarioChangeOwnerRequest(new_owner).format() - self.req_d['installers'][0]['versions'][0]['owner'] = new_owner - return update - - @update_url_fixture('versions') - @update_partial('_add', '_success') - def test_addVersions_succ(self): - add = models.ScenarioVersion(version='Euphrates').format() - self.req_d['installers'][0]['versions'].append(add) - return [add] - - @update_url_fixture('versions') - @update_partial('_add', '_conflict') - def test_addVersions_already_exist(self): - add = models.ScenarioVersion(version='master').format() - return [add] - - @update_url_fixture('versions') - @update_partial('_add', '_bad_request') - def test_addVersions_bad_schema(self): - add = models.ScenarioVersion(version='euphrates').format() - add['notexist'] = None - return [add] - - @update_url_fixture('versions') - @update_partial('_update', '_success') - def test_updateVersions_succ(self): - update = models.ScenarioVersion(version='euphrates').format() - self.req_d['installers'][0]['versions'] = [update] - return [update] - - @update_url_fixture('versions') - @update_partial('_update', '_conflict') - def test_updateVersions_duplicated(self): - update = models.ScenarioVersion(version='euphrates').format() - return [update, update] - - @update_url_fixture('versions') - @update_partial('_update', '_bad_request') - def test_updateVersions_bad_schema(self): - update = models.ScenarioVersion(version='euphrates').format() - update['not_owner'] = 'Iam' - return [update] - - @update_url_fixture('versions') - @update_partial('_delete', '_success') - def test_deleteVersions(self): - deletes = ['master'] - versions = self.req_d['installers'][0]['versions'] - self.req_d['installers'][0]['versions'] = filter( - lambda f: f['version'] != 'master', - versions) - return deletes - - @update_url_fixture('installers') - @update_partial('_add', '_success') - def test_addInstallers_succ(self): - add = models.ScenarioInstaller(installer='daisy').format() - self.req_d['installers'].append(add) - return [add] - - @update_url_fixture('installers') - @update_partial('_add', '_conflict') - def test_addInstallers_already_exist(self): - add = models.ScenarioInstaller(installer='apex').format() - return [add] - - @update_url_fixture('installers') - @update_partial('_add', '_bad_request') - def test_addInstallers_bad_schema(self): - add = models.ScenarioInstaller(installer='daisy').format() - add['not_exist'] = 'not_exist' - return [add] - - @update_url_fixture('installers') - @update_partial('_update', '_success') - def test_updateInstallers_succ(self): - update = models.ScenarioInstaller(installer='daisy').format() - self.req_d['installers'] = [update] - return [update] - - @update_url_fixture('installers') - @update_partial('_update', '_conflict') - def test_updateInstallers_duplicated(self): - update = models.ScenarioInstaller(installer='daisy').format() - return [update, update] - - @update_url_fixture('installers') - @update_partial('_update', '_bad_request') - def test_updateInstallers_bad_schema(self): - update = models.ScenarioInstaller(installer='daisy').format() - update['not_exist'] = 'not_exist' - return [update] - - @update_url_fixture('installers') - @update_partial('_delete', '_success') - def test_deleteInstallers(self): - deletes = ['apex'] - installers = self.req_d['installers'] - self.req_d['installers'] = filter( - lambda f: f['installer'] != 'apex', - installers) - return deletes - - @update_url_fixture('rename') - @update_partial('_update', '_success') - def test_renameScenario(self): - new_name = 'new_scenario_name' - update = models.ScenarioUpdateRequest(name=new_name) - self.req_d['name'] = new_name - return update - - @update_url_fixture('rename') - @update_partial('_update', '_forbidden') - def test_renameScenario_exist(self): - new_name = self.req_d['name'] - update = models.ScenarioUpdateRequest(name=new_name) - return update - - def _add(self, update_req): - return self.post_direct_url(self.update_url, update_req) - - def _update(self, update_req): - return self.update_direct_url(self.update_url, update_req) - - def _delete(self, update_req): - return self.delete_direct_url(self.update_url, update_req) - - def _success(self, status): - self.assertEqual(status, httplib.OK) - self.get_and_assert(self.req_d['name']) - - def _forbidden(self, status): - self.assertEqual(status, httplib.FORBIDDEN) - - def _bad_request(self, status): - self.assertEqual(status, httplib.BAD_REQUEST) - - def _conflict(self, status): - self.assertEqual(status, httplib.CONFLICT) diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py b/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py deleted file mode 100644 index 4f2bc2a..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/test_testcase.py +++ /dev/null @@ -1,201 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import copy -import httplib -import unittest - -from opnfv_testapi.common import message -from opnfv_testapi.resources import project_models -from opnfv_testapi.resources import testcase_models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestCaseBase(base.TestBase): - def setUp(self): - super(TestCaseBase, self).setUp() - self.req_d = testcase_models.TestcaseCreateRequest('vping_1', - '/cases/vping_1', - 'vping-ssh test') - self.req_e = testcase_models.TestcaseCreateRequest('doctor_1', - '/cases/doctor_1', - 'create doctor') - self.update_d = testcase_models.TestcaseUpdateRequest('vping_1', - 'vping-ssh test', - 'functest') - self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1', - 'create doctor', - 'functest') - self.get_res = testcase_models.Testcase - self.list_res = testcase_models.Testcases - self.update_res = testcase_models.Testcase - self.basePath = '/api/v1/projects/%s/cases' - self.create_project() - - def assert_body(self, case, req=None): - if not req: - req = self.req_d - self.assertEqual(case.name, req.name) - self.assertEqual(case.description, req.description) - self.assertEqual(case.url, req.url) - self.assertIsNotNone(case._id) - self.assertIsNotNone(case.creation_date) - - def assert_update_body(self, old, new, req=None): - if not req: - req = self.req_d - self.assertEqual(new.name, req.name) - self.assertEqual(new.description, req.description) - self.assertEqual(new.url, old.url) - self.assertIsNotNone(new._id) - self.assertIsNotNone(new.creation_date) - - def create_project(self): - req_p = project_models.ProjectCreateRequest('functest', - 'vping-ssh test') - self.create_help('/api/v1/projects', req_p) - self.project = req_p.name - - def create_d(self): - return super(TestCaseBase, self).create_d(self.project) - - def create_e(self): - return super(TestCaseBase, self).create_e(self.project) - - def get(self, case=None): - return super(TestCaseBase, self).get(self.project, case) - - def create(self, req=None, *args): - return super(TestCaseBase, self).create(req, self.project) - - def update(self, new=None, case=None): - return super(TestCaseBase, self).update(new, self.project, case) - - def delete(self, case): - return super(TestCaseBase, self).delete(self.project, case) - - -class TestCaseCreate(TestCaseBase): - @executor.create(httplib.BAD_REQUEST, message.no_body()) - def test_noBody(self): - return None - - @executor.create(httplib.FORBIDDEN, message.not_found_base) - def test_noProject(self): - self.project = 'noProject' - return self.req_d - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_emptyName(self): - req_empty = testcase_models.TestcaseCreateRequest('') - return req_empty - - @executor.create(httplib.BAD_REQUEST, message.missing('name')) - def test_noneName(self): - req_none = testcase_models.TestcaseCreateRequest(None) - return req_none - - @executor.create(httplib.OK, '_assert_success') - def test_success(self): - return self.req_d - - def _assert_success(self, body): - self.assert_create_body(body, self.req_d, self.project) - - @executor.create(httplib.FORBIDDEN, message.exist_base) - def test_alreadyExist(self): - self.create_d() - return self.req_d - - -class TestCaseGet(TestCaseBase): - def setUp(self): - super(TestCaseGet, self).setUp() - self.create_d() - self.create_e() - - @executor.get(httplib.NOT_FOUND, message.not_found_base) - def test_notExist(self): - return 'notExist' - - @executor.get(httplib.OK, 'assert_body') - def test_getOne(self): - return self.req_d.name - - @executor.get(httplib.OK, '_list') - def test_list(self): - return None - - def _list(self, body): - for case in body.testcases: - if self.req_d.name == case.name: - self.assert_body(case) - else: - self.assert_body(case, self.req_e) - - -class TestCaseUpdate(TestCaseBase): - def setUp(self): - super(TestCaseUpdate, self).setUp() - self.create_d() - - @executor.update(httplib.BAD_REQUEST, message.no_body()) - def test_noBody(self): - return None, 'noBody' - - @executor.update(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return self.update_e, 'notFound' - - @executor.update(httplib.FORBIDDEN, message.exist_base) - def test_newNameExist(self): - self.create_e() - return self.update_e, self.req_d.name - - @executor.update(httplib.FORBIDDEN, message.no_update()) - def test_noUpdate(self): - return self.update_d, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_success(self): - return self.update_e, self.req_d.name - - @executor.update(httplib.OK, '_update_success') - def test_with_dollar(self): - update = copy.deepcopy(self.update_d) - update.description = {'2. change': 'dollar change'} - return update, self.req_d.name - - def _update_success(self, request, body): - self.assert_update_body(self.req_d, body, request) - _, new_body = self.get(request.name) - self.assert_update_body(self.req_d, new_body, request) - - -class TestCaseDelete(TestCaseBase): - def setUp(self): - super(TestCaseDelete, self).setUp() - self.create_d() - - @executor.delete(httplib.NOT_FOUND, message.not_found_base) - def test_notFound(self): - return 'notFound' - - @executor.delete(httplib.OK, '_delete_success') - def test_success(self): - return self.req_d.name - - def _delete_success(self, body): - self.assertEqual(body, '') - code, body = self.get(self.req_d.name) - self.assertEqual(code, httplib.NOT_FOUND) - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_token.py b/testapi/opnfv_testapi/tests/unit/resources/test_token.py deleted file mode 100644 index bd64723..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/test_token.py +++ /dev/null @@ -1,52 +0,0 @@ -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -import httplib -import unittest - -from tornado import web - -from opnfv_testapi.common import message -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit import fake_pymongo -from opnfv_testapi.tests.unit.resources import test_result - - -class TestTokenCreateResult(test_result.TestResultBase): - def get_app(self): - from opnfv_testapi.router import url_mappings - return web.Application( - url_mappings.mappings, - db=fake_pymongo, - debug=True, - auth=True - ) - - def setUp(self): - super(TestTokenCreateResult, self).setUp() - fake_pymongo.tokens.insert({"access_token": "12345"}) - - @executor.create(httplib.FORBIDDEN, message.invalid_token()) - def test_resultCreateTokenInvalid(self): - self.headers['X-Auth-Token'] = '1234' - return self.req_d - - @executor.create(httplib.UNAUTHORIZED, message.unauthorized()) - def test_resultCreateTokenUnauthorized(self): - if 'X-Auth-Token' in self.headers: - self.headers.pop('X-Auth-Token') - return self.req_d - - @executor.create(httplib.OK, '_create_success') - def test_resultCreateTokenSuccess(self): - self.headers['X-Auth-Token'] = '12345' - return self.req_d - - def _create_success(self, body): - self.assertIn('CreateResponse', str(type(body))) - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tests/unit/resources/test_version.py b/testapi/opnfv_testapi/tests/unit/resources/test_version.py deleted file mode 100644 index 51fed11..0000000 --- a/testapi/opnfv_testapi/tests/unit/resources/test_version.py +++ /dev/null @@ -1,36 +0,0 @@ -############################################################################## -# Copyright (c) 2016 ZTE Corporation -# feng.xiaowei@zte.com.cn -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -import httplib -import unittest - -from opnfv_testapi.resources import models -from opnfv_testapi.tests.unit import executor -from opnfv_testapi.tests.unit.resources import test_base as base - - -class TestVersionBase(base.TestBase): - def setUp(self): - super(TestVersionBase, self).setUp() - self.list_res = models.Versions - self.basePath = '/versions' - - -class TestVersion(TestVersionBase): - @executor.get(httplib.OK, '_get_success') - def test_success(self): - return None - - def _get_success(self, body): - self.assertEqual(len(body.versions), 1) - self.assertEqual(body.versions[0].version, 'v1.0') - self.assertEqual(body.versions[0].description, 'basics') - - -if __name__ == '__main__': - unittest.main() diff --git a/testapi/opnfv_testapi/tornado_swagger/handlers.py b/testapi/opnfv_testapi/tornado_swagger/handlers.py index e39a9f6..a04de07 100644 --- a/testapi/opnfv_testapi/tornado_swagger/handlers.py +++ b/testapi/opnfv_testapi/tornado_swagger/handlers.py @@ -26,7 +26,7 @@ def swagger_handlers(): settings.docs_settings, name=settings.API_DOCS_NAME), tornado.web.URLSpec( - _path(r'resources.json$'), + _path(r'models.json$'), views.SwaggerResourcesHandler, settings.docs_settings, name=settings.RESOURCE_LISTING_NAME), diff --git a/testapi/opnfv_testapi/ui/__init__.py b/testapi/opnfv_testapi/ui/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/testapi/opnfv_testapi/ui/auth/__init__.py b/testapi/opnfv_testapi/ui/auth/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/testapi/opnfv_testapi/ui/auth/sign.py b/testapi/opnfv_testapi/ui/auth/sign.py deleted file mode 100644 index 318473e..0000000 --- a/testapi/opnfv_testapi/ui/auth/sign.py +++ /dev/null @@ -1,59 +0,0 @@ -from cas import CASClient -from tornado import gen -from tornado import web - -from opnfv_testapi.common import constants -from opnfv_testapi.common.config import CONF -from opnfv_testapi.db import api as dbapi -from opnfv_testapi.resources import handlers - - -class SignBaseHandler(handlers.GenericApiHandler): - def __init__(self, application, request, **kwargs): - super(SignBaseHandler, self).__init__(application, request, **kwargs) - self.table = 'users' - self.cas_client = CASClient(version='2', - server_url=CONF.lfid_cas_url, - service_url='{}/{}'.format( - CONF.ui_url, - CONF.lfid_signin_return)) - - -class SigninHandler(SignBaseHandler): - def get(self): - self.redirect(url=(self.cas_client.get_login_url())) - - -class SigninReturnHandler(SignBaseHandler): - - @web.asynchronous - @gen.coroutine - def get(self): - ticket = self.get_query_argument('ticket', default=None) - if ticket: - (user, attrs, _) = self.cas_client.verify_ticket(ticket=ticket) - login_user = { - 'user': user, - 'email': attrs.get('mail'), - 'fullname': attrs.get('field_lf_full_name'), - 'groups': constants.TESTAPI_USERS + attrs.get('group', []) - } - q_user = {'user': user} - db_user = yield dbapi.db_find_one(self.table, q_user) - if not db_user: - dbapi.db_save(self.table, login_user) - else: - dbapi.db_update(self.table, q_user, login_user) - - self.clear_cookie(constants.TESTAPI_ID) - self.set_secure_cookie(constants.TESTAPI_ID, user) - - self.redirect(url=CONF.ui_url) - - -class SignoutHandler(SignBaseHandler): - def get(self): - """Handle signout request.""" - self.clear_cookie(constants.TESTAPI_ID) - logout_url = self.cas_client.get_logout_url(redirect_url=CONF.ui_url) - self.redirect(url=logout_url) diff --git a/testapi/opnfv_testapi/ui/auth/user.py b/testapi/opnfv_testapi/ui/auth/user.py deleted file mode 100644 index ff2c2a9..0000000 --- a/testapi/opnfv_testapi/ui/auth/user.py +++ /dev/null @@ -1,33 +0,0 @@ -from opnfv_testapi.common import constants -from opnfv_testapi.common import raises -from opnfv_testapi.common.config import CONF -from opnfv_testapi.resources import handlers -from opnfv_testapi.resources import models - - -class User(models.ModelBase): - def __init__(self, user=None, email=None, fullname=None, groups=None): - self.user = user - self.email = email - self.fullname = fullname - self.groups = groups - - -class UserHandler(handlers.GenericApiHandler): - def __init__(self, application, request, **kwargs): - super(UserHandler, self).__init__(application, request, **kwargs) - self.table = 'users' - self.table_cls = User - - def get(self): - if CONF.api_authenticate: - username = self.get_secure_cookie(constants.TESTAPI_ID) - if username: - self._get_one(query={'user': username}) - else: - raises.Unauthorized('Unauthorized') - else: - self.finish_request(User('anonymous', - 'anonymous@linuxfoundation.com', - 'anonymous lf', - constants.TESTAPI_USERS).format()) diff --git a/testapi/opnfv_testapi/ui/root.py b/testapi/opnfv_testapi/ui/root.py deleted file mode 100644 index 286a6b0..0000000 --- a/testapi/opnfv_testapi/ui/root.py +++ /dev/null @@ -1,10 +0,0 @@ -from opnfv_testapi.common.config import CONF -from opnfv_testapi.resources import handlers - - -class RootHandler(handlers.GenericApiHandler): - def get_template_path(self): - return CONF.ui_static_path - - def get(self): - self.render('testapi-ui/index.html') -- cgit 1.2.3-korg