aboutsummaryrefslogtreecommitdiffstats
path: root/moon_manager/tests/func_tests/features/steps/pdp.py
diff options
context:
space:
mode:
Diffstat (limited to 'moon_manager/tests/func_tests/features/steps/pdp.py')
-rw-r--r--moon_manager/tests/func_tests/features/steps/pdp.py248
1 files changed, 248 insertions, 0 deletions
diff --git a/moon_manager/tests/func_tests/features/steps/pdp.py b/moon_manager/tests/func_tests/features/steps/pdp.py
new file mode 100644
index 00000000..bf839658
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/pdp.py
@@ -0,0 +1,248 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table
+from common_functions import *
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing pdps in the system
+# 2) Loop by id and delete them
+@Given('the system has no pdps')
+def step_impl(context):
+ logger.info("Given the system has no pdps")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI, headers=apis_urls.auth_headers)
+ pdpjason=apis_urls.pdpAPI+"s"
+ if len(response.json()[pdpjason]) != 0:
+ for ids in dict(response.json()[pdpjason]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.pdpAPI + "/" + ids,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Get model id by calling the common funtion: get_policyid
+# 2) create the pdp data jason then post it
+@Given('the following pdp exists')
+def step_impl(context):
+ logger.info("Given the following pdp exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "pdp name: '" + row["pdpname"] + "' pdp description: '" + row[
+ "pdpdescription"] + "' and keystone project:'" + row[
+ "keystone_project_id"] + "' and security pipeline '" + row['security_pipeline'] + "'")
+ policies_list = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['security_pipeline']) > 25):
+ policies_list = row['security_pipeline']
+ else:
+ for policy in row["security_pipeline"].split(","):
+ policies_list.append(commonfunctions.get_policyid(policy))
+
+ data = {
+ 'name': row["pdpname"],
+ 'description': row["pdpdescription"],
+ 'vim_project_id': row['keystone_project_id'],
+ 'security_pipeline': policies_list
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.pdpAPI, headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Get policy id by calling the common funtion: get_policyid
+# 2) create the pdp jason then patch the policy after searching for it's id.
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following pdp')
+def step_impl(context):
+ logger.info("When the user sets to add the following pdp")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "pdp name: '" + row["pdpname"] + "' pdp description: '" + row[
+ "pdpdescription"] + "' and keystone project:'" + row[
+ "keystone_project_id"] + "' and security pipeline '" + row['security_pipeline'] + "'")
+
+ policies_list = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ if (row["security_pipeline"] != ""):
+ if (len(row['security_pipeline']) > 25):
+ policies_list = row['security_pipeline']
+ else:
+ for policy in row["security_pipeline"].split(","):
+ policies_list.append(commonfunctions.get_policyid(policy))
+ data = {
+ 'name': row["pdpname"],
+ 'description': row["pdpdescription"],
+ 'vim_project_id': row['keystone_project_id'],
+ 'security_pipeline': policies_list
+ }
+ else:
+ data = {
+ 'name': row["pdpname"],
+ 'description': row["pdpdescription"],
+ 'vim_project_id': row['keystone_project_id'],
+ 'security_pipeline': ""
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.pdpAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get model id by calling the common funtion: get_policyid
+# 2) create the pdp data jason then patch it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following pdp')
+def step_impl(context):
+ logger.info("When the user sets to update the following pdp")
+
+ model = getattr(context, "model", None)
+ policies_list=[]
+ for row in context.table:
+ logger.info(
+ "pdp name: '" + row["pdpname"] + "' which will be updated to pdp name:" + row[
+ "updatedpdpname"] + "' and pdp description: '" + row[
+ "updatedpdpdescription"] + "' keystone_project: '" + row["updatedkeystone_project_id"] + "' security pipeline: '"+row["updatedsecurity_pipeline"]+"'")
+
+ policies_list = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['updatedsecurity_pipeline']) > 25):
+ policies_list = row['updatedsecurity_pipeline']
+ else:
+ for policy in row["updatedsecurity_pipeline"].split(","):
+ policies_list.append(commonfunctions.get_policyid(policy))
+
+ data = {
+ 'name': row["updatedpdpname"],
+ 'description': row["updatedpdpdescription"],
+ 'vim_project_id': row['updatedkeystone_project_id'],
+ 'security_pipeline': policies_list
+ }
+
+ response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI,headers=apis_urls.auth_headers)
+ logger.info(response.json())
+ pdpjason = apis_urls.pdpAPI + "s"
+ for ids in dict(response.json()[pdpjason]).keys():
+ logger.info(str(response.json()[pdpjason][ids]['name']))
+ if (response.json()[pdpjason][ids]['name'] == row["pdpname"]):
+ logger.info(apis_urls.serverURL + apis_urls.pdpAPI+ '/' + ids)
+ response = requests.patch(apis_urls.serverURL + apis_urls.pdpAPI+ '/' + ids, headers=headers,
+ data=json.dumps(data))
+ logger.info(response.json())
+
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+ break
+
+# Step Definition Implementation:
+# 1) Get all the pdps by get request
+# 2) Loop by ids and search for the matching pdp by name and delete it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following pdp')
+def step_impl(context):
+ logging.info("When the user sets to delete the following pdp")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("pdp name:'" + row["pdpname"] + "'")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI,headers=apis_urls.auth_headers)
+ pdpjason=apis_urls.pdpAPI+"s"
+ for ids in dict(response.json()[pdpjason]).keys():
+ if (response.json()[pdpjason][ids]['name'] == row["pdpname"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.pdpAPI + "/" + ids,
+ headers=headers)
+
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing pdps by get request and put them into a table
+# 2) Sort the table by pdp name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following pdp should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following pdp should be existed in the system")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI,headers=apis_urls.auth_headers)
+ apiresult = Table(
+ names=('pdpname', 'pdpdescription', 'keystone_project_id','security_pipeline'),
+ dtype=('S10', 'S100', 'S100','S100'))
+ pdp_jason=apis_urls.pdpAPI+"s"
+ if len(response.json()[pdp_jason]) != 0:
+ for ids in dict(response.json()[pdp_jason]).keys():
+ apipdppolicies = ""
+ apipdpname = response.json()[pdp_jason][ids]['name']
+ apipdpdescription = response.json()[pdp_jason][ids]['description']
+ apipdpprojectid = response.json()[pdp_jason][ids]['vim_project_id']
+ for policies in response.json()[pdp_jason][ids]['security_pipeline']:
+ if(len(apipdppolicies)>2):
+ apipdppolicies = apipdppolicies +','+ commonfunctions.get_policyname(policies)
+ else:
+ apipdppolicies=commonfunctions.get_policyname(policies)
+
+ apiresult.add_row(vals=(
+ apipdpname, apipdpdescription, apipdpprojectid,apipdppolicies))
+
+ else:
+ apiresult.add_row(vals=("", "", "",""))
+
+ apiresult.sort('pdpname')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected pdp name: '" + str(
+ row1["pdpname"]) + "' is the same as the actual existing '" + str(
+ row2["pdpname"]) + "'")
+ assert str(row1["pdpname"]) == str(row2["pdpname"]), "pdp name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected pdp description: '" + str(
+ row1["pdpdescription"]) + "' is the same as the actual existing '" + str(
+ row2["pdpdescription"]) + "'")
+
+ assert str(row1["pdpdescription"]) == str(row2["pdpdescription"]), "pdp description is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected keystone project id description: '" + str(
+ row1["keystone_project_id"]) + "' is the same as the actual existing '" + str(
+ row2["keystone_project_id"]) + "'")
+ assert str(row1["keystone_project_id"]) == str(row2["keystone_project_id"]), "project id is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected security pipeline description: '" + str(
+ row1["security_pipeline"]) + "' is the same as the actual existing '" + str(
+ row2["security_pipeline"]) + "'")
+ assert str(row1["security_pipeline"]) == str(row2["security_pipeline"]), "security_pipeline policies is not correct!"
+ logger.info("assertion passed!")
+