summaryrefslogtreecommitdiffstats
path: root/cvp/opnfv_testapi
diff options
context:
space:
mode:
authorxudan <xudan16@huawei.com>2018-07-05 22:37:35 -0400
committerGeorg Kunz <georg.kunz@ericsson.com>2018-07-25 09:17:09 +0000
commit58b91dd3baaaf72ab65062a4804403cd4a5935b2 (patch)
tree801b1f39b3bee25ecbaae387339955d55651e98b /cvp/opnfv_testapi
parent947f1bf0147c40971fdae36feecd477ab3caf3b8 (diff)
Move OVP web portal code to a separate repo
The new repo for web portal is https://gerrit.opnfv.org/gerrit/dovetail-webportal JIRA: DOVETAIL-671 Change-Id: Iac085abc3d175b9a091d70d0448af56c7a6845e9 Signed-off-by: xudan <xudan16@huawei.com>
Diffstat (limited to 'cvp/opnfv_testapi')
-rw-r--r--cvp/opnfv_testapi/__init__.py8
-rw-r--r--cvp/opnfv_testapi/cmd/__init__.py8
-rw-r--r--cvp/opnfv_testapi/cmd/server.py64
-rw-r--r--cvp/opnfv_testapi/common/__init__.py8
-rw-r--r--cvp/opnfv_testapi/common/check.py114
-rw-r--r--cvp/opnfv_testapi/common/config.py79
-rw-r--r--cvp/opnfv_testapi/common/message.py54
-rw-r--r--cvp/opnfv_testapi/common/raises.py39
-rw-r--r--cvp/opnfv_testapi/common/utils.py43
-rw-r--r--cvp/opnfv_testapi/db/__init__.py0
-rw-r--r--cvp/opnfv_testapi/db/api.py38
-rw-r--r--cvp/opnfv_testapi/resources/__init__.py8
-rw-r--r--cvp/opnfv_testapi/resources/application_handlers.py233
-rw-r--r--cvp/opnfv_testapi/resources/application_models.py39
-rw-r--r--cvp/opnfv_testapi/resources/handlers.py331
-rw-r--r--cvp/opnfv_testapi/resources/models.py115
-rw-r--r--cvp/opnfv_testapi/resources/pod_handlers.py78
-rw-r--r--cvp/opnfv_testapi/resources/pod_models.py52
-rw-r--r--cvp/opnfv_testapi/resources/project_handlers.py86
-rw-r--r--cvp/opnfv_testapi/resources/project_models.py48
-rw-r--r--cvp/opnfv_testapi/resources/result_handlers.py308
-rw-r--r--cvp/opnfv_testapi/resources/result_models.py133
-rw-r--r--cvp/opnfv_testapi/resources/scenario_handlers.py282
-rw-r--r--cvp/opnfv_testapi/resources/scenario_models.py204
-rw-r--r--cvp/opnfv_testapi/resources/sut_handlers.py112
-rw-r--r--cvp/opnfv_testapi/resources/sut_models.py31
-rw-r--r--cvp/opnfv_testapi/resources/test_handlers.py307
-rw-r--r--cvp/opnfv_testapi/resources/test_models.py90
-rw-r--r--cvp/opnfv_testapi/resources/testcase_handlers.py103
-rw-r--r--cvp/opnfv_testapi/resources/testcase_models.py95
-rw-r--r--cvp/opnfv_testapi/router/__init__.py9
-rw-r--r--cvp/opnfv_testapi/router/url_mappings.py45
-rw-r--r--cvp/opnfv_testapi/tests/__init__.py1
-rw-r--r--cvp/opnfv_testapi/tests/unit/__init__.py9
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/__init__.py0
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/noparam.ini16
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/normal.ini17
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/nosection.ini11
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/notboolean.ini17
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/notint.ini17
-rw-r--r--cvp/opnfv_testapi/tests/unit/common/test_config.py15
-rw-r--r--cvp/opnfv_testapi/tests/unit/conftest.py8
-rw-r--r--cvp/opnfv_testapi/tests/unit/executor.py97
-rw-r--r--cvp/opnfv_testapi/tests/unit/fake_pymongo.py284
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/__init__.py0
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json38
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json73
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_base.py161
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py123
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_pod.py90
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_project.py137
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_result.py410
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_scenario.py360
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_testcase.py201
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_token.py114
-rw-r--r--cvp/opnfv_testapi/tests/unit/resources/test_version.py36
-rw-r--r--cvp/opnfv_testapi/tornado_swagger/README.md273
-rw-r--r--cvp/opnfv_testapi/tornado_swagger/__init__.py8
-rw-r--r--cvp/opnfv_testapi/tornado_swagger/handlers.py38
-rw-r--r--cvp/opnfv_testapi/tornado_swagger/settings.py25
-rw-r--r--cvp/opnfv_testapi/tornado_swagger/swagger.py291
-rw-r--r--cvp/opnfv_testapi/tornado_swagger/views.py134
-rw-r--r--cvp/opnfv_testapi/ui/__init__.py0
-rw-r--r--cvp/opnfv_testapi/ui/auth/__init__.py0
-rw-r--r--cvp/opnfv_testapi/ui/auth/base.py35
-rw-r--r--cvp/opnfv_testapi/ui/auth/constants.py18
-rw-r--r--cvp/opnfv_testapi/ui/auth/jira_util.py66
-rw-r--r--cvp/opnfv_testapi/ui/auth/rsa.pem27
-rw-r--r--cvp/opnfv_testapi/ui/auth/sign.py281
-rw-r--r--cvp/opnfv_testapi/ui/auth/user.py35
-rw-r--r--cvp/opnfv_testapi/ui/root.py10
71 files changed, 0 insertions, 6640 deletions
diff --git a/cvp/opnfv_testapi/__init__.py b/cvp/opnfv_testapi/__init__.py
deleted file mode 100644
index 363bc388..00000000
--- a/cvp/opnfv_testapi/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
diff --git a/cvp/opnfv_testapi/cmd/__init__.py b/cvp/opnfv_testapi/cmd/__init__.py
deleted file mode 100644
index 363bc388..00000000
--- a/cvp/opnfv_testapi/cmd/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
diff --git a/cvp/opnfv_testapi/cmd/server.py b/cvp/opnfv_testapi/cmd/server.py
deleted file mode 100644
index d503c8a1..00000000
--- a/cvp/opnfv_testapi/cmd/server.py
+++ /dev/null
@@ -1,64 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-"""
-Pre-requisites:
- pip install motor
- pip install tornado
-
-We can launch the API with this file
-
-TODOs :
- - logging
- - json args validation with schemes
- - POST/PUT/DELETE for PODs
- - POST/PUT/GET/DELETE for installers, platforms (enrich results info)
- - count cases for GET on projects
- - count results for GET on cases
- - include objects
- - swagger documentation
- - setup file
- - results pagination
- - unit tests
-
-"""
-
-import tornado.ioloop
-import logging
-
-from opnfv_testapi.common.config import CONF
-from opnfv_testapi.router import url_mappings
-from opnfv_testapi.tornado_swagger import swagger
-
-my_logger = logging.getLogger()
-handler = logging.handlers.RotatingFileHandler(
- CONF.api_log_file, maxBytes=20000000, backupCount=50)
-my_logger.setLevel(logging.DEBUG)
-my_logger.addHandler(handler)
-
-
-def make_app():
- swagger.docs(base_url=CONF.swagger_base_url,
- static_path=CONF.static_path)
- return swagger.Application(
- url_mappings.mappings,
- debug=CONF.api_debug,
- auth=CONF.api_authenticate,
- cookie_secret='opnfv-testapi',
- )
-
-
-def main():
- application = make_app()
- application.listen(CONF.api_port)
- tornado.ioloop.IOLoop.current().start()
-
-
-if __name__ == "__main__":
- main()
diff --git a/cvp/opnfv_testapi/common/__init__.py b/cvp/opnfv_testapi/common/__init__.py
deleted file mode 100644
index 05c0c939..00000000
--- a/cvp/opnfv_testapi/common/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
diff --git a/cvp/opnfv_testapi/common/check.py b/cvp/opnfv_testapi/common/check.py
deleted file mode 100644
index 24ba876a..00000000
--- a/cvp/opnfv_testapi/common/check.py
+++ /dev/null
@@ -1,114 +0,0 @@
-##############################################################################
-# Copyright (c) 2017 ZTE Corp
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import functools
-
-from tornado import gen
-from tornado import web
-
-from opnfv_testapi.common import message
-from opnfv_testapi.common import raises
-from opnfv_testapi.db import api as dbapi
-
-
-def authenticate(method):
- @web.asynchronous
- @gen.coroutine
- @functools.wraps(method)
- def wrapper(self, *args, **kwargs):
- if self.auth:
- try:
- token = self.request.headers['X-Auth-Token']
- except KeyError:
- raises.Unauthorized(message.unauthorized())
- query = {'access_token': token}
- check = yield dbapi.db_find_one('tokens', query)
- if not check:
- raises.Forbidden(message.invalid_token())
- ret = yield gen.coroutine(method)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrapper
-
-
-def not_exist(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- query = kwargs.get('query')
- data = yield dbapi.db_find_one(self.table, query)
- if not data:
- raises.NotFound(message.not_found(self.table, query))
- ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
- raise gen.Return(ret)
-
- return wrap
-
-
-def no_body(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- if self.json_args is None:
- raises.BadRequest(message.no_body())
- ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
- raise gen.Return(ret)
-
- return wrap
-
-
-def miss_fields(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- fields = kwargs.pop('miss_fields', [])
- if fields:
- for miss in fields:
- miss_data = self.json_args.get(miss)
- if miss_data is None or miss_data == '':
- raises.BadRequest(message.missing(miss))
- ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrap
-
-
-def carriers_exist(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- carriers = kwargs.pop('carriers', {})
- if carriers:
- for table, query in carriers:
- exist = yield dbapi.db_find_one(table, query())
- if not exist:
- raises.Forbidden(message.not_found(table, query()))
- ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrap
-
-
-def new_not_exists(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- query = kwargs.get('query')
- if query:
- to_data = yield dbapi.db_find_one(self.table, query())
- if to_data:
- raises.Forbidden(message.exist(self.table, query()))
- ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrap
-
-
-def updated_one_not_exist(xstep):
- @functools.wraps(xstep)
- def wrap(self, data, *args, **kwargs):
- db_keys = kwargs.pop('db_keys', [])
- query = self._update_query(db_keys, data)
- if query:
- to_data = yield dbapi.db_find_one(self.table, query)
- if to_data:
- raises.Forbidden(message.exist(self.table, query))
- ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
- raise gen.Return(ret)
- return wrap
diff --git a/cvp/opnfv_testapi/common/config.py b/cvp/opnfv_testapi/common/config.py
deleted file mode 100644
index 75dbc35d..00000000
--- a/cvp/opnfv_testapi/common/config.py
+++ /dev/null
@@ -1,79 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-# feng.xiaowei@zte.com.cn remove prepare_put_request 5-30-2016
-##############################################################################
-import ConfigParser
-import argparse
-import os
-import sys
-
-
-class Config(object):
-
- def __init__(self):
- self.config_file = None
- self._set_config_file()
- self._parse()
- self._parse_per_page()
- self.static_path = os.path.join(
- os.path.dirname(os.path.normpath(__file__)),
- os.pardir,
- 'static')
- self.base_path = "/home/testapi"
-
- def _parse(self):
- if not os.path.exists(self.config_file):
- raise Exception("%s not found" % self.config_file)
-
- config = ConfigParser.RawConfigParser()
- config.read(self.config_file)
- self._parse_section(config)
-
- def _parse_section(self, config):
- [self._parse_item(config, section) for section in (config.sections())]
-
- def _parse_item(self, config, section):
- [setattr(self, '{}_{}'.format(section, k), self._parse_value(v))
- for k, v in config.items(section)]
-
- def _parse_per_page(self):
- if not hasattr(self, 'api_results_per_page'):
- self.api_results_per_page = 20
-
- @staticmethod
- def _parse_value(value):
- try:
- value = int(value)
- except:
- if str(value).lower() == 'true':
- value = True
- elif str(value).lower() == 'false':
- value = False
- return value
-
- def _set_config_file(self):
- if not self._set_sys_config_file():
- self._set_default_config_file()
-
- def _set_sys_config_file(self):
- parser = argparse.ArgumentParser()
- parser.add_argument("-c", "--config-file", dest='config_file',
- help="Config file location", metavar="FILE")
- args, _ = parser.parse_known_args(sys.argv)
- try:
- self.config_file = args.config_file
- finally:
- return self.config_file is not None
-
- def _set_default_config_file(self):
- is_venv = os.getenv('VIRTUAL_ENV')
- self.config_file = os.path.join('/' if not is_venv else is_venv,
- 'etc/opnfv_testapi/config.ini')
-
-
-CONF = Config()
diff --git a/cvp/opnfv_testapi/common/message.py b/cvp/opnfv_testapi/common/message.py
deleted file mode 100644
index 61ce03dc..00000000
--- a/cvp/opnfv_testapi/common/message.py
+++ /dev/null
@@ -1,54 +0,0 @@
-##############################################################################
-# Copyright (c) 2017 ZTE Corp
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-not_found_base = 'Could Not Found'
-exist_base = 'Already Exists'
-
-
-def key_error(key):
- return "KeyError: '{}'".format(key)
-
-
-def no_body():
- return 'No Body'
-
-
-def not_found(key, value):
- return '{} {} [{}]'.format(not_found_base, key, value)
-
-
-def missing(name):
- return '{} Missing'.format(name)
-
-
-def exist(key, value):
- return '{} [{}] {}'.format(key, value, exist_base)
-
-
-def bad_format(error):
- return 'Bad Format [{}]'.format(error)
-
-
-def unauthorized():
- return 'No Authentication Header'
-
-
-def invalid_token():
- return 'Invalid Token'
-
-
-def no_update():
- return 'Nothing to update'
-
-
-def must_int(name):
- return '{} must be int'.format(name)
-
-
-def no_auth():
- return 'No permission to operate. Please ask Administrator for details.'
diff --git a/cvp/opnfv_testapi/common/raises.py b/cvp/opnfv_testapi/common/raises.py
deleted file mode 100644
index ec6b8a56..00000000
--- a/cvp/opnfv_testapi/common/raises.py
+++ /dev/null
@@ -1,39 +0,0 @@
-##############################################################################
-# Copyright (c) 2017 ZTE Corp
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import httplib
-
-from tornado import web
-
-
-class Raiser(object):
- code = httplib.OK
-
- def __init__(self, reason):
- raise web.HTTPError(self.code, reason)
-
-
-class BadRequest(Raiser):
- code = httplib.BAD_REQUEST
-
-
-class Forbidden(Raiser):
- code = httplib.FORBIDDEN
-
-
-class NotFound(Raiser):
- code = httplib.NOT_FOUND
-
-
-class Unauthorized(Raiser):
- code = httplib.UNAUTHORIZED
-
-
-class CodeTBD(object):
- def __init__(self, code, reason):
- raise web.HTTPError(code, reason)
diff --git a/cvp/opnfv_testapi/common/utils.py b/cvp/opnfv_testapi/common/utils.py
deleted file mode 100644
index 107c7099..00000000
--- a/cvp/opnfv_testapi/common/utils.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import logging
-import smtplib
-from email.mime.text import MIMEText
-
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
-
-
-def send_email(subject, content):
- MAIL_LIST = ['cvp@opnfv.org']
- HOST = "smtp.gmail.com"
- USER = "opnfv.cvp"
- PASSWD = "opnfv@cvp"
-
- sender = 'cvp<{}@gmail.com>'.format(USER)
- msg = MIMEText(content, _subtype='plain')
- msg['Subject'] = subject
- msg['From'] = sender
- msg['To'] = ";".join(MAIL_LIST)
-
- _send_email(HOST, sender, USER, PASSWD, MAIL_LIST, msg)
-
-
-def _send_email(host,
- sender,
- user,
- passwd,
- receivers,
- msg):
-
- client = smtplib.SMTP()
- try:
- client.connect(host, 25)
- LOG.debug('Success to connect server')
- client.starttls()
- client.login(user, passwd)
- LOG.debug('Success to login')
- LOG.debug('Start to sending email')
- client.sendmail(sender, receivers, msg.as_string())
- client.close()
- except Exception:
- LOG.exception('Error when sending email')
- raise
diff --git a/cvp/opnfv_testapi/db/__init__.py b/cvp/opnfv_testapi/db/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cvp/opnfv_testapi/db/__init__.py
+++ /dev/null
diff --git a/cvp/opnfv_testapi/db/api.py b/cvp/opnfv_testapi/db/api.py
deleted file mode 100644
index c057480d..00000000
--- a/cvp/opnfv_testapi/db/api.py
+++ /dev/null
@@ -1,38 +0,0 @@
-import motor
-
-from opnfv_testapi.common.config import CONF
-
-DB = motor.MotorClient(CONF.mongo_url)[CONF.mongo_dbname]
-
-
-def db_update(collection, query, update_req):
- return _eval_db(collection, 'update', query, update_req, check_keys=False)
-
-
-def db_delete(collection, query):
- return _eval_db(collection, 'remove', query)
-
-
-def db_aggregate(collection, pipelines):
- return _eval_db(collection, 'aggregate', pipelines, allowDiskUse=True)
-
-
-def db_list(collection, query):
- return _eval_db(collection, 'find', query)
-
-
-def db_save(collection, data):
- return _eval_db(collection, 'insert', data, check_keys=False)
-
-
-def db_find_one(collection, query):
- return _eval_db(collection, 'find_one', query)
-
-
-def _eval_db(collection, method, *args, **kwargs):
- exec_collection = DB.__getattr__(collection)
- return exec_collection.__getattribute__(method)(*args, **kwargs)
-
-
-def _eval_db_find_one(query, table=None):
- return _eval_db(table, 'find_one', query)
diff --git a/cvp/opnfv_testapi/resources/__init__.py b/cvp/opnfv_testapi/resources/__init__.py
deleted file mode 100644
index 05c0c939..00000000
--- a/cvp/opnfv_testapi/resources/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
diff --git a/cvp/opnfv_testapi/resources/application_handlers.py b/cvp/opnfv_testapi/resources/application_handlers.py
deleted file mode 100644
index 258c1aa2..00000000
--- a/cvp/opnfv_testapi/resources/application_handlers.py
+++ /dev/null
@@ -1,233 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import logging
-import json
-
-from tornado import web
-from tornado import gen
-from bson import objectid
-
-from opnfv_testapi.common.config import CONF
-from opnfv_testapi.common import utils
-from opnfv_testapi.resources import handlers
-from opnfv_testapi.resources import application_models
-from opnfv_testapi.tornado_swagger import swagger
-from opnfv_testapi.ui.auth import constants as auth_const
-
-
-class GenericApplicationHandler(handlers.GenericApiHandler):
- def __init__(self, application, request, **kwargs):
- super(GenericApplicationHandler, self).__init__(application,
- request,
- **kwargs)
- self.table = "applications"
- self.table_cls = application_models.Application
-
-
-class ApplicationsLogoHandler(GenericApplicationHandler):
- @web.asynchronous
- @gen.coroutine
- def post(self):
- role = self.get_secure_cookie(auth_const.ROLE)
- if role.find('administrator') == -1:
- msg = 'Only administrator is allowed to upload logos'
- self.finish_request({'code': '-1', 'msg': msg})
- return
-
- fileinfo = self.request.files['file'][0]
- fname = fileinfo['filename']
- location = '3rd_party/static/testapi-ui/assets/img/'
- fh = open(location + fname, 'w')
- fh.write(fileinfo['body'])
- msg = 'Successfully uploaded logo: ' + fname
- resp = {'code': '1', 'msg': msg}
- self.finish_request(resp)
-
-
-class ApplicationsGetLogoHandler(GenericApplicationHandler):
- def get(self, filename):
- location = '3rd_party/static/testapi-ui/assets/img/' + filename
- self.set_header('Content-Type', 'application/force-download')
- self.set_header('Content-Disposition',
- 'attachment; filename=%s' % filename)
- try:
- with open(location, "rb") as f:
- try:
- while True:
- _buffer = f.read(4096)
- if _buffer:
- self.write(_buffer)
- else:
- f.close()
- self.finish()
- return
- except Exception:
- raise web.HTTPError(404)
- except Exception:
- raise web.HTTPError(500)
-
-
-class ApplicationsCLHandler(GenericApplicationHandler):
- @swagger.operation(nickname="queryApplications")
- @web.asynchronous
- @gen.coroutine
- def get(self):
- """
- @description: Retrieve result(s) for a application project
- on a specific pod.
- @notes: Retrieve application.
- Available filters for this request are :
- - id : Application id
- - period : x last days, incompatible with from/to
- - from : starting time in 2016-01-01 or 2016-01-01 00:01:23
- - to : ending time in 2016-01-01 or 2016-01-01 00:01:23
- - signed : get logined user result
-
- @return 200: all application results consist with query,
- empty list if no result is found
- @rtype: L{Applications}
- """
- def descend_limit():
- descend = self.get_query_argument('descend', 'true')
- return -1 if descend.lower() == 'true' else 1
-
- def last_limit():
- return self.get_int('last', self.get_query_argument('last', 0))
-
- def page_limit():
- return self.get_int('page', self.get_query_argument('page', 0))
-
- limitations = {
- 'sort': {'_id': descend_limit()},
- 'last': last_limit(),
- 'page': page_limit(),
- 'per_page': CONF.api_results_per_page
- }
-
- query = yield self.set_query()
- yield self._list(query=query, **limitations)
- logging.debug('list end')
-
- @swagger.operation(nickname="createApplication")
- @web.asynchronous
- def post(self):
- """
- @description: create a application
- @param body: application to be created
- @type body: L{ApplicationCreateRequest}
- @in body: body
- @rtype: L{CreateResponse}
- @return 200: application is created.
- @raise 404: pod/project/applicationcase not exist
- @raise 400: body/pod_name/project_name/case_name not provided
- """
- openid = self.get_secure_cookie(auth_const.OPENID)
- if openid:
- self.json_args['owner'] = openid
-
- self._post()
-
- @gen.coroutine
- def _post(self):
- miss_fields = []
- carriers = []
-
- role = self.get_secure_cookie(auth_const.ROLE)
- if role.find('administrator') == -1:
- self.finish_request({'code': '403', 'msg': 'Only administrator \
- is allowed to submit application.'})
- return
-
- query = {"openid": self.json_args['user_id']}
- table = "users"
- ret, msg = yield self._check_if_exists(table=table, query=query)
- logging.debug('ret:%s', ret)
- if not ret:
- self.finish_request({'code': '403', 'msg': msg})
- return
- self._create(miss_fields=miss_fields, carriers=carriers)
-
- self._send_email()
-
- def _send_email(self):
-
- data = self.table_cls.from_dict(self.json_args)
- subject = "[OPNFV CVP]New OPNFV CVP Application Submission"
- content = """Hi CVP Reviewer,
-
-This is a new application:
-
- Organization Name: {},
- Organization Website: {},
- Product Name: {},
- Product Specifications: {},
- Product Documentation: {},
- Product Categories: {},
- Primary Name: {},
- Primary Email: {},
- Primary Address: {},
- Primary Phone: {},
- User ID Type: {},
- User ID: {}
-
-Best Regards,
-CVP Team
- """.format(data.organization_name,
- data.organization_web,
- data.product_name,
- data.product_spec,
- data.product_documentation,
- data.product_categories,
- data.prim_name,
- data.prim_email,
- data.prim_address,
- data.prim_phone,
- data.id_type,
- data.user_id)
-
- utils.send_email(subject, content)
-
-
-class ApplicationsGURHandler(GenericApplicationHandler):
- @swagger.operation(nickname="deleteAppById")
- def delete(self, id):
- query = {'_id': objectid.ObjectId(id)}
- self._delete(query=query)
-
- @swagger.operation(nickname="updateApplicationById")
- def put(self, application_id):
- """
- @description: update a single application by id
- @param body: fields to be updated
- @type body: L{ApplicationUpdateRequest}
- @in body: body
- @rtype: L{Application}
- @return 200: update success
- @raise 404: Application not exist
- @raise 403: nothing to update
- """
- data = json.loads(self.request.body)
- item = data.get('item')
- value = data.get(item)
- logging.debug('%s:%s', item, value)
- try:
- self.update(application_id, item, value)
- except Exception as e:
- logging.error('except:%s', e)
- return
-
- @web.asynchronous
- @gen.coroutine
- def update(self, application_id, item, value):
- self.json_args = {}
- self.json_args[item] = value
- query = {'_id': application_id, 'owner':
- self.get_secure_cookie(auth_const.OPENID)}
- db_keys = ['_id', 'owner']
- self._update(query=query, db_keys=db_keys)
diff --git a/cvp/opnfv_testapi/resources/application_models.py b/cvp/opnfv_testapi/resources/application_models.py
deleted file mode 100644
index e2bb652d..00000000
--- a/cvp/opnfv_testapi/resources/application_models.py
+++ /dev/null
@@ -1,39 +0,0 @@
-##############################################################################
-# Copyright (c) 2015
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from opnfv_testapi.resources import models
-from opnfv_testapi.tornado_swagger import swagger
-
-from datetime import datetime
-
-
-@swagger.model()
-class Application(models.ModelBase):
- """
- @property trust_indicator: used for long duration test case
- @ptype trust_indicator: L{TI}
- """
- def __init__(self, _id=None, owner=None, status="created",
- creation_date=[], trust_indicator=None):
- self._id = _id
- self.owner = owner
- self.creation_date = datetime.now()
- self.status = status
-
-
-@swagger.model()
-class Applications(models.ModelBase):
- """
- @property applications:
- @ptype tests: C{list} of L{Application}
- """
- def __init__(self):
- self.applications = list()
-
- @staticmethod
- def attr_parser():
- return {'applications': Application}
diff --git a/cvp/opnfv_testapi/resources/handlers.py b/cvp/opnfv_testapi/resources/handlers.py
deleted file mode 100644
index 9b156e17..00000000
--- a/cvp/opnfv_testapi/resources/handlers.py
+++ /dev/null
@@ -1,331 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-# feng.xiaowei@zte.com.cn refactor db.pod to db.pods 5-19-2016
-# feng.xiaowei@zte.com.cn refactor test_project to project 5-19-2016
-# feng.xiaowei@zte.com.cn refactor response body 5-19-2016
-# feng.xiaowei@zte.com.cn refactor pod/project response info 5-19-2016
-# feng.xiaowei@zte.com.cn refactor testcase related handler 5-20-2016
-# feng.xiaowei@zte.com.cn refactor result related handler 5-23-2016
-# feng.xiaowei@zte.com.cn refactor dashboard related handler 5-24-2016
-# feng.xiaowei@zte.com.cn add methods to GenericApiHandler 5-26-2016
-# feng.xiaowei@zte.com.cn remove PodHandler 5-26-2016
-# feng.xiaowei@zte.com.cn remove ProjectHandler 5-26-2016
-# feng.xiaowei@zte.com.cn remove TestcaseHandler 5-27-2016
-# feng.xiaowei@zte.com.cn remove ResultHandler 5-29-2016
-# feng.xiaowei@zte.com.cn remove DashboardHandler 5-30-2016
-##############################################################################
-
-import json
-from datetime import datetime
-from datetime import timedelta
-
-import logging
-from tornado import gen
-from tornado import web
-
-from opnfv_testapi.common import check
-from opnfv_testapi.common import message
-from opnfv_testapi.common import raises
-from opnfv_testapi.db import api as dbapi
-from opnfv_testapi.resources import models
-from opnfv_testapi.tornado_swagger import swagger
-from opnfv_testapi.ui.auth import constants as auth_const
-
-DEFAULT_REPRESENTATION = "application/json"
-
-
-class GenericApiHandler(web.RequestHandler):
- def __init__(self, application, request, **kwargs):
- super(GenericApiHandler, self).__init__(application, request, **kwargs)
- self.json_args = None
- self.table = None
- self.table_cls = None
- self.db_projects = 'projects'
- self.db_pods = 'pods'
- self.db_testcases = 'testcases'
- self.db_results = 'results'
- self.db_scenarios = 'scenarios'
- self.auth = self.settings["auth"]
-
- def get_int(self, key, value):
- try:
- value = int(value)
- except:
- raises.BadRequest(message.must_int(key))
- return value
-
- @gen.coroutine
- def set_query(self):
- query = dict()
- date_range = dict()
- for k in self.request.query_arguments.keys():
- v = self.get_query_argument(k)
- if k == 'period':
- v = self.get_int(k, v)
- if v > 0:
- period = datetime.now() - timedelta(days=v)
- obj = {"$gte": str(period)}
- query['start_date'] = obj
- elif k == 'from':
- date_range.update({'$gte': str(v)})
- elif k == 'to':
- date_range.update({'$lt': str(v)})
- elif k == 'signed':
- openid = self.get_secure_cookie(auth_const.OPENID)
- user = yield dbapi.db_find_one("users", {'openid': openid})
- role = self.get_secure_cookie(auth_const.ROLE)
- logging.info('role:%s', role)
- if role:
- query['$or'] = [
- {
- "shared": {
- "$elemMatch": {"$eq": openid}
- }
- },
- {"owner": openid},
- {
- "shared": {
- "$elemMatch": {"$eq": user.get("email")}
- }
- }
- ]
-
- if role.find("reviewer") != -1:
- query['$or'].append({"status": {"$ne": "private"}})
- elif k not in ['last', 'page', 'descend', 'per_page']:
- query[k] = v
- if date_range:
- query['start_date'] = date_range
-
- # if $lt is not provided,
- # empty/None/null/'' start_date will also be returned
- if 'start_date' in query and '$lt' not in query['start_date']:
- query['start_date'].update({'$lt': str(datetime.now())})
-
- logging.debug("query:%s", query)
- raise gen.Return((query))
-
- def prepare(self):
- if self.request.method != "GET" and self.request.method != "DELETE":
- if self.request.headers.get("Content-Type") is not None:
- if self.request.headers["Content-Type"].startswith(
- DEFAULT_REPRESENTATION):
- try:
- self.json_args = json.loads(self.request.body)
- except (ValueError, KeyError, TypeError) as error:
- raises.BadRequest(message.bad_format(str(error)))
-
- def finish_request(self, json_object=None):
- if json_object:
- self.write(json.dumps(json_object))
- self.set_header("Content-Type", DEFAULT_REPRESENTATION)
- self.finish()
-
- def _create_response(self, resource):
- href = self.request.full_url() + '/' + str(resource)
- return models.CreateResponse(href=href).format()
-
- def format_data(self, data):
- cls_data = self.table_cls.from_dict(data)
- return cls_data.format_http()
-
- @gen.coroutine
- @check.no_body
- @check.miss_fields
- @check.carriers_exist
- @check.new_not_exists
- def _inner_create(self, **kwargs):
- data = self.table_cls.from_dict(self.json_args)
- for k, v in kwargs.iteritems():
- if k != 'query':
- data.__setattr__(k, v)
-
- if self.table != 'results':
- data.creation_date = datetime.now()
- _id = yield dbapi.db_save(self.table, data.format())
- logging.warning("_id:%s", _id)
- raise gen.Return(_id)
-
- def _create_only(self, **kwargs):
- resource = self._inner_create(**kwargs)
- logging.warning("resource:%s", resource)
-
- @check.authenticate
- @check.no_body
- @check.miss_fields
- @check.carriers_exist
- @check.new_not_exists
- def _create(self, **kwargs):
- # resource = self._inner_create(**kwargs)
- data = self.table_cls.from_dict(self.json_args)
- for k, v in kwargs.iteritems():
- if k != 'query':
- data.__setattr__(k, v)
-
- if self.table != 'results':
- data.creation_date = datetime.now()
- _id = yield dbapi.db_save(self.table, data.format())
- if 'name' in self.json_args:
- resource = data.name
- else:
- resource = _id
-
- self.finish_request(self._create_response(resource))
-
- @gen.coroutine
- def _check_if_exists(self, *args, **kwargs):
- query = kwargs['query']
- table = kwargs['table']
- if query and table:
- data = yield dbapi.db_find_one(table, query)
- if data:
- raise gen.Return((True, 'Data alreay exists. %s' % (query)))
- raise gen.Return((False, 'Data does not exist. %s' % (query)))
-
- # @web.asynchronous
- @gen.coroutine
- def _list(self, query=None, res_op=None, *args, **kwargs):
- logging.debug("_list query:%s", query)
- sort = kwargs.get('sort')
- page = kwargs.get('page', 0)
- last = kwargs.get('last', 0)
- per_page = kwargs.get('per_page', 0)
- if query is None:
- query = {}
-
- total_pages = 0
- if page > 0:
- cursor = dbapi.db_list(self.table, query)
- records_count = yield cursor.count()
- total_pages = self._calc_total_pages(records_count,
- last,
- page,
- per_page)
- pipelines = self._set_pipelines(query, sort, last, page, per_page)
- cursor = dbapi.db_aggregate(self.table, pipelines)
- data = list()
- while (yield cursor.fetch_next):
- data.append(self.format_data(cursor.next_object()))
- if res_op is None:
- res = {self.table: data}
- else:
- res = res_op(data, *args)
- if page > 0:
- res.update({
- 'pagination': {
- 'current_page': kwargs.get('page'),
- 'total_pages': total_pages
- }
- })
- self.finish_request(res)
- logging.debug('_list end')
-
- @staticmethod
- def _calc_total_pages(records_count, last, page, per_page):
- logging.debug("totalItems:%d per_page:%d", records_count, per_page)
- records_nr = records_count
- if (records_count > last) and (last > 0):
- records_nr = last
-
- total_pages, remainder = divmod(records_nr, per_page)
- if remainder > 0:
- total_pages += 1
- if page > 1 and page > total_pages:
- raises.BadRequest(
- 'Request page > total_pages [{}]'.format(total_pages))
- return total_pages
-
- @staticmethod
- def _set_pipelines(query, sort, last, page, per_page):
- pipelines = list()
- if query:
- pipelines.append({'$match': query})
- if sort:
- pipelines.append({'$sort': sort})
-
- if page > 0:
- pipelines.append({'$skip': (page - 1) * per_page})
- pipelines.append({'$limit': per_page})
- elif last > 0:
- pipelines.append({'$limit': last})
-
- return pipelines
-
- @web.asynchronous
- @gen.coroutine
- @check.not_exist
- def _get_one(self, data, query=None):
- self.finish_request(self.format_data(data))
-
- @check.authenticate
- @check.not_exist
- def _delete(self, data, query=None):
- yield dbapi.db_delete(self.table, query)
- self.finish_request()
-
- @check.authenticate
- @check.no_body
- @check.not_exist
- @check.updated_one_not_exist
- def _update(self, data, query=None, **kwargs):
- logging.debug("_update")
- data = self.table_cls.from_dict(data)
- update_req = self._update_requests(data)
- yield dbapi.db_update(self.table, query, update_req)
- update_req['_id'] = str(data._id)
- self.finish_request(update_req)
-
- def _update_requests(self, data):
- request = dict()
- for k, v in self.json_args.iteritems():
- request = self._update_request(request, k, v,
- data.__getattribute__(k))
- if not request:
- raises.Forbidden(message.no_update())
-
- edit_request = data.format()
- edit_request.update(request)
- return edit_request
-
- @staticmethod
- def _update_request(edit_request, key, new_value, old_value):
- """
- This function serves to prepare the elements in the update request.
- We try to avoid replace the exact values in the db
- edit_request should be a dict in which we add an entry (key) after
- comparing values
- """
- if not (new_value is None):
- if new_value != old_value:
- edit_request[key] = new_value
-
- return edit_request
-
- def _update_query(self, keys, data):
- query = dict()
- equal = True
- for key in keys:
- new = self.json_args.get(key)
- old = data.get(key)
- if new is None:
- new = old
- elif new != old:
- equal = False
- query[key] = new
- return query if not equal else dict()
-
-
-class VersionHandler(GenericApiHandler):
- @swagger.operation(nickname='listAllVersions')
- def get(self):
- """
- @description: list all supported versions
- @rtype: L{Versions}
- """
- versions = [{'version': 'api.cvp.0.7.0', 'description': 'basics'}]
- self.finish_request({'versions': versions})
diff --git a/cvp/opnfv_testapi/resources/models.py b/cvp/opnfv_testapi/resources/models.py
deleted file mode 100644
index e8fc532b..00000000
--- a/cvp/opnfv_testapi/resources/models.py
+++ /dev/null
@@ -1,115 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-# feng.xiaowei@zte.com.cn mv Pod to pod_models.py 5-18-2016
-# feng.xiaowei@zte.com.cn add MetaCreateResponse/MetaGetResponse 5-18-2016
-# feng.xiaowei@zte.com.cn mv TestProject to project_models.py 5-19-2016
-# feng.xiaowei@zte.com.cn delete meta class 5-19-2016
-# feng.xiaowei@zte.com.cn add CreateResponse 5-19-2016
-# feng.xiaowei@zte.com.cn mv TestCase to testcase_models.py 5-20-2016
-# feng.xiaowei@zte.com.cn mv TestResut to result_models.py 5-23-2016
-# feng.xiaowei@zte.com.cn add ModelBase 12-20-2016
-##############################################################################
-import ast
-import copy
-
-from opnfv_testapi.tornado_swagger import swagger
-
-
-class ModelBase(object):
-
- def format(self):
- return self._format(['_id'])
-
- def format_http(self):
- return self._format([])
-
- @classmethod
- def from_dict(cls, a_dict):
- if a_dict is None:
- return None
-
- attr_parser = cls.attr_parser()
- t = cls()
- for k, v in a_dict.iteritems():
- value = v
- if isinstance(v, dict) and k in attr_parser:
- value = attr_parser[k].from_dict(v)
- elif isinstance(v, list) and k in attr_parser:
- value = []
- for item in v:
- value.append(attr_parser[k].from_dict(item))
-
- t.__setattr__(k, value)
-
- return t
-
- @staticmethod
- def attr_parser():
- return {}
-
- def _format(self, excludes):
- new_obj = copy.deepcopy(self)
- dicts = new_obj.__dict__
- for k in dicts.keys():
- if k in excludes:
- del dicts[k]
- elif dicts[k]:
- dicts[k] = self._obj_format(dicts[k])
- return dicts
-
- def _obj_format(self, obj):
- if self._has_format(obj):
- obj = obj.format()
- elif isinstance(obj, unicode):
- try:
- obj = self._obj_format(ast.literal_eval(obj))
- except:
- try:
- obj = str(obj)
- except:
- obj = obj
- elif isinstance(obj, list):
- hs = list()
- for h in obj:
- hs.append(self._obj_format(h))
- obj = hs
- elif not isinstance(obj, (str, int, float, dict)):
- obj = str(obj)
- return obj
-
- @staticmethod
- def _has_format(obj):
- return not isinstance(obj, (str, unicode)) and hasattr(obj, 'format')
-
-
-@swagger.model()
-class CreateResponse(ModelBase):
- def __init__(self, href=''):
- self.href = href
-
-
-@swagger.model()
-class Versions(ModelBase):
- """
- @property versions:
- @ptype versions: C{list} of L{Version}
- """
-
- def __init__(self):
- self.versions = list()
-
- @staticmethod
- def attr_parser():
- return {'versions': Version}
-
-
-@swagger.model()
-class Version(ModelBase):
- def __init__(self, version=None, description=None):
- self.version = version
- self.description = description
diff --git a/cvp/opnfv_testapi/resources/pod_handlers.py b/cvp/opnfv_testapi/resources/pod_handlers.py
deleted file mode 100644
index 50298875..00000000
--- a/cvp/opnfv_testapi/resources/pod_handlers.py
+++ /dev/null
@@ -1,78 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import handlers
-from opnfv_testapi.resources import pod_models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-class GenericPodHandler(handlers.GenericApiHandler):
- def __init__(self, application, request, **kwargs):
- super(GenericPodHandler, self).__init__(application, request, **kwargs)
- self.table = 'pods'
- self.table_cls = pod_models.Pod
-
-
-class PodCLHandler(GenericPodHandler):
- @swagger.operation(nickname='listAllPods')
- def get(self):
- """
- @description: list all pods
- @return 200: list all pods, empty list is no pod exist
- @rtype: L{Pods}
- """
- self._list()
-
- @swagger.operation(nickname='createPod')
- def post(self):
- """
- @description: create a pod
- @param body: pod to be created
- @type body: L{PodCreateRequest}
- @in body: body
- @rtype: L{CreateResponse}
- @return 200: pod is created.
- @raise 403: pod already exists
- @raise 400: body or name not provided
- """
- def query():
- return {'name': self.json_args.get('name')}
- miss_fields = ['name']
- self._create(miss_fields=miss_fields, query=query)
-
-
-class PodGURHandler(GenericPodHandler):
- @swagger.operation(nickname='getPodByName')
- def get(self, pod_name):
- """
- @description: get a single pod by pod_name
- @rtype: L{Pod}
- @return 200: pod exist
- @raise 404: pod not exist
- """
- self._get_one(query={'name': pod_name})
-
- def delete(self, pod_name):
- """ Remove a POD
-
- # check for an existing pod to be deleted
- mongo_dict = yield self.db.pods.find_one(
- {'name': pod_name})
- pod = TestProject.pod(mongo_dict)
- if pod is None:
- raise HTTPError(HTTP_NOT_FOUND,
- "{} could not be found as a pod to be deleted"
- .format(pod_name))
-
- # just delete it, or maybe save it elsewhere in a future
- res = yield self.db.projects.remove(
- {'name': pod_name})
-
- self.finish_request(answer)
- """
- pass
diff --git a/cvp/opnfv_testapi/resources/pod_models.py b/cvp/opnfv_testapi/resources/pod_models.py
deleted file mode 100644
index 2c3ea978..00000000
--- a/cvp/opnfv_testapi/resources/pod_models.py
+++ /dev/null
@@ -1,52 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from opnfv_testapi.resources import models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-# name: name of the POD e.g. zte-1
-# mode: metal or virtual
-# details: any detail
-# role: ci-pod or community-pod or single-node
-
-
-@swagger.model()
-class PodCreateRequest(models.ModelBase):
- def __init__(self, name, mode='', details='', role=""):
- self.name = name
- self.mode = mode
- self.details = details
- self.role = role
-
-
-@swagger.model()
-class Pod(models.ModelBase):
- def __init__(self,
- name='', mode='', details='',
- role="", _id='', create_date=''):
- self.name = name
- self.mode = mode
- self.details = details
- self.role = role
- self._id = _id
- self.creation_date = create_date
-
-
-@swagger.model()
-class Pods(models.ModelBase):
- """
- @property pods:
- @ptype pods: C{list} of L{Pod}
- """
- def __init__(self):
- self.pods = list()
-
- @staticmethod
- def attr_parser():
- return {'pods': Pod}
diff --git a/cvp/opnfv_testapi/resources/project_handlers.py b/cvp/opnfv_testapi/resources/project_handlers.py
deleted file mode 100644
index be295070..00000000
--- a/cvp/opnfv_testapi/resources/project_handlers.py
+++ /dev/null
@@ -1,86 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-from opnfv_testapi.resources import handlers
-from opnfv_testapi.resources import project_models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-class GenericProjectHandler(handlers.GenericApiHandler):
- def __init__(self, application, request, **kwargs):
- super(GenericProjectHandler, self).__init__(application,
- request,
- **kwargs)
- self.table = 'projects'
- self.table_cls = project_models.Project
-
-
-class ProjectCLHandler(GenericProjectHandler):
- @swagger.operation(nickname="listAllProjects")
- def get(self):
- """
- @description: list all projects
- @return 200: return all projects, empty list is no project exist
- @rtype: L{Projects}
- """
- self._list()
-
- @swagger.operation(nickname="createProject")
- def post(self):
- """
- @description: create a project
- @param body: project to be created
- @type body: L{ProjectCreateRequest}
- @in body: body
- @rtype: L{CreateResponse}
- @return 200: project is created.
- @raise 403: project already exists
- @raise 400: body or name not provided
- """
- def query():
- return {'name': self.json_args.get('name')}
- miss_fields = ['name']
- self._create(miss_fields=miss_fields, query=query)
-
-
-class ProjectGURHandler(GenericProjectHandler):
- @swagger.operation(nickname='getProjectByName')
- def get(self, project_name):
- """
- @description: get a single project by project_name
- @rtype: L{Project}
- @return 200: project exist
- @raise 404: project not exist
- """
- self._get_one(query={'name': project_name})
-
- @swagger.operation(nickname="updateProjectByName")
- def put(self, project_name):
- """
- @description: update a single project by project_name
- @param body: project to be updated
- @type body: L{ProjectUpdateRequest}
- @in body: body
- @rtype: L{Project}
- @return 200: update success
- @raise 404: project not exist
- @raise 403: new project name already exist or nothing to update
- """
- query = {'name': project_name}
- db_keys = ['name']
- self._update(query=query, db_keys=db_keys)
-
- @swagger.operation(nickname='deleteProjectByName')
- def delete(self, project_name):
- """
- @description: delete a project by project_name
- @return 200: delete success
- @raise 404: project not exist
- """
- self._delete(query={'name': project_name})
diff --git a/cvp/opnfv_testapi/resources/project_models.py b/cvp/opnfv_testapi/resources/project_models.py
deleted file mode 100644
index 3243882b..00000000
--- a/cvp/opnfv_testapi/resources/project_models.py
+++ /dev/null
@@ -1,48 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from opnfv_testapi.resources import models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-@swagger.model()
-class ProjectCreateRequest(models.ModelBase):
- def __init__(self, name, description=''):
- self.name = name
- self.description = description
-
-
-@swagger.model()
-class ProjectUpdateRequest(models.ModelBase):
- def __init__(self, name='', description=''):
- self.name = name
- self.description = description
-
-
-@swagger.model()
-class Project(models.ModelBase):
- def __init__(self,
- name=None, _id=None, description=None, create_date=None):
- self._id = _id
- self.name = name
- self.description = description
- self.creation_date = create_date
-
-
-@swagger.model()
-class Projects(models.ModelBase):
- """
- @property projects:
- @ptype projects: C{list} of L{Project}
- """
- def __init__(self):
- self.projects = list()
-
- @staticmethod
- def attr_parser():
- return {'projects': Project}
diff --git a/cvp/opnfv_testapi/resources/result_handlers.py b/cvp/opnfv_testapi/resources/result_handlers.py
deleted file mode 100644
index 2e65ba44..00000000
--- a/cvp/opnfv_testapi/resources/result_handlers.py
+++ /dev/null
@@ -1,308 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import logging
-from datetime import datetime
-from datetime import timedelta
-import json
-import tarfile
-import io
-
-from tornado import gen
-from tornado import web
-from bson import objectid
-
-from opnfv_testapi.common.config import CONF
-from opnfv_testapi.common import message
-from opnfv_testapi.common import raises
-from opnfv_testapi.resources import handlers
-from opnfv_testapi.resources import result_models
-from opnfv_testapi.tornado_swagger import swagger
-from opnfv_testapi.ui.auth import constants as auth_const
-
-
-class GenericResultHandler(handlers.GenericApiHandler):
- def __init__(self, application, request, **kwargs):
- super(GenericResultHandler, self).__init__(application,
- request,
- **kwargs)
- self.table = self.db_results
- self.table_cls = result_models.TestResult
-
- def get_int(self, key, value):
- try:
- value = int(value)
- except:
- raises.BadRequest(message.must_int(key))
- return value
-
- def set_query(self):
- query = dict()
- date_range = dict()
-
- query['public'] = {'$not': {'$eq': 'false'}}
- for k in self.request.query_arguments.keys():
- v = self.get_query_argument(k)
- if k == 'project' or k == 'pod' or k == 'case':
- query[k + '_name'] = v
- elif k == 'period':
- v = self.get_int(k, v)
- if v > 0:
- period = datetime.now() - timedelta(days=v)
- obj = {"$gte": str(period)}
- query['start_date'] = obj
- elif k == 'trust_indicator':
- query[k + '.current'] = float(v)
- elif k == 'from':
- date_range.update({'$gte': str(v)})
- elif k == 'to':
- date_range.update({'$lt': str(v)})
- elif k == 'signed':
- openid = self.get_secure_cookie(auth_const.OPENID)
- role = self.get_secure_cookie(auth_const.ROLE)
- logging.info('role:%s', role)
- if role:
- del query['public']
- query['user'] = openid
- if role.find("reviewer") != -1:
- del query['user']
- query['review'] = 'true'
- elif k not in ['last', 'page', 'descend']:
- query[k] = v
- if date_range:
- query['start_date'] = date_range
-
- # if $lt is not provided,
- # empty/None/null/'' start_date will also be returned
- if 'start_date' in query and '$lt' not in query['start_date']:
- query['start_date'].update({'$lt': str(datetime.now())})
-
- return query
-
-
-class ResultsCLHandler(GenericResultHandler):
- @swagger.operation(nickname="queryTestResults")
- def get(self):
- """
- @description: Retrieve result(s) for a test project
- on a specific pod.
- @notes: Retrieve result(s) for a test project on a specific pod.
- Available filters for this request are :
- - project : project name
- - case : case name
- - pod : pod name
- - version : platform version (Arno-R1, ...)
- - installer : fuel/apex/compass/joid/daisy
- - build_tag : Jenkins build tag name
- - period : x last days, incompatible with from/to
- - from : starting time in 2016-01-01 or 2016-01-01 00:01:23
- - to : ending time in 2016-01-01 or 2016-01-01 00:01:23
- - scenario : the test scenario (previously version)
- - criteria : the global criteria status passed or failed
- - trust_indicator : evaluate the stability of the test case
- to avoid running systematically long and stable test case
- - signed : get logined user result
-
- GET /results/project=functest&case=vPing&version=Arno-R1 \
- &pod=pod_name&period=15&signed
- @return 200: all test results consist with query,
- empty list if no result is found
- @rtype: L{TestResults}
- @param pod: pod name
- @type pod: L{string}
- @in pod: query
- @required pod: False
- @param project: project name
- @type project: L{string}
- @in project: query
- @required project: False
- @param case: case name
- @type case: L{string}
- @in case: query
- @required case: False
- @param version: i.e. Colorado
- @type version: L{string}
- @in version: query
- @required version: False
- @param installer: fuel/apex/joid/compass
- @type installer: L{string}
- @in installer: query
- @required installer: False
- @param build_tag: i.e. v3.0
- @type build_tag: L{string}
- @in build_tag: query
- @required build_tag: False
- @param scenario: i.e. odl
- @type scenario: L{string}
- @in scenario: query
- @required scenario: False
- @param criteria: i.e. passed
- @type criteria: L{string}
- @in criteria: query
- @required criteria: False
- @param period: last days
- @type period: L{string}
- @in period: query
- @required period: False
- @param from: i.e. 2016-01-01 or 2016-01-01 00:01:23
- @type from: L{string}
- @in from: query
- @required from: False
- @param to: i.e. 2016-01-01 or 2016-01-01 00:01:23
- @type to: L{string}
- @in to: query
- @required to: False
- @param last: last records stored until now
- @type last: L{string}
- @in last: query
- @required last: False
- @param page: which page to list
- @type page: L{int}
- @in page: query
- @required page: False
- @param trust_indicator: must be float
- @type trust_indicator: L{float}
- @in trust_indicator: query
- @required trust_indicator: False
- @param signed: user results or all results
- @type signed: L{string}
- @in signed: query
- @required signed: False
- @param descend: true, newest2oldest; false, oldest2newest
- @type descend: L{string}
- @in descend: query
- @required descend: False
- """
- def descend_limit():
- descend = self.get_query_argument('descend', 'true')
- return -1 if descend.lower() == 'true' else 1
-
- def last_limit():
- return self.get_int('last', self.get_query_argument('last', 0))
-
- def page_limit():
- return self.get_int('page', self.get_query_argument('page', 0))
-
- limitations = {
- 'sort': {'_id': descend_limit()},
- 'last': last_limit(),
- 'page': page_limit(),
- 'per_page': CONF.api_results_per_page
- }
-
- self._list(query=self.set_query(), **limitations)
-
- @swagger.operation(nickname="createTestResult")
- def post(self):
- """
- @description: create a test result
- @param body: result to be created
- @type body: L{ResultCreateRequest}
- @in body: body
- @rtype: L{CreateResponse}
- @return 200: result is created.
- @raise 404: pod/project/testcase not exist
- @raise 400: body/pod_name/project_name/case_name not provided
- """
- self._post()
-
- def _post(self):
- def pod_query():
- return {'name': self.json_args.get('pod_name')}
-
- def project_query():
- return {'name': self.json_args.get('project_name')}
-
- def testcase_query():
- return {'project_name': self.json_args.get('project_name'),
- 'name': self.json_args.get('case_name')}
-
- miss_fields = ['pod_name', 'project_name', 'case_name']
- carriers = [('pods', pod_query),
- ('projects', project_query),
- ('testcases', testcase_query)]
-
- self._create(miss_fields=miss_fields, carriers=carriers)
-
-
-class ResultsUploadHandler(ResultsCLHandler):
- @swagger.operation(nickname="uploadTestResult")
- @web.asynchronous
- @gen.coroutine
- def post(self):
- """
- @description: upload and create a test result
- @param body: result to be created
- @type body: L{ResultCreateRequest}
- @in body: body
- @rtype: L{CreateResponse}
- @return 200: result is created.
- @raise 404: pod/project/testcase not exist
- @raise 400: body/pod_name/project_name/case_name not provided
- """
- fileinfo = self.request.files['file'][0]
- tar_in = tarfile.open(fileobj=io.BytesIO(fileinfo['body']),
- mode="r:gz")
- try:
- results = tar_in.extractfile('results/results.json').read()
- except KeyError:
- msg = 'Uploaded results must contain at least one passing test.'
- self.finish_request({'code': 403, 'msg': msg})
- return
- results = results.split('\n')
- result_ids = []
- for result in results:
- if result == '':
- continue
- self.json_args = json.loads(result).copy()
- build_tag = self.json_args['build_tag']
- _id = yield self._inner_create()
- result_ids.append(str(_id))
- test_id = build_tag[13:49]
- log_path = '/home/testapi/logs/%s' % (test_id)
- tar_in.extractall(log_path)
- log_filename = "/home/testapi/logs/log_%s.tar.gz" % (test_id)
- with open(log_filename, "wb") as tar_out:
- tar_out.write(fileinfo['body'])
- resp = {'id': test_id, 'results': result_ids}
- self.finish_request(resp)
-
-
-class ResultsGURHandler(GenericResultHandler):
- @swagger.operation(nickname='DeleteTestResultById')
- def delete(self, result_id):
- query = {'_id': objectid.ObjectId(result_id)}
- self._delete(query=query)
-
- @swagger.operation(nickname='getTestResultById')
- def get(self, result_id):
- """
- @description: get a single result by result_id
- @rtype: L{TestResult}
- @return 200: test result exist
- @raise 404: test result not exist
- """
- query = dict()
- query["_id"] = objectid.ObjectId(result_id)
- self._get_one(query=query)
-
- @swagger.operation(nickname="updateTestResultById")
- def put(self, result_id):
- """
- @description: update a single result by _id
- @param body: fields to be updated
- @type body: L{ResultUpdateRequest}
- @in body: body
- @rtype: L{Result}
- @return 200: update success
- @raise 404: result not exist
- @raise 403: nothing to update
- """
- query = {'_id': objectid.ObjectId(result_id)}
- db_keys = []
- self._update(query=query, db_keys=db_keys)
diff --git a/cvp/opnfv_testapi/resources/result_models.py b/cvp/opnfv_testapi/resources/result_models.py
deleted file mode 100644
index 698f4981..00000000
--- a/cvp/opnfv_testapi/resources/result_models.py
+++ /dev/null
@@ -1,133 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from opnfv_testapi.resources import models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-@swagger.model()
-class TIHistory(models.ModelBase):
- """
- @ptype step: L{float}
- """
- def __init__(self, date=None, step=0):
- self.date = date
- self.step = step
-
-
-@swagger.model()
-class TI(models.ModelBase):
- """
- @property histories: trust_indicator update histories
- @ptype histories: C{list} of L{TIHistory}
- @ptype current: L{float}
- """
- def __init__(self, current=0):
- self.current = current
- self.histories = list()
-
- @staticmethod
- def attr_parser():
- return {'histories': TIHistory}
-
-
-@swagger.model()
-class ResultCreateRequest(models.ModelBase):
- """
- @property trust_indicator:
- @ptype trust_indicator: L{TI}
- """
- def __init__(self,
- pod_name=None,
- project_name=None,
- case_name=None,
- installer=None,
- version=None,
- start_date=None,
- stop_date=None,
- details=None,
- build_tag=None,
- scenario=None,
- criteria=None,
- user=None,
- public="true",
- review="false",
- trust_indicator=None):
- self.pod_name = pod_name
- self.project_name = project_name
- self.case_name = case_name
- self.installer = installer
- self.version = version
- self.start_date = start_date
- self.stop_date = stop_date
- self.details = details
- self.build_tag = build_tag
- self.scenario = scenario
- self.criteria = criteria
- self.user = user
- self.public = public
- self.review = review
- self.trust_indicator = trust_indicator if trust_indicator else TI(0)
-
-
-@swagger.model()
-class ResultUpdateRequest(models.ModelBase):
- """
- @property trust_indicator:
- @ptype trust_indicator: L{TI}
- """
- def __init__(self, trust_indicator=None):
- self.trust_indicator = trust_indicator
-
-
-@swagger.model()
-class TestResult(models.ModelBase):
- """
- @property trust_indicator: used for long duration test case
- @ptype trust_indicator: L{TI}
- """
- def __init__(self, _id=None, case_name=None, project_name=None,
- pod_name=None, installer=None, version=None,
- start_date=None, stop_date=None, details=None,
- build_tag=None, scenario=None, criteria=None,
- user=None, public="true", review="false",
- trust_indicator=None):
- self._id = _id
- self.case_name = case_name
- self.project_name = project_name
- self.pod_name = pod_name
- self.installer = installer
- self.version = version
- self.start_date = start_date
- self.stop_date = stop_date
- self.details = details
- self.build_tag = build_tag
- self.scenario = scenario
- self.criteria = criteria
- self.user = user
- self.public = public
- self.review = review
- self.trust_indicator = trust_indicator
-
- @staticmethod
- def attr_parser():
- return {'trust_indicator': TI}
-
-
-@swagger.model()
-class TestResults(models.ModelBase):
- """
- @property results:
- @ptype results: C{list} of L{TestResult}
- """
- def __init__(self):
- self.results = list()
-
- @staticmethod
- def attr_parser():
- return {'results': TestResult}
diff --git a/cvp/opnfv_testapi/resources/scenario_handlers.py b/cvp/opnfv_testapi/resources/scenario_handlers.py
deleted file mode 100644
index 5d420a56..00000000
--- a/cvp/opnfv_testapi/resources/scenario_handlers.py
+++ /dev/null
@@ -1,282 +0,0 @@
-import functools
-
-from opnfv_testapi.common import message
-from opnfv_testapi.common import raises
-from opnfv_testapi.resources import handlers
-import opnfv_testapi.resources.scenario_models as models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-class GenericScenarioHandler(handlers.GenericApiHandler):
- def __init__(self, application, request, **kwargs):
- super(GenericScenarioHandler, self).__init__(application,
- request,
- **kwargs)
- self.table = self.db_scenarios
- self.table_cls = models.Scenario
-
-
-class ScenariosCLHandler(GenericScenarioHandler):
- @swagger.operation(nickname="queryScenarios")
- def get(self):
- """
- @description: Retrieve scenario(s).
- @notes: Retrieve scenario(s)
- Available filters for this request are :
- - name : scenario name
-
- GET /scenarios?name=scenario_1
- @param name: scenario name
- @type name: L{string}
- @in name: query
- @required name: False
- @param installer: installer type
- @type installer: L{string}
- @in installer: query
- @required installer: False
- @param version: version
- @type version: L{string}
- @in version: query
- @required version: False
- @param project: project name
- @type project: L{string}
- @in project: query
- @required project: False
- @return 200: all scenarios satisfy queries,
- empty list if no scenario is found
- @rtype: L{Scenarios}
- """
-
- def _set_query():
- query = dict()
- elem_query = dict()
- for k in self.request.query_arguments.keys():
- v = self.get_query_argument(k)
- if k == 'installer':
- elem_query["installer"] = v
- elif k == 'version':
- elem_query["versions.version"] = v
- elif k == 'project':
- elem_query["versions.projects.project"] = v
- else:
- query[k] = v
- if elem_query:
- query['installers'] = {'$elemMatch': elem_query}
- return query
-
- self._list(query=_set_query())
-
- @swagger.operation(nickname="createScenario")
- def post(self):
- """
- @description: create a new scenario by name
- @param body: scenario to be created
- @type body: L{ScenarioCreateRequest}
- @in body: body
- @rtype: L{CreateResponse}
- @return 200: scenario is created.
- @raise 403: scenario already exists
- @raise 400: body or name not provided
- """
- def query():
- return {'name': self.json_args.get('name')}
- miss_fields = ['name']
- self._create(miss_fields=miss_fields, query=query)
-
-
-class ScenarioGURHandler(GenericScenarioHandler):
- @swagger.operation(nickname='getScenarioByName')
- def get(self, name):
- """
- @description: get a single scenario by name
- @rtype: L{Scenario}
- @return 200: scenario exist
- @raise 404: scenario not exist
- """
- self._get_one(query={'name': name})
- pass
-
- @swagger.operation(nickname="updateScenarioByName")
- def put(self, name):
- """
- @description: update a single scenario by name
- @param body: fields to be updated
- @type body: L{ScenarioUpdateRequest}
- @in body: body
- @rtype: L{Scenario}
- @return 200: update success
- @raise 404: scenario not exist
- @raise 403: nothing to update
- """
- query = {'name': name}
- db_keys = ['name']
- self._update(query=query, db_keys=db_keys)
-
- @swagger.operation(nickname="deleteScenarioByName")
- def delete(self, name):
- """
- @description: delete a scenario by name
- @return 200: delete success
- @raise 404: scenario not exist:
- """
-
- self._delete(query={'name': name})
-
- def _update_query(self, keys, data):
- query = dict()
- if self._is_rename():
- new = self._term.get('name')
- if data.get('name') != new:
- query['name'] = new
-
- return query
-
- def _update_requests(self, data):
- updates = {
- ('name', 'update'): self._update_requests_rename,
- ('installer', 'add'): self._update_requests_add_installer,
- ('installer', 'delete'): self._update_requests_delete_installer,
- ('version', 'add'): self._update_requests_add_version,
- ('version', 'delete'): self._update_requests_delete_version,
- ('owner', 'update'): self._update_requests_change_owner,
- ('project', 'add'): self._update_requests_add_project,
- ('project', 'delete'): self._update_requests_delete_project,
- ('customs', 'add'): self._update_requests_add_customs,
- ('customs', 'delete'): self._update_requests_delete_customs,
- ('score', 'add'): self._update_requests_add_score,
- ('trust_indicator', 'add'): self._update_requests_add_ti,
- }
-
- updates[(self._field, self._op)](data)
-
- return data.format()
-
- def _iter_installers(xstep):
- @functools.wraps(xstep)
- def magic(self, data):
- [xstep(self, installer)
- for installer in self._filter_installers(data.installers)]
- return magic
-
- def _iter_versions(xstep):
- @functools.wraps(xstep)
- def magic(self, installer):
- [xstep(self, version)
- for version in (self._filter_versions(installer.versions))]
- return magic
-
- def _iter_projects(xstep):
- @functools.wraps(xstep)
- def magic(self, version):
- [xstep(self, project)
- for project in (self._filter_projects(version.projects))]
- return magic
-
- def _update_requests_rename(self, data):
- data.name = self._term.get('name')
- if not data.name:
- raises.BadRequest(message.missing('name'))
-
- def _update_requests_add_installer(self, data):
- data.installers.append(models.ScenarioInstaller.from_dict(self._term))
-
- def _update_requests_delete_installer(self, data):
- data.installers = self._remove_installers(data.installers)
-
- @_iter_installers
- def _update_requests_add_version(self, installer):
- installer.versions.append(models.ScenarioVersion.from_dict(self._term))
-
- @_iter_installers
- def _update_requests_delete_version(self, installer):
- installer.versions = self._remove_versions(installer.versions)
-
- @_iter_installers
- @_iter_versions
- def _update_requests_change_owner(self, version):
- version.owner = self._term.get('owner')
-
- @_iter_installers
- @_iter_versions
- def _update_requests_add_project(self, version):
- version.projects.append(models.ScenarioProject.from_dict(self._term))
-
- @_iter_installers
- @_iter_versions
- def _update_requests_delete_project(self, version):
- version.projects = self._remove_projects(version.projects)
-
- @_iter_installers
- @_iter_versions
- @_iter_projects
- def _update_requests_add_customs(self, project):
- project.customs = list(set(project.customs + self._term))
-
- @_iter_installers
- @_iter_versions
- @_iter_projects
- def _update_requests_delete_customs(self, project):
- project.customs = filter(
- lambda f: f not in self._term,
- project.customs)
-
- @_iter_installers
- @_iter_versions
- @_iter_projects
- def _update_requests_add_score(self, project):
- project.scores.append(
- models.ScenarioScore.from_dict(self._term))
-
- @_iter_installers
- @_iter_versions
- @_iter_projects
- def _update_requests_add_ti(self, project):
- project.trust_indicators.append(
- models.ScenarioTI.from_dict(self._term))
-
- def _is_rename(self):
- return self._field == 'name' and self._op == 'update'
-
- def _remove_installers(self, installers):
- return self._remove('installer', installers)
-
- def _filter_installers(self, installers):
- return self._filter('installer', installers)
-
- def _remove_versions(self, versions):
- return self._remove('version', versions)
-
- def _filter_versions(self, versions):
- return self._filter('version', versions)
-
- def _remove_projects(self, projects):
- return self._remove('project', projects)
-
- def _filter_projects(self, projects):
- return self._filter('project', projects)
-
- def _remove(self, field, fields):
- return filter(
- lambda f: getattr(f, field) != self._locate.get(field),
- fields)
-
- def _filter(self, field, fields):
- return filter(
- lambda f: getattr(f, field) == self._locate.get(field),
- fields)
-
- @property
- def _field(self):
- return self.json_args.get('field')
-
- @property
- def _op(self):
- return self.json_args.get('op')
-
- @property
- def _locate(self):
- return self.json_args.get('locate')
-
- @property
- def _term(self):
- return self.json_args.get('term')
diff --git a/cvp/opnfv_testapi/resources/scenario_models.py b/cvp/opnfv_testapi/resources/scenario_models.py
deleted file mode 100644
index 467cff24..00000000
--- a/cvp/opnfv_testapi/resources/scenario_models.py
+++ /dev/null
@@ -1,204 +0,0 @@
-from opnfv_testapi.resources import models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-def list_default(value):
- return value if value else list()
-
-
-def dict_default(value):
- return value if value else dict()
-
-
-@swagger.model()
-class ScenarioTI(models.ModelBase):
- def __init__(self, date=None, status='silver'):
- self.date = date
- self.status = status
-
-
-@swagger.model()
-class ScenarioScore(models.ModelBase):
- def __init__(self, date=None, score='0'):
- self.date = date
- self.score = score
-
-
-@swagger.model()
-class ScenarioProject(models.ModelBase):
- """
- @property customs:
- @ptype customs: C{list} of L{string}
- @property scores:
- @ptype scores: C{list} of L{ScenarioScore}
- @property trust_indicators:
- @ptype trust_indicators: C{list} of L{ScenarioTI}
- """
- def __init__(self,
- project='',
- customs=None,
- scores=None,
- trust_indicators=None):
- self.project = project
- self.customs = list_default(customs)
- self.scores = list_default(scores)
- self.trust_indicators = list_default(trust_indicators)
-
- @staticmethod
- def attr_parser():
- return {'scores': ScenarioScore,
- 'trust_indicators': ScenarioTI}
-
- def __eq__(self, other):
- return [self.project == other.project and
- self._customs_eq(other) and
- self._scores_eq(other) and
- self._ti_eq(other)]
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def _customs_eq(self, other):
- return set(self.customs) == set(other.customs)
-
- def _scores_eq(self, other):
- return set(self.scores) == set(other.scores)
-
- def _ti_eq(self, other):
- return set(self.trust_indicators) == set(other.trust_indicators)
-
-
-@swagger.model()
-class ScenarioVersion(models.ModelBase):
- """
- @property projects:
- @ptype projects: C{list} of L{ScenarioProject}
- """
- def __init__(self, version=None, projects=None):
- self.version = version
- self.projects = list_default(projects)
-
- @staticmethod
- def attr_parser():
- return {'projects': ScenarioProject}
-
- def __eq__(self, other):
- return [self.version == other.version and self._projects_eq(other)]
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def _projects_eq(self, other):
- for s_project in self.projects:
- for o_project in other.projects:
- if s_project.project == o_project.project:
- if s_project != o_project:
- return False
-
- return True
-
-
-@swagger.model()
-class ScenarioInstaller(models.ModelBase):
- """
- @property versions:
- @ptype versions: C{list} of L{ScenarioVersion}
- """
- def __init__(self, installer=None, versions=None):
- self.installer = installer
- self.versions = list_default(versions)
-
- @staticmethod
- def attr_parser():
- return {'versions': ScenarioVersion}
-
- def __eq__(self, other):
- return [self.installer == other.installer and self._versions_eq(other)]
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def _versions_eq(self, other):
- for s_version in self.versions:
- for o_version in other.versions:
- if s_version.version == o_version.version:
- if s_version != o_version:
- return False
-
- return True
-
-
-@swagger.model()
-class ScenarioCreateRequest(models.ModelBase):
- """
- @property installers:
- @ptype installers: C{list} of L{ScenarioInstaller}
- """
- def __init__(self, name='', installers=None):
- self.name = name
- self.installers = list_default(installers)
-
- @staticmethod
- def attr_parser():
- return {'installers': ScenarioInstaller}
-
-
-@swagger.model()
-class ScenarioUpdateRequest(models.ModelBase):
- """
- @property field: update field
- @property op: add/delete/update
- @property locate: information used to locate the field
- @property term: new value
- """
- def __init__(self, field=None, op=None, locate=None, term=None):
- self.field = field
- self.op = op
- self.locate = dict_default(locate)
- self.term = dict_default(term)
-
-
-@swagger.model()
-class Scenario(models.ModelBase):
- """
- @property installers:
- @ptype installers: C{list} of L{ScenarioInstaller}
- """
- def __init__(self, name='', create_date='', _id='', installers=None):
- self.name = name
- self._id = _id
- self.creation_date = create_date
- self.installers = list_default(installers)
-
- @staticmethod
- def attr_parser():
- return {'installers': ScenarioInstaller}
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def __eq__(self, other):
- return [self.name == other.name and self._installers_eq(other)]
-
- def _installers_eq(self, other):
- for s_install in self.installers:
- for o_install in other.installers:
- if s_install.installer == o_install.installer:
- if s_install != o_install:
- return False
-
- return True
-
-
-@swagger.model()
-class Scenarios(models.ModelBase):
- """
- @property scenarios:
- @ptype scenarios: C{list} of L{Scenario}
- """
- def __init__(self):
- self.scenarios = list()
-
- @staticmethod
- def attr_parser():
- return {'scenarios': Scenario}
diff --git a/cvp/opnfv_testapi/resources/sut_handlers.py b/cvp/opnfv_testapi/resources/sut_handlers.py
deleted file mode 100644
index 16c50b83..00000000
--- a/cvp/opnfv_testapi/resources/sut_handlers.py
+++ /dev/null
@@ -1,112 +0,0 @@
-##############################################################################
-# Copyright (c) 2017
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import logging
-import json
-import os
-
-from opnfv_testapi.resources import handlers
-from opnfv_testapi.resources import sut_models
-from opnfv_testapi.tornado_swagger import swagger
-
-LOG = logging.getLogger(__name__)
-LOG.setLevel(logging.DEBUG)
-
-RESULT_PATH = '/home/testapi/logs/{}/results'
-
-
-class GenericSutHandler(handlers.GenericApiHandler):
- def __init__(self, application, request, **kwargs):
- super(GenericSutHandler, self).__init__(application,
- request,
- **kwargs)
- self.table = "suts"
- self.table_cls = sut_models.Sut
-
-
-class HardwareHandler(GenericSutHandler):
- @swagger.operation(nickname="getHardwareById")
- def get(self, id):
- endpoint_info = self._read_endpoint_info(id)
- LOG.debug('Endpoint info: %s', endpoint_info)
-
- all_info = self._read_sut_info(id)
- LOG.debug('All SUT info: %s', all_info)
-
- hardware_info = {k: self._get_single_host_info(v)
- for k, v in all_info.items()}
- LOG.debug('SUT info: %s', hardware_info)
-
- data = {
- 'endpoint_info': endpoint_info,
- 'hardware_info': hardware_info
- }
-
- self.write(data)
-
- def _read_endpoint_info(self, id):
- path = os.path.join(RESULT_PATH.format(id), 'endpoint_info.json')
- try:
- with open(path) as f:
- endpoint_info = json.load(f)
- except Exception:
- endpoint_info = []
-
- return endpoint_info
-
- def _read_sut_info(self, id):
- path = os.path.join(RESULT_PATH.format(id), 'all_hosts_info.json')
- try:
- with open(path) as f:
- all_info = json.load(f)
- except Exception:
- all_info = {}
- return all_info
-
- def _get_single_host_info(self, single_info):
- info = []
- facts = single_info.get('ansible_facts', {})
-
- info.append(['hostname', facts.get('ansible_hostname')])
-
- info.append(['product_name', facts.get('ansible_product_name')])
- info.append(['product_version', facts.get('ansible_product_version')])
-
- processors = facts.get('ansible_processor', [])
- try:
- processor_type = '{} {}'.format(processors[0], processors[1])
- except IndexError:
- LOG.exception('No Processor in SUT data')
- processor_type = None
- info.append(['processor_type', processor_type])
- info.append(['architecture', facts.get('ansible_architecture')])
- info.append(['processor_cores', facts.get('ansible_processor_cores')])
- info.append(['processor_vcpus', facts.get('ansible_processor_vcpus')])
-
- memory = facts.get('ansible_memtotal_mb')
- memory = round(memory * 1.0 / 1024, 2) if memory else None
- info.append(['memory', '{} GB'.format(memory)])
-
- devices = facts.get('ansible_devices', {})
- info.extend([self._get_device_info(k, v) for k, v in devices.items()])
-
- lsb_description = facts.get('ansible_lsb', {}).get('description')
- info.append(['OS', lsb_description])
-
- interfaces = facts.get('ansible_interfaces')
- info.append(['interfaces', interfaces])
- info.extend([self._get_interface_info(facts, i) for i in interfaces])
- info = [i for i in info if i]
-
- return info
-
- def _get_interface_info(self, facts, name):
- mac = facts.get('ansible_{}'.format(name), {}).get('macaddress')
- return [name, mac] if mac else []
-
- def _get_device_info(self, name, info):
- return ['disk_{}'.format(name), info.get('size')]
diff --git a/cvp/opnfv_testapi/resources/sut_models.py b/cvp/opnfv_testapi/resources/sut_models.py
deleted file mode 100644
index b4a869b6..00000000
--- a/cvp/opnfv_testapi/resources/sut_models.py
+++ /dev/null
@@ -1,31 +0,0 @@
-##############################################################################
-# Copyright (c) 2017
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from opnfv_testapi.resources import models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-@swagger.model()
-class Sut(models.ModelBase):
- """
- """
- def __init__(self):
- pass
-
-
-@swagger.model()
-class Suts(models.ModelBase):
- """
- @property suts:
- @ptype tests: C{list} of L{Sut}
- """
- def __init__(self):
- self.suts = list()
-
- @staticmethod
- def attr_parser():
- return {'suts': Sut}
diff --git a/cvp/opnfv_testapi/resources/test_handlers.py b/cvp/opnfv_testapi/resources/test_handlers.py
deleted file mode 100644
index 82cf9ae6..00000000
--- a/cvp/opnfv_testapi/resources/test_handlers.py
+++ /dev/null
@@ -1,307 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import logging
-import os
-import json
-
-from tornado import web
-from tornado import gen
-from bson import objectid
-
-from opnfv_testapi.common.config import CONF
-from opnfv_testapi.common import message
-from opnfv_testapi.common import raises
-from opnfv_testapi.resources import handlers
-from opnfv_testapi.resources import test_models
-from opnfv_testapi.tornado_swagger import swagger
-from opnfv_testapi.ui.auth import constants as auth_const
-from opnfv_testapi.db import api as dbapi
-
-DOVETAIL_LOG_PATH = '/home/testapi/logs/{}/results/dovetail.log'
-
-
-class GenericTestHandler(handlers.GenericApiHandler):
- def __init__(self, application, request, **kwargs):
- super(GenericTestHandler, self).__init__(application,
- request,
- **kwargs)
- self.table = "tests"
- self.table_cls = test_models.Test
-
-
-class TestsCLHandler(GenericTestHandler):
- @swagger.operation(nickname="queryTests")
- @web.asynchronous
- @gen.coroutine
- def get(self):
- """
- @description: Retrieve result(s) for a test project
- on a specific pod.
- @notes: Retrieve result(s) for a test project on a specific pod.
- Available filters for this request are :
- - id : Test id
- - period : x last days, incompatible with from/to
- - from : starting time in 2016-01-01 or 2016-01-01 00:01:23
- - to : ending time in 2016-01-01 or 2016-01-01 00:01:23
- - signed : get logined user result
-
- GET /results/project=functest&case=vPing&version=Arno-R1 \
- &pod=pod_name&period=15&signed
- @return 200: all test results consist with query,
- empty list if no result is found
- @rtype: L{Tests}
- """
- def descend_limit():
- descend = self.get_query_argument('descend', 'true')
- return -1 if descend.lower() == 'true' else 1
-
- def last_limit():
- return self.get_int('last', self.get_query_argument('last', 0))
-
- def page_limit():
- return self.get_int('page', self.get_query_argument('page', 0))
-
- limitations = {
- 'sort': {'_id': descend_limit()},
- 'last': last_limit(),
- 'page': page_limit(),
- 'per_page': CONF.api_results_per_page
- }
-
- query = yield self.set_query()
- yield self._list(query=query, **limitations)
- logging.debug('list end')
-
- @swagger.operation(nickname="createTest")
- @web.asynchronous
- def post(self):
- """
- @description: create a test
- @param body: test to be created
- @type body: L{TestCreateRequest}
- @in body: body
- @rtype: L{CreateResponse}
- @return 200: test is created.
- @raise 404: pod/project/testcase not exist
- @raise 400: body/pod_name/project_name/case_name not provided
- """
- openid = self.get_secure_cookie(auth_const.OPENID)
- if openid:
- self.json_args['owner'] = openid
-
- self._post()
-
- @gen.coroutine
- def _post(self):
- miss_fields = []
- carriers = []
- query = {'owner': self.json_args['owner'], 'id': self.json_args['id']}
- ret, msg = yield self._check_if_exists(table="tests", query=query)
- if ret:
- self.finish_request({'code': '403', 'msg': msg})
- return
-
- self._create(miss_fields=miss_fields, carriers=carriers)
-
-
-class TestsGURHandler(GenericTestHandler):
-
- @swagger.operation(nickname="getTestById")
- @web.asynchronous
- @gen.coroutine
- def get(self, test_id):
- query = dict()
- query["_id"] = objectid.ObjectId(test_id)
-
- data = yield dbapi.db_find_one(self.table, query)
- if not data:
- raises.NotFound(message.not_found(self.table, query))
-
- validation = yield self._check_api_response_validation(data['id'])
-
- data.update({'validation': validation})
-
- self.finish_request(self.format_data(data))
-
- @gen.coroutine
- def _check_api_response_validation(self, test_id):
- log_path = DOVETAIL_LOG_PATH.format(test_id)
- if not os.path.exists(log_path):
- raises.Forbidden('dovetail.log not found, please check')
-
- with open(log_path) as f:
- log_content = f.read()
-
- warning_keyword = 'Strict API response validation DISABLED'
- if warning_keyword in log_content:
- raise gen.Return('API response validation disabled')
- else:
- raise gen.Return('API response validation enabled')
-
- @swagger.operation(nickname="deleteTestById")
- def delete(self, test_id):
- query = {'_id': objectid.ObjectId(test_id)}
- self._delete(query=query)
-
- @swagger.operation(nickname="updateTestById")
- @web.asynchronous
- def put(self, _id):
- """
- @description: update a single test by id
- @param body: fields to be updated
- @type body: L{TestUpdateRequest}
- @in body: body
- @rtype: L{Test}
- @return 200: update success
- @raise 404: Test not exist
- @raise 403: nothing to update
- """
- logging.debug('put')
- data = json.loads(self.request.body)
- item = data.get('item')
- value = data.get(item)
- logging.debug('%s:%s', item, value)
- try:
- self.update(_id, item, value)
- except Exception as e:
- logging.error('except:%s', e)
- return
-
- @gen.coroutine
- def _convert_to_id(self, email):
- query = {"email": email}
- table = "users"
- if query and table:
- data = yield dbapi.db_find_one(table, query)
- if data:
- raise gen.Return((True, 'Data alreay exists. %s' % (query),
- data.get("openid")))
- raise gen.Return((False, 'Data does not exist. %s' % (query), None))
-
- @gen.coroutine
- def update(self, _id, item, value):
- logging.debug("update")
- if item == "shared":
- new_list = []
- for user in value:
- ret, msg, user_id = yield self._convert_to_id(user)
- if ret:
- user = user_id
- new_list.append(user)
- query = {"$or": [{"openid": user}, {"email": user}]}
- table = "users"
- ret, msg = yield self._check_if_exists(table=table,
- query=query)
- logging.debug('ret:%s', ret)
- if not ret:
- self.finish_request({'code': '403', 'msg': msg})
- return
-
- if len(new_list) != len(set(new_list)):
- msg = "Already shared with this user"
- self.finish_request({'code': '403', 'msg': msg})
- return
-
- logging.debug("before _update")
- self.json_args = {}
- self.json_args[item] = value
- ret, msg = yield self.check_auth(item, value)
- if not ret:
- self.finish_request({'code': '404', 'msg': msg})
- return
-
- query = {'_id': objectid.ObjectId(_id)}
- db_keys = ['_id', ]
-
- test = yield dbapi.db_find_one("tests", query)
- if not test:
- msg = 'Record does not exist'
- self.finish_request({'code': 404, 'msg': msg})
- return
-
- curr_user = self.get_secure_cookie(auth_const.OPENID)
- if item in {"shared", "label", "sut_label"}:
- query['owner'] = curr_user
- db_keys.append('owner')
-
- if item == 'sut_label':
- if test['status'] != 'private' and not value:
- msg = 'SUT version cannot be changed to None after submitting.'
- self.finish_request({'code': 403, 'msg': msg})
- return
-
- if item == "status":
- if value in {'approved', 'not approved'}:
- if test['status'] == 'private':
- msg = 'Not allowed to approve/not approve'
- self.finish_request({'code': 403, 'msg': msg})
- return
-
- user = yield dbapi.db_find_one("users", {'openid': curr_user})
- if 'administrator' not in user['role']:
- msg = 'No permission to operate'
- self.finish_request({'code': 403, 'msg': msg})
- return
- elif value == 'review':
- if test['status'] != 'private':
- msg = 'Not allowed to submit to review'
- self.finish_request({'code': 403, 'msg': msg})
- return
-
- if not test['sut_label']:
- msg = 'Please fill out SUT version before submission'
- self.finish_request({'code': 403, 'msg': msg})
- return
-
- query['owner'] = curr_user
- db_keys.append('owner')
-
- test_query = {
- 'id': test['id'],
- '$or': [
- {'status': 'review'},
- {'status': 'approved'},
- {'status': 'not approved'}
- ]
- }
- record = yield dbapi.db_find_one("tests", test_query)
- if record:
- msg = ('{} has already submitted one record with the same '
- 'Test ID: {}'.format(record['owner'], test['id']))
- self.finish_request({'code': 403, 'msg': msg})
- return
- else:
- query['owner'] = curr_user
- db_keys.append('owner')
-
- logging.debug("before _update 2")
- self._update(query=query, db_keys=db_keys)
-
- @gen.coroutine
- def check_auth(self, item, value):
- logging.debug('check_auth')
- user = self.get_secure_cookie(auth_const.OPENID)
- query = {}
- if item == "status":
- if value == "private" or value == "review":
- logging.debug('check review')
- query['user_id'] = user
- data = yield dbapi.db_find_one('applications', query)
- if not data:
- logging.debug('not found')
- raise gen.Return((False, message.no_auth()))
- if value == "approve" or value == "not approved":
- logging.debug('check approve')
- query['role'] = {"$regex": ".*reviewer.*"}
- query['openid'] = user
- data = yield dbapi.db_find_one('users', query)
- if not data:
- logging.debug('not found')
- raise gen.Return((False, message.no_auth()))
- raise gen.Return((True, {}))
diff --git a/cvp/opnfv_testapi/resources/test_models.py b/cvp/opnfv_testapi/resources/test_models.py
deleted file mode 100644
index 3829cd6d..00000000
--- a/cvp/opnfv_testapi/resources/test_models.py
+++ /dev/null
@@ -1,90 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from opnfv_testapi.resources import models
-from opnfv_testapi.tornado_swagger import swagger
-
-from datetime import datetime
-
-
-@swagger.model()
-class TestCreateRequest(models.ModelBase):
- """
- @property trust_indicator:
- @ptype trust_indicator: L{TI}
- """
- def __init__(self,
- _id=None,
- owner=None,
- results=[],
- public="false",
- review="false",
- status="private",
- shared=[]):
- self._id = _id
- self.owner = owner
- self.results = results.copy()
- self.public = public
- self.review = review
- self.upload_date = datetime.now()
- self.status = status
- self.shared = shared
-
-
-class ResultUpdateRequest(models.ModelBase):
- """
- @property trust_indicator:
- @ptype trust_indicator: L{TI}
- """
- def __init__(self, trust_indicator=None):
- self.trust_indicator = trust_indicator
-
-
-@swagger.model()
-class Test(models.ModelBase):
- """
- @property trust_indicator: used for long duration test case
- @ptype trust_indicator: L{TI}
- """
- def __init__(self,
- _id=None,
- owner=None,
- results=[],
- public="false",
- review="false",
- status="private",
- shared=[],
- filename="",
- label="",
- sut_label="",
- trust_indicator=None):
- self._id = _id
- self.owner = owner
- self.results = results
- self.public = public
- self.review = review
- self.upload_date = datetime.now()
- self.status = status
- self.shared = shared
- self.filename = filename
- self.label = label
- self.sut_label = sut_label
-
-
-@swagger.model()
-class Tests(models.ModelBase):
- """
- @property tests:
- @ptype tests: C{list} of L{Test}
- """
- def __init__(self):
- self.tests = list()
-
- @staticmethod
- def attr_parser():
- return {'tests': Test}
diff --git a/cvp/opnfv_testapi/resources/testcase_handlers.py b/cvp/opnfv_testapi/resources/testcase_handlers.py
deleted file mode 100644
index 9399326f..00000000
--- a/cvp/opnfv_testapi/resources/testcase_handlers.py
+++ /dev/null
@@ -1,103 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-from opnfv_testapi.resources import handlers
-from opnfv_testapi.resources import testcase_models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-class GenericTestcaseHandler(handlers.GenericApiHandler):
- def __init__(self, application, request, **kwargs):
- super(GenericTestcaseHandler, self).__init__(application,
- request,
- **kwargs)
- self.table = self.db_testcases
- self.table_cls = testcase_models.Testcase
-
-
-class TestcaseCLHandler(GenericTestcaseHandler):
- @swagger.operation(nickname="listAllTestCases")
- def get(self, project_name):
- """
- @description: list all testcases of a project by project_name
- @return 200: return all testcases of this project,
- empty list is no testcase exist in this project
- @rtype: L{TestCases}
- """
- self._list(query={'project_name': project_name})
-
- @swagger.operation(nickname="createTestCase")
- def post(self, project_name):
- """
- @description: create a testcase of a project by project_name
- @param body: testcase to be created
- @type body: L{TestcaseCreateRequest}
- @in body: body
- @rtype: L{CreateResponse}
- @return 200: testcase is created in this project.
- @raise 403: project not exist
- or testcase already exists in this project
- @raise 400: body or name not provided
- """
- def project_query():
- return {'name': project_name}
-
- def testcase_query():
- return {'project_name': project_name,
- 'name': self.json_args.get('name')}
- miss_fields = ['name']
- carriers = [(self.db_projects, project_query)]
- self._create(miss_fields=miss_fields,
- carriers=carriers,
- query=testcase_query,
- project_name=project_name)
-
-
-class TestcaseGURHandler(GenericTestcaseHandler):
- @swagger.operation(nickname='getTestCaseByName')
- def get(self, project_name, case_name):
- """
- @description: get a single testcase
- by case_name and project_name
- @rtype: L{Testcase}
- @return 200: testcase exist
- @raise 404: testcase not exist
- """
- query = dict()
- query['project_name'] = project_name
- query["name"] = case_name
- self._get_one(query=query)
-
- @swagger.operation(nickname="updateTestCaseByName")
- def put(self, project_name, case_name):
- """
- @description: update a single testcase
- by project_name and case_name
- @param body: testcase to be updated
- @type body: L{TestcaseUpdateRequest}
- @in body: body
- @rtype: L{Project}
- @return 200: update success
- @raise 404: testcase or project not exist
- @raise 403: new testcase name already exist in project
- or nothing to update
- """
- query = {'project_name': project_name, 'name': case_name}
- db_keys = ['name', 'project_name']
- self._update(query=query, db_keys=db_keys)
-
- @swagger.operation(nickname='deleteTestCaseByName')
- def delete(self, project_name, case_name):
- """
- @description: delete a testcase by project_name and case_name
- @return 200: delete success
- @raise 404: testcase not exist
- """
- query = {'project_name': project_name, 'name': case_name}
- self._delete(query=query)
diff --git a/cvp/opnfv_testapi/resources/testcase_models.py b/cvp/opnfv_testapi/resources/testcase_models.py
deleted file mode 100644
index 2379dfc4..00000000
--- a/cvp/opnfv_testapi/resources/testcase_models.py
+++ /dev/null
@@ -1,95 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from opnfv_testapi.resources import models
-from opnfv_testapi.tornado_swagger import swagger
-
-
-@swagger.model()
-class TestcaseCreateRequest(models.ModelBase):
- def __init__(self, name, url=None, description=None,
- catalog_description=None, tier=None, ci_loop=None,
- criteria=None, blocking=None, dependencies=None, run=None,
- domains=None, tags=None, version=None):
- self.name = name
- self.url = url
- self.description = description
- self.catalog_description = catalog_description
- self.tier = tier
- self.ci_loop = ci_loop
- self.criteria = criteria
- self.blocking = blocking
- self.dependencies = dependencies
- self.run = run
- self.domains = domains
- self.tags = tags
- self.version = version
- self.trust = "Silver"
-
-
-@swagger.model()
-class TestcaseUpdateRequest(models.ModelBase):
- def __init__(self, name=None, description=None, project_name=None,
- catalog_description=None, tier=None, ci_loop=None,
- criteria=None, blocking=None, dependencies=None, run=None,
- domains=None, tags=None, version=None, trust=None):
- self.name = name
- self.description = description
- self.catalog_description = catalog_description
- self.project_name = project_name
- self.tier = tier
- self.ci_loop = ci_loop
- self.criteria = criteria
- self.blocking = blocking
- self.dependencies = dependencies
- self.run = run
- self.domains = domains
- self.tags = tags
- self.version = version
- self.trust = trust
-
-
-@swagger.model()
-class Testcase(models.ModelBase):
- def __init__(self, _id=None, name=None, project_name=None,
- description=None, url=None, creation_date=None,
- catalog_description=None, tier=None, ci_loop=None,
- criteria=None, blocking=None, dependencies=None, run=None,
- domains=None, tags=None, version=None,
- trust=None):
- self._id = None
- self.name = None
- self.project_name = None
- self.description = None
- self.catalog_description = None
- self.url = None
- self.creation_date = None
- self.tier = None
- self.ci_loop = None
- self.criteria = None
- self.blocking = None
- self.dependencies = None
- self.run = None
- self.domains = None
- self.tags = None
- self.version = None
- self.trust = None
-
-
-@swagger.model()
-class Testcases(models.ModelBase):
- """
- @property testcases:
- @ptype testcases: C{list} of L{Testcase}
- """
- def __init__(self):
- self.testcases = list()
-
- @staticmethod
- def attr_parser():
- return {'testcases': Testcase}
diff --git a/cvp/opnfv_testapi/router/__init__.py b/cvp/opnfv_testapi/router/__init__.py
deleted file mode 100644
index 3fc79f1d..00000000
--- a/cvp/opnfv_testapi/router/__init__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-__author__ = 'serena'
diff --git a/cvp/opnfv_testapi/router/url_mappings.py b/cvp/opnfv_testapi/router/url_mappings.py
deleted file mode 100644
index e1d4c181..00000000
--- a/cvp/opnfv_testapi/router/url_mappings.py
+++ /dev/null
@@ -1,45 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-from opnfv_testapi.resources import handlers
-from opnfv_testapi.resources import result_handlers
-from opnfv_testapi.resources import test_handlers
-from opnfv_testapi.resources import application_handlers
-from opnfv_testapi.resources import sut_handlers
-from opnfv_testapi.ui.auth import sign
-from opnfv_testapi.ui.auth import user
-
-mappings = [
- (r"/versions", handlers.VersionHandler),
-
- (r"/api/v1/results/upload", result_handlers.ResultsUploadHandler),
- (r"/api/v1/results/([^/]+)", result_handlers.ResultsGURHandler),
-
- (r"/api/v1/tests", test_handlers.TestsCLHandler),
- (r"/api/v1/tests/([^/]+)", test_handlers.TestsGURHandler),
-
- (r"/api/v1/cvp/applications/getlogo/([^/]+)",
- application_handlers.ApplicationsGetLogoHandler),
- (r"/api/v1/cvp/applications/uploadlogo",
- application_handlers.ApplicationsLogoHandler),
- (r"/api/v1/cvp/applications", application_handlers.ApplicationsCLHandler),
- (r"/api/v1/cvp/applications/([^/]+)",
- application_handlers.ApplicationsGURHandler),
-
- (r"/api/v1/suts/hardware/([^/]+)", sut_handlers.HardwareHandler),
-
-
- (r'/api/v1/auth/signin', sign.SigninHandler),
- (r'/api/v1/auth/signin_return', sign.SigninReturnHandler),
- (r'/api/v1/auth/signin_return_jira', sign.SigninReturnJiraHandler),
- (r'/api/v1/auth/signin_return_cas', sign.SigninReturnCasHandler),
- (r'/api/v1/auth/signout', sign.SignoutHandler),
- (r'/api/v1/profile', user.ProfileHandler),
-
-]
diff --git a/cvp/opnfv_testapi/tests/__init__.py b/cvp/opnfv_testapi/tests/__init__.py
deleted file mode 100644
index 9f28b0bf..00000000
--- a/cvp/opnfv_testapi/tests/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-__author__ = 'serena'
diff --git a/cvp/opnfv_testapi/tests/unit/__init__.py b/cvp/opnfv_testapi/tests/unit/__init__.py
deleted file mode 100644
index 3fc79f1d..00000000
--- a/cvp/opnfv_testapi/tests/unit/__init__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-__author__ = 'serena'
diff --git a/cvp/opnfv_testapi/tests/unit/common/__init__.py b/cvp/opnfv_testapi/tests/unit/common/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cvp/opnfv_testapi/tests/unit/common/__init__.py
+++ /dev/null
diff --git a/cvp/opnfv_testapi/tests/unit/common/noparam.ini b/cvp/opnfv_testapi/tests/unit/common/noparam.ini
deleted file mode 100644
index fda2a09e..00000000
--- a/cvp/opnfv_testapi/tests/unit/common/noparam.ini
+++ /dev/null
@@ -1,16 +0,0 @@
-# to add a new parameter in the config file,
-# the CONF object in config.ini must be updated
-[mongo]
-# URL of the mongo DB
-# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
-url = mongodb://127.0.0.1:27017/
-
-[api]
-# Listening port
-port = 8000
-# With debug_on set to true, error traces will be shown in HTTP responses
-debug = True
-authenticate = False
-
-[swagger]
-base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/normal.ini b/cvp/opnfv_testapi/tests/unit/common/normal.ini
deleted file mode 100644
index 77cc6c6e..00000000
--- a/cvp/opnfv_testapi/tests/unit/common/normal.ini
+++ /dev/null
@@ -1,17 +0,0 @@
-# to add a new parameter in the config file,
-# the CONF object in config.ini must be updated
-[mongo]
-# URL of the mongo DB
-# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
-url = mongodb://127.0.0.1:27017/
-dbname = test_results_collection
-
-[api]
-# Listening port
-port = 8000
-# With debug_on set to true, error traces will be shown in HTTP responses
-debug = True
-authenticate = False
-
-[swagger]
-base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/nosection.ini b/cvp/opnfv_testapi/tests/unit/common/nosection.ini
deleted file mode 100644
index 9988fc0a..00000000
--- a/cvp/opnfv_testapi/tests/unit/common/nosection.ini
+++ /dev/null
@@ -1,11 +0,0 @@
-# to add a new parameter in the config file,
-# the CONF object in config.ini must be updated
-[api]
-# Listening port
-port = 8000
-# With debug_on set to true, error traces will be shown in HTTP responses
-debug = True
-authenticate = False
-
-[swagger]
-base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/notboolean.ini b/cvp/opnfv_testapi/tests/unit/common/notboolean.ini
deleted file mode 100644
index b3f32767..00000000
--- a/cvp/opnfv_testapi/tests/unit/common/notboolean.ini
+++ /dev/null
@@ -1,17 +0,0 @@
-# to add a new parameter in the config file,
-# the CONF object in config.ini must be updated
-[mongo]
-# URL of the mongo DB
-# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
-url = mongodb://127.0.0.1:27017/
-dbname = test_results_collection
-
-[api]
-# Listening port
-port = 8000
-# With debug_on set to true, error traces will be shown in HTTP responses
-debug = True
-authenticate = notboolean
-
-[swagger]
-base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/notint.ini b/cvp/opnfv_testapi/tests/unit/common/notint.ini
deleted file mode 100644
index d1b752a3..00000000
--- a/cvp/opnfv_testapi/tests/unit/common/notint.ini
+++ /dev/null
@@ -1,17 +0,0 @@
-# to add a new parameter in the config file,
-# the CONF object in config.ini must be updated
-[mongo]
-# URL of the mongo DB
-# Mongo auth url => mongodb://user1:pwd1@host1/?authSource=db1
-url = mongodb://127.0.0.1:27017/
-dbname = test_results_collection
-
-[api]
-# Listening port
-port = notint
-# With debug_on set to true, error traces will be shown in HTTP responses
-debug = True
-authenticate = False
-
-[swagger]
-base_url = http://localhost:8000
diff --git a/cvp/opnfv_testapi/tests/unit/common/test_config.py b/cvp/opnfv_testapi/tests/unit/common/test_config.py
deleted file mode 100644
index cc8743ca..00000000
--- a/cvp/opnfv_testapi/tests/unit/common/test_config.py
+++ /dev/null
@@ -1,15 +0,0 @@
-import argparse
-
-
-def test_config_normal(mocker, config_normal):
- mocker.patch(
- 'argparse.ArgumentParser.parse_known_args',
- return_value=(argparse.Namespace(config_file=config_normal), None))
- from opnfv_testapi.common import config
- CONF = config.Config()
- assert CONF.mongo_url == 'mongodb://127.0.0.1:27017/'
- assert CONF.mongo_dbname == 'test_results_collection'
- assert CONF.api_port == 8000
- assert CONF.api_debug is True
- assert CONF.api_authenticate is False
- assert CONF.swagger_base_url == 'http://localhost:8000'
diff --git a/cvp/opnfv_testapi/tests/unit/conftest.py b/cvp/opnfv_testapi/tests/unit/conftest.py
deleted file mode 100644
index feff1daa..00000000
--- a/cvp/opnfv_testapi/tests/unit/conftest.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from os import path
-
-import pytest
-
-
-@pytest.fixture
-def config_normal():
- return path.join(path.dirname(__file__), 'common/normal.ini')
diff --git a/cvp/opnfv_testapi/tests/unit/executor.py b/cvp/opnfv_testapi/tests/unit/executor.py
deleted file mode 100644
index b8f696ca..00000000
--- a/cvp/opnfv_testapi/tests/unit/executor.py
+++ /dev/null
@@ -1,97 +0,0 @@
-##############################################################################
-# Copyright (c) 2017 ZTE Corp
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import functools
-import httplib
-
-
-def upload(excepted_status, excepted_response):
- def _upload(create_request):
- @functools.wraps(create_request)
- def wrap(self):
- request = create_request(self)
- status, body = self.upload(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _upload
-
-
-def create(excepted_status, excepted_response):
- def _create(create_request):
- @functools.wraps(create_request)
- def wrap(self):
- request = create_request(self)
- status, body = self.create(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _create
-
-
-def get(excepted_status, excepted_response):
- def _get(get_request):
- @functools.wraps(get_request)
- def wrap(self):
- request = get_request(self)
- status, body = self.get(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _get
-
-
-def update(excepted_status, excepted_response):
- def _update(update_request):
- @functools.wraps(update_request)
- def wrap(self):
- request, resource = update_request(self)
- status, body = self.update(request, resource)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(request, body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _update
-
-
-def delete(excepted_status, excepted_response):
- def _delete(delete_request):
- @functools.wraps(delete_request)
- def wrap(self):
- request = delete_request(self)
- if isinstance(request, tuple):
- status, body = self.delete(request[0], *(request[1]))
- else:
- status, body = self.delete(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _delete
-
-
-def query(excepted_status, excepted_response, number=0):
- def _query(get_request):
- @functools.wraps(get_request)
- def wrap(self):
- request = get_request(self)
- status, body = self.query(request)
- if excepted_status == httplib.OK:
- getattr(self, excepted_response)(body, number)
- else:
- self.assertIn(excepted_response, body)
- return wrap
- return _query
diff --git a/cvp/opnfv_testapi/tests/unit/fake_pymongo.py b/cvp/opnfv_testapi/tests/unit/fake_pymongo.py
deleted file mode 100644
index 0ca83df6..00000000
--- a/cvp/opnfv_testapi/tests/unit/fake_pymongo.py
+++ /dev/null
@@ -1,284 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from operator import itemgetter
-
-from bson.objectid import ObjectId
-from concurrent.futures import ThreadPoolExecutor
-
-
-def thread_execute(method, *args, **kwargs):
- with ThreadPoolExecutor(max_workers=2) as executor:
- result = executor.submit(method, *args, **kwargs)
- return result
-
-
-class MemCursor(object):
- def __init__(self, collection):
- self.collection = collection
- self.length = len(self.collection)
- self.sorted = []
-
- def _is_next_exist(self):
- return self.length != 0
-
- @property
- def fetch_next(self):
- return thread_execute(self._is_next_exist)
-
- def next_object(self):
- self.length -= 1
- return self.collection.pop()
-
- def sort(self, key_or_list):
- for k, v in key_or_list.iteritems():
- if v == -1:
- reverse = True
- else:
- reverse = False
-
- self.collection = sorted(self.collection,
- key=itemgetter(k), reverse=reverse)
- return self
-
- def limit(self, limit):
- if limit != 0 and limit < len(self.collection):
- self.collection = self.collection[0: limit]
- self.length = limit
- return self
-
- def skip(self, skip):
- if skip < self.length and (skip > 0):
- self.collection = self.collection[self.length - skip: -1]
- self.length -= skip
- elif skip >= self.length:
- self.collection = []
- self.length = 0
- return self
-
- def _count(self):
- return self.length
-
- def count(self):
- return thread_execute(self._count)
-
-
-class MemDb(object):
-
- def __init__(self, name):
- self.name = name
- self.contents = []
- pass
-
- def _find_one(self, spec_or_id=None, *args):
- if spec_or_id is not None and not isinstance(spec_or_id, dict):
- spec_or_id = {"_id": spec_or_id}
- if '_id' in spec_or_id:
- spec_or_id['_id'] = str(spec_or_id['_id'])
- cursor = self._find(spec_or_id, *args)
- for result in cursor:
- return result
- return None
-
- def find_one(self, spec_or_id=None, *args):
- return thread_execute(self._find_one, spec_or_id, *args)
-
- def _insert(self, doc_or_docs, check_keys=True):
-
- docs = doc_or_docs
- return_one = False
- if isinstance(docs, dict):
- return_one = True
- docs = [docs]
-
- if check_keys:
- for doc in docs:
- self._check_keys(doc)
-
- ids = []
- for doc in docs:
- if '_id' not in doc:
- doc['_id'] = str(ObjectId())
- if not self._find_one(doc['_id']):
- ids.append(doc['_id'])
- self.contents.append(doc_or_docs)
-
- if len(ids) == 0:
- return None
- if return_one:
- return ids[0]
- else:
- return ids
-
- def insert(self, doc_or_docs, check_keys=True):
- return thread_execute(self._insert, doc_or_docs, check_keys)
-
- @staticmethod
- def _compare_date(spec, value):
- gte = True
- lt = False
- for k, v in spec.iteritems():
- if k == '$gte' and value < v:
- gte = False
- elif k == '$lt' and value < v:
- lt = True
- return gte and lt
-
- def _in(self, content, *args):
- if self.name == 'scenarios':
- return self._in_scenarios(content, *args)
- else:
- return self._in_others(content, *args)
-
- def _in_scenarios_installer(self, installer, content):
- hit = False
- for s_installer in content['installers']:
- if installer == s_installer['installer']:
- hit = True
-
- return hit
-
- def _in_scenarios_version(self, version, content):
- hit = False
- for s_installer in content['installers']:
- for s_version in s_installer['versions']:
- if version == s_version['version']:
- hit = True
- return hit
-
- def _in_scenarios_project(self, project, content):
- hit = False
- for s_installer in content['installers']:
- for s_version in s_installer['versions']:
- for s_project in s_version['projects']:
- if project == s_project['project']:
- hit = True
-
- return hit
-
- def _in_scenarios(self, content, *args):
- for arg in args:
- for k, v in arg.iteritems():
- if k == 'installers':
- for inner in v.values():
- for i_k, i_v in inner.iteritems():
- if i_k == 'installer':
- return self._in_scenarios_installer(i_v,
- content)
- elif i_k == 'versions.version':
- return self._in_scenarios_version(i_v,
- content)
- elif i_k == 'versions.projects.project':
- return self._in_scenarios_project(i_v,
- content)
- elif content.get(k, None) != v:
- return False
-
- return True
-
- def _in_others(self, content, *args):
- for arg in args:
- for k, v in arg.iteritems():
- if k == 'start_date':
- if not MemDb._compare_date(v, content.get(k)):
- return False
- elif k == 'trust_indicator.current':
- if content.get('trust_indicator').get('current') != v:
- return False
- elif not isinstance(v, dict) and content.get(k, None) != v:
- return False
- return True
-
- def _find(self, *args):
- res = []
- for content in self.contents:
- if self._in(content, *args):
- res.append(content)
-
- return res
-
- def find(self, *args):
- return MemCursor(self._find(*args))
-
- def _aggregate(self, *args, **kwargs):
- res = self.contents
- print args
- for arg in args[0]:
- for k, v in arg.iteritems():
- if k == '$match':
- res = self._find(v)
- cursor = MemCursor(res)
- for arg in args[0]:
- for k, v in arg.iteritems():
- if k == '$sort':
- cursor = cursor.sort(v)
- elif k == '$skip':
- cursor = cursor.skip(v)
- elif k == '$limit':
- cursor = cursor.limit(v)
- return cursor
-
- def aggregate(self, *args, **kwargs):
- return self._aggregate(*args, **kwargs)
-
- def _update(self, spec, document, check_keys=True):
- updated = False
-
- if check_keys:
- self._check_keys(document)
-
- for index in range(len(self.contents)):
- content = self.contents[index]
- if self._in(content, spec):
- for k, v in document.iteritems():
- updated = True
- content[k] = v
- self.contents[index] = content
- return updated
-
- def update(self, spec, document, check_keys=True):
- return thread_execute(self._update, spec, document, check_keys)
-
- def _remove(self, spec_or_id=None):
- if spec_or_id is None:
- self.contents = []
- if not isinstance(spec_or_id, dict):
- spec_or_id = {'_id': spec_or_id}
- for index in range(len(self.contents)):
- content = self.contents[index]
- if self._in(content, spec_or_id):
- del self.contents[index]
- return True
- return False
-
- def remove(self, spec_or_id=None):
- return thread_execute(self._remove, spec_or_id)
-
- def clear(self):
- self._remove()
-
- def _check_keys(self, doc):
- for key in doc.keys():
- if '.' in key:
- raise NameError('key {} must not contain .'.format(key))
- if key.startswith('$'):
- raise NameError('key {} must not start with $'.format(key))
- if isinstance(doc.get(key), dict):
- self._check_keys(doc.get(key))
-
-
-def __getattr__(name):
- return globals()[name]
-
-
-pods = MemDb('pods')
-projects = MemDb('projects')
-testcases = MemDb('testcases')
-results = MemDb('results')
-scenarios = MemDb('scenarios')
-tokens = MemDb('tokens')
diff --git a/cvp/opnfv_testapi/tests/unit/resources/__init__.py b/cvp/opnfv_testapi/tests/unit/resources/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/__init__.py
+++ /dev/null
diff --git a/cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json b/cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json
deleted file mode 100644
index 18780221..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/scenario-c1.json
+++ /dev/null
@@ -1,38 +0,0 @@
-{
- "name": "nosdn-nofeature-ha",
- "installers":
- [
- {
- "installer": "apex",
- "versions":
- [
- {
- "owner": "Luke",
- "version": "master",
- "projects":
- [
- {
- "project": "functest",
- "customs": [ "healthcheck", "vping_ssh"],
- "scores":
- [
- {
- "date": "2017-01-08 22:46:44",
- "score": "12/14"
- }
-
- ],
- "trust_indicators": []
- },
- {
- "project": "yardstick",
- "customs": [],
- "scores": [],
- "trust_indicators": []
- }
- ]
- }
- ]
- }
- ]
-}
diff --git a/cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json b/cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json
deleted file mode 100644
index b6a3b83a..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/scenario-c2.json
+++ /dev/null
@@ -1,73 +0,0 @@
-{
- "name": "odl_2-nofeature-ha",
- "installers":
- [
- {
- "installer": "fuel",
- "versions":
- [
- {
- "owner": "Lucky",
- "version": "colorado",
- "projects":
- [
- {
- "project": "functest",
- "customs": [ "healthcheck", "vping_ssh"],
- "scores": [],
- "trust_indicators": [
- {
- "date": "2017-01-18 22:46:44",
- "status": "silver"
- }
-
- ]
- },
- {
- "project": "yardstick",
- "customs": ["suite-a"],
- "scores": [
- {
- "date": "2017-01-08 22:46:44",
- "score": "0"
- }
- ],
- "trust_indicators": [
- {
- "date": "2017-01-18 22:46:44",
- "status": "gold"
- }
- ]
- }
- ]
- },
- {
- "owner": "Luke",
- "version": "colorado",
- "projects":
- [
- {
- "project": "functest",
- "customs": [ "healthcheck", "vping_ssh"],
- "scores":
- [
- {
- "date": "2017-01-09 22:46:44",
- "score": "11/14"
- }
-
- ],
- "trust_indicators": []
- },
- {
- "project": "yardstick",
- "customs": [],
- "scores": [],
- "trust_indicators": []
- }
- ]
- }
- ]
- }
- ]
-}
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_base.py b/cvp/opnfv_testapi/tests/unit/resources/test_base.py
deleted file mode 100644
index dcec4e95..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/test_base.py
+++ /dev/null
@@ -1,161 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import json
-from os import path
-
-import mock
-from tornado import testing
-
-from opnfv_testapi.resources import models
-from opnfv_testapi.tests.unit import fake_pymongo
-
-
-class TestBase(testing.AsyncHTTPTestCase):
- headers = {'Content-Type': 'application/json; charset=UTF-8'}
-
- def setUp(self):
- self._patch_server()
- self.basePath = ''
- self.create_res = models.CreateResponse
- self.get_res = None
- self.list_res = None
- self.update_res = None
- self.req_d = None
- self.req_e = None
- self.addCleanup(self._clear)
- super(TestBase, self).setUp()
-
- def tearDown(self):
- self.db_patcher.stop()
- self.config_patcher.stop()
-
- def _patch_server(self):
- import argparse
- config = path.join(path.dirname(__file__), '../common/normal.ini')
- self.config_patcher = mock.patch(
- 'argparse.ArgumentParser.parse_known_args',
- return_value=(argparse.Namespace(config_file=config), None))
- self.db_patcher = mock.patch('opnfv_testapi.db.api.DB',
- fake_pymongo)
- self.config_patcher.start()
- self.db_patcher.start()
-
- def set_config_file(self):
- self.config_file = 'normal.ini'
-
- def get_app(self):
- from opnfv_testapi.cmd import server
- return server.make_app()
-
- def create_d(self, *args):
- return self.create(self.req_d, *args)
-
- def create_e(self, *args):
- return self.create(self.req_e, *args)
-
- def create(self, req=None, *args):
- return self.create_help(self.basePath, req, *args)
-
- def create_help(self, uri, req, *args):
- if req and not isinstance(req, str) and hasattr(req, 'format'):
- req = req.format()
- res = self.fetch(self._update_uri(uri, *args),
- method='POST',
- body=json.dumps(req),
- headers=self.headers)
-
- return self._get_return(res, self.create_res)
-
- def get(self, *args):
- res = self.fetch(self._get_uri(*args),
- method='GET',
- headers=self.headers)
-
- def inner():
- new_args, num = self._get_valid_args(*args)
- return self.get_res \
- if num != self._need_arg_num(self.basePath) else self.list_res
- return self._get_return(res, inner())
-
- def query(self, query):
- res = self.fetch(self._get_query_uri(query),
- method='GET',
- headers=self.headers)
- return self._get_return(res, self.list_res)
-
- def update(self, new=None, *args):
- if new:
- new = new.format()
- res = self.fetch(self._get_uri(*args),
- method='PUT',
- body=json.dumps(new),
- headers=self.headers)
- return self._get_return(res, self.update_res)
-
- def delete(self, *args):
- res = self.fetch(self._get_uri(*args),
- method='DELETE',
- headers=self.headers)
- return res.code, res.body
-
- @staticmethod
- def _get_valid_args(*args):
- new_args = tuple(['%s' % arg for arg in args if arg is not None])
- return new_args, len(new_args)
-
- def _need_arg_num(self, uri):
- return uri.count('%s')
-
- def _get_query_uri(self, query):
- return self.basePath + '?' + query if query else self.basePath
-
- def _get_uri(self, *args):
- return self._update_uri(self.basePath, *args)
-
- def _update_uri(self, uri, *args):
- r_uri = uri
- new_args, num = self._get_valid_args(*args)
- if num != self._need_arg_num(uri):
- r_uri += '/%s'
-
- return r_uri % tuple(['%s' % arg for arg in new_args])
-
- def _get_return(self, res, cls):
- code = res.code
- body = res.body
- return code, self._get_return_body(code, body, cls)
-
- @staticmethod
- def _get_return_body(code, body, cls):
- return cls.from_dict(json.loads(body)) if code < 300 and cls else body
-
- def assert_href(self, body):
- self.assertIn(self.basePath, body.href)
-
- def assert_create_body(self, body, req=None, *args):
- import inspect
- if not req:
- req = self.req_d
- resource_name = ''
- if inspect.isclass(req):
- resource_name = req.name
- elif isinstance(req, dict):
- resource_name = req['name']
- elif isinstance(req, str):
- resource_name = json.loads(req)['name']
- new_args = args + tuple([resource_name])
- self.assertIn(self._get_uri(*new_args), body.href)
-
- @staticmethod
- def _clear():
- fake_pymongo.pods.clear()
- fake_pymongo.projects.clear()
- fake_pymongo.testcases.clear()
- fake_pymongo.results.clear()
- fake_pymongo.scenarios.clear()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py b/cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py
deleted file mode 100644
index 1ebc96f3..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/test_fake_pymongo.py
+++ /dev/null
@@ -1,123 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import unittest
-
-from tornado import gen
-from tornado import testing
-from tornado import web
-
-from opnfv_testapi.tests.unit import fake_pymongo
-
-
-class MyTest(testing.AsyncHTTPTestCase):
- def setUp(self):
- super(MyTest, self).setUp()
- self.db = fake_pymongo
- self.addCleanup(self._clear)
- self.io_loop.run_sync(self.fixture_setup)
-
- def get_app(self):
- return web.Application()
-
- @gen.coroutine
- def fixture_setup(self):
- self.test1 = {'_id': '1', 'name': 'test1'}
- self.test2 = {'name': 'test2'}
- yield self.db.pods.insert({'_id': '1', 'name': 'test1'})
- yield self.db.pods.insert({'name': 'test2'})
-
- @testing.gen_test
- def test_find_one(self):
- user = yield self.db.pods.find_one({'name': 'test1'})
- self.assertEqual(user, self.test1)
- self.db.pods.remove()
-
- @testing.gen_test
- def test_find(self):
- cursor = self.db.pods.find()
- names = []
- while (yield cursor.fetch_next):
- ob = cursor.next_object()
- names.append(ob.get('name'))
- self.assertItemsEqual(names, ['test1', 'test2'])
-
- @testing.gen_test
- def test_update(self):
- yield self.db.pods.update({'_id': '1'}, {'name': 'new_test1'})
- user = yield self.db.pods.find_one({'_id': '1'})
- self.assertEqual(user.get('name', None), 'new_test1')
-
- def test_update_dot_error(self):
- self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
- 'key 1. name must not contain .')
-
- def test_update_dot_no_error(self):
- self._update_assert({'_id': '1', 'name': {'1. name': 'test1'}},
- None,
- check_keys=False)
-
- def test_update_dollar_error(self):
- self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
- 'key $name must not start with $')
-
- def test_update_dollar_no_error(self):
- self._update_assert({'_id': '1', 'name': {'$name': 'test1'}},
- None,
- check_keys=False)
-
- @testing.gen_test
- def test_remove(self):
- yield self.db.pods.remove({'_id': '1'})
- user = yield self.db.pods.find_one({'_id': '1'})
- self.assertIsNone(user)
-
- def test_insert_dot_error(self):
- self._insert_assert({'_id': '1', '2. name': 'test1'},
- 'key 2. name must not contain .')
-
- def test_insert_dot_no_error(self):
- self._insert_assert({'_id': '1', '2. name': 'test1'},
- None,
- check_keys=False)
-
- def test_insert_dollar_error(self):
- self._insert_assert({'_id': '1', '$name': 'test1'},
- 'key $name must not start with $')
-
- def test_insert_dollar_no_error(self):
- self._insert_assert({'_id': '1', '$name': 'test1'},
- None,
- check_keys=False)
-
- def _clear(self):
- self.db.pods.clear()
-
- def _update_assert(self, docs, error=None, **kwargs):
- self._db_assert('update', error, {'_id': '1'}, docs, **kwargs)
-
- def _insert_assert(self, docs, error=None, **kwargs):
- self._db_assert('insert', error, docs, **kwargs)
-
- @testing.gen_test
- def _db_assert(self, method, error, *args, **kwargs):
- name_error = None
- try:
- yield self._eval_pods_db(method, *args, **kwargs)
- except NameError as err:
- name_error = err.args[0]
- finally:
- self.assertEqual(name_error, error)
-
- def _eval_pods_db(self, method, *args, **kwargs):
- table_obj = vars(self.db)['pods']
- return table_obj.__getattribute__(method)(*args, **kwargs)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_pod.py b/cvp/opnfv_testapi/tests/unit/resources/test_pod.py
deleted file mode 100644
index cb4f1d92..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/test_pod.py
+++ /dev/null
@@ -1,90 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import httplib
-import unittest
-
-from opnfv_testapi.common import message
-from opnfv_testapi.resources import pod_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit.resources import test_base as base
-
-
-class TestPodBase(base.TestBase):
- def setUp(self):
- super(TestPodBase, self).setUp()
- self.req_d = pod_models.PodCreateRequest('zte-1', 'virtual',
- 'zte pod 1', 'ci-pod')
- self.req_e = pod_models.PodCreateRequest('zte-2', 'metal', 'zte pod 2')
- self.get_res = pod_models.Pod
- self.list_res = pod_models.Pods
- self.basePath = '/api/v1/pods'
-
- def assert_get_body(self, pod, req=None):
- if not req:
- req = self.req_d
- self.assertEqual(pod.name, req.name)
- self.assertEqual(pod.mode, req.mode)
- self.assertEqual(pod.details, req.details)
- self.assertEqual(pod.role, req.role)
- self.assertIsNotNone(pod.creation_date)
- self.assertIsNotNone(pod._id)
-
-
-class TestPodCreate(TestPodBase):
- @executor.create(httplib.BAD_REQUEST, message.no_body())
- def test_withoutBody(self):
- return None
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_emptyName(self):
- return pod_models.PodCreateRequest('')
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_noneName(self):
- return pod_models.PodCreateRequest(None)
-
- @executor.create(httplib.OK, 'assert_create_body')
- def test_success(self):
- return self.req_d
-
- @executor.create(httplib.FORBIDDEN, message.exist_base)
- def test_alreadyExist(self):
- self.create_d()
- return self.req_d
-
-
-class TestPodGet(TestPodBase):
- def setUp(self):
- super(TestPodGet, self).setUp()
- self.create_d()
- self.create_e()
-
- @executor.get(httplib.NOT_FOUND, message.not_found_base)
- def test_notExist(self):
- return 'notExist'
-
- @executor.get(httplib.OK, 'assert_get_body')
- def test_getOne(self):
- return self.req_d.name
-
- @executor.get(httplib.OK, '_assert_list')
- def test_list(self):
- return None
-
- def _assert_list(self, body):
- self.assertEqual(len(body.pods), 2)
- for pod in body.pods:
- if self.req_d.name == pod.name:
- self.assert_get_body(pod)
- else:
- self.assert_get_body(pod, self.req_e)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_project.py b/cvp/opnfv_testapi/tests/unit/resources/test_project.py
deleted file mode 100644
index 0622ba8d..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/test_project.py
+++ /dev/null
@@ -1,137 +0,0 @@
-import httplib
-import unittest
-
-from opnfv_testapi.common import message
-from opnfv_testapi.resources import project_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit.resources import test_base as base
-
-
-class TestProjectBase(base.TestBase):
- def setUp(self):
- super(TestProjectBase, self).setUp()
- self.req_d = project_models.ProjectCreateRequest('vping',
- 'vping-ssh test')
- self.req_e = project_models.ProjectCreateRequest('doctor',
- 'doctor test')
- self.get_res = project_models.Project
- self.list_res = project_models.Projects
- self.update_res = project_models.Project
- self.basePath = '/api/v1/projects'
-
- def assert_body(self, project, req=None):
- if not req:
- req = self.req_d
- self.assertEqual(project.name, req.name)
- self.assertEqual(project.description, req.description)
- self.assertIsNotNone(project._id)
- self.assertIsNotNone(project.creation_date)
-
-
-class TestProjectCreate(TestProjectBase):
- @executor.create(httplib.BAD_REQUEST, message.no_body())
- def test_withoutBody(self):
- return None
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_emptyName(self):
- return project_models.ProjectCreateRequest('')
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_noneName(self):
- return project_models.ProjectCreateRequest(None)
-
- @executor.create(httplib.OK, 'assert_create_body')
- def test_success(self):
- return self.req_d
-
- @executor.create(httplib.FORBIDDEN, message.exist_base)
- def test_alreadyExist(self):
- self.create_d()
- return self.req_d
-
-
-class TestProjectGet(TestProjectBase):
- def setUp(self):
- super(TestProjectGet, self).setUp()
- self.create_d()
- self.create_e()
-
- @executor.get(httplib.NOT_FOUND, message.not_found_base)
- def test_notExist(self):
- return 'notExist'
-
- @executor.get(httplib.OK, 'assert_body')
- def test_getOne(self):
- return self.req_d.name
-
- @executor.get(httplib.OK, '_assert_list')
- def test_list(self):
- return None
-
- def _assert_list(self, body):
- for project in body.projects:
- if self.req_d.name == project.name:
- self.assert_body(project)
- else:
- self.assert_body(project, self.req_e)
-
-
-class TestProjectUpdate(TestProjectBase):
- def setUp(self):
- super(TestProjectUpdate, self).setUp()
- _, d_body = self.create_d()
- _, get_res = self.get(self.req_d.name)
- self.index_d = get_res._id
- self.create_e()
-
- @executor.update(httplib.BAD_REQUEST, message.no_body())
- def test_withoutBody(self):
- return None, 'noBody'
-
- @executor.update(httplib.NOT_FOUND, message.not_found_base)
- def test_notFound(self):
- return self.req_e, 'notFound'
-
- @executor.update(httplib.FORBIDDEN, message.exist_base)
- def test_newNameExist(self):
- return self.req_e, self.req_d.name
-
- @executor.update(httplib.FORBIDDEN, message.no_update())
- def test_noUpdate(self):
- return self.req_d, self.req_d.name
-
- @executor.update(httplib.OK, '_assert_update')
- def test_success(self):
- req = project_models.ProjectUpdateRequest('newName', 'new description')
- return req, self.req_d.name
-
- def _assert_update(self, req, body):
- self.assertEqual(self.index_d, body._id)
- self.assert_body(body, req)
- _, new_body = self.get(req.name)
- self.assertEqual(self.index_d, new_body._id)
- self.assert_body(new_body, req)
-
-
-class TestProjectDelete(TestProjectBase):
- def setUp(self):
- super(TestProjectDelete, self).setUp()
- self.create_d()
-
- @executor.delete(httplib.NOT_FOUND, message.not_found_base)
- def test_notFound(self):
- return 'notFound'
-
- @executor.delete(httplib.OK, '_assert_delete')
- def test_success(self):
- return self.req_d.name
-
- def _assert_delete(self, body):
- self.assertEqual(body, '')
- code, body = self.get(self.req_d.name)
- self.assertEqual(code, httplib.NOT_FOUND)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_result.py b/cvp/opnfv_testapi/tests/unit/resources/test_result.py
deleted file mode 100644
index 1e83ed30..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/test_result.py
+++ /dev/null
@@ -1,410 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import copy
-import httplib
-import unittest
-from datetime import datetime, timedelta
-import json
-
-from opnfv_testapi.common import message
-from opnfv_testapi.resources import pod_models
-from opnfv_testapi.resources import project_models
-from opnfv_testapi.resources import result_models
-from opnfv_testapi.resources import testcase_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit.resources import test_base as base
-
-
-class Details(object):
- def __init__(self, timestart=None, duration=None, status=None):
- self.timestart = timestart
- self.duration = duration
- self.status = status
- self.items = [{'item1': 1}, {'item2': 2}]
-
- def format(self):
- return {
- "timestart": self.timestart,
- "duration": self.duration,
- "status": self.status,
- 'items': [{'item1': 1}, {'item2': 2}]
- }
-
- @staticmethod
- def from_dict(a_dict):
-
- if a_dict is None:
- return None
-
- t = Details()
- t.timestart = a_dict.get('timestart')
- t.duration = a_dict.get('duration')
- t.status = a_dict.get('status')
- t.items = a_dict.get('items')
- return t
-
-
-class TestResultBase(base.TestBase):
- def setUp(self):
- self.pod = 'zte-pod1'
- self.project = 'functest'
- self.case = 'vPing'
- self.installer = 'fuel'
- self.version = 'C'
- self.build_tag = 'v3.0'
- self.scenario = 'odl-l2'
- self.criteria = 'passed'
- self.trust_indicator = result_models.TI(0.7)
- self.start_date = str(datetime.now())
- self.stop_date = str(datetime.now() + timedelta(minutes=1))
- self.update_date = str(datetime.now() + timedelta(days=1))
- self.update_step = -0.05
- super(TestResultBase, self).setUp()
- self.details = Details(timestart='0', duration='9s', status='OK')
- self.req_d = result_models.ResultCreateRequest(
- pod_name=self.pod,
- project_name=self.project,
- case_name=self.case,
- installer=self.installer,
- version=self.version,
- start_date=self.start_date,
- stop_date=self.stop_date,
- details=self.details.format(),
- build_tag=self.build_tag,
- scenario=self.scenario,
- criteria=self.criteria,
- trust_indicator=self.trust_indicator)
- self.get_res = result_models.TestResult
- self.list_res = result_models.TestResults
- self.update_res = result_models.TestResult
- self.basePath = '/api/v1/results'
- self.req_pod = pod_models.PodCreateRequest(
- self.pod,
- 'metal',
- 'zte pod 1')
- self.req_project = project_models.ProjectCreateRequest(
- self.project,
- 'vping test')
- self.req_testcase = testcase_models.TestcaseCreateRequest(
- self.case,
- '/cases/vping',
- 'vping-ssh test')
- self.create_help('/api/v1/pods', self.req_pod)
- self.create_help('/api/v1/projects', self.req_project)
- self.create_help('/api/v1/projects/%s/cases',
- self.req_testcase,
- self.project)
-
- def assert_res(self, result, req=None):
- if req is None:
- req = self.req_d
- self.assertEqual(result.pod_name, req.pod_name)
- self.assertEqual(result.project_name, req.project_name)
- self.assertEqual(result.case_name, req.case_name)
- self.assertEqual(result.installer, req.installer)
- self.assertEqual(result.version, req.version)
- details_req = Details.from_dict(req.details)
- details_res = Details.from_dict(result.details)
- self.assertEqual(details_res.duration, details_req.duration)
- self.assertEqual(details_res.timestart, details_req.timestart)
- self.assertEqual(details_res.status, details_req.status)
- self.assertEqual(details_res.items, details_req.items)
- self.assertEqual(result.build_tag, req.build_tag)
- self.assertEqual(result.scenario, req.scenario)
- self.assertEqual(result.criteria, req.criteria)
- self.assertEqual(result.start_date, req.start_date)
- self.assertEqual(result.stop_date, req.stop_date)
- self.assertIsNotNone(result._id)
- ti = result.trust_indicator
- self.assertEqual(ti.current, req.trust_indicator.current)
- if ti.histories:
- history = ti.histories[0]
- self.assertEqual(history.date, self.update_date)
- self.assertEqual(history.step, self.update_step)
-
- def _create_d(self):
- _, res = self.create_d()
- return res.href.split('/')[-1]
-
- def upload(self, req):
- if req and not isinstance(req, str) and hasattr(req, 'format'):
- req = req.format()
- res = self.fetch(self.basePath + '/upload',
- method='POST',
- body=json.dumps(req),
- headers=self.headers)
-
- return self._get_return(res, self.create_res)
-
-
-class TestResultUpload(TestResultBase):
- @executor.upload(httplib.BAD_REQUEST, message.key_error('file'))
- def test_filenotfind(self):
- return None
-
-
-class TestResultCreate(TestResultBase):
- @executor.create(httplib.BAD_REQUEST, message.no_body())
- def test_nobody(self):
- return None
-
- @executor.create(httplib.BAD_REQUEST, message.missing('pod_name'))
- def test_podNotProvided(self):
- req = self.req_d
- req.pod_name = None
- return req
-
- @executor.create(httplib.BAD_REQUEST, message.missing('project_name'))
- def test_projectNotProvided(self):
- req = self.req_d
- req.project_name = None
- return req
-
- @executor.create(httplib.BAD_REQUEST, message.missing('case_name'))
- def test_testcaseNotProvided(self):
- req = self.req_d
- req.case_name = None
- return req
-
- @executor.create(httplib.FORBIDDEN, message.not_found_base)
- def test_noPod(self):
- req = self.req_d
- req.pod_name = 'notExistPod'
- return req
-
- @executor.create(httplib.FORBIDDEN, message.not_found_base)
- def test_noProject(self):
- req = self.req_d
- req.project_name = 'notExistProject'
- return req
-
- @executor.create(httplib.FORBIDDEN, message.not_found_base)
- def test_noTestcase(self):
- req = self.req_d
- req.case_name = 'notExistTestcase'
- return req
-
- @executor.create(httplib.OK, 'assert_href')
- def test_success(self):
- return self.req_d
-
- @executor.create(httplib.OK, 'assert_href')
- def test_key_with_doc(self):
- req = copy.deepcopy(self.req_d)
- req.details = {'1.name': 'dot_name'}
- return req
-
- @executor.create(httplib.OK, '_assert_no_ti')
- def test_no_ti(self):
- req = result_models.ResultCreateRequest(pod_name=self.pod,
- project_name=self.project,
- case_name=self.case,
- installer=self.installer,
- version=self.version,
- start_date=self.start_date,
- stop_date=self.stop_date,
- details=self.details.format(),
- build_tag=self.build_tag,
- scenario=self.scenario,
- criteria=self.criteria)
- self.actual_req = req
- return req
-
- def _assert_no_ti(self, body):
- _id = body.href.split('/')[-1]
- code, body = self.get(_id)
- self.assert_res(body, self.actual_req)
-
-
-class TestResultGet(TestResultBase):
- def setUp(self):
- super(TestResultGet, self).setUp()
- self.req_10d_before = self._create_changed_date(days=-10)
- self.req_d_id = self._create_d()
- self.req_10d_later = self._create_changed_date(days=10)
-
- @executor.get(httplib.OK, 'assert_res')
- def test_getOne(self):
- return self.req_d_id
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryPod(self):
- return self._set_query('pod')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryProject(self):
- return self._set_query('project')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryTestcase(self):
- return self._set_query('case')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryVersion(self):
- return self._set_query('version')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryInstaller(self):
- return self._set_query('installer')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryBuildTag(self):
- return self._set_query('build_tag')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryScenario(self):
- return self._set_query('scenario')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryTrustIndicator(self):
- return self._set_query('trust_indicator')
-
- @executor.query(httplib.OK, '_query_success', 3)
- def test_queryCriteria(self):
- return self._set_query('criteria')
-
- @executor.query(httplib.BAD_REQUEST, message.must_int('period'))
- def test_queryPeriodNotInt(self):
- return self._set_query('period=a')
-
- @executor.query(httplib.OK, '_query_period_one', 1)
- def test_queryPeriodSuccess(self):
- return self._set_query('period=5')
-
- @executor.query(httplib.BAD_REQUEST, message.must_int('last'))
- def test_queryLastNotInt(self):
- return self._set_query('last=a')
-
- @executor.query(httplib.OK, '_query_last_one', 1)
- def test_queryLast(self):
- return self._set_query('last=1')
-
- @executor.query(httplib.OK, '_query_success', 4)
- def test_queryPublic(self):
- self._create_public_data()
- return self._set_query('')
-
- @executor.query(httplib.OK, '_query_success', 1)
- def test_queryPrivate(self):
- self._create_private_data()
- return self._set_query('public=false')
-
- @executor.query(httplib.OK, '_query_period_one', 1)
- def test_combination(self):
- return self._set_query('pod',
- 'project',
- 'case',
- 'version',
- 'installer',
- 'build_tag',
- 'scenario',
- 'trust_indicator',
- 'criteria',
- 'period=5')
-
- @executor.query(httplib.OK, '_query_success', 0)
- def test_notFound(self):
- return self._set_query('pod=notExistPod',
- 'project',
- 'case',
- 'version',
- 'installer',
- 'build_tag',
- 'scenario',
- 'trust_indicator',
- 'criteria',
- 'period=1')
-
- @executor.query(httplib.OK, '_query_success', 1)
- def test_filterErrorStartdate(self):
- self._create_error_start_date(None)
- self._create_error_start_date('None')
- self._create_error_start_date('null')
- self._create_error_start_date('')
- return self._set_query('period=5')
-
- def _query_success(self, body, number):
- self.assertEqual(number, len(body.results))
-
- def _query_last_one(self, body, number):
- self.assertEqual(number, len(body.results))
- self.assert_res(body.results[0], self.req_10d_later)
-
- def _query_period_one(self, body, number):
- self.assertEqual(number, len(body.results))
- self.assert_res(body.results[0], self.req_d)
-
- def _create_error_start_date(self, start_date):
- req = copy.deepcopy(self.req_d)
- req.start_date = start_date
- self.create(req)
- return req
-
- def _create_changed_date(self, **kwargs):
- req = copy.deepcopy(self.req_d)
- req.start_date = datetime.now() + timedelta(**kwargs)
- req.stop_date = str(req.start_date + timedelta(minutes=10))
- req.start_date = str(req.start_date)
- self.create(req)
- return req
-
- def _create_public_data(self, **kwargs):
- req = copy.deepcopy(self.req_d)
- req.public = 'true'
- self.create(req)
- return req
-
- def _create_private_data(self, **kwargs):
- req = copy.deepcopy(self.req_d)
- req.public = 'false'
- self.create(req)
- return req
-
- def _set_query(self, *args):
- def get_value(arg):
- return self.__getattribute__(arg) \
- if arg != 'trust_indicator' else self.trust_indicator.current
- uri = ''
- for arg in args:
- if arg:
- if '=' in arg:
- uri += arg + '&'
- else:
- uri += '{}={}&'.format(arg, get_value(arg))
- return uri[0: -1]
-
-
-class TestResultUpdate(TestResultBase):
- def setUp(self):
- super(TestResultUpdate, self).setUp()
- self.req_d_id = self._create_d()
-
- @executor.update(httplib.OK, '_assert_update_ti')
- def test_success(self):
- new_ti = copy.deepcopy(self.trust_indicator)
- new_ti.current += self.update_step
- new_ti.histories.append(
- result_models.TIHistory(self.update_date, self.update_step))
- new_data = copy.deepcopy(self.req_d)
- new_data.trust_indicator = new_ti
- update = result_models.ResultUpdateRequest(trust_indicator=new_ti)
- self.update_req = new_data
- return update, self.req_d_id
-
- def _assert_update_ti(self, request, body):
- ti = body.trust_indicator
- self.assertEqual(ti.current, request.trust_indicator.current)
- if ti.histories:
- history = ti.histories[0]
- self.assertEqual(history.date, self.update_date)
- self.assertEqual(history.step, self.update_step)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_scenario.py b/cvp/opnfv_testapi/tests/unit/resources/test_scenario.py
deleted file mode 100644
index bd720671..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/test_scenario.py
+++ /dev/null
@@ -1,360 +0,0 @@
-import functools
-import httplib
-import json
-import os
-from copy import deepcopy
-from datetime import datetime
-
-import opnfv_testapi.resources.scenario_models as models
-from opnfv_testapi.common import message
-from opnfv_testapi.tests.unit.resources import test_base as base
-
-
-class TestScenarioBase(base.TestBase):
- def setUp(self):
- super(TestScenarioBase, self).setUp()
- self.get_res = models.Scenario
- self.list_res = models.Scenarios
- self.basePath = '/api/v1/scenarios'
- self.req_d = self._load_request('scenario-c1.json')
- self.req_2 = self._load_request('scenario-c2.json')
-
- def tearDown(self):
- pass
-
- def assert_body(self, project, req=None):
- pass
-
- @staticmethod
- def _load_request(f_req):
- abs_file = os.path.join(os.path.dirname(__file__), f_req)
- with open(abs_file, 'r') as f:
- loader = json.load(f)
- f.close()
- return loader
-
- def create_return_name(self, req):
- _, res = self.create(req)
- return res.href.split('/')[-1]
-
- def assert_res(self, code, scenario, req=None):
- self.assertEqual(code, httplib.OK)
- if req is None:
- req = self.req_d
- self.assertIsNotNone(scenario._id)
- self.assertIsNotNone(scenario.creation_date)
-
- scenario == models.Scenario.from_dict(req)
-
- @staticmethod
- def _set_query(*args):
- uri = ''
- for arg in args:
- uri += arg + '&'
- return uri[0: -1]
-
- def _get_and_assert(self, name, req=None):
- code, body = self.get(name)
- self.assert_res(code, body, req)
-
-
-class TestScenarioCreate(TestScenarioBase):
- def test_withoutBody(self):
- (code, body) = self.create()
- self.assertEqual(code, httplib.BAD_REQUEST)
-
- def test_emptyName(self):
- req_empty = models.ScenarioCreateRequest('')
- (code, body) = self.create(req_empty)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
-
- def test_noneName(self):
- req_none = models.ScenarioCreateRequest(None)
- (code, body) = self.create(req_none)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
-
- def test_success(self):
- (code, body) = self.create_d()
- self.assertEqual(code, httplib.OK)
- self.assert_create_body(body)
-
- def test_alreadyExist(self):
- self.create_d()
- (code, body) = self.create_d()
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.exist_base, body)
-
-
-class TestScenarioGet(TestScenarioBase):
- def setUp(self):
- super(TestScenarioGet, self).setUp()
- self.scenario_1 = self.create_return_name(self.req_d)
- self.scenario_2 = self.create_return_name(self.req_2)
-
- def test_getByName(self):
- self._get_and_assert(self.scenario_1, self.req_d)
-
- def test_getAll(self):
- self._query_and_assert(query=None, reqs=[self.req_d, self.req_2])
-
- def test_queryName(self):
- query = self._set_query('name=nosdn-nofeature-ha')
- self._query_and_assert(query, reqs=[self.req_d])
-
- def test_queryInstaller(self):
- query = self._set_query('installer=apex')
- self._query_and_assert(query, reqs=[self.req_d])
-
- def test_queryVersion(self):
- query = self._set_query('version=master')
- self._query_and_assert(query, reqs=[self.req_d])
-
- def test_queryProject(self):
- query = self._set_query('project=functest')
- self._query_and_assert(query, reqs=[self.req_d, self.req_2])
-
- def test_queryCombination(self):
- query = self._set_query('name=nosdn-nofeature-ha',
- 'installer=apex',
- 'version=master',
- 'project=functest')
-
- self._query_and_assert(query, reqs=[self.req_d])
-
- def _query_and_assert(self, query, found=True, reqs=None):
- code, body = self.query(query)
- if not found:
- self.assertEqual(code, httplib.OK)
- self.assertEqual(0, len(body.scenarios))
- else:
- self.assertEqual(len(reqs), len(body.scenarios))
- for req in reqs:
- for scenario in body.scenarios:
- if req['name'] == scenario.name:
- self.assert_res(code, scenario, req)
-
-
-class TestScenarioUpdate(TestScenarioBase):
- def setUp(self):
- super(TestScenarioUpdate, self).setUp()
- self.scenario = self.create_return_name(self.req_d)
- self.scenario_2 = self.create_return_name(self.req_2)
-
- def _execute(set_update):
- @functools.wraps(set_update)
- def magic(self):
- update, scenario = set_update(self, deepcopy(self.req_d))
- self._update_and_assert(update, scenario)
- return magic
-
- def _update(expected):
- def _update(set_update):
- @functools.wraps(set_update)
- def wrap(self):
- update, scenario = set_update(self, deepcopy(self.req_d))
- code, body = self.update(update, self.scenario)
- getattr(self, expected)(code, scenario)
- return wrap
- return _update
-
- @_update('_success')
- def test_renameScenario(self, scenario):
- new_name = 'nosdn-nofeature-noha'
- scenario['name'] = new_name
- update_req = models.ScenarioUpdateRequest(field='name',
- op='update',
- locate={},
- term={'name': new_name})
- return update_req, scenario
-
- @_update('_forbidden')
- def test_renameScenario_exist(self, scenario):
- new_name = self.scenario_2
- scenario['name'] = new_name
- update_req = models.ScenarioUpdateRequest(field='name',
- op='update',
- locate={},
- term={'name': new_name})
- return update_req, scenario
-
- @_update('_bad_request')
- def test_renameScenario_noName(self, scenario):
- new_name = self.scenario_2
- scenario['name'] = new_name
- update_req = models.ScenarioUpdateRequest(field='name',
- op='update',
- locate={},
- term={})
- return update_req, scenario
-
- @_execute
- def test_addInstaller(self, scenario):
- add = models.ScenarioInstaller(installer='daisy', versions=list())
- scenario['installers'].append(add.format())
- update = models.ScenarioUpdateRequest(field='installer',
- op='add',
- locate={},
- term=add.format())
- return update, scenario
-
- @_execute
- def test_deleteInstaller(self, scenario):
- scenario['installers'] = filter(lambda f: f['installer'] != 'apex',
- scenario['installers'])
-
- update = models.ScenarioUpdateRequest(field='installer',
- op='delete',
- locate={'installer': 'apex'})
- return update, scenario
-
- @_execute
- def test_addVersion(self, scenario):
- add = models.ScenarioVersion(version='danube', projects=list())
- scenario['installers'][0]['versions'].append(add.format())
- update = models.ScenarioUpdateRequest(field='version',
- op='add',
- locate={'installer': 'apex'},
- term=add.format())
- return update, scenario
-
- @_execute
- def test_deleteVersion(self, scenario):
- scenario['installers'][0]['versions'] = filter(
- lambda f: f['version'] != 'master',
- scenario['installers'][0]['versions'])
-
- update = models.ScenarioUpdateRequest(field='version',
- op='delete',
- locate={'installer': 'apex',
- 'version': 'master'})
- return update, scenario
-
- @_execute
- def test_changeOwner(self, scenario):
- scenario['installers'][0]['versions'][0]['owner'] = 'lucy'
-
- update = models.ScenarioUpdateRequest(field='owner',
- op='update',
- locate={'installer': 'apex',
- 'version': 'master'},
- term={'owner': 'lucy'})
- return update, scenario
-
- @_execute
- def test_addProject(self, scenario):
- add = models.ScenarioProject(project='qtip').format()
- scenario['installers'][0]['versions'][0]['projects'].append(add)
- update = models.ScenarioUpdateRequest(field='project',
- op='add',
- locate={'installer': 'apex',
- 'version': 'master'},
- term=add)
- return update, scenario
-
- @_execute
- def test_deleteProject(self, scenario):
- scenario['installers'][0]['versions'][0]['projects'] = filter(
- lambda f: f['project'] != 'functest',
- scenario['installers'][0]['versions'][0]['projects'])
-
- update = models.ScenarioUpdateRequest(field='project',
- op='delete',
- locate={
- 'installer': 'apex',
- 'version': 'master',
- 'project': 'functest'})
- return update, scenario
-
- @_execute
- def test_addCustoms(self, scenario):
- add = ['odl', 'parser', 'vping_ssh']
- projects = scenario['installers'][0]['versions'][0]['projects']
- functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['customs'] = ['healthcheck', 'odl', 'parser', 'vping_ssh']
- update = models.ScenarioUpdateRequest(field='customs',
- op='add',
- locate={
- 'installer': 'apex',
- 'version': 'master',
- 'project': 'functest'},
- term=add)
- return update, scenario
-
- @_execute
- def test_deleteCustoms(self, scenario):
- projects = scenario['installers'][0]['versions'][0]['projects']
- functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['customs'] = ['healthcheck']
- update = models.ScenarioUpdateRequest(field='customs',
- op='delete',
- locate={
- 'installer': 'apex',
- 'version': 'master',
- 'project': 'functest'},
- term=['vping_ssh'])
- return update, scenario
-
- @_execute
- def test_addScore(self, scenario):
- add = models.ScenarioScore(date=str(datetime.now()), score='11/12')
- projects = scenario['installers'][0]['versions'][0]['projects']
- functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['scores'].append(add.format())
- update = models.ScenarioUpdateRequest(field='score',
- op='add',
- locate={
- 'installer': 'apex',
- 'version': 'master',
- 'project': 'functest'},
- term=add.format())
- return update, scenario
-
- @_execute
- def test_addTi(self, scenario):
- add = models.ScenarioTI(date=str(datetime.now()), status='gold')
- projects = scenario['installers'][0]['versions'][0]['projects']
- functest = filter(lambda f: f['project'] == 'functest', projects)[0]
- functest['trust_indicators'].append(add.format())
- update = models.ScenarioUpdateRequest(field='trust_indicator',
- op='add',
- locate={
- 'installer': 'apex',
- 'version': 'master',
- 'project': 'functest'},
- term=add.format())
- return update, scenario
-
- def _update_and_assert(self, update_req, new_scenario, name=None):
- code, _ = self.update(update_req, self.scenario)
- self.assertEqual(code, httplib.OK)
- self._get_and_assert(_none_default(name, self.scenario),
- new_scenario)
-
- def _success(self, status, new_scenario):
- self.assertEqual(status, httplib.OK)
- self._get_and_assert(new_scenario.get('name'), new_scenario)
-
- def _forbidden(self, status, new_scenario):
- self.assertEqual(status, httplib.FORBIDDEN)
-
- def _bad_request(self, status, new_scenario):
- self.assertEqual(status, httplib.BAD_REQUEST)
-
-
-class TestScenarioDelete(TestScenarioBase):
- def test_notFound(self):
- code, body = self.delete('notFound')
- self.assertEqual(code, httplib.NOT_FOUND)
-
- def test_success(self):
- scenario = self.create_return_name(self.req_d)
- code, _ = self.delete(scenario)
- self.assertEqual(code, httplib.OK)
- code, _ = self.get(scenario)
- self.assertEqual(code, httplib.NOT_FOUND)
-
-
-def _none_default(check, default):
- return check if check else default
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_testcase.py b/cvp/opnfv_testapi/tests/unit/resources/test_testcase.py
deleted file mode 100644
index 4f2bc2ad..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/test_testcase.py
+++ /dev/null
@@ -1,201 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import copy
-import httplib
-import unittest
-
-from opnfv_testapi.common import message
-from opnfv_testapi.resources import project_models
-from opnfv_testapi.resources import testcase_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit.resources import test_base as base
-
-
-class TestCaseBase(base.TestBase):
- def setUp(self):
- super(TestCaseBase, self).setUp()
- self.req_d = testcase_models.TestcaseCreateRequest('vping_1',
- '/cases/vping_1',
- 'vping-ssh test')
- self.req_e = testcase_models.TestcaseCreateRequest('doctor_1',
- '/cases/doctor_1',
- 'create doctor')
- self.update_d = testcase_models.TestcaseUpdateRequest('vping_1',
- 'vping-ssh test',
- 'functest')
- self.update_e = testcase_models.TestcaseUpdateRequest('doctor_1',
- 'create doctor',
- 'functest')
- self.get_res = testcase_models.Testcase
- self.list_res = testcase_models.Testcases
- self.update_res = testcase_models.Testcase
- self.basePath = '/api/v1/projects/%s/cases'
- self.create_project()
-
- def assert_body(self, case, req=None):
- if not req:
- req = self.req_d
- self.assertEqual(case.name, req.name)
- self.assertEqual(case.description, req.description)
- self.assertEqual(case.url, req.url)
- self.assertIsNotNone(case._id)
- self.assertIsNotNone(case.creation_date)
-
- def assert_update_body(self, old, new, req=None):
- if not req:
- req = self.req_d
- self.assertEqual(new.name, req.name)
- self.assertEqual(new.description, req.description)
- self.assertEqual(new.url, old.url)
- self.assertIsNotNone(new._id)
- self.assertIsNotNone(new.creation_date)
-
- def create_project(self):
- req_p = project_models.ProjectCreateRequest('functest',
- 'vping-ssh test')
- self.create_help('/api/v1/projects', req_p)
- self.project = req_p.name
-
- def create_d(self):
- return super(TestCaseBase, self).create_d(self.project)
-
- def create_e(self):
- return super(TestCaseBase, self).create_e(self.project)
-
- def get(self, case=None):
- return super(TestCaseBase, self).get(self.project, case)
-
- def create(self, req=None, *args):
- return super(TestCaseBase, self).create(req, self.project)
-
- def update(self, new=None, case=None):
- return super(TestCaseBase, self).update(new, self.project, case)
-
- def delete(self, case):
- return super(TestCaseBase, self).delete(self.project, case)
-
-
-class TestCaseCreate(TestCaseBase):
- @executor.create(httplib.BAD_REQUEST, message.no_body())
- def test_noBody(self):
- return None
-
- @executor.create(httplib.FORBIDDEN, message.not_found_base)
- def test_noProject(self):
- self.project = 'noProject'
- return self.req_d
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_emptyName(self):
- req_empty = testcase_models.TestcaseCreateRequest('')
- return req_empty
-
- @executor.create(httplib.BAD_REQUEST, message.missing('name'))
- def test_noneName(self):
- req_none = testcase_models.TestcaseCreateRequest(None)
- return req_none
-
- @executor.create(httplib.OK, '_assert_success')
- def test_success(self):
- return self.req_d
-
- def _assert_success(self, body):
- self.assert_create_body(body, self.req_d, self.project)
-
- @executor.create(httplib.FORBIDDEN, message.exist_base)
- def test_alreadyExist(self):
- self.create_d()
- return self.req_d
-
-
-class TestCaseGet(TestCaseBase):
- def setUp(self):
- super(TestCaseGet, self).setUp()
- self.create_d()
- self.create_e()
-
- @executor.get(httplib.NOT_FOUND, message.not_found_base)
- def test_notExist(self):
- return 'notExist'
-
- @executor.get(httplib.OK, 'assert_body')
- def test_getOne(self):
- return self.req_d.name
-
- @executor.get(httplib.OK, '_list')
- def test_list(self):
- return None
-
- def _list(self, body):
- for case in body.testcases:
- if self.req_d.name == case.name:
- self.assert_body(case)
- else:
- self.assert_body(case, self.req_e)
-
-
-class TestCaseUpdate(TestCaseBase):
- def setUp(self):
- super(TestCaseUpdate, self).setUp()
- self.create_d()
-
- @executor.update(httplib.BAD_REQUEST, message.no_body())
- def test_noBody(self):
- return None, 'noBody'
-
- @executor.update(httplib.NOT_FOUND, message.not_found_base)
- def test_notFound(self):
- return self.update_e, 'notFound'
-
- @executor.update(httplib.FORBIDDEN, message.exist_base)
- def test_newNameExist(self):
- self.create_e()
- return self.update_e, self.req_d.name
-
- @executor.update(httplib.FORBIDDEN, message.no_update())
- def test_noUpdate(self):
- return self.update_d, self.req_d.name
-
- @executor.update(httplib.OK, '_update_success')
- def test_success(self):
- return self.update_e, self.req_d.name
-
- @executor.update(httplib.OK, '_update_success')
- def test_with_dollar(self):
- update = copy.deepcopy(self.update_d)
- update.description = {'2. change': 'dollar change'}
- return update, self.req_d.name
-
- def _update_success(self, request, body):
- self.assert_update_body(self.req_d, body, request)
- _, new_body = self.get(request.name)
- self.assert_update_body(self.req_d, new_body, request)
-
-
-class TestCaseDelete(TestCaseBase):
- def setUp(self):
- super(TestCaseDelete, self).setUp()
- self.create_d()
-
- @executor.delete(httplib.NOT_FOUND, message.not_found_base)
- def test_notFound(self):
- return 'notFound'
-
- @executor.delete(httplib.OK, '_delete_success')
- def test_success(self):
- return self.req_d.name
-
- def _delete_success(self, body):
- self.assertEqual(body, '')
- code, body = self.get(self.req_d.name)
- self.assertEqual(code, httplib.NOT_FOUND)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_token.py b/cvp/opnfv_testapi/tests/unit/resources/test_token.py
deleted file mode 100644
index 940e256c..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/test_token.py
+++ /dev/null
@@ -1,114 +0,0 @@
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-import httplib
-import unittest
-
-from tornado import web
-
-from opnfv_testapi.common import message
-from opnfv_testapi.resources import project_models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit import fake_pymongo
-from opnfv_testapi.tests.unit.resources import test_base as base
-
-
-class TestToken(base.TestBase):
- def get_app(self):
- from opnfv_testapi.router import url_mappings
- return web.Application(
- url_mappings.mappings,
- db=fake_pymongo,
- debug=True,
- auth=True
- )
-
-
-class TestTokenCreateProject(TestToken):
- def setUp(self):
- super(TestTokenCreateProject, self).setUp()
- self.req_d = project_models.ProjectCreateRequest('vping')
- fake_pymongo.tokens.insert({"access_token": "12345"})
- self.basePath = '/api/v1/projects'
-
- @executor.create(httplib.FORBIDDEN, message.invalid_token())
- def test_projectCreateTokenInvalid(self):
- self.headers['X-Auth-Token'] = '1234'
- return self.req_d
-
- @executor.create(httplib.UNAUTHORIZED, message.unauthorized())
- def test_projectCreateTokenUnauthorized(self):
- if 'X-Auth-Token' in self.headers:
- self.headers.pop('X-Auth-Token')
- return self.req_d
-
- @executor.create(httplib.OK, '_create_success')
- def test_projectCreateTokenSuccess(self):
- self.headers['X-Auth-Token'] = '12345'
- return self.req_d
-
- def _create_success(self, body):
- self.assertIn('CreateResponse', str(type(body)))
-
-
-class TestTokenDeleteProject(TestToken):
- def setUp(self):
- super(TestTokenDeleteProject, self).setUp()
- self.req_d = project_models.ProjectCreateRequest('vping')
- fake_pymongo.tokens.insert({"access_token": "12345"})
- self.basePath = '/api/v1/projects'
- self.headers['X-Auth-Token'] = '12345'
- self.create_d()
-
- @executor.delete(httplib.FORBIDDEN, message.invalid_token())
- def test_projectDeleteTokenIvalid(self):
- self.headers['X-Auth-Token'] = '1234'
- return self.req_d.name
-
- @executor.delete(httplib.UNAUTHORIZED, message.unauthorized())
- def test_projectDeleteTokenUnauthorized(self):
- self.headers.pop('X-Auth-Token')
- return self.req_d.name
-
- @executor.delete(httplib.OK, '_delete_success')
- def test_projectDeleteTokenSuccess(self):
- return self.req_d.name
-
- def _delete_success(self, body):
- self.assertEqual('', body)
-
-
-class TestTokenUpdateProject(TestToken):
- def setUp(self):
- super(TestTokenUpdateProject, self).setUp()
- self.req_d = project_models.ProjectCreateRequest('vping')
- fake_pymongo.tokens.insert({"access_token": "12345"})
- self.basePath = '/api/v1/projects'
- self.headers['X-Auth-Token'] = '12345'
- self.create_d()
-
- @executor.update(httplib.FORBIDDEN, message.invalid_token())
- def test_projectUpdateTokenIvalid(self):
- self.headers['X-Auth-Token'] = '1234'
- req = project_models.ProjectUpdateRequest('newName', 'new description')
- return req, self.req_d.name
-
- @executor.update(httplib.UNAUTHORIZED, message.unauthorized())
- def test_projectUpdateTokenUnauthorized(self):
- self.headers.pop('X-Auth-Token')
- req = project_models.ProjectUpdateRequest('newName', 'new description')
- return req, self.req_d.name
-
- @executor.update(httplib.OK, '_update_success')
- def test_projectUpdateTokenSuccess(self):
- req = project_models.ProjectUpdateRequest('newName', 'new description')
- return req, self.req_d.name
-
- def _update_success(self, request, body):
- self.assertIn(request.name, body)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/cvp/opnfv_testapi/tests/unit/resources/test_version.py b/cvp/opnfv_testapi/tests/unit/resources/test_version.py
deleted file mode 100644
index 51fed11e..00000000
--- a/cvp/opnfv_testapi/tests/unit/resources/test_version.py
+++ /dev/null
@@ -1,36 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import httplib
-import unittest
-
-from opnfv_testapi.resources import models
-from opnfv_testapi.tests.unit import executor
-from opnfv_testapi.tests.unit.resources import test_base as base
-
-
-class TestVersionBase(base.TestBase):
- def setUp(self):
- super(TestVersionBase, self).setUp()
- self.list_res = models.Versions
- self.basePath = '/versions'
-
-
-class TestVersion(TestVersionBase):
- @executor.get(httplib.OK, '_get_success')
- def test_success(self):
- return None
-
- def _get_success(self, body):
- self.assertEqual(len(body.versions), 1)
- self.assertEqual(body.versions[0].version, 'v1.0')
- self.assertEqual(body.versions[0].description, 'basics')
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/cvp/opnfv_testapi/tornado_swagger/README.md b/cvp/opnfv_testapi/tornado_swagger/README.md
deleted file mode 100644
index d815f216..00000000
--- a/cvp/opnfv_testapi/tornado_swagger/README.md
+++ /dev/null
@@ -1,273 +0,0 @@
-# tornado-swagger
-
-## What is tornado-swagger?
-tornado is a wrapper for tornado which enables swagger-ui support.
-
-In essense, you just need to wrap the Api instance and add a few python decorators to
-get full swagger support.http://swagger.io/
-
-
-## How to use:
-
-
-```python
-from tornado.web import RequestHandler, HTTPError
-from tornado_swagger import swagger
-
-swagger.docs()
-
-# You may decorate your operation with @swagger.operation and use docs to inform information
-class ItemNoParamHandler(GenericApiHandler):
- @swagger.operation(nickname='create')
- def post(self):
- """
- @param body: create test results for a item.
- @type body: L{Item}
- @return 200: item is created.
- @raise 400: invalid input
- """
-
-# Operations not decorated with @swagger.operation do not get added to the swagger docs
-
-class ItemNoParamHandler(GenericApiHandler):
- def options(self):
- """
- I'm not visible in the swagger docs
- """
- pass
-
-
-# Then you use swagger.Application instead of tornado.web.Application
-# and do other operations as usual
-
-def make_app():
- return swagger.Application([
- (r"/items", ItemNoParamHandler),
- (r"/items/([^/]+)", ItemHandler),
- (r"/items/([^/]+)/cases/([^/]+)", ItemOptionParamHandler),
- ])
-
-# You define models like this:
-@swagger.model
-class Item:
- """
- @descriptin:
- This is an example of a model class that has parameters in its constructor
- and the fields in the swagger spec are derived from the parameters to __init__.
- @notes:
- In this case we would have property1, property2 as required parameters
- and property3 as optional parameter.
- @property property3: Item decription
- @ptype property3: L{PropertySubclass}
- """
- def __init__(self, property1, property2=None):
- self.property1 = property1
- self.property2 = property2
-
-# Swagger json:
- "models": {
- "Item": {
- "description": "A description...",
- "id": "Item",
- "required": [
- "property1",
- ],
- "properties": [
- "property1": {
- "type": "string"
- },
- "property2": {
- "type": "string"
- "default": null
- }
- ]
- }
- }
-
-# If you declare an __init__ method with meaningful arguments
-# then those args could be used to deduce the swagger model fields.
-# just as shown above
-
-# if you declare an @property in docs, this property property2 will also be used
-# to deduce the swagger model fields
-class Item:
- """
- @property property3: Item description
- """
- def __init__(self, property1, property2):
- self.property1 = property1
- self.property2 = property2
-
-# Swagger json:
- "models": {
- "Item": {
- "description": "A description...",
- "id": "Item",
- "required": [
- "property1",
- ],
- "properties": [
- "property1": {
- "type": "string"
- },
- "property2": {
- "type": "string"
- }
- "property3": {
- "type": "string"
- }
- ]
- }
- }
-
-# if you declare an argument with @ptype, the type of this argument will be specified
-# rather than the default 'string'
-class Item:
- """
- @ptype property3: L{PropertySubclass}
- """
- def __init__(self, property1, property2, property3=None):
- self.property1 = property1
- self.property2 = property2
- self.property3 = property3
-
-# Swagger json:
- "models": {
- "Item": {
- "description": "A description...",
- "id": "Item",
- "required": [
- "property1",
- ],
- "properties": [
- "property1": {
- "type": "string"
- },
- "property2": {
- "type": "string"
- },
- "property3": {
- "type": "PropertySubclass"
- "default": null
- }
- ]
- }
- }
-
-# if you want to declare an list property, you can do it like this:
-class Item:
- """
- @ptype property3: L{PropertySubclass}
- @ptype property4: C{list} of L{PropertySubclass}
- """
- def __init__(self, property1, property2, property3, property4=None):
- self.property1 = property1
- self.property2 = property2
- self.property3 = property3
- self.property4 = property4
-
-# Swagger json:
- "models": {
- "Item": {
- "description": "A description...",
- "id": "Item",
- "required": [
- "property1",
- ],
- "properties": [
- "property1": {
- "type": "string"
- },
- "property2": {
- "type": "string"
- },
- "property3": {
- "type": "PropertySubclass"
- "default": null
- },
- "property4": {
- "default": null,
- "items": {
- "type": "PropertySubclass"},
- "type": "array"
- }
- }
- ]
- }
- }
-
-# if it is a query:
-class ItemQueryHandler(GenericApiHandler):
- @swagger.operation(nickname='query')
- def get(self):
- """
- @param property1:
- @type property1: L{string}
- @in property1: query
- @required property1: False
-
- @param property2:
- @type property2: L{string}
- @in property2: query
- @required property2: True
- @rtype: L{Item}
-
- @notes: GET /item?property1=1&property2=1
- """
-
-# Swagger json:
- "apis": [
- {
- "operations": [
- {
- "parameters": [
- {
- "name": "property1",
- "dataType": "string",
- "paramType": "query",
- "description": ""
- },
- {
- "name": "property2",
- "dataType": "string",
- "paramType": "query",
- "required": true,
- "description": ""
- }
- ],
- "responseClass": "Item",
- "notes": null,
- "responseMessages": [],
- "summary": null,
- "httpMethod": "GET",
- "nickname": "query"
- }
- ],
- "path": "/item",
- "description": null
- },
- ....
- ]
-```
-
-# Running and testing
-
-Now run your tornado app
-
-```
-python main.py
-```
-
-And visit:
-
-```
-curl http://ip:port/swagger/spec
-```
-
-access to web
-```
-http://ip:port/swagger/spec.html
-```
-
-# Passing more metadata to swagger
-customized arguments used in creating the 'swagger.docs' object will be supported later
diff --git a/cvp/opnfv_testapi/tornado_swagger/__init__.py b/cvp/opnfv_testapi/tornado_swagger/__init__.py
deleted file mode 100644
index 363bc388..00000000
--- a/cvp/opnfv_testapi/tornado_swagger/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
diff --git a/cvp/opnfv_testapi/tornado_swagger/handlers.py b/cvp/opnfv_testapi/tornado_swagger/handlers.py
deleted file mode 100644
index e39a9f63..00000000
--- a/cvp/opnfv_testapi/tornado_swagger/handlers.py
+++ /dev/null
@@ -1,38 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import tornado.web
-
-from opnfv_testapi.tornado_swagger import settings
-from opnfv_testapi.tornado_swagger import views
-
-
-def swagger_handlers():
- prefix = settings.docs_settings.get('swagger_prefix', '/swagger')
- if prefix[-1] != '/':
- prefix += '/'
-
- def _path(suffix):
- return prefix + suffix
- return [
- tornado.web.URLSpec(
- _path(r'spec.html$'),
- views.SwaggerUIHandler,
- settings.docs_settings,
- name=settings.API_DOCS_NAME),
- tornado.web.URLSpec(
- _path(r'resources.json$'),
- views.SwaggerResourcesHandler,
- settings.docs_settings,
- name=settings.RESOURCE_LISTING_NAME),
- tornado.web.URLSpec(
- _path(r'APIs$'),
- views.SwaggerApiHandler,
- settings.docs_settings,
- name=settings.API_DECLARATION_NAME),
- ]
diff --git a/cvp/opnfv_testapi/tornado_swagger/settings.py b/cvp/opnfv_testapi/tornado_swagger/settings.py
deleted file mode 100644
index 28422611..00000000
--- a/cvp/opnfv_testapi/tornado_swagger/settings.py
+++ /dev/null
@@ -1,25 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-API_DOCS_NAME = 'swagger-api-docs'
-RESOURCE_LISTING_NAME = 'swagger-resource-listing'
-API_DECLARATION_NAME = 'swagger-api-declaration'
-
-docs_settings = {
- 'base_url': '',
- 'static_path': '',
- 'swagger_prefix': '/swagger',
- 'api_version': 'v1.0',
- 'swagger_version': '1.2',
- 'api_key': '',
- 'enabled_methods': ['get', 'post', 'put', 'patch', 'delete'],
- 'exclude_namespaces': [],
-}
-
-models = []
diff --git a/cvp/opnfv_testapi/tornado_swagger/swagger.py b/cvp/opnfv_testapi/tornado_swagger/swagger.py
deleted file mode 100644
index 83f389a6..00000000
--- a/cvp/opnfv_testapi/tornado_swagger/swagger.py
+++ /dev/null
@@ -1,291 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from HTMLParser import HTMLParser
-from functools import wraps
-import inspect
-
-import epydoc.markup
-import tornado.web
-
-from opnfv_testapi.tornado_swagger import handlers
-from opnfv_testapi.tornado_swagger import settings
-
-
-class EpytextParser(HTMLParser):
- a_text = False
-
- def __init__(self, tag):
- HTMLParser.__init__(self)
- self.tag = tag
- self.data = None
-
- def handle_starttag(self, tag, attr):
- if tag == self.tag:
- self.a_text = True
-
- def handle_endtag(self, tag):
- if tag == self.tag:
- self.a_text = False
-
- def handle_data(self, data):
- if self.a_text:
- self.data = data
-
- def get_data(self):
- return self.data
-
-
-class DocParser(object):
- def __init__(self):
- self.notes = None
- self.summary = None
- self.responseClass = None
- self.responseMessages = []
- self.params = {}
- self.properties = {}
-
- def parse_docstring(self, text):
- if text is None:
- return
-
- errors = []
- doc = epydoc.markup.parse(text, markup='epytext', errors=errors)
- _, fields = doc.split_fields(errors)
-
- for field in fields:
- tag = field.tag()
- arg = field.arg()
- body = field.body()
- self._get_parser(tag)(arg=arg, body=body)
- return doc
-
- def _get_parser(self, tag):
- parser = {
- 'param': self._parse_param,
- 'type': self._parse_type,
- 'in': self._parse_in,
- 'required': self._parse_required,
- 'rtype': self._parse_rtype,
- 'property': self._parse_property,
- 'ptype': self._parse_ptype,
- 'return': self._parse_return,
- 'raise': self._parse_return,
- 'notes': self._parse_notes,
- 'description': self._parse_description,
- }
- return parser.get(tag, self._not_supported)
-
- def _parse_param(self, **kwargs):
- arg = kwargs.get('arg', None)
- body = self._get_body(**kwargs)
- self.params.setdefault(arg, {}).update({
- 'name': arg,
- 'description': body,
- })
-
- if 'paramType' not in self.params[arg]:
- self.params[arg]['paramType'] = 'query'
-
- def _parse_type(self, **kwargs):
- arg = kwargs.get('arg', None)
- body = self._get_body(**kwargs)
- self.params.setdefault(arg, {}).update({
- 'name': arg,
- 'dataType': body
- })
-
- def _parse_in(self, **kwargs):
- arg = kwargs.get('arg', None)
- body = self._get_body(**kwargs)
- self.params.setdefault(arg, {}).update({
- 'name': arg,
- 'paramType': body
- })
-
- def _parse_required(self, **kwargs):
- arg = kwargs.get('arg', None)
- body = self._get_body(**kwargs)
- self.params.setdefault(arg, {}).update({
- 'name': arg,
- 'required': False if body in ['False', 'false'] else True
- })
-
- def _parse_rtype(self, **kwargs):
- body = self._get_body(**kwargs)
- self.responseClass = body
-
- def _parse_property(self, **kwargs):
- arg = kwargs.get('arg', None)
- self.properties.setdefault(arg, {}).update({
- 'type': 'string'
- })
-
- def _parse_ptype(self, **kwargs):
- arg = kwargs.get('arg', None)
- code = self._parse_epytext_para('code', **kwargs)
- link = self._parse_epytext_para('link', **kwargs)
- if code is None:
- self.properties.setdefault(arg, {}).update({
- 'type': link
- })
- elif code == 'list':
- self.properties.setdefault(arg, {}).update({
- 'type': 'array',
- 'items': {'type': link}
- })
-
- def _parse_return(self, **kwargs):
- arg = kwargs.get('arg', None)
- body = self._get_body(**kwargs)
- self.responseMessages.append({
- 'code': arg,
- 'message': body
- })
-
- def _parse_notes(self, **kwargs):
- body = self._get_body(**kwargs)
- self.notes = self._sanitize_doc(body)
-
- def _parse_description(self, **kwargs):
- body = self._get_body(**kwargs)
- self.summary = self._sanitize_doc(body)
-
- def _not_supported(self, **kwargs):
- pass
-
- @staticmethod
- def _sanitize_doc(comment):
- return comment.replace('\n', '<br/>') if comment else comment
-
- @staticmethod
- def _get_body(**kwargs):
- body = kwargs.get('body', None)
- return body.to_plaintext(None).strip() if body else body
-
- @staticmethod
- def _parse_epytext_para(tag, **kwargs):
- def _parse_epytext(tag, body):
- epytextParser = EpytextParser(tag)
- epytextParser.feed(str(body))
- data = epytextParser.get_data()
- epytextParser.close()
- return data
-
- body = kwargs.get('body', None)
- return _parse_epytext(tag, body) if body else body
-
-
-class model(DocParser):
- def __init__(self, *args, **kwargs):
- super(model, self).__init__()
- self.args = args
- self.kwargs = kwargs
- self.required = []
- self.cls = None
-
- def __call__(self, *args):
- if self.cls:
- return self.cls
-
- cls = args[0]
- self._parse_model(cls)
-
- return cls
-
- def _parse_model(self, cls):
- self.id = cls.__name__
- self.cls = cls
- if '__init__' in dir(cls):
- self._parse_args(cls.__init__)
- self.parse_docstring(inspect.getdoc(cls))
- settings.models.append(self)
-
- def _parse_args(self, func):
- argspec = inspect.getargspec(func)
- argspec.args.remove("self")
- defaults = {}
- if argspec.defaults:
- defaults = list(zip(argspec.args[-len(argspec.defaults):],
- argspec.defaults))
- required_args_count = len(argspec.args) - len(defaults)
- for arg in argspec.args[:required_args_count]:
- self.required.append(arg)
- self.properties.setdefault(arg, {'type': 'string'})
- for arg, default in defaults:
- self.properties.setdefault(arg, {
- 'type': 'string',
- "default": default
- })
-
-
-class operation(DocParser):
- def __init__(self, nickname='apis', **kwds):
- super(operation, self).__init__()
- self.nickname = nickname
- self.func = None
- self.func_args = []
- self.kwds = kwds
-
- def __call__(self, *args, **kwds):
- if self.func:
- return self.func(*args, **kwds)
-
- func = args[0]
- self._parse_operation(func)
-
- @wraps(func)
- def __wrapper__(*in_args, **in_kwds):
- return self.func(*in_args, **in_kwds)
-
- __wrapper__.rest_api = self
- return __wrapper__
-
- def _parse_operation(self, func):
- self.func = func
-
- self.__name__ = func.__name__
- self._parse_args(func)
- self.parse_docstring(inspect.getdoc(self.func))
-
- def _parse_args(self, func):
- argspec = inspect.getargspec(func)
- argspec.args.remove("self")
-
- defaults = []
- if argspec.defaults:
- defaults = argspec.args[-len(argspec.defaults):]
-
- for arg in argspec.args:
- if arg in defaults:
- required = False
- else:
- required = True
- self.params.setdefault(arg, {
- 'name': arg,
- 'required': required,
- 'paramType': 'path',
- 'dataType': 'string'
- })
- self.func_args = argspec.args
-
-
-def docs(**opts):
- settings.docs_settings.update(opts)
-
-
-class Application(tornado.web.Application):
- def __init__(self, app_handlers=None,
- default_host="",
- transforms=None,
- **settings):
- super(Application, self).__init__(
- handlers.swagger_handlers() + app_handlers,
- default_host,
- transforms,
- **settings)
diff --git a/cvp/opnfv_testapi/tornado_swagger/views.py b/cvp/opnfv_testapi/tornado_swagger/views.py
deleted file mode 100644
index 79399970..00000000
--- a/cvp/opnfv_testapi/tornado_swagger/views.py
+++ /dev/null
@@ -1,134 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import inspect
-import json
-
-import tornado.template
-import tornado.web
-
-from opnfv_testapi.tornado_swagger import settings
-
-
-def json_dumps(obj, pretty=False):
- return json.dumps(obj,
- sort_keys=True,
- indent=4,
- separators=(',', ': ')) if pretty else json.dumps(obj)
-
-
-class SwaggerUIHandler(tornado.web.RequestHandler):
- def initialize(self, **kwargs):
- self.static_path = kwargs.get('static_path')
- self.base_url = kwargs.get('base_url')
-
- def get_template_path(self):
- return self.static_path
-
- def get(self):
- resource_url = self.reverse_url(settings.RESOURCE_LISTING_NAME)
- discovery_url = self.base_url + resource_url
- self.render('swagger/index.html', discovery_url=discovery_url)
-
-
-class SwaggerResourcesHandler(tornado.web.RequestHandler):
- def initialize(self, **kwargs):
- self.api_version = kwargs.get('api_version')
- self.swagger_version = kwargs.get('swagger_version')
- self.base_url = kwargs.get('base_url')
- self.exclude_namespaces = kwargs.get('exclude_namespaces')
-
- def get(self):
- self.set_header('content-type', 'application/json')
- resources = {
- 'apiVersion': self.api_version,
- 'swaggerVersion': self.swagger_version,
- 'basePath': self.base_url,
- 'apis': [{
- 'path': self.reverse_url(settings.API_DECLARATION_NAME),
- 'description': 'Restful APIs Specification'
- }]
- }
-
- self.finish(json_dumps(resources, self.get_arguments('pretty')))
-
-
-class SwaggerApiHandler(tornado.web.RequestHandler):
- def initialize(self, **kwargs):
- self.api_version = kwargs.get('api_version')
- self.swagger_version = kwargs.get('swagger_version')
- self.base_url = kwargs.get('base_url')
-
- def get(self):
- self.set_header('content-type', 'application/json')
- apis = self.find_api(self.application.handlers)
- if apis is None:
- raise tornado.web.HTTPError(404)
-
- specs = {
- 'apiVersion': self.api_version,
- 'swaggerVersion': self.swagger_version,
- 'basePath': self.base_url,
- 'resourcePath': '/',
- 'produces': ["application/json"],
- 'apis': [self.__get_api_spec__(path, spec, operations)
- for path, spec, operations in apis],
- 'models': self.__get_models_spec(settings.models)
- }
- self.finish(json_dumps(specs, self.get_arguments('pretty')))
-
- def __get_models_spec(self, models):
- models_spec = {}
- for model in models:
- models_spec.setdefault(model.id, self.__get_model_spec(model))
- return models_spec
-
- @staticmethod
- def __get_model_spec(model):
- return {
- 'description': model.summary,
- 'id': model.id,
- 'notes': model.notes,
- 'properties': model.properties,
- 'required': model.required
- }
-
- @staticmethod
- def __get_api_spec__(path, spec, operations):
- return {
- 'path': path,
- 'description': spec.handler_class.__doc__,
- 'operations': [{
- 'httpMethod': api.func.__name__.upper(),
- 'nickname': api.nickname,
- 'parameters': api.params.values(),
- 'summary': api.summary,
- 'notes': api.notes,
- 'responseClass': api.responseClass,
- 'responseMessages': api.responseMessages,
- } for api in operations]
- }
-
- @staticmethod
- def find_api(host_handlers):
- def get_path(url, args):
- return url % tuple(['{%s}' % arg for arg in args])
-
- def get_operations(cls):
- return [member.rest_api
- for (_, member) in inspect.getmembers(cls)
- if hasattr(member, 'rest_api')]
-
- for host, handlers in host_handlers:
- for spec in handlers:
- for (_, mbr) in inspect.getmembers(spec.handler_class):
- if inspect.ismethod(mbr) and hasattr(mbr, 'rest_api'):
- path = get_path(spec._path, mbr.rest_api.func_args)
- operations = get_operations(spec.handler_class)
- yield path, spec, operations
- break
diff --git a/cvp/opnfv_testapi/ui/__init__.py b/cvp/opnfv_testapi/ui/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cvp/opnfv_testapi/ui/__init__.py
+++ /dev/null
diff --git a/cvp/opnfv_testapi/ui/auth/__init__.py b/cvp/opnfv_testapi/ui/auth/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cvp/opnfv_testapi/ui/auth/__init__.py
+++ /dev/null
diff --git a/cvp/opnfv_testapi/ui/auth/base.py b/cvp/opnfv_testapi/ui/auth/base.py
deleted file mode 100644
index bea87c4d..00000000
--- a/cvp/opnfv_testapi/ui/auth/base.py
+++ /dev/null
@@ -1,35 +0,0 @@
-import random
-import string
-
-from six.moves.urllib import parse
-
-from opnfv_testapi.resources import handlers
-
-
-class BaseHandler(handlers.GenericApiHandler):
- def __init__(self, application, request, **kwargs):
- super(BaseHandler, self).__init__(application, request, **kwargs)
- self.table = 'users'
-
- def set_cookies(self, cookies):
- for cookie_n, cookie_v in cookies:
- self.set_secure_cookie(cookie_n, cookie_v)
-
-
-def get_token(length=30):
- """Get random token."""
- return ''.join(random.choice(string.ascii_lowercase)
- for i in range(length))
-
-
-def set_query_params(url, params):
- """Set params in given query."""
- url_parts = parse.urlparse(url)
- url = parse.urlunparse((
- url_parts.scheme,
- url_parts.netloc,
- url_parts.path,
- url_parts.params,
- parse.urlencode(params),
- url_parts.fragment))
- return url
diff --git a/cvp/opnfv_testapi/ui/auth/constants.py b/cvp/opnfv_testapi/ui/auth/constants.py
deleted file mode 100644
index 44ccb46d..00000000
--- a/cvp/opnfv_testapi/ui/auth/constants.py
+++ /dev/null
@@ -1,18 +0,0 @@
-OPENID = 'openid'
-ROLE = 'role'
-DEFAULT_ROLE = 'user'
-
-# OpenID parameters
-OPENID_MODE = 'openid.mode'
-OPENID_NS = 'openid.ns'
-OPENID_RETURN_TO = 'openid.return_to'
-OPENID_CLAIMED_ID = 'openid.claimed_id'
-OPENID_IDENTITY = 'openid.identity'
-OPENID_REALM = 'openid.realm'
-OPENID_NS_SREG = 'openid.ns.sreg'
-OPENID_NS_SREG_REQUIRED = 'openid.sreg.required'
-OPENID_NS_SREG_EMAIL = 'openid.sreg.email'
-OPENID_NS_SREG_FULLNAME = 'openid.sreg.fullname'
-OPENID_ERROR = 'openid.error'
-
-CSRF_TOKEN = 'csrf_token'
diff --git a/cvp/opnfv_testapi/ui/auth/jira_util.py b/cvp/opnfv_testapi/ui/auth/jira_util.py
deleted file mode 100644
index 5ec91a71..00000000
--- a/cvp/opnfv_testapi/ui/auth/jira_util.py
+++ /dev/null
@@ -1,66 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 Max Breitenfeldt and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-
-import base64
-import os
-
-import oauth2 as oauth
-from jira import JIRA
-from tlslite.utils import keyfactory
-from opnfv_testapi.common.config import CONF
-
-
-class SignatureMethod_RSA_SHA1(oauth.SignatureMethod):
- name = 'RSA-SHA1'
-
- def signing_base(self, request, consumer, token):
- if not hasattr(request, 'normalized_url') or \
- request.normalized_url is None:
- raise ValueError("Base URL for request is not set.")
-
- sig = (
- oauth.escape(request.method),
- oauth.escape(request.normalized_url),
- oauth.escape(request.get_normalized_parameters()),
- )
-
- key = '%s&' % oauth.escape(consumer.secret)
- if token:
- key += oauth.escape(token.secret)
- raw = '&'.join(sig)
- return key, raw
-
- def sign(self, request, consumer, token):
- """Builds the base signature string."""
- key, raw = self.signing_base(request, consumer, token)
-
- module_dir = os.path.dirname(__file__) # get current directory
- with open(module_dir + '/rsa.pem', 'r') as f:
- data = f.read()
- privateKeyString = data.strip()
- privatekey = keyfactory.parsePrivateKey(privateKeyString)
- raw = str.encode(raw)
- signature = privatekey.hashAndSign(raw)
- return base64.b64encode(signature)
-
-
-def get_jira(access_token):
- module_dir = os.path.dirname(__file__) # get current directory
- with open(module_dir + '/rsa.pem', 'r') as f:
- key_cert = f.read()
-
- oauth_dict = {
- 'access_token': access_token['oauth_token'],
- 'access_token_secret': access_token['oauth_token_secret'],
- 'consumer_key': CONF.jira_oauth_consumer_key,
- 'key_cert': key_cert
- }
-
- return JIRA(server=CONF.jira_jira_url, oauth=oauth_dict)
diff --git a/cvp/opnfv_testapi/ui/auth/rsa.pem b/cvp/opnfv_testapi/ui/auth/rsa.pem
deleted file mode 100644
index 5ec1bbf1..00000000
--- a/cvp/opnfv_testapi/ui/auth/rsa.pem
+++ /dev/null
@@ -1,27 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIIEowIBAAKCAQEAyDe0zz8Gpr1dh31c7R8LGV4p+wT0s2sUTtha1ex3GXzhEewQ
-Cx3WUW/tttnBGd0oVVNMzoaIPbQJgPnuyVx2VugELLIdPHd9ngPDq09YWzK/J3VW
-3I0sQLA5xVmGv32z4Uz7vbc7/4UM+FXPqUmhWBysR0zsm/nlLqbLs08GEyt2ZT+v
-rNQnYtWA6YuL2OHSHvkQlwibpezHzs6LV7A8mQjWbptu0FL229h6pNSyV7YM459w
-Z5RzBsPybl07P5HOVtkSizIFJ+HLc6yp4cVxjCgk4rMKyPBSG9dBEaHMlBCis61R
-2lTbCJiy7SnWGiMd28WKuvu2k8T4A9k2FzvYwwIDAQABAoIBAGqhOFtTjqBIo8If
-4tiqOsgE3UjBp+zR71vaX+4kZH2fg2J/HUA+YMC4YpqKOAwlO3DNz08CWRa7hoA5
-G5ID+0ZnhKmlJmronG8GRDQ9KqpPSXyjQmJtkQ7Wi73t4xSixqUL0dqE9qAr5O9x
-DAp1m0cI5juG3VBoc0U4Ma5KPMsB3jceeV446ZsU07LSgTIOfLNzq6oEWLWhBzLj
-rDRcGyB6iNxCsNacruW3DKrDg1cMqWqjxt6Tf4LuTWYFmGedTIktmn7VZDgXcbkK
-a7sCRr7P0br6zuIFak1ugkUECDwNznLz3+QgW/iaay6NL6qEpnMLg3Z44kP+BLma
-h5g/SvECgYEA7ewD4lG8s/iz5OIinVHIW10Bc8pEMX3+8cVCo+rq7YbWG+HqXFrv
-DUcyRu/O3SHpc4ozkhRMTsVK5xGUuWGlLG9Hit5R4Ra8oHurJMsFUqjaptd9roHi
-CMmynCFupqBwDoxMig5KxvuDqbOmo2yQOelP/UEnC+qlrux5+lClx4sCgYEA125H
-KPAi30FkRJ/7pzlNtcqzNYQh6xdgcrDIsU1zHRa4AOPYSD+WSkb7wAbns5WLlOM8
-wScpUijyfu56YDizHuID4QW4ddKGVLEbx4tt8CiPLzweeFsP/FSfpd+OK0EDs8wP
-S0b81rCkJKvGljfdl/wY3mYXOu0RZzXB55N1GqkCgYAscy+2lLbAmPJjDKyS37ii
-+RlQXLWo2XVMDiKJJVaG0e4mf2qdno+S135ZKmxne/J1l5hS7l/jR5Da4rn6eHe3
-eYLQOwDpIKpVAUXUNenkq49OJGxisflc0vH/oW9eyhKlZSjXkhv+WPccOWgkmB/J
-8gDzu7xjyY7yw1N2pKKUSQKBgQCfhdB5twALk698xX6igGNT10pGuZYoMEJCCzhB
-WlmAU79jIVSZg0R1sgRfWH2gVH9se6wUVzxY02tlpI/HypSQrMo0iXji/kZsVk18
-wHljGZWVY44ojz3SGpOxT05GJzlnnRZCJsm47EpPwUcnGy0iixGbNbvD7aIya/Mu
-2NkhKQKBgBgLvhfU3sU6XYrF99L63W1vcDyoXcsmQQtz2EzPflFkdcLYoeHo13XW
-Apv7EeX+zqaeqx0v7xuVYWyde5ux9+vII4al0jToabLcd0y2k0Oxmjv40K1YVYsu
-ZqoLXriNHf4NkqgQAFu8FfV1S9RTl6+3X4z6yzf09ustxiw3KWCz
------END RSA PRIVATE KEY-----
diff --git a/cvp/opnfv_testapi/ui/auth/sign.py b/cvp/opnfv_testapi/ui/auth/sign.py
deleted file mode 100644
index dbb40ed0..00000000
--- a/cvp/opnfv_testapi/ui/auth/sign.py
+++ /dev/null
@@ -1,281 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-
-from six.moves.urllib import parse
-from tornado import gen
-from tornado import web
-
-from cas import CASClient
-from opnfv_testapi.ui.auth.jira_util import SignatureMethod_RSA_SHA1
-from opnfv_testapi.ui.auth.jira_util import get_jira
-
-from opnfv_testapi.common.config import CONF
-from opnfv_testapi.db import api as dbapi
-from opnfv_testapi.ui.auth import base
-from opnfv_testapi.ui.auth import constants as const
-
-import logging
-import oauth2 as oauth
-
-root = logging.getLogger()
-root.setLevel(logging.DEBUG)
-
-
-class SigninHandler(base.BaseHandler):
- def get(self):
- signin_type = self.get_query_argument("type")
- self.set_secure_cookie("signin_type", signin_type)
- if signin_type == "openstack":
- self.signin_with_openstack()
- if signin_type == "jira":
- self.signin_with_jira()
- if signin_type == "cas":
- self.signin_with_cas()
-
- def signin_with_cas(self):
- client = CASClient(
- version='2',
- renew=False,
- extra_login_params=False,
- server_url=CONF.lfid_url,
- service_url=CONF.lfid_return_url
- )
- redirect_url = client.get_login_url()
- self.redirect(url=redirect_url, permanent=False)
-
- def signin_with_openstack(self):
- csrf_token = base.get_token()
- return_endpoint = parse.urljoin(CONF.api_url,
- CONF.osid_openid_return_to)
- return_to = base.set_query_params(return_endpoint,
- {const.CSRF_TOKEN: csrf_token})
-
- params = {
- const.OPENID_MODE: CONF.osid_openid_mode,
- const.OPENID_NS: CONF.osid_openid_ns,
- const.OPENID_RETURN_TO: return_to,
- const.OPENID_CLAIMED_ID: CONF.osid_openid_claimed_id,
- const.OPENID_IDENTITY: CONF.osid_openid_identity,
- const.OPENID_REALM: CONF.api_url,
- const.OPENID_NS_SREG: CONF.osid_openid_ns_sreg,
- const.OPENID_NS_SREG_REQUIRED: CONF.osid_openid_sreg_required,
- }
- url = CONF.osid_openstack_openid_endpoint
- url = base.set_query_params(url, params)
- self.redirect(url=url, permanent=False)
-
- def signin_with_jira(self):
- consumer = oauth.Consumer(CONF.jira_oauth_consumer_key,
- CONF.jira_oauth_consumer_secret)
- client = oauth.Client(consumer)
- client.set_signature_method(SignatureMethod_RSA_SHA1())
-
- # Step 1. Get a request token from Jira.
- try:
- resp, content = client.request(CONF.jira_oauth_request_token_url,
- "POST")
- except Exception as e:
- logging.error('Connect jira exception: %s', e)
- self._auth_failure('Error: Connection to Jira failed. \
- Please contact an Administrator')
- return
-
- if resp['status'] != '200':
- logging.error('Connect jira error: %s', resp)
- self._auth_failure('Error: Connection to Jira failed. \
- Error code(%s). \
- Please contact an Administrator' % (resp['status']))
- return
-
- # Step 2. Store the request token in a session for later use.
- logging.warning('content is %s', content)
- request_token = dict(parse.parse_qsl(content.decode()))
- self.set_secure_cookie('oauth_token', request_token['oauth_token'])
- self.set_secure_cookie('oauth_token_secret',
- request_token['oauth_token_secret'])
-
- # Step 3. Redirect the user to the authentication URL.
- url = CONF.jira_oauth_authorize_url + '?oauth_token=' + \
- request_token['oauth_token'] + \
- '&oauth_callback=' + CONF.jira_oauth_callback_url
- self.redirect(url=url, permanent=False)
-
- def _auth_failure(self, message):
- params = {'message': message}
- url = parse.urljoin(CONF.ui_url,
- '/#/auth_failure?' + parse.urlencode(params))
- self.redirect(url)
-
-
-class SigninReturnHandler(base.BaseHandler):
- @web.asynchronous
- @gen.coroutine
- def get(self):
- if self.get_query_argument(const.OPENID_MODE) == 'cancel':
- self._auth_failure('Authentication canceled.')
-
- openid = self.get_query_argument(const.OPENID_CLAIMED_ID)
- role = const.DEFAULT_ROLE
- new_user_info = {
- 'openid': openid,
- 'email': self.get_query_argument(const.OPENID_NS_SREG_EMAIL),
- 'fullname': self.get_query_argument(const.OPENID_NS_SREG_FULLNAME),
- const.ROLE: role
- }
- user = yield dbapi.db_find_one(self.table, {'openid': openid})
- if not user:
- dbapi.db_save(self.table, new_user_info)
- else:
- role = user.get(const.ROLE)
-
- self.clear_cookie(const.OPENID)
- self.clear_cookie(const.ROLE)
- self.set_secure_cookie(const.OPENID, openid)
- self.set_secure_cookie(const.ROLE, role)
- self.redirect(url=CONF.ui_url)
-
-
-class SigninReturnCasHandler(base.BaseHandler):
- @web.asynchronous
- @gen.coroutine
- def get(self):
- logging.warning("cas return")
- ticket = self.get_query_argument('ticket')
- logging.warning("ticket:%s", ticket)
- client = CASClient(
- version='2',
- renew=False,
- extra_login_params=False,
- server_url=CONF.lfid_url,
- service_url=CONF.lfid_return_url
- )
- user, attrs, _ = client.verify_ticket(ticket)
- logging.debug("user:%s", user)
- logging.debug("attr:%s", attrs)
- openid = user
- role = const.DEFAULT_ROLE
- new_user_info = {
- 'openid': openid,
- 'email': attrs['mail'],
- 'fullname': attrs['profile_name_full'],
- const.ROLE: role
- }
- user = yield dbapi.db_find_one(self.table, {'openid': openid})
- if not user:
- dbapi.db_save(self.table, new_user_info)
- else:
- role = user.get(const.ROLE)
-
- self.clear_cookie(const.OPENID)
- self.clear_cookie(const.ROLE)
- self.clear_cookie('ticket')
- self.set_secure_cookie(const.OPENID, openid)
- self.set_secure_cookie(const.ROLE, role)
- self.set_secure_cookie('ticket', ticket)
-
- self.redirect("/")
-
-
-class SigninReturnJiraHandler(base.BaseHandler):
- @web.asynchronous
- @gen.coroutine
- def get(self):
- logging.warning("jira return")
- # Step 1. Use the request token in the session to build a new client.
- consumer = oauth.Consumer(CONF.jira_oauth_consumer_key,
- CONF.jira_oauth_consumer_secret)
- token = oauth.Token(self.get_secure_cookie('oauth_token'),
- self.get_secure_cookie('oauth_token_secret'))
- client = oauth.Client(consumer, token)
- client.set_signature_method(SignatureMethod_RSA_SHA1())
-
- # Step 2. Request the authorized access token from Jira.
- try:
- resp, content = client.request(CONF.jira_oauth_access_token_url,
- "POST")
- except Exception as e:
- logging.error("Connect jira exception:%s", e)
- self._auth_failure('Error: Connection to Jira failed. \
- Please contact an Administrator')
- if resp['status'] != '200':
- logging.error("Connect jira error:%s", resp)
- self._auth_failure('Error: Connection to Jira failed. \
- Please contact an Administrator')
- access_token = dict(parse.parse_qsl(content.decode()))
- logging.warning("access_token: %s", access_token)
-
- # jira = JIRA(server=CONF.jira_jira_url, oauth=oauth_dict)
- jira = get_jira(access_token)
- lf_id = jira.current_user()
- logging.warning("lf_id: %s", lf_id)
- user = jira.myself()
- logging.warning("user: %s", user)
- # Step 3. Lookup the user or create them if they don't exist.
- role = const.DEFAULT_ROLE
- new_user_info = {
- 'openid': lf_id,
- 'email': user['emailAddress'],
- 'fullname': user['displayName'],
- const.ROLE: role
- }
- user = yield dbapi.db_find_one(self.table, {'openid': lf_id})
- if not user:
- dbapi.db_save(self.table, new_user_info)
- else:
- role = user.get(const.ROLE)
-
- self.clear_cookie(const.OPENID)
- self.clear_cookie(const.ROLE)
- self.set_secure_cookie(const.OPENID, lf_id)
- self.set_secure_cookie(const.ROLE, role)
- self.redirect(url=CONF.ui_url)
-
- def _auth_failure(self, message):
- params = {'message': message}
- url = parse.urljoin(CONF.ui_url,
- '/#/auth_failure?' + parse.urlencode(params))
- self.redirect(url)
-
-
-class SignoutHandler(base.BaseHandler):
- def get(self):
- """Handle signout request."""
- self.clear_cookie(const.OPENID)
- self.clear_cookie(const.ROLE)
- signin_type = self.get_secure_cookie("signin_type")
- if signin_type == "openstack":
- self.signout_openstack()
- if signin_type == "jira":
- self.signout_jira()
- if signin_type == 'cas':
- self.signout_cas()
-
- def signout_openstack(self):
- params = {'openid_logout': CONF.osid_openid_logout_endpoint}
- url = parse.urljoin(CONF.ui_url,
- '/#/logout?' + parse.urlencode(params))
- self.redirect(url)
-
- def signout_jira(self):
- params = {'alt_token': ''}
- url = parse.urljoin(CONF.jira_jira_url,
- '/logout?' + parse.urlencode(params))
- self.redirect(url)
-
- def signout_cas(self):
- client = CASClient(
- version='2',
- renew=False,
- extra_login_params=False,
- server_url=CONF.lfid_url,
- service_url=CONF.lfid_return_url
- )
- url = client.get_logout_url(CONF.ui_url)
- self.redirect(url)
diff --git a/cvp/opnfv_testapi/ui/auth/user.py b/cvp/opnfv_testapi/ui/auth/user.py
deleted file mode 100644
index a695da45..00000000
--- a/cvp/opnfv_testapi/ui/auth/user.py
+++ /dev/null
@@ -1,35 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Orange
-# guyrodrigue.koffi@orange.com / koffirodrigue@gmail.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-from tornado import gen
-from tornado import web
-
-from opnfv_testapi.common import raises
-from opnfv_testapi.db import api as dbapi
-from opnfv_testapi.ui.auth import base
-
-
-class ProfileHandler(base.BaseHandler):
- @web.asynchronous
- @gen.coroutine
- def get(self):
- openid = self.get_secure_cookie('openid')
- if openid:
- try:
- user = yield dbapi.db_find_one(self.table, {'openid': openid})
- self.finish_request({
- "openid": user.get('openid'),
- "email": user.get('email'),
- "fullname": user.get('fullname'),
- "role": user.get('role', 'user'),
- "type": self.get_secure_cookie('signin_type')
- })
- except Exception:
- pass
- raises.Unauthorized('Unauthorized')
diff --git a/cvp/opnfv_testapi/ui/root.py b/cvp/opnfv_testapi/ui/root.py
deleted file mode 100644
index 5b2c922d..00000000
--- a/cvp/opnfv_testapi/ui/root.py
+++ /dev/null
@@ -1,10 +0,0 @@
-from opnfv_testapi.resources.handlers import GenericApiHandler
-from opnfv_testapi.common.config import CONF
-
-
-class RootHandler(GenericApiHandler):
- def get_template_path(self):
- return CONF.static_path
-
- def get(self):
- self.render('testapi-ui/index.html')