summaryrefslogtreecommitdiffstats
path: root/utils/test
diff options
context:
space:
mode:
Diffstat (limited to 'utils/test')
-rwxr-xr-xutils/test/reporting/functest/reporting-status.py407
-rw-r--r--utils/test/reporting/functest/template/index-status-tmpl.html22
-rw-r--r--utils/test/reporting/reporting.yaml7
-rw-r--r--utils/test/reporting/utils/reporting_utils.py39
-rw-r--r--utils/test/testapi/opnfv_testapi/common/check.py111
-rw-r--r--utils/test/testapi/opnfv_testapi/resources/handlers.py96
-rw-r--r--utils/test/testapi/opnfv_testapi/resources/pod_handlers.py20
-rw-r--r--utils/test/testapi/opnfv_testapi/resources/project_handlers.py21
-rw-r--r--utils/test/testapi/opnfv_testapi/resources/result_handlers.py43
-rw-r--r--utils/test/testapi/opnfv_testapi/resources/scenario_handlers.py29
-rw-r--r--utils/test/testapi/opnfv_testapi/resources/testcase_handlers.py42
-rw-r--r--utils/test/testapi/opnfv_testapi/resources/testcase_models.py15
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/executor.py83
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_base.py2
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py2
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py50
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_project.py118
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_result.py210
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py2
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py125
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_token.py78
-rw-r--r--utils/test/testapi/opnfv_testapi/tests/unit/test_version.py11
22 files changed, 846 insertions, 687 deletions
diff --git a/utils/test/reporting/functest/reporting-status.py b/utils/test/reporting/functest/reporting-status.py
index af1d1d8a5..94e7f2f3e 100755
--- a/utils/test/reporting/functest/reporting-status.py
+++ b/utils/test/reporting/functest/reporting-status.py
@@ -9,10 +9,8 @@
import datetime
import jinja2
import os
-import requests
import sys
import time
-import yaml
import testCase as tc
import scenarioResult as sr
@@ -43,9 +41,7 @@ log_level = rp_utils.get_config('general.log.log_level')
exclude_noha = rp_utils.get_config('functest.exclude_noha')
exclude_virtual = rp_utils.get_config('functest.exclude_virtual')
-response = requests.get(cf)
-
-functest_yaml_config = yaml.safe_load(response.text)
+functest_yaml_config = rp_utils.getFunctestConfig()
logger.info("*******************************************")
logger.info("* *")
@@ -69,128 +65,117 @@ config_tiers = functest_yaml_config.get("tiers")
for tier in config_tiers:
if tier['order'] >= 0 and tier['order'] < 2:
for case in tier['testcases']:
- if case['name'] not in blacklist:
- testValid.append(tc.TestCase(case['name'],
+ if case['case_name'] not in blacklist:
+ testValid.append(tc.TestCase(case['case_name'],
"functest",
case['dependencies']))
elif tier['order'] == 2:
for case in tier['testcases']:
- if case['name'] not in blacklist:
- testValid.append(tc.TestCase(case['name'],
- case['name'],
+ if case['case_name'] not in blacklist:
+ testValid.append(tc.TestCase(case['case_name'],
+ case['case_name'],
case['dependencies']))
elif tier['order'] > 2:
for case in tier['testcases']:
- if case['name'] not in blacklist:
- otherTestCases.append(tc.TestCase(case['name'],
+ if case['case_name'] not in blacklist:
+ otherTestCases.append(tc.TestCase(case['case_name'],
"functest",
case['dependencies']))
logger.debug("Functest reporting start")
+
# For all the versions
for version in versions:
# For all the installers
+ scenario_directory = "./display/" + version + "/functest/"
+ scenario_file_name = scenario_directory + "scenario_history.txt"
+
+ # check that the directory exists, if not create it
+ # (first run on new version)
+ if not os.path.exists(scenario_directory):
+ os.makedirs(scenario_directory)
+
+ # initiate scenario file if it does not exist
+ if not os.path.isfile(scenario_file_name):
+ with open(scenario_file_name, "a") as my_file:
+ logger.debug("Create scenario file: %s" % scenario_file_name)
+ my_file.write("date,scenario,installer,detail,score\n")
+
for installer in installers:
+
# get scenarios
scenario_results = rp_utils.getScenarios(healthcheck,
installer,
version)
- scenario_stats = rp_utils.getScenarioStats(scenario_results)
- items = {}
- scenario_result_criteria = {}
- scenario_directory = "./display/" + version + "/functest/"
- scenario_file_name = scenario_directory + "scenario_history.txt"
-
- # check that the directory exists, if not create it
- # (first run on new version)
- if not os.path.exists(scenario_directory):
- os.makedirs(scenario_directory)
-
- # initiate scenario file if it does not exist
- if not os.path.isfile(scenario_file_name):
- with open(scenario_file_name, "a") as my_file:
- logger.debug("Create scenario file: %s" % scenario_file_name)
- my_file.write("date,scenario,installer,detail,score\n")
-
- # For all the scenarios get results
- for s, s_result in scenario_results.items():
- logger.info("---------------------------------")
- logger.info("installer %s, version %s, scenario %s:" %
- (installer, version, s))
- logger.debug("Scenario results: %s" % s_result)
-
- # Green or Red light for a given scenario
- nb_test_runnable_for_this_scenario = 0
- scenario_score = 0
- # url of the last jenkins log corresponding to a given
- # scenario
- s_url = ""
- if len(s_result) > 0:
- build_tag = s_result[len(s_result)-1]['build_tag']
- logger.debug("Build tag: %s" % build_tag)
- s_url = rp_utils.getJenkinsUrl(build_tag)
- if s_url is None:
- s_url = "http://testresultS.opnfv.org/reporting"
- logger.info("last jenkins url: %s" % s_url)
- testCases2BeDisplayed = []
- # Check if test case is runnable / installer, scenario
- # for the test case used for Scenario validation
- try:
- # 1) Manage the test cases for the scenario validation
- # concretely Tiers 0-3
- for test_case in testValid:
- test_case.checkRunnable(installer, s,
- test_case.getConstraints())
- logger.debug("testcase %s (%s) is %s" %
- (test_case.getDisplayName(),
- test_case.getName(),
- test_case.isRunnable))
- time.sleep(1)
- if test_case.isRunnable:
- dbName = test_case.getDbName()
- name = test_case.getName()
- displayName = test_case.getDisplayName()
- project = test_case.getProject()
- nb_test_runnable_for_this_scenario += 1
- logger.info(" Searching results for case %s " %
- (displayName))
- result = rp_utils.getResult(dbName, installer,
- s, version)
- # if no result set the value to 0
- if result < 0:
- result = 0
- logger.info(" >>>> Test score = " + str(result))
- test_case.setCriteria(result)
- test_case.setIsRunnable(True)
- testCases2BeDisplayed.append(tc.TestCase(name,
- project,
- "",
- result,
- True,
- 1))
- scenario_score = scenario_score + result
-
- # 2) Manage the test cases for the scenario qualification
- # concretely Tiers > 3
- for test_case in otherTestCases:
- test_case.checkRunnable(installer, s,
- test_case.getConstraints())
- logger.debug("testcase %s (%s) is %s" %
- (test_case.getDisplayName(),
- test_case.getName(),
- test_case.isRunnable))
- time.sleep(1)
- if test_case.isRunnable:
- dbName = test_case.getDbName()
- name = test_case.getName()
- displayName = test_case.getDisplayName()
- project = test_case.getProject()
- logger.info(" Searching results for case %s " %
- (displayName))
- result = rp_utils.getResult(dbName, installer,
- s, version)
- # at least 1 result for the test
- if result > -1:
+
+ # get nb of supported architecture (x86, aarch64)
+ architectures = rp_utils.getArchitectures(scenario_results)
+ logger.info("Supported architectures: {}".format(architectures))
+
+ for architecture in architectures:
+ logger.info("architecture: {}".format(architecture))
+ # Consider only the results for the selected architecture
+ # i.e drop x86 for aarch64 and vice versa
+ filter_results = rp_utils.filterArchitecture(scenario_results,
+ architecture)
+ scenario_stats = rp_utils.getScenarioStats(filter_results)
+ items = {}
+ scenario_result_criteria = {}
+
+ # in case of more than 1 architecture supported
+ # precise the architecture
+ installer_display = installer
+ if (len(architectures) > 1):
+ installer_display = installer + "@" + architecture
+
+ # For all the scenarios get results
+ for s, s_result in filter_results.items():
+ logger.info("---------------------------------")
+ logger.info("installer %s, version %s, scenario %s:" %
+ (installer, version, s))
+ logger.debug("Scenario results: %s" % s_result)
+
+ # Green or Red light for a given scenario
+ nb_test_runnable_for_this_scenario = 0
+ scenario_score = 0
+ # url of the last jenkins log corresponding to a given
+ # scenario
+ s_url = ""
+ if len(s_result) > 0:
+ build_tag = s_result[len(s_result)-1]['build_tag']
+ logger.debug("Build tag: %s" % build_tag)
+ s_url = rp_utils.getJenkinsUrl(build_tag)
+ if s_url is None:
+ s_url = "http://testresultS.opnfv.org/reporting"
+ logger.info("last jenkins url: %s" % s_url)
+ testCases2BeDisplayed = []
+ # Check if test case is runnable / installer, scenario
+ # for the test case used for Scenario validation
+ try:
+ # 1) Manage the test cases for the scenario validation
+ # concretely Tiers 0-3
+ for test_case in testValid:
+ test_case.checkRunnable(installer, s,
+ test_case.getConstraints())
+ logger.debug("testcase %s (%s) is %s" %
+ (test_case.getDisplayName(),
+ test_case.getName(),
+ test_case.isRunnable))
+ time.sleep(1)
+ if test_case.isRunnable:
+ dbName = test_case.getDbName()
+ name = test_case.getName()
+ displayName = test_case.getDisplayName()
+ project = test_case.getProject()
+ nb_test_runnable_for_this_scenario += 1
+ logger.info(" Searching results for case %s " %
+ (displayName))
+ result = rp_utils.getResult(dbName, installer,
+ s, version)
+ # if no result set the value to 0
+ if result < 0:
+ result = 0
+ logger.info(" >>>> Test score = " + str(result))
test_case.setCriteria(result)
test_case.setIsRunnable(True)
testCases2BeDisplayed.append(tc.TestCase(name,
@@ -198,91 +183,127 @@ for version in versions:
"",
result,
True,
- 4))
- else:
- logger.debug("No results found")
-
- items[s] = testCases2BeDisplayed
- except:
- logger.error("Error: installer %s, version %s, scenario %s" %
- (installer, version, s))
- logger.error("No data available: %s " % (sys.exc_info()[0]))
-
- # **********************************************
- # Evaluate the results for scenario validation
- # **********************************************
- # the validation criteria = nb runnable tests x 3
- # because each test case = 0,1,2 or 3
- scenario_criteria = nb_test_runnable_for_this_scenario * 3
- # if 0 runnable tests set criteria at a high value
- if scenario_criteria < 1:
- scenario_criteria = 50 # conf.MAX_SCENARIO_CRITERIA
-
- s_score = str(scenario_score) + "/" + str(scenario_criteria)
- s_score_percent = rp_utils.getScenarioPercent(scenario_score,
- scenario_criteria)
-
- s_status = "KO"
- if scenario_score < scenario_criteria:
- logger.info(">>>> scenario not OK, score = %s/%s" %
- (scenario_score, scenario_criteria))
+ 1))
+ scenario_score = scenario_score + result
+
+ # 2) Manage the test cases for the scenario qualification
+ # concretely Tiers > 3
+ for test_case in otherTestCases:
+ test_case.checkRunnable(installer, s,
+ test_case.getConstraints())
+ logger.debug("testcase %s (%s) is %s" %
+ (test_case.getDisplayName(),
+ test_case.getName(),
+ test_case.isRunnable))
+ time.sleep(1)
+ if test_case.isRunnable:
+ dbName = test_case.getDbName()
+ name = test_case.getName()
+ displayName = test_case.getDisplayName()
+ project = test_case.getProject()
+ logger.info(" Searching results for case %s " %
+ (displayName))
+ result = rp_utils.getResult(dbName, installer,
+ s, version)
+ # at least 1 result for the test
+ if result > -1:
+ test_case.setCriteria(result)
+ test_case.setIsRunnable(True)
+ testCases2BeDisplayed.append(tc.TestCase(
+ name,
+ project,
+ "",
+ result,
+ True,
+ 4))
+ else:
+ logger.debug("No results found")
+
+ items[s] = testCases2BeDisplayed
+ except:
+ logger.error("Error: installer %s, version %s, scenario %s"
+ % (installer, version, s))
+ logger.error("No data available: %s" % (sys.exc_info()[0]))
+
+ # **********************************************
+ # Evaluate the results for scenario validation
+ # **********************************************
+ # the validation criteria = nb runnable tests x 3
+ # because each test case = 0,1,2 or 3
+ scenario_criteria = nb_test_runnable_for_this_scenario * 3
+ # if 0 runnable tests set criteria at a high value
+ if scenario_criteria < 1:
+ scenario_criteria = 50 # conf.MAX_SCENARIO_CRITERIA
+
+ s_score = str(scenario_score) + "/" + str(scenario_criteria)
+ s_score_percent = rp_utils.getScenarioPercent(
+ scenario_score,
+ scenario_criteria)
+
s_status = "KO"
- else:
- logger.info(">>>>> scenario OK, save the information")
- s_status = "OK"
- path_validation_file = ("./display/" + version +
- "/functest/" +
- "validated_scenario_history.txt")
- with open(path_validation_file, "a") as f:
- time_format = "%Y-%m-%d %H:%M"
- info = (datetime.datetime.now().strftime(time_format) +
- ";" + installer + ";" + s + "\n")
+ if scenario_score < scenario_criteria:
+ logger.info(">>>> scenario not OK, score = %s/%s" %
+ (scenario_score, scenario_criteria))
+ s_status = "KO"
+ else:
+ logger.info(">>>>> scenario OK, save the information")
+ s_status = "OK"
+ path_validation_file = ("./display/" + version +
+ "/functest/" +
+ "validated_scenario_history.txt")
+ with open(path_validation_file, "a") as f:
+ time_format = "%Y-%m-%d %H:%M"
+ info = (datetime.datetime.now().strftime(time_format) +
+ ";" + installer_display + ";" + s + "\n")
+ f.write(info)
+
+ # Save daily results in a file
+ with open(scenario_file_name, "a") as f:
+ info = (reportingDate + "," + s + "," + installer_display +
+ "," + s_score + "," +
+ str(round(s_score_percent)) + "\n")
f.write(info)
- # Save daily results in a file
- with open(scenario_file_name, "a") as f:
- info = (reportingDate + "," + s + "," + installer +
- "," + s_score + "," +
- str(round(s_score_percent)) + "\n")
- f.write(info)
-
- scenario_result_criteria[s] = sr.ScenarioResult(s_status,
- s_score,
- s_score_percent,
- s_url)
- logger.info("--------------------------")
-
- templateLoader = jinja2.FileSystemLoader(".")
- templateEnv = jinja2.Environment(
- loader=templateLoader, autoescape=True)
-
- TEMPLATE_FILE = "./functest/template/index-status-tmpl.html"
- template = templateEnv.get_template(TEMPLATE_FILE)
-
- outputText = template.render(scenario_stats=scenario_stats,
- scenario_results=scenario_result_criteria,
- items=items,
- installer=installer,
- period=period,
- version=version,
- date=reportingDate)
-
- with open("./display/" + version +
- "/functest/status-" + installer + ".html", "wb") as fh:
- fh.write(outputText)
-
- logger.info("Manage export CSV & PDF")
- rp_utils.export_csv(scenario_file_name, installer, version)
- logger.error("CSV generated...")
-
- # Generate outputs for export
- # pdf
- # TODO Change once web site updated...use the current one
- # to test pdf production
- url_pdf = rp_utils.get_config('general.url')
- pdf_path = ("./display/" + version +
- "/functest/status-" + installer + ".html")
- pdf_doc_name = ("./display/" + version +
- "/functest/status-" + installer + ".pdf")
- rp_utils.export_pdf(pdf_path, pdf_doc_name)
- logger.info("PDF generated...")
+ scenario_result_criteria[s] = sr.ScenarioResult(
+ s_status,
+ s_score,
+ s_score_percent,
+ s_url)
+ logger.info("--------------------------")
+
+ templateLoader = jinja2.FileSystemLoader(".")
+ templateEnv = jinja2.Environment(
+ loader=templateLoader, autoescape=True)
+
+ TEMPLATE_FILE = "./functest/template/index-status-tmpl.html"
+ template = templateEnv.get_template(TEMPLATE_FILE)
+
+ outputText = template.render(
+ scenario_stats=scenario_stats,
+ scenario_results=scenario_result_criteria,
+ items=items,
+ installer=installer_display,
+ period=period,
+ version=version,
+ date=reportingDate)
+
+ with open("./display/" + version +
+ "/functest/status-" +
+ installer_display + ".html", "wb") as fh:
+ fh.write(outputText)
+
+ logger.info("Manage export CSV & PDF")
+ rp_utils.export_csv(scenario_file_name, installer_display, version)
+ logger.error("CSV generated...")
+
+ # Generate outputs for export
+ # pdf
+ # TODO Change once web site updated...use the current one
+ # to test pdf production
+ url_pdf = rp_utils.get_config('general.url')
+ pdf_path = ("./display/" + version +
+ "/functest/status-" + installer_display + ".html")
+ pdf_doc_name = ("./display/" + version +
+ "/functest/status-" + installer_display + ".pdf")
+ rp_utils.export_pdf(pdf_path, pdf_doc_name)
+ logger.info("PDF generated...")
diff --git a/utils/test/reporting/functest/template/index-status-tmpl.html b/utils/test/reporting/functest/template/index-status-tmpl.html
index 52046c37f..ebacfd159 100644
--- a/utils/test/reporting/functest/template/index-status-tmpl.html
+++ b/utils/test/reporting/functest/template/index-status-tmpl.html
@@ -15,27 +15,27 @@
{% for scenario in scenario_stats.iteritems() -%}
var gaugeScenario{{loop.index}} = gauge('#gaugeScenario{{loop.index}}');
{%- endfor %}
-
+
// assign success rate to the gauge
function updateReadings() {
{% for scenario,iteration in scenario_stats.iteritems() -%}
gaugeScenario{{loop.index}}.update({{scenario_results[scenario].getScorePercent()}});
{%- endfor %}
}
- updateReadings();
+ updateReadings();
}
-
+
// trend line management
- d3.csv("./scenario_history.csv", function(data) {
+ d3.csv("./scenario_history.txt", function(data) {
// ***************************************
// Create the trend line
{% for scenario,iteration in scenario_stats.iteritems() -%}
- // for scenario {{scenario}}
+ // for scenario {{scenario}}
// Filter results
- var trend{{loop.index}} = data.filter(function(row) {
+ var trend{{loop.index}} = data.filter(function(row) {
return row["scenario"]=="{{scenario}}" && row["installer"]=="{{installer}}";
})
- // Parse the date
+ // Parse the date
trend{{loop.index}}.forEach(function(d) {
d.date = parseDate(d.date);
d.score = +d.score
@@ -44,7 +44,7 @@
var mytrend = trend("#trend_svg{{loop.index}}",trend{{loop.index}})
// ****************************************
{%- endfor %}
- });
+ });
if ( !window.isLoaded ) {
window.addEventListener("load", function() {
onDocumentReady();
@@ -61,7 +61,7 @@ $(document).ready(function (){
});
})
</script>
-
+
</head>
<body>
<div class="container">
@@ -72,8 +72,8 @@ $(document).ready(function (){
<li class="active"><a href="../../index.html">Home</a></li>
<li><a href="status-apex.html">Apex</a></li>
<li><a href="status-compass.html">Compass</a></li>
- <li><a href="status-daisy.html">Daisy</a></li>
- <li><a href="status-fuel.html">Fuel</a></li>
+ <li><a href="status-fuel@x86.html">fuel@x86</a></li>
+ <li><a href="status-fuel@aarch64.html">fuel@aarch64</a></li>
<li><a href="status-joid.html">Joid</a></li>
</ul>
</nav>
diff --git a/utils/test/reporting/reporting.yaml b/utils/test/reporting/reporting.yaml
index 6af87c121..1692f481d 100644
--- a/utils/test/reporting/reporting.yaml
+++ b/utils/test/reporting/reporting.yaml
@@ -3,7 +3,6 @@ general:
installers:
- apex
- compass
- - daisy
- fuel
- joid
@@ -37,7 +36,6 @@ functest:
blacklist:
- ovno
- security_scan
- - rally_sanity
- healthcheck
- odl_netvirt
- aaa
@@ -45,13 +43,12 @@ functest:
- orchestra_ims
- juju_epc
- orchestra
- - promise
max_scenario_criteria: 50
test_conf: https://git.opnfv.org/cgit/functest/plain/functest/ci/testcases.yaml
log_level: ERROR
jenkins_url: https://build.opnfv.org/ci/view/functest/job/
- exclude_noha: "False"
- exclude_virtual: "False"
+ exclude_noha: False
+ exclude_virtual: False
yardstick:
test_conf: https://git.opnfv.org/cgit/yardstick/plain/tests/ci/report_config.yaml
diff --git a/utils/test/reporting/utils/reporting_utils.py b/utils/test/reporting/utils/reporting_utils.py
index 0eae59163..599a93818 100644
--- a/utils/test/reporting/utils/reporting_utils.py
+++ b/utils/test/reporting/utils/reporting_utils.py
@@ -10,6 +10,7 @@ from urllib2 import Request, urlopen, URLError
import logging
import json
import os
+import requests
import pdfkit
import yaml
@@ -328,6 +329,44 @@ def getScenarioPercent(scenario_score, scenario_criteria):
# *********
+# Functest
+# *********
+def getFunctestConfig(version=""):
+ config_file = get_config('functest.test_conf') + version
+ response = requests.get(config_file)
+ return yaml.safe_load(response.text)
+
+
+def getArchitectures(scenario_results):
+ supported_arch = ['x86']
+ if (len(scenario_results) > 0):
+ for scenario_result in scenario_results.values():
+ for value in scenario_result:
+ if ("armband" in value['build_tag']):
+ supported_arch.append('aarch64')
+ return supported_arch
+ return supported_arch
+
+
+def filterArchitecture(results, architecture):
+ filtered_results = {}
+ for name, results in results.items():
+ filtered_values = []
+ for value in results:
+ if (architecture is "x86"):
+ # drop aarch64 results
+ if ("armband" not in value['build_tag']):
+ filtered_values.append(value)
+ elif(architecture is "aarch64"):
+ # drop x86 results
+ if ("armband" in value['build_tag']):
+ filtered_values.append(value)
+ if (len(filtered_values) > 0):
+ filtered_results[name] = filtered_values
+ return filtered_results
+
+
+# *********
# Yardstick
# *********
def subfind(given_list, pattern_list):
diff --git a/utils/test/testapi/opnfv_testapi/common/check.py b/utils/test/testapi/opnfv_testapi/common/check.py
new file mode 100644
index 000000000..be4b1df12
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/common/check.py
@@ -0,0 +1,111 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corp
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import functools
+
+from tornado import web, gen
+
+from opnfv_testapi.common import raises, message
+
+
+def authenticate(method):
+ @web.asynchronous
+ @gen.coroutine
+ @functools.wraps(method)
+ def wrapper(self, *args, **kwargs):
+ if self.auth:
+ try:
+ token = self.request.headers['X-Auth-Token']
+ except KeyError:
+ raises.Unauthorized(message.unauthorized())
+ query = {'access_token': token}
+ check = yield self._eval_db_find_one(query, 'tokens')
+ if not check:
+ raises.Forbidden(message.invalid_token())
+ ret = yield gen.coroutine(method)(self, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrapper
+
+
+def not_exist(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ query = kwargs.get('query')
+ data = yield self._eval_db_find_one(query)
+ if not data:
+ raises.NotFound(message.not_found(self.table, query))
+ ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
+ raise gen.Return(ret)
+
+ return wrap
+
+
+def no_body(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ if self.json_args is None:
+ raises.BadRequest(message.no_body())
+ ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
+ raise gen.Return(ret)
+
+ return wrap
+
+
+def miss_fields(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ fields = kwargs.get('miss_fields')
+ if fields:
+ for miss in fields:
+ miss_data = self.json_args.get(miss)
+ if miss_data is None or miss_data == '':
+ raises.BadRequest(message.missing(miss))
+ ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrap
+
+
+def carriers_exist(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ carriers = kwargs.get('carriers')
+ if carriers:
+ for table, query in carriers:
+ exist = yield self._eval_db_find_one(query(), table)
+ if not exist:
+ raises.Forbidden(message.not_found(table, query()))
+ ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrap
+
+
+def new_not_exists(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ query = kwargs.get('query')
+ if query:
+ to_data = yield self._eval_db_find_one(query())
+ if to_data:
+ raises.Forbidden(message.exist(self.table, query()))
+ ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrap
+
+
+def updated_one_not_exist(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, data, *args, **kwargs):
+ db_keys = kwargs.get('db_keys')
+ query = self._update_query(db_keys, data)
+ if query:
+ to_data = yield self._eval_db_find_one(query)
+ if to_data:
+ raises.Forbidden(message.exist(self.table, query))
+ ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrap
diff --git a/utils/test/testapi/opnfv_testapi/resources/handlers.py b/utils/test/testapi/opnfv_testapi/resources/handlers.py
index 522bbe7f5..955fbbef7 100644
--- a/utils/test/testapi/opnfv_testapi/resources/handlers.py
+++ b/utils/test/testapi/opnfv_testapi/resources/handlers.py
@@ -21,13 +21,13 @@
##############################################################################
from datetime import datetime
-import functools
import json
from tornado import gen
from tornado import web
import models
+from opnfv_testapi.common import check
from opnfv_testapi.common import message
from opnfv_testapi.common import raises
from opnfv_testapi.tornado_swagger import swagger
@@ -73,48 +73,20 @@ class GenericApiHandler(web.RequestHandler):
cls_data = self.table_cls.from_dict(data)
return cls_data.format_http()
- def authenticate(method):
- @web.asynchronous
- @gen.coroutine
- @functools.wraps(method)
- def wrapper(self, *args, **kwargs):
- if self.auth:
- try:
- token = self.request.headers['X-Auth-Token']
- except KeyError:
- raises.Unauthorized(message.unauthorized())
- query = {'access_token': token}
- check = yield self._eval_db_find_one(query, 'tokens')
- if not check:
- raises.Forbidden(message.invalid_token())
- ret = yield gen.coroutine(method)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrapper
-
- @authenticate
- def _create(self, miss_checks, db_checks, **kwargs):
+ @check.authenticate
+ @check.no_body
+ @check.miss_fields
+ @check.carriers_exist
+ @check.new_not_exists
+ def _create(self, **kwargs):
"""
:param miss_checks: [miss1, miss2]
:param db_checks: [(table, exist, query, error)]
"""
- if self.json_args is None:
- raises.BadRequest(message.no_body())
-
data = self.table_cls.from_dict(self.json_args)
- for miss in miss_checks:
- miss_data = data.__getattribute__(miss)
- if miss_data is None or miss_data == '':
- raises.BadRequest(message.missing(miss))
-
for k, v in kwargs.iteritems():
data.__setattr__(k, v)
- for table, exist, query, error in db_checks:
- check = yield self._eval_db_find_one(query(data), table)
- if (exist and not check) or (not exist and check):
- code, msg = error(data)
- raises.CodeTBD(code, msg)
-
if self.table != 'results':
data.creation_date = datetime.now()
_id = yield self._eval_db(self.table, 'insert', data.format(),
@@ -146,47 +118,27 @@ class GenericApiHandler(web.RequestHandler):
@web.asynchronous
@gen.coroutine
- def _get_one(self, query):
- data = yield self._eval_db_find_one(query)
- if data is None:
- raises.NotFound(message.not_found(self.table, query))
+ @check.not_exist
+ def _get_one(self, data, query=None):
self.finish_request(self.format_data(data))
- @authenticate
- def _delete(self, query):
- data = yield self._eval_db_find_one(query)
- if data is None:
- raises.NotFound(message.not_found(self.table, query))
-
+ @check.authenticate
+ @check.not_exist
+ def _delete(self, data, query=None):
yield self._eval_db(self.table, 'remove', query)
self.finish_request()
- @authenticate
- def _update(self, query, db_keys):
- if self.json_args is None:
- raises.BadRequest(message.no_body())
-
- # check old data exist
- from_data = yield self._eval_db_find_one(query)
- if from_data is None:
- raises.NotFound(message.not_found(self.table, query))
-
- data = self.table_cls.from_dict(from_data)
- # check new data exist
- equal, new_query = self._update_query(db_keys, data)
- if not equal:
- to_data = yield self._eval_db_find_one(new_query)
- if to_data is not None:
- raises.Forbidden(message.exist(self.table, new_query))
-
- # we merge the whole document """
- edit_request = self._update_requests(data)
-
- """ Updating the DB """
- yield self._eval_db(self.table, 'update', query, edit_request,
+ @check.authenticate
+ @check.no_body
+ @check.not_exist
+ @check.updated_one_not_exist
+ def _update(self, data, query=None, **kwargs):
+ data = self.table_cls.from_dict(data)
+ update_req = self._update_requests(data)
+ yield self._eval_db(self.table, 'update', query, update_req,
check_keys=False)
- edit_request['_id'] = str(data._id)
- self.finish_request(edit_request)
+ update_req['_id'] = str(data._id)
+ self.finish_request(update_req)
def _update_requests(self, data):
request = dict()
@@ -219,13 +171,13 @@ class GenericApiHandler(web.RequestHandler):
equal = True
for key in keys:
new = self.json_args.get(key)
- old = data.__getattribute__(key)
+ old = data.get(key)
if new is None:
new = old
elif new != old:
equal = False
query[key] = new
- return equal, query
+ return query if not equal else dict()
def _eval_db(self, table, method, *args, **kwargs):
exec_collection = self.db.__getattr__(table)
diff --git a/utils/test/testapi/opnfv_testapi/resources/pod_handlers.py b/utils/test/testapi/opnfv_testapi/resources/pod_handlers.py
index 2c303c934..e21841d33 100644
--- a/utils/test/testapi/opnfv_testapi/resources/pod_handlers.py
+++ b/utils/test/testapi/opnfv_testapi/resources/pod_handlers.py
@@ -6,10 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import httplib
-
import handlers
-from opnfv_testapi.common import message
from opnfv_testapi.tornado_swagger import swagger
import pod_models
@@ -43,15 +40,10 @@ class PodCLHandler(GenericPodHandler):
@raise 403: pod already exists
@raise 400: body or name not provided
"""
- def query(data):
- return {'name': data.name}
-
- def error(data):
- return httplib.FORBIDDEN, message.exist('pod', data.name)
-
- miss_checks = ['name']
- db_checks = [(self.table, False, query, error)]
- self._create(miss_checks, db_checks)
+ def query():
+ return {'name': self.json_args.get('name')}
+ miss_fields = ['name']
+ self._create(miss_fields=miss_fields, query=query)
class PodGURHandler(GenericPodHandler):
@@ -63,9 +55,7 @@ class PodGURHandler(GenericPodHandler):
@return 200: pod exist
@raise 404: pod not exist
"""
- query = dict()
- query['name'] = pod_name
- self._get_one(query)
+ self._get_one(query={'name': pod_name})
def delete(self, pod_name):
""" Remove a POD
diff --git a/utils/test/testapi/opnfv_testapi/resources/project_handlers.py b/utils/test/testapi/opnfv_testapi/resources/project_handlers.py
index 59e0b88e5..d79cd3b61 100644
--- a/utils/test/testapi/opnfv_testapi/resources/project_handlers.py
+++ b/utils/test/testapi/opnfv_testapi/resources/project_handlers.py
@@ -6,10 +6,8 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import httplib
import handlers
-from opnfv_testapi.common import message
from opnfv_testapi.tornado_swagger import swagger
import project_models
@@ -45,15 +43,10 @@ class ProjectCLHandler(GenericProjectHandler):
@raise 403: project already exists
@raise 400: body or name not provided
"""
- def query(data):
- return {'name': data.name}
-
- def error(data):
- return httplib.FORBIDDEN, message.exist('project', data.name)
-
- miss_checks = ['name']
- db_checks = [(self.table, False, query, error)]
- self._create(miss_checks, db_checks)
+ def query():
+ return {'name': self.json_args.get('name')}
+ miss_fields = ['name']
+ self._create(miss_fields=miss_fields, query=query)
class ProjectGURHandler(GenericProjectHandler):
@@ -65,7 +58,7 @@ class ProjectGURHandler(GenericProjectHandler):
@return 200: project exist
@raise 404: project not exist
"""
- self._get_one({'name': project_name})
+ self._get_one(query={'name': project_name})
@swagger.operation(nickname="updateProjectByName")
def put(self, project_name):
@@ -81,7 +74,7 @@ class ProjectGURHandler(GenericProjectHandler):
"""
query = {'name': project_name}
db_keys = ['name']
- self._update(query, db_keys)
+ self._update(query=query, db_keys=db_keys)
@swagger.operation(nickname='deleteProjectByName')
def delete(self, project_name):
@@ -90,4 +83,4 @@ class ProjectGURHandler(GenericProjectHandler):
@return 200: delete success
@raise 404: project not exist
"""
- self._delete({'name': project_name})
+ self._delete(query={'name': project_name})
diff --git a/utils/test/testapi/opnfv_testapi/resources/result_handlers.py b/utils/test/testapi/opnfv_testapi/resources/result_handlers.py
index fb5ed9ec7..214706f5f 100644
--- a/utils/test/testapi/opnfv_testapi/resources/result_handlers.py
+++ b/utils/test/testapi/opnfv_testapi/resources/result_handlers.py
@@ -8,7 +8,6 @@
##############################################################################
from datetime import datetime
from datetime import timedelta
-import httplib
from bson import objectid
@@ -127,7 +126,9 @@ class ResultsCLHandler(GenericResultHandler):
if last is not None:
last = self.get_int('last', last)
- self._list(self.set_query(), sort=[('start_date', -1)], last=last)
+ self._list(query=self.set_query(),
+ sort=[('start_date', -1)],
+ last=last)
@swagger.operation(nickname="createTestResult")
def post(self):
@@ -141,31 +142,21 @@ class ResultsCLHandler(GenericResultHandler):
@raise 404: pod/project/testcase not exist
@raise 400: body/pod_name/project_name/case_name not provided
"""
- def pod_query(data):
- return {'name': data.pod_name}
+ def pod_query():
+ return {'name': self.json_args.get('pod_name')}
- def pod_error(data):
- return httplib.FORBIDDEN, message.not_found('pod', data.pod_name)
+ def project_query():
+ return {'name': self.json_args.get('project_name')}
- def project_query(data):
- return {'name': data.project_name}
+ def testcase_query():
+ return {'project_name': self.json_args.get('project_name'),
+ 'name': self.json_args.get('case_name')}
- def project_error(data):
- return httplib.FORBIDDEN, message.not_found('project',
- data.project_name)
-
- def testcase_query(data):
- return {'project_name': data.project_name, 'name': data.case_name}
-
- def testcase_error(data):
- return httplib.FORBIDDEN, message.not_found('testcase',
- data.case_name)
-
- miss_checks = ['pod_name', 'project_name', 'case_name']
- db_checks = [('pods', True, pod_query, pod_error),
- ('projects', True, project_query, project_error),
- ('testcases', True, testcase_query, testcase_error)]
- self._create(miss_checks, db_checks)
+ miss_fields = ['pod_name', 'project_name', 'case_name']
+ carriers = [('pods', pod_query),
+ ('projects', project_query),
+ ('testcases', testcase_query)]
+ self._create(miss_fields=miss_fields, carriers=carriers)
class ResultsGURHandler(GenericResultHandler):
@@ -179,7 +170,7 @@ class ResultsGURHandler(GenericResultHandler):
"""
query = dict()
query["_id"] = objectid.ObjectId(result_id)
- self._get_one(query)
+ self._get_one(query=query)
@swagger.operation(nickname="updateTestResultById")
def put(self, result_id):
@@ -195,4 +186,4 @@ class ResultsGURHandler(GenericResultHandler):
"""
query = {'_id': objectid.ObjectId(result_id)}
db_keys = []
- self._update(query, db_keys)
+ self._update(query=query, db_keys=db_keys)
diff --git a/utils/test/testapi/opnfv_testapi/resources/scenario_handlers.py b/utils/test/testapi/opnfv_testapi/resources/scenario_handlers.py
index bad79fdc6..5d420a56e 100644
--- a/utils/test/testapi/opnfv_testapi/resources/scenario_handlers.py
+++ b/utils/test/testapi/opnfv_testapi/resources/scenario_handlers.py
@@ -1,5 +1,4 @@
import functools
-import httplib
from opnfv_testapi.common import message
from opnfv_testapi.common import raises
@@ -65,7 +64,7 @@ class ScenariosCLHandler(GenericScenarioHandler):
query['installers'] = {'$elemMatch': elem_query}
return query
- self._list(_set_query())
+ self._list(query=_set_query())
@swagger.operation(nickname="createScenario")
def post(self):
@@ -79,15 +78,10 @@ class ScenariosCLHandler(GenericScenarioHandler):
@raise 403: scenario already exists
@raise 400: body or name not provided
"""
- def query(data):
- return {'name': data.name}
-
- def error(data):
- return httplib.FORBIDDEN, message.exist('scenario', data.name)
-
- miss_checks = ['name']
- db_checks = [(self.table, False, query, error)]
- self._create(miss_checks=miss_checks, db_checks=db_checks)
+ def query():
+ return {'name': self.json_args.get('name')}
+ miss_fields = ['name']
+ self._create(miss_fields=miss_fields, query=query)
class ScenarioGURHandler(GenericScenarioHandler):
@@ -99,7 +93,7 @@ class ScenarioGURHandler(GenericScenarioHandler):
@return 200: scenario exist
@raise 404: scenario not exist
"""
- self._get_one({'name': name})
+ self._get_one(query={'name': name})
pass
@swagger.operation(nickname="updateScenarioByName")
@@ -116,7 +110,7 @@ class ScenarioGURHandler(GenericScenarioHandler):
"""
query = {'name': name}
db_keys = ['name']
- self._update(query, db_keys)
+ self._update(query=query, db_keys=db_keys)
@swagger.operation(nickname="deleteScenarioByName")
def delete(self, name):
@@ -126,19 +120,16 @@ class ScenarioGURHandler(GenericScenarioHandler):
@raise 404: scenario not exist:
"""
- query = {'name': name}
- self._delete(query)
+ self._delete(query={'name': name})
def _update_query(self, keys, data):
query = dict()
- equal = True
if self._is_rename():
new = self._term.get('name')
- if data.name != new:
- equal = False
+ if data.get('name') != new:
query['name'] = new
- return equal, query
+ return query
def _update_requests(self, data):
updates = {
diff --git a/utils/test/testapi/opnfv_testapi/resources/testcase_handlers.py b/utils/test/testapi/opnfv_testapi/resources/testcase_handlers.py
index bc22b74e2..9399326f0 100644
--- a/utils/test/testapi/opnfv_testapi/resources/testcase_handlers.py
+++ b/utils/test/testapi/opnfv_testapi/resources/testcase_handlers.py
@@ -6,9 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import httplib
-from opnfv_testapi.common import message
from opnfv_testapi.resources import handlers
from opnfv_testapi.resources import testcase_models
from opnfv_testapi.tornado_swagger import swagger
@@ -32,9 +30,7 @@ class TestcaseCLHandler(GenericTestcaseHandler):
empty list is no testcase exist in this project
@rtype: L{TestCases}
"""
- query = dict()
- query['project_name'] = project_name
- self._list(query)
+ self._list(query={'project_name': project_name})
@swagger.operation(nickname="createTestCase")
def post(self, project_name):
@@ -49,26 +45,18 @@ class TestcaseCLHandler(GenericTestcaseHandler):
or testcase already exists in this project
@raise 400: body or name not provided
"""
- def p_query(data):
- return {'name': data.project_name}
-
- def tc_query(data):
- return {
- 'project_name': data.project_name,
- 'name': data.name
- }
-
- def p_error(data):
- return httplib.FORBIDDEN, message.not_found('project',
- data.project_name)
-
- def tc_error(data):
- return httplib.FORBIDDEN, message.exist('testcase', data.name)
+ def project_query():
+ return {'name': project_name}
- miss_checks = ['name']
- db_checks = [(self.db_projects, True, p_query, p_error),
- (self.db_testcases, False, tc_query, tc_error)]
- self._create(miss_checks, db_checks, project_name=project_name)
+ def testcase_query():
+ return {'project_name': project_name,
+ 'name': self.json_args.get('name')}
+ miss_fields = ['name']
+ carriers = [(self.db_projects, project_query)]
+ self._create(miss_fields=miss_fields,
+ carriers=carriers,
+ query=testcase_query,
+ project_name=project_name)
class TestcaseGURHandler(GenericTestcaseHandler):
@@ -84,7 +72,7 @@ class TestcaseGURHandler(GenericTestcaseHandler):
query = dict()
query['project_name'] = project_name
query["name"] = case_name
- self._get_one(query)
+ self._get_one(query=query)
@swagger.operation(nickname="updateTestCaseByName")
def put(self, project_name, case_name):
@@ -102,7 +90,7 @@ class TestcaseGURHandler(GenericTestcaseHandler):
"""
query = {'project_name': project_name, 'name': case_name}
db_keys = ['name', 'project_name']
- self._update(query, db_keys)
+ self._update(query=query, db_keys=db_keys)
@swagger.operation(nickname='deleteTestCaseByName')
def delete(self, project_name, case_name):
@@ -112,4 +100,4 @@ class TestcaseGURHandler(GenericTestcaseHandler):
@raise 404: testcase not exist
"""
query = {'project_name': project_name, 'name': case_name}
- self._delete(query)
+ self._delete(query=query)
diff --git a/utils/test/testapi/opnfv_testapi/resources/testcase_models.py b/utils/test/testapi/opnfv_testapi/resources/testcase_models.py
index 8cc3c6c6a..9b8f4b5ff 100644
--- a/utils/test/testapi/opnfv_testapi/resources/testcase_models.py
+++ b/utils/test/testapi/opnfv_testapi/resources/testcase_models.py
@@ -13,12 +13,13 @@ from opnfv_testapi.tornado_swagger import swagger
@swagger.model()
class TestcaseCreateRequest(models.ModelBase):
def __init__(self, name, url=None, description=None,
- tier=None, ci_loop=None, criteria=None,
- blocking=None, dependencies=None, run=None,
+ catalog_description=None, tier=None, ci_loop=None,
+ criteria=None, blocking=None, dependencies=None, run=None,
domains=None, tags=None, version=None):
self.name = name
self.url = url
self.description = description
+ self.catalog_description = catalog_description
self.tier = tier
self.ci_loop = ci_loop
self.criteria = criteria
@@ -34,11 +35,12 @@ class TestcaseCreateRequest(models.ModelBase):
@swagger.model()
class TestcaseUpdateRequest(models.ModelBase):
def __init__(self, name=None, description=None, project_name=None,
- tier=None, ci_loop=None, criteria=None,
- blocking=None, dependencies=None, run=None,
+ catalog_description=None, tier=None, ci_loop=None,
+ criteria=None, blocking=None, dependencies=None, run=None,
domains=None, tags=None, version=None, trust=None):
self.name = name
self.description = description
+ self.catalog_description = catalog_description
self.project_name = project_name
self.tier = tier
self.ci_loop = ci_loop
@@ -56,14 +58,15 @@ class TestcaseUpdateRequest(models.ModelBase):
class Testcase(models.ModelBase):
def __init__(self, _id=None, name=None, project_name=None,
description=None, url=None, creation_date=None,
- tier=None, ci_loop=None, criteria=None,
- blocking=None, dependencies=None, run=None,
+ catalog_description=None, tier=None, ci_loop=None,
+ criteria=None, blocking=None, dependencies=None, run=None,
domains=None, tags=None, version=None,
trust=None):
self._id = None
self.name = None
self.project_name = None
self.description = None
+ self.catalog_description = None
self.url = None
self.creation_date = None
self.tier = None
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/executor.py b/utils/test/testapi/opnfv_testapi/tests/unit/executor.py
new file mode 100644
index 000000000..b30c3258b
--- /dev/null
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/executor.py
@@ -0,0 +1,83 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corp
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import functools
+import httplib
+
+
+def create(excepted_status, excepted_response):
+ def _create(create_request):
+ @functools.wraps(create_request)
+ def wrap(self):
+ request = create_request(self)
+ status, body = self.create(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _create
+
+
+def get(excepted_status, excepted_response):
+ def _get(get_request):
+ @functools.wraps(get_request)
+ def wrap(self):
+ request = get_request(self)
+ status, body = self.get(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _get
+
+
+def update(excepted_status, excepted_response):
+ def _update(update_request):
+ @functools.wraps(update_request)
+ def wrap(self):
+ request, resource = update_request(self)
+ status, body = self.update(request, resource)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(request, body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _update
+
+
+def delete(excepted_status, excepted_response):
+ def _delete(delete_request):
+ @functools.wraps(delete_request)
+ def wrap(self):
+ request = delete_request(self)
+ if isinstance(request, tuple):
+ status, body = self.delete(request[0], *(request[1]))
+ else:
+ status, body = self.delete(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _delete
+
+
+def query(excepted_status, excepted_response, number=0):
+ def _query(get_request):
+ @functools.wraps(get_request)
+ def wrap(self):
+ request = get_request(self)
+ status, body = self.query(request)
+ if excepted_status == httplib.OK:
+ getattr(self, excepted_response)(body, number)
+ else:
+ self.assertIn(excepted_response, body)
+ return wrap
+ return _query
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py
index b955f4a5a..a6e733914 100644
--- a/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_base.py
@@ -12,9 +12,9 @@ from os import path
import mock
from tornado import testing
-import fake_pymongo
from opnfv_testapi.cmd import server
from opnfv_testapi.resources import models
+from opnfv_testapi.tests.unit import fake_pymongo
class TestBase(testing.AsyncHTTPTestCase):
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py
index 7c43fca62..1ebc96f3b 100644
--- a/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_fake_pymongo.py
@@ -12,7 +12,7 @@ from tornado import gen
from tornado import testing
from tornado import web
-import fake_pymongo
+from opnfv_testapi.tests.unit import fake_pymongo
class MyTest(testing.AsyncHTTPTestCase):
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py
index cae86e8bb..0ed348df9 100644
--- a/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_pod.py
@@ -11,7 +11,8 @@ import unittest
from opnfv_testapi.common import message
from opnfv_testapi.resources import pod_models
-import test_base as base
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import test_base as base
class TestPodBase(base.TestBase):
@@ -36,48 +37,47 @@ class TestPodBase(base.TestBase):
class TestPodCreate(TestPodBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
def test_withoutBody(self):
- (code, body) = self.create()
- self.assertEqual(code, httplib.BAD_REQUEST)
+ return None
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
def test_emptyName(self):
- req_empty = pod_models.PodCreateRequest('')
- (code, body) = self.create(req_empty)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
+ return pod_models.PodCreateRequest('')
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
def test_noneName(self):
- req_none = pod_models.PodCreateRequest(None)
- (code, body) = self.create(req_none)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
+ return pod_models.PodCreateRequest(None)
+ @executor.create(httplib.OK, 'assert_create_body')
def test_success(self):
- code, body = self.create_d()
- self.assertEqual(code, httplib.OK)
- self.assert_create_body(body)
+ return self.req_d
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
def test_alreadyExist(self):
self.create_d()
- code, body = self.create_d()
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.exist_base, body)
+ return self.req_d
class TestPodGet(TestPodBase):
+ def setUp(self):
+ super(TestPodGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
def test_notExist(self):
- code, body = self.get('notExist')
- self.assertEqual(code, httplib.NOT_FOUND)
+ return 'notExist'
+ @executor.get(httplib.OK, 'assert_get_body')
def test_getOne(self):
- self.create_d()
- code, body = self.get(self.req_d.name)
- self.assert_get_body(body)
+ return self.req_d.name
+ @executor.get(httplib.OK, '_assert_list')
def test_list(self):
- self.create_d()
- self.create_e()
- code, body = self.get()
+ return None
+
+ def _assert_list(self, body):
self.assertEqual(len(body.pods), 2)
for pod in body.pods:
if self.req_d.name == pod.name:
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py
index 74cefd711..323a1168f 100644
--- a/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_project.py
@@ -1,17 +1,10 @@
-##############################################################################
-# Copyright (c) 2016 ZTE Corporation
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
import httplib
import unittest
from opnfv_testapi.common import message
from opnfv_testapi.resources import project_models
-import test_base as base
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import test_base as base
class TestProjectBase(base.TestBase):
@@ -36,49 +29,47 @@ class TestProjectBase(base.TestBase):
class TestProjectCreate(TestProjectBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
def test_withoutBody(self):
- (code, body) = self.create()
- self.assertEqual(code, httplib.BAD_REQUEST)
+ return None
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
def test_emptyName(self):
- req_empty = project_models.ProjectCreateRequest('')
- (code, body) = self.create(req_empty)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
+ return project_models.ProjectCreateRequest('')
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
def test_noneName(self):
- req_none = project_models.ProjectCreateRequest(None)
- (code, body) = self.create(req_none)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
+ return project_models.ProjectCreateRequest(None)
+ @executor.create(httplib.OK, 'assert_create_body')
def test_success(self):
- (code, body) = self.create_d()
- self.assertEqual(code, httplib.OK)
- self.assert_create_body(body)
+ return self.req_d
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
def test_alreadyExist(self):
self.create_d()
- (code, body) = self.create_d()
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.exist_base, body)
+ return self.req_d
class TestProjectGet(TestProjectBase):
+ def setUp(self):
+ super(TestProjectGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
def test_notExist(self):
- code, body = self.get('notExist')
- self.assertEqual(code, httplib.NOT_FOUND)
+ return 'notExist'
+ @executor.get(httplib.OK, 'assert_body')
def test_getOne(self):
- self.create_d()
- code, body = self.get(self.req_d.name)
- self.assertEqual(code, httplib.OK)
- self.assert_body(body)
+ return self.req_d.name
+ @executor.get(httplib.OK, '_assert_list')
def test_list(self):
- self.create_d()
- self.create_e()
- code, body = self.get()
+ return None
+
+ def _assert_list(self, body):
for project in body.projects:
if self.req_d.name == project.name:
self.assert_body(project)
@@ -87,54 +78,57 @@ class TestProjectGet(TestProjectBase):
class TestProjectUpdate(TestProjectBase):
+ def setUp(self):
+ super(TestProjectUpdate, self).setUp()
+ _, d_body = self.create_d()
+ _, get_res = self.get(self.req_d.name)
+ self.index_d = get_res._id
+ self.create_e()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
def test_withoutBody(self):
- code, _ = self.update(None, 'noBody')
- self.assertEqual(code, httplib.BAD_REQUEST)
+ return None, 'noBody'
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
def test_notFound(self):
- code, _ = self.update(self.req_e, 'notFound')
- self.assertEqual(code, httplib.NOT_FOUND)
+ return self.req_e, 'notFound'
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
def test_newNameExist(self):
- self.create_d()
- self.create_e()
- code, body = self.update(self.req_e, self.req_d.name)
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.exist_base, body)
+ return self.req_e, self.req_d.name
+ @executor.update(httplib.FORBIDDEN, message.no_update())
def test_noUpdate(self):
- self.create_d()
- code, body = self.update(self.req_d, self.req_d.name)
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.no_update(), body)
+ return self.req_d, self.req_d.name
+ @executor.update(httplib.OK, '_assert_update')
def test_success(self):
- self.create_d()
- code, body = self.get(self.req_d.name)
- _id = body._id
-
req = project_models.ProjectUpdateRequest('newName', 'new description')
- code, body = self.update(req, self.req_d.name)
- self.assertEqual(code, httplib.OK)
- self.assertEqual(_id, body._id)
- self.assert_body(body, req)
+ return req, self.req_d.name
+ def _assert_update(self, req, body):
+ self.assertEqual(self.index_d, body._id)
+ self.assert_body(body, req)
_, new_body = self.get(req.name)
- self.assertEqual(_id, new_body._id)
+ self.assertEqual(self.index_d, new_body._id)
self.assert_body(new_body, req)
class TestProjectDelete(TestProjectBase):
+ def setUp(self):
+ super(TestProjectDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
def test_notFound(self):
- code, body = self.delete('notFound')
- self.assertEqual(code, httplib.NOT_FOUND)
+ return 'notFound'
+ @executor.delete(httplib.OK, '_assert_delete')
def test_success(self):
- self.create_d()
- code, body = self.delete(self.req_d.name)
- self.assertEqual(code, httplib.OK)
- self.assertEqual(body, '')
+ return self.req_d.name
+ def _assert_delete(self, body):
+ self.assertEqual(body, '')
code, body = self.get(self.req_d.name)
self.assertEqual(code, httplib.NOT_FOUND)
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py
index 2e0aa3685..ef2ce307e 100644
--- a/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_result.py
@@ -16,7 +16,8 @@ from opnfv_testapi.resources import pod_models
from opnfv_testapi.resources import project_models
from opnfv_testapi.resources import result_models
from opnfv_testapi.resources import testcase_models
-import test_base as base
+from opnfv_testapi.tests.unit import test_base as base
+from opnfv_testapi.tests.unit import executor
class Details(object):
@@ -99,8 +100,7 @@ class TestResultBase(base.TestBase):
self.req_testcase,
self.project)
- def assert_res(self, code, result, req=None):
- self.assertEqual(code, httplib.OK)
+ def assert_res(self, result, req=None):
if req is None:
req = self.req_d
self.assertEqual(result.pod_name, req.pod_name)
@@ -133,65 +133,57 @@ class TestResultBase(base.TestBase):
class TestResultCreate(TestResultBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
def test_nobody(self):
- (code, body) = self.create(None)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.no_body(), body)
+ return None
+ @executor.create(httplib.BAD_REQUEST, message.missing('pod_name'))
def test_podNotProvided(self):
req = self.req_d
req.pod_name = None
- (code, body) = self.create(req)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('pod_name'), body)
+ return req
+ @executor.create(httplib.BAD_REQUEST, message.missing('project_name'))
def test_projectNotProvided(self):
req = self.req_d
req.project_name = None
- (code, body) = self.create(req)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('project_name'), body)
+ return req
+ @executor.create(httplib.BAD_REQUEST, message.missing('case_name'))
def test_testcaseNotProvided(self):
req = self.req_d
req.case_name = None
- (code, body) = self.create(req)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('case_name'), body)
+ return req
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
def test_noPod(self):
req = self.req_d
req.pod_name = 'notExistPod'
- (code, body) = self.create(req)
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.not_found_base, body)
+ return req
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
def test_noProject(self):
req = self.req_d
req.project_name = 'notExistProject'
- (code, body) = self.create(req)
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.not_found_base, body)
+ return req
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
def test_noTestcase(self):
req = self.req_d
req.case_name = 'notExistTestcase'
- (code, body) = self.create(req)
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.not_found_base, body)
+ return req
+ @executor.create(httplib.OK, 'assert_href')
def test_success(self):
- (code, body) = self.create_d()
- self.assertEqual(code, httplib.OK)
- self.assert_href(body)
+ return self.req_d
+ @executor.create(httplib.OK, 'assert_href')
def test_key_with_doc(self):
req = copy.deepcopy(self.req_d)
req.details = {'1.name': 'dot_name'}
- (code, body) = self.create(req)
- self.assertEqual(code, httplib.OK)
- self.assert_href(body)
+ return req
+ @executor.create(httplib.OK, '_assert_no_ti')
def test_no_ti(self):
req = result_models.ResultCreateRequest(pod_name=self.pod,
project_name=self.project,
@@ -204,106 +196,110 @@ class TestResultCreate(TestResultBase):
build_tag=self.build_tag,
scenario=self.scenario,
criteria=self.criteria)
- (code, res) = self.create(req)
- _id = res.href.split('/')[-1]
- self.assertEqual(code, httplib.OK)
+ self.actual_req = req
+ return req
+
+ def _assert_no_ti(self, body):
+ _id = body.href.split('/')[-1]
code, body = self.get(_id)
- self.assert_res(code, body, req)
+ self.assert_res(body, self.actual_req)
class TestResultGet(TestResultBase):
+ def setUp(self):
+ super(TestResultGet, self).setUp()
+ self.req_d_id = self._create_d()
+ self.req_10d_later = self._create_changed_date(days=10)
+ self.req_10d_before = self._create_changed_date(days=-10)
+
+ @executor.get(httplib.OK, 'assert_res')
def test_getOne(self):
- _id = self._create_d()
- code, body = self.get(_id)
- self.assert_res(code, body)
+ return self.req_d_id
+ @executor.query(httplib.OK, '_query_success', 3)
def test_queryPod(self):
- self._query_and_assert(self._set_query('pod'))
+ return self._set_query('pod')
+ @executor.query(httplib.OK, '_query_success', 3)
def test_queryProject(self):
- self._query_and_assert(self._set_query('project'))
+ return self._set_query('project')
+ @executor.query(httplib.OK, '_query_success', 3)
def test_queryTestcase(self):
- self._query_and_assert(self._set_query('case'))
+ return self._set_query('case')
+ @executor.query(httplib.OK, '_query_success', 3)
def test_queryVersion(self):
- self._query_and_assert(self._set_query('version'))
+ return self._set_query('version')
+ @executor.query(httplib.OK, '_query_success', 3)
def test_queryInstaller(self):
- self._query_and_assert(self._set_query('installer'))
+ return self._set_query('installer')
+ @executor.query(httplib.OK, '_query_success', 3)
def test_queryBuildTag(self):
- self._query_and_assert(self._set_query('build_tag'))
+ return self._set_query('build_tag')
+ @executor.query(httplib.OK, '_query_success', 3)
def test_queryScenario(self):
- self._query_and_assert(self._set_query('scenario'))
+ return self._set_query('scenario')
+ @executor.query(httplib.OK, '_query_success', 3)
def test_queryTrustIndicator(self):
- self._query_and_assert(self._set_query('trust_indicator'))
+ return self._set_query('trust_indicator')
+ @executor.query(httplib.OK, '_query_success', 3)
def test_queryCriteria(self):
- self._query_and_assert(self._set_query('criteria'))
+ return self._set_query('criteria')
+ @executor.query(httplib.BAD_REQUEST, message.must_int('period'))
def test_queryPeriodNotInt(self):
- code, body = self.query(self._set_query('period=a'))
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn('period must be int', body)
-
- def test_queryPeriodFail(self):
- self._query_and_assert(self._set_query('period=1'),
- found=False, days=-10)
+ return self._set_query('period=a')
+ @executor.query(httplib.OK, '_query_last_one', 1)
def test_queryPeriodSuccess(self):
- self._query_and_assert(self._set_query('period=1'),
- found=True)
+ return self._set_query('period=1')
+ @executor.query(httplib.BAD_REQUEST, message.must_int('last'))
def test_queryLastNotInt(self):
- code, body = self.query(self._set_query('last=a'))
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn('last must be int', body)
+ return self._set_query('last=a')
+ @executor.query(httplib.OK, '_query_last_one', 1)
def test_queryLast(self):
- self._create_changed_date()
- req = self._create_changed_date(minutes=20)
- self._create_changed_date(minutes=-20)
- self._query_and_assert(self._set_query('last=1'), req=req)
+ return self._set_query('last=1')
+ @executor.query(httplib.OK, '_query_last_one', 1)
def test_combination(self):
- self._query_and_assert(self._set_query('pod',
- 'project',
- 'case',
- 'version',
- 'installer',
- 'build_tag',
- 'scenario',
- 'trust_indicator',
- 'criteria',
- 'period=1'))
-
+ return self._set_query('pod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1')
+
+ @executor.query(httplib.OK, '_query_success', 0)
def test_notFound(self):
- self._query_and_assert(self._set_query('pod=notExistPod',
- 'project',
- 'case',
- 'version',
- 'installer',
- 'build_tag',
- 'scenario',
- 'trust_indicator',
- 'criteria',
- 'period=1'),
- found=False)
-
- def _query_and_assert(self, query, found=True, req=None, **kwargs):
- if req is None:
- req = self._create_changed_date(**kwargs)
- code, body = self.query(query)
- if not found:
- self.assertEqual(code, httplib.OK)
- self.assertEqual(0, len(body.results))
- else:
- self.assertEqual(1, len(body.results))
- for result in body.results:
- self.assert_res(code, result, req)
+ return self._set_query('pod=notExistPod',
+ 'project',
+ 'case',
+ 'version',
+ 'installer',
+ 'build_tag',
+ 'scenario',
+ 'trust_indicator',
+ 'criteria',
+ 'period=1')
+
+ def _query_success(self, body, number):
+ self.assertEqual(number, len(body.results))
+
+ def _query_last_one(self, body, number):
+ self.assertEqual(number, len(body.results))
+ self.assert_res(body.results[0], self.req_10d_later)
def _create_changed_date(self, **kwargs):
req = copy.deepcopy(self.req_d)
@@ -327,9 +323,12 @@ class TestResultGet(TestResultBase):
class TestResultUpdate(TestResultBase):
- def test_success(self):
- _id = self._create_d()
+ def setUp(self):
+ super(TestResultUpdate, self).setUp()
+ self.req_d_id = self._create_d()
+ @executor.update(httplib.OK, '_assert_update_ti')
+ def test_success(self):
new_ti = copy.deepcopy(self.trust_indicator)
new_ti.current += self.update_step
new_ti.histories.append(
@@ -337,13 +336,16 @@ class TestResultUpdate(TestResultBase):
new_data = copy.deepcopy(self.req_d)
new_data.trust_indicator = new_ti
update = result_models.ResultUpdateRequest(trust_indicator=new_ti)
- code, body = self.update(update, _id)
- self.assertEqual(_id, body._id)
- self.assert_res(code, body, new_data)
+ self.update_req = new_data
+ return update, self.req_d_id
- code, new_body = self.get(_id)
- self.assertEqual(_id, new_body._id)
- self.assert_res(code, new_body, new_data)
+ def _assert_update_ti(self, request, body):
+ ti = body.trust_indicator
+ self.assertEqual(ti.current, request.trust_indicator.current)
+ if ti.histories:
+ history = ti.histories[0]
+ self.assertEqual(history.date, self.update_date)
+ self.assertEqual(history.step, self.update_step)
if __name__ == '__main__':
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py
index f2291a566..b232bc168 100644
--- a/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_scenario.py
@@ -7,7 +7,7 @@ import os
from opnfv_testapi.common import message
import opnfv_testapi.resources.scenario_models as models
-import test_base as base
+from opnfv_testapi.tests.unit import test_base as base
class TestScenarioBase(base.TestBase):
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py
index 62d0fa043..e28eaf5b8 100644
--- a/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_testcase.py
@@ -13,7 +13,8 @@ import unittest
from opnfv_testapi.common import message
from opnfv_testapi.resources import project_models
from opnfv_testapi.resources import testcase_models
-import test_base as base
+from opnfv_testapi.tests.unit import test_base as base
+from opnfv_testapi.tests.unit import executor
class TestCaseBase(base.TestBase):
@@ -70,6 +71,9 @@ class TestCaseBase(base.TestBase):
def get(self, case=None):
return super(TestCaseBase, self).get(self.project, case)
+ def create(self, req=None, *args):
+ return super(TestCaseBase, self).create(req, self.project)
+
def update(self, new=None, case=None):
return super(TestCaseBase, self).update(new, self.project, case)
@@ -78,54 +82,57 @@ class TestCaseBase(base.TestBase):
class TestCaseCreate(TestCaseBase):
+ @executor.create(httplib.BAD_REQUEST, message.no_body())
def test_noBody(self):
- (code, body) = self.create(None, 'vping')
- self.assertEqual(code, httplib.BAD_REQUEST)
+ return None
+ @executor.create(httplib.FORBIDDEN, message.not_found_base)
def test_noProject(self):
- code, body = self.create(self.req_d, 'noProject')
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.not_found_base, body)
+ self.project = 'noProject'
+ return self.req_d
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
def test_emptyName(self):
req_empty = testcase_models.TestcaseCreateRequest('')
- (code, body) = self.create(req_empty, self.project)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
+ return req_empty
+ @executor.create(httplib.BAD_REQUEST, message.missing('name'))
def test_noneName(self):
req_none = testcase_models.TestcaseCreateRequest(None)
- (code, body) = self.create(req_none, self.project)
- self.assertEqual(code, httplib.BAD_REQUEST)
- self.assertIn(message.missing('name'), body)
+ return req_none
+ @executor.create(httplib.OK, '_assert_success')
def test_success(self):
- code, body = self.create_d()
- self.assertEqual(code, httplib.OK)
- self.assert_create_body(body, None, self.project)
+ return self.req_d
+
+ def _assert_success(self, body):
+ self.assert_create_body(body, self.req_d, self.project)
+ @executor.create(httplib.FORBIDDEN, message.exist_base)
def test_alreadyExist(self):
self.create_d()
- code, body = self.create_d()
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.exist_base, body)
+ return self.req_d
class TestCaseGet(TestCaseBase):
+ def setUp(self):
+ super(TestCaseGet, self).setUp()
+ self.create_d()
+ self.create_e()
+
+ @executor.get(httplib.NOT_FOUND, message.not_found_base)
def test_notExist(self):
- code, body = self.get('notExist')
- self.assertEqual(code, httplib.NOT_FOUND)
+ return 'notExist'
+ @executor.get(httplib.OK, 'assert_body')
def test_getOne(self):
- self.create_d()
- code, body = self.get(self.req_d.name)
- self.assertEqual(code, httplib.OK)
- self.assert_body(body)
+ return self.req_d.name
+ @executor.get(httplib.OK, '_list')
def test_list(self):
- self.create_d()
- self.create_e()
- code, body = self.get()
+ return None
+
+ def _list(self, body):
for case in body.testcases:
if self.req_d.name == case.name:
self.assert_body(case)
@@ -134,60 +141,58 @@ class TestCaseGet(TestCaseBase):
class TestCaseUpdate(TestCaseBase):
+ def setUp(self):
+ super(TestCaseUpdate, self).setUp()
+ self.create_d()
+
+ @executor.update(httplib.BAD_REQUEST, message.no_body())
def test_noBody(self):
- code, _ = self.update(case='noBody')
- self.assertEqual(code, httplib.BAD_REQUEST)
+ return None, 'noBody'
+ @executor.update(httplib.NOT_FOUND, message.not_found_base)
def test_notFound(self):
- code, _ = self.update(self.update_e, 'notFound')
- self.assertEqual(code, httplib.NOT_FOUND)
+ return self.update_e, 'notFound'
+ @executor.update(httplib.FORBIDDEN, message.exist_base)
def test_newNameExist(self):
- self.create_d()
self.create_e()
- code, body = self.update(self.update_e, self.req_d.name)
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.exist_base, body)
+ return self.update_e, self.req_d.name
+ @executor.update(httplib.FORBIDDEN, message.no_update())
def test_noUpdate(self):
- self.create_d()
- code, body = self.update(self.update_d, self.req_d.name)
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.no_update(), body)
+ return self.update_d, self.req_d.name
+ @executor.update(httplib.OK, '_update_success')
def test_success(self):
- self.create_d()
- code, body = self.get(self.req_d.name)
- _id = body._id
-
- code, body = self.update(self.update_e, self.req_d.name)
- self.assertEqual(code, httplib.OK)
- self.assertEqual(_id, body._id)
- self.assert_update_body(self.req_d, body, self.update_e)
-
- _, new_body = self.get(self.req_e.name)
- self.assertEqual(_id, new_body._id)
- self.assert_update_body(self.req_d, new_body, self.update_e)
+ return self.update_e, self.req_d.name
+ @executor.update(httplib.OK, '_update_success')
def test_with_dollar(self):
- self.create_d()
update = copy.deepcopy(self.update_d)
update.description = {'2. change': 'dollar change'}
- code, body = self.update(update, self.req_d.name)
- self.assertEqual(code, httplib.OK)
+ return update, self.req_d.name
+
+ def _update_success(self, request, body):
+ self.assert_update_body(self.req_d, body, request)
+ _, new_body = self.get(request.name)
+ self.assert_update_body(self.req_d, new_body, request)
class TestCaseDelete(TestCaseBase):
+ def setUp(self):
+ super(TestCaseDelete, self).setUp()
+ self.create_d()
+
+ @executor.delete(httplib.NOT_FOUND, message.not_found_base)
def test_notFound(self):
- code, body = self.delete('notFound')
- self.assertEqual(code, httplib.NOT_FOUND)
+ return 'notFound'
+ @executor.delete(httplib.OK, '_delete_success')
def test_success(self):
- self.create_d()
- code, body = self.delete(self.req_d.name)
- self.assertEqual(code, httplib.OK)
- self.assertEqual(body, '')
+ return self.req_d.name
+ def _delete_success(self, body):
+ self.assertEqual(body, '')
code, body = self.get(self.req_d.name)
self.assertEqual(code, httplib.NOT_FOUND)
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_token.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_token.py
index ed3eda0f7..ca247a3b7 100644
--- a/utils/test/testapi/opnfv_testapi/tests/unit/test_token.py
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_token.py
@@ -8,11 +8,12 @@ import unittest
from tornado import web
-import fake_pymongo
from opnfv_testapi.common import message
from opnfv_testapi.resources import project_models
from opnfv_testapi.router import url_mappings
-import test_base as base
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import fake_pymongo
+from opnfv_testapi.tests.unit import test_base as base
class TestToken(base.TestBase):
@@ -32,22 +33,24 @@ class TestTokenCreateProject(TestToken):
fake_pymongo.tokens.insert({"access_token": "12345"})
self.basePath = '/api/v1/projects'
+ @executor.create(httplib.FORBIDDEN, message.invalid_token())
def test_projectCreateTokenInvalid(self):
self.headers['X-Auth-Token'] = '1234'
- code, body = self.create_d()
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.invalid_token(), body)
+ return self.req_d
+ @executor.create(httplib.UNAUTHORIZED, message.unauthorized())
def test_projectCreateTokenUnauthorized(self):
- self.headers.pop('X-Auth-Token')
- code, body = self.create_d()
- self.assertEqual(code, httplib.UNAUTHORIZED)
- self.assertIn(message.unauthorized(), body)
+ if 'X-Auth-Token' in self.headers:
+ self.headers.pop('X-Auth-Token')
+ return self.req_d
+ @executor.create(httplib.OK, '_create_success')
def test_projectCreateTokenSuccess(self):
self.headers['X-Auth-Token'] = '12345'
- code, body = self.create_d()
- self.assertEqual(code, httplib.OK)
+ return self.req_d
+
+ def _create_success(self, body):
+ self.assertIn('CreateResponse', str(type(body)))
class TestTokenDeleteProject(TestToken):
@@ -56,28 +59,25 @@ class TestTokenDeleteProject(TestToken):
self.req_d = project_models.ProjectCreateRequest('vping')
fake_pymongo.tokens.insert({"access_token": "12345"})
self.basePath = '/api/v1/projects'
-
- def test_projectDeleteTokenIvalid(self):
self.headers['X-Auth-Token'] = '12345'
self.create_d()
+
+ @executor.delete(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectDeleteTokenIvalid(self):
self.headers['X-Auth-Token'] = '1234'
- code, body = self.delete(self.req_d.name)
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.invalid_token(), body)
+ return self.req_d.name
+ @executor.delete(httplib.UNAUTHORIZED, message.unauthorized())
def test_projectDeleteTokenUnauthorized(self):
- self.headers['X-Auth-Token'] = '12345'
- self.create_d()
self.headers.pop('X-Auth-Token')
- code, body = self.delete(self.req_d.name)
- self.assertEqual(code, httplib.UNAUTHORIZED)
- self.assertIn(message.unauthorized(), body)
+ return self.req_d.name
+ @executor.delete(httplib.OK, '_delete_success')
def test_projectDeleteTokenSuccess(self):
- self.headers['X-Auth-Token'] = '12345'
- self.create_d()
- code, body = self.delete(self.req_d.name)
- self.assertEqual(code, httplib.OK)
+ return self.req_d.name
+
+ def _delete_success(self, body):
+ self.assertEqual('', body)
class TestTokenUpdateProject(TestToken):
@@ -86,34 +86,28 @@ class TestTokenUpdateProject(TestToken):
self.req_d = project_models.ProjectCreateRequest('vping')
fake_pymongo.tokens.insert({"access_token": "12345"})
self.basePath = '/api/v1/projects'
-
- def test_projectUpdateTokenIvalid(self):
self.headers['X-Auth-Token'] = '12345'
self.create_d()
- code, body = self.get(self.req_d.name)
+
+ @executor.update(httplib.FORBIDDEN, message.invalid_token())
+ def test_projectUpdateTokenIvalid(self):
self.headers['X-Auth-Token'] = '1234'
req = project_models.ProjectUpdateRequest('newName', 'new description')
- code, body = self.update(req, self.req_d.name)
- self.assertEqual(code, httplib.FORBIDDEN)
- self.assertIn(message.invalid_token(), body)
+ return req, self.req_d.name
+ @executor.update(httplib.UNAUTHORIZED, message.unauthorized())
def test_projectUpdateTokenUnauthorized(self):
- self.headers['X-Auth-Token'] = '12345'
- self.create_d()
- code, body = self.get(self.req_d.name)
self.headers.pop('X-Auth-Token')
req = project_models.ProjectUpdateRequest('newName', 'new description')
- code, body = self.update(req, self.req_d.name)
- self.assertEqual(code, httplib.UNAUTHORIZED)
- self.assertIn(message.unauthorized(), body)
+ return req, self.req_d.name
+ @executor.update(httplib.OK, '_update_success')
def test_projectUpdateTokenSuccess(self):
- self.headers['X-Auth-Token'] = '12345'
- self.create_d()
- code, body = self.get(self.req_d.name)
req = project_models.ProjectUpdateRequest('newName', 'new description')
- code, body = self.update(req, self.req_d.name)
- self.assertEqual(code, httplib.OK)
+ return req, self.req_d.name
+
+ def _update_success(self, request, body):
+ self.assertIn(request.name, body)
if __name__ == '__main__':
unittest.main()
diff --git a/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py b/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py
index c8f3f5062..fff802ac8 100644
--- a/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py
+++ b/utils/test/testapi/opnfv_testapi/tests/unit/test_version.py
@@ -6,10 +6,12 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import httplib
import unittest
from opnfv_testapi.resources import models
-import test_base as base
+from opnfv_testapi.tests.unit import executor
+from opnfv_testapi.tests.unit import test_base as base
class TestVersionBase(base.TestBase):
@@ -20,12 +22,15 @@ class TestVersionBase(base.TestBase):
class TestVersion(TestVersionBase):
+ @executor.get(httplib.OK, '_get_success')
def test_success(self):
- code, body = self.get()
- self.assertEqual(200, code)
+ return None
+
+ def _get_success(self, body):
self.assertEqual(len(body.versions), 1)
self.assertEqual(body.versions[0].version, 'v1.0')
self.assertEqual(body.versions[0].description, 'basics')
+
if __name__ == '__main__':
unittest.main()