aboutsummaryrefslogtreecommitdiffstats
path: root/moon_manager/tests/func_tests/features/steps/perimeter.py
diff options
context:
space:
mode:
Diffstat (limited to 'moon_manager/tests/func_tests/features/steps/perimeter.py')
-rw-r--r--moon_manager/tests/func_tests/features/steps/perimeter.py727
1 files changed, 727 insertions, 0 deletions
diff --git a/moon_manager/tests/func_tests/features/steps/perimeter.py b/moon_manager/tests/func_tests/features/steps/perimeter.py
new file mode 100644
index 00000000..a4a53120
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/perimeter.py
@@ -0,0 +1,727 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table
+from common_functions import *
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing subject preimeters in the system
+# 2) Loop by id to unlink the policies attached
+# 3) Then delete the perimeter itself
+@Given('the system has no subject perimeter')
+def step_impl(context):
+ logger.info("Given the system has no subject perimeter")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.perimetersubjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ policies_list = response.json()[apis_urls.perimetersubjectAPI][ids]['policy_list']
+ for policy in policies_list:
+ response_delete_policies = requests.delete(
+ apis_urls.serverURL + "policies/" + policy + "/" + apis_urls.perimetersubjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+ response_delete = requests.delete(apis_urls.serverURL + apis_urls.perimetersubjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ # exit(0)
+
+# Step Definition Implementation:
+# 1) Post subject perimeter using the policy id
+@Given('the following subject perimeter exists')
+def step_impl(context):
+ logger.info("Given the following subject perimeter exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row["subjectperimetername"] + "' subject perimeter description: '" + row[
+ "subjectperimeterdescription"] # "' and subject perimeter email:'" + row[
+ # "subjectperimeteremail"] + "' and subject perimeter password '" + row['subjectperimeterpassword']
+ + "' and policies '" + row['policies'] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ policyid=""
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ data = {
+ 'name': row["subjectperimetername"],
+ 'description': row["subjectperimeterdescription"],
+ # 'email': row['subjectperimeteremail'],
+ # 'password': row['subjectperimeterpassword'],
+
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI, headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Get all the existing object preimeters in the system
+# 2) Loop by id to unlink the policies attached
+# 3) Then delete the perimeter itself
+@Given('the system has no object perimeter')
+def step_impl(context):
+ logger.info("Given the system has no object perimeter")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.perimeterobjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ policies_list = response.json()[apis_urls.perimeterobjectAPI][ids]['policy_list']
+ for policy in policies_list:
+ response_delete_policies = requests.delete(
+ apis_urls.serverURL + "policies/" + policy + "/" + apis_urls.perimeterobjectAPI + "/" + ids,
+ headers=headers)
+ response_delete = requests.delete(apis_urls.serverURL + apis_urls.perimeterobjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+# Step Definition Implementation:
+# 1) Post object perimeter using the policy id
+@Given('the following object perimeter exists')
+def step_impl(context):
+ logger.info("Given the following object perimeter exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object perimeter name: '" + row["objectperimetername"] + "' object perimeter description: '" + row[
+ "objectperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+
+ data = {
+ 'name': row["objectperimetername"],
+ 'description': row["objectperimeterdescription"],
+
+ }
+ response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeterobjectAPI,
+ headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Get all the existing action preimeters in the system
+# 2) Loop by id to unlink the policies attached
+# 3) Then delete the perimeter itself
+@Given('the system has no action perimeter')
+def step_impl(context):
+ logger.info("Given the system has no action perimeter")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.perimeteractionAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ policies_list = response.json()[apis_urls.perimeteractionAPI][ids]['policy_list']
+ for policy in policies_list:
+ response_delete_policies = requests.delete(
+ apis_urls.serverURL + "policies/" + policy + "/" + apis_urls.perimeteractionAPI + "/" + ids,
+ headers=headers)
+ response_delete = requests.delete(apis_urls.serverURL + apis_urls.perimeteractionAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+
+# Step Definition Implementation:
+# 1) Post action perimeter using the policy id
+@Given('the following action perimeter exists')
+def step_impl(context):
+ logger.info("Given the following action perimeter exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "action perimeter name: '" + row["actionperimetername"] + "' action perimeter description: '" + row[
+ "actionperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ policyid=""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ data = {
+ 'name': row["actionperimetername"],
+ 'description': row["actionperimeterdescription"],
+
+ }
+ response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeteractionAPI,
+ headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Insert subject perimeter using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following subject perimeter')
+def step_impl(context):
+ logger.info("When the user sets to add the following subject perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row["subjectperimetername"] + "' subject perimeter description: '" + row[
+ "subjectperimeterdescription"] +
+ # "' and subject perimeter email:'" + row["subjectperimeteremail"] + "' and subject perimeter password '" + row['subjectperimeterpassword'] +
+ "' and policies '" + row['policies'] + "'")
+
+ policyid = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ data = {
+ 'name': row["subjectperimetername"],
+ 'description': row["subjectperimeterdescription"],
+ # 'email': row['subjectperimeteremail'],
+ # 'password': row['subjectperimeterpassword'],
+ }
+ response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing subject perimeter & get its id
+# 2) create the new perimeter jason and patch it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following subject perimeter')
+def step_impl(context):
+ logger.info("When the user sets to update the following subject perimeter")
+ model = getattr(context, "model", None)
+ policies_list = []
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row[
+ 'subjectperimetername'] + "' which will be updated to subject perimeter name:'" + row[
+ "updatedsubjectperimetername"] + "' subject perimeter description: '" + row[
+ "updatedsubjectperimeterdescription"] +
+ # "' and subject perimeter email:'" + row["updatedsubjectperimeteremail"] + "' and subject perimeter password '" + row['updatedsubjectperimeterpassword']
+ "' and policies '" + row['policies'] + "'")
+
+ policyid = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid=commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["updatedsubjectperimetername"],
+ 'description': row["updatedsubjectperimeterdescription"],
+ # 'email': row['subjectperimeteremail'],
+ # 'password': row['subjectperimeterpassword'],
+ }
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ if (response.json()[apis_urls.perimetersubjectAPI][ids]['name'] == row["subjectperimetername"]):
+ #print(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI + '/' + ids)
+ response = requests.patch(apis_urls.serverURL + apis_urls.perimetersubjectAPI + '/' + ids,
+ headers=headers,data=json.dumps(data))
+ print(response)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing subject perimeter & get its id
+# 2) Delete it without having the policy id in the request
+@When('the user sets to delete the following subject perimeter')
+def step_impl(context):
+ logging.info("When the user sets to delete the following subject perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+ logger.info("subject perimeter name:'" + row["subjectperimetername"] + "'")
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ if (response.json()[apis_urls.perimetersubjectAPI][ids]['name'] == row["subjectperimetername"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.perimetersubjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing subject perimeter & get its id
+# 2) Delete it while having the policy id in the request
+@When('the user sets to delete the following subject perimeter for a given policy')
+def step_impl(context):
+ logging.info("the user sets to delete the following subject perimeter for a given policy")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+ logger.info("subject perimeter name:'" + row["subjectperimetername"] + "' and policy:"+ row["policies"]+"'")
+ policyid = commonfunctions.get_policyid(row['policies'])
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ if (response.json()[apis_urls.perimetersubjectAPI][ids]['name'] == row["subjectperimetername"]):
+ response = requests.delete(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+ logger.info(response.json())
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Insert object perimeter using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following object perimeter')
+def step_impl(context):
+ logger.info("When the user sets to add the following object perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object perimeter name: '" + row["objectperimetername"] + "' object perimeter description: '" + row[
+ "objectperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ policies_list = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["objectperimetername"],
+ 'description': row["objectperimeterdescription"],
+ }
+ response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeterobjectAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing object perimeter & get its id
+# 2) create the new perimeter jason and patch it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following object perimeter')
+def step_impl(context):
+ logger.info("When the user sets to update the following object perimeter")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object perimeter name: '" + row[
+ 'objectperimetername'] + "' which will be updated to object perimeter name:" + row[
+ "updatedobjectperimetername"] + "' object perimeter description: '" + row[
+ "updatedobjectperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["updatedobjectperimetername"],
+ 'description': row["updatedobjectperimeterdescription"],
+ }
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ if (response.json()[apis_urls.perimeterobjectAPI][ids]['name'] == row["objectperimetername"]):
+ response = requests.patch(apis_urls.serverURL + apis_urls.perimeterobjectAPI + '/' + ids,
+ headers=headers,data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing object perimeter & get its id
+# 2) Delete it without having the policy id in the request
+@When('the user sets to delete the following object perimeter')
+def step_impl(context):
+ logging.info("When the user sets to delete the following object perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("object perimeter name:'" + row["objectperimetername"] + "'")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ if (response.json()[apis_urls.perimeterobjectAPI][ids]['name'] == row["objectperimetername"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.perimeterobjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing object perimeter & get its id
+# 2) Delete it while having the policy id in the request
+@When('the user sets to delete the following object perimeter for a given policy')
+def step_impl(context):
+ logging.info("the user sets to delete the following object perimeter for a given policy")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("object perimeter name:'" + row["objectperimetername"] + "' and policy:"+ row["policies"]+"'")
+ policyid = commonfunctions.get_policyid(row['policies'])
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ if (response.json()[apis_urls.perimeterobjectAPI][ids]['name'] == row["objectperimetername"]):
+ response = requests.delete(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeterobjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Insert action perimeter using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following action perimeter')
+def step_impl(context):
+ logger.info("When the user sets to add the following action perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "action perimeter name: '" + row["actionperimetername"] + "' action perimeter description: '" + row[
+ "actionperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ policyid=""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["actionperimetername"],
+ 'description': row["actionperimeterdescription"],
+
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeteractionAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing action perimeter & get its id
+# 2) create the new perimeter jason and patch it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following action perimeter')
+def step_impl(context):
+ logger.info("When the user sets to update the following action perimeter")
+
+ model = getattr(context, "model", None)
+
+ for row in context.table:
+
+ logger.info(
+ "action perimeter name: '" + row[
+ 'actionperimetername'] + "' which will be updated to action perimeter name:" + row[
+ "updatedactionperimetername"] + "' action perimeter description: '" + row[
+ "updatedactionperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["updatedactionperimetername"],
+ 'description': row["updatedactionperimeterdescription"],
+ }
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ if (response.json()[apis_urls.perimeteractionAPI][ids]['name'] == row["actionperimetername"]):
+ response = requests.patch(
+ apis_urls.serverURL + apis_urls.perimeteractionAPI + '/' + ids,
+ headers=headers,data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing action perimeter & get its id
+# 2) Delete it without having the policy id in the request
+@When('the user sets to delete the following action perimeter')
+def step_impl(context):
+ logging.info("When the user sets to delete the following action perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("action perimeter name:'" + row["actionperimetername"] + "'")
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ if (response.json()[apis_urls.perimeteractionAPI][ids]['name'] == row["actionperimetername"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.perimeteractionAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing action perimeter & get its id
+# 2) Delete it while having the policy id in the request
+@When('the user sets to delete the following action perimeter for a given policy')
+def step_impl(context):
+ logging.info("the user sets to delete the following action perimeter for a given policy")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("action perimeter name:'" + row["actionperimetername"] + "' and policy:"+ row["policies"]+"'")
+ policyid = commonfunctions.get_policyid(row['policies'])
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ if (response.json()[apis_urls.perimeteractionAPI][ids]['name'] == row["actionperimetername"]):
+ response = requests.delete(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeteractionAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing subject perimeter by get request and put them into a table
+# 2) Sort the table by subject perimeter
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following subject perimeter should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following subject perimeter should be existed in the system")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ apiresult = Table(
+ names=('subjectperimetername', 'subjectperimeterdescription',
+ # 'subjectperimeteremail',
+ # 'subjectperimeterpassword',
+ 'policies'),
+ dtype=('S100', 'S100', 'S100'))
+
+ if len(response.json()[apis_urls.perimetersubjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ apipoliciesid = []
+ apipolicies = ""
+ GeneralVariables.assignsubjectperimeterid['value']=ids
+ apisubjectperimetername = response.json()[apis_urls.perimetersubjectAPI][ids]['name']
+ apisubjectperimeterdescription = response.json()[apis_urls.perimetersubjectAPI][ids]['description']
+ # apisubjectperimeteremail = response.json()[apis_urls.perimetersubjectAPI][ids]['email']
+ # apisubjectperimeterpassword = response.json()[apis_urls.perimetersubjectAPI][ids]['password']
+ if (len(response.json()[apis_urls.perimetersubjectAPI][ids]['policy_list']) != 0):
+ for policies in response.json()[apis_urls.perimetersubjectAPI][ids]['policy_list']:
+ apipoliciesid.append(commonfunctions.get_policyname(str(policies)))
+ apipolicies = ",".join(apipoliciesid)
+ else:
+ apipolicies = ""
+ apiresult.add_row(vals=(
+ apisubjectperimetername, apisubjectperimeterdescription,
+ # apisubjectperimeteremail,# apisubjectperimeterpassword,
+ apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", ""))
+
+ apiresult.sort('subjectperimetername')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected subject perimeter name: '" + str(
+ row1["subjectperimetername"]) + "' is the same as the actual existing '" + str(
+ row2["subjectperimetername"]) + "'")
+ assert str(row1["subjectperimetername"]) == str(
+ row2["subjectperimetername"]), "subject perimeter name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected subject perimeter description: '" + str(
+ row1["subjectperimeterdescription"]) + "' is the same as the actual existing '" + str(
+ row2["subjectperimeterdescription"]) + "'")
+ assert str(row1["subjectperimeterdescription"]) == str(
+ row2["subjectperimeterdescription"]), "subject perimeter description is not correct!"
+ logger.info("assertion passed!")
+
+ # logger.info("asserting the expected subject perimeter email: '" + str(
+ # row1["subjectperimeteremail"]) + "' is the same as the actual existing '" + str(
+ # row2["subjectperimeteremail"]) + "'")
+ # assert str(row1["subjectperimeteremail"]) == str(
+ # row2["subjectperimeteremail"]), "subject perimeter email is not correct!"
+ # logger.info("assertion passed!")
+ #
+ # logger.info("asserting the expected subject perimeter password: '" + str(
+ # row1["subjectperimeterpassword"]) + "' is the same as the actual existing '" + str(
+ # row2["subjectperimeterpassword"]) + "'")
+ # assert str(row1["subjectperimeterpassword"]) == str(
+ # row2["subjectperimeterpassword"]), "subject perimeter password is not correct!"
+ # logger.info("assertion passed!")
+
+ if (str(row1["policies"]).find(',') == -1):
+ logger.info("asserting the expected policies: '" + str(
+ row1["policies"]) + "' is the same as the actual existing '" + str(
+ row2["policies"]) + "'")
+ logger.info("policies is not correct!")
+ assert str(row1["policies"]) == str(row2["policies"]), " policies is not correct!"
+ else:
+
+ logger.info("asserting the expected policies: '" + ','.join(
+ sorted(str(row1["policies"]).split(','), key=str.lower)) + "' is the same as the actual existing '" +
+ ','.join(sorted(str(row2["policies"]).split(','), key=str.lower)) + "'")
+ logger.info("policies is not correct!")
+ assert ','.join(sorted(str(row1["policies"]).split(','), key=str.lower)) == ','.join(
+ sorted(str(row2["policies"]).split(','), key=str.lower)), " policies is not correct!"
+ logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing object perimeter by get request and put them into a table
+# 2) Sort the table by subject perimeter
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following object perimeter should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following object perimeter should be existed in the system")
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ apiresult = Table(
+ names=('objectperimetername', 'objectperimeterdescription', 'policies'),
+ dtype=('S100', 'S100', 'S100'))
+ if len(response.json()[apis_urls.perimeterobjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ apipolicies = ""
+ apipoliciesid = []
+ apiobjectperimetername = response.json()[apis_urls.perimeterobjectAPI][ids]['name']
+ apiobjectperimeterdescription = response.json()[apis_urls.perimeterobjectAPI][ids]['description']
+ if (len(response.json()[apis_urls.perimeterobjectAPI][ids]['policy_list']) != 0):
+ for policies in response.json()[apis_urls.perimeterobjectAPI][ids]['policy_list']:
+ apipoliciesid.append(commonfunctions.get_policyname(str(policies)))
+ apipolicies = ",".join(apipoliciesid)
+ else:
+ apipolicies = ""
+ apiresult.add_row(vals=(
+ apiobjectperimetername, apiobjectperimeterdescription, apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", ""))
+
+ apiresult.sort('objectperimetername')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected object perimeter name: '" + str(
+ row1["objectperimetername"]) + "' is the same as the actual existing '" + str(
+ row2["objectperimetername"]) + "'")
+ assert str(row1["objectperimetername"]) == str(
+ row2["objectperimetername"]), "object perimeter name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected object perimeter description: '" + str(
+ row1["objectperimeterdescription"]) + "' is the same as the actual existing '" + str(
+ row2["objectperimeterdescription"]) + "'")
+ assert str(row1["objectperimeterdescription"]) == str(
+ row2["objectperimeterdescription"]), "object perimeter description is not correct!"
+ logger.info("assertion passed!")
+
+ if (str(row1["policies"]).find(',') == -1):
+ logger.info("asserting the expected policies: '" + str(
+ row1["policies"]) + "' is the same as the actual existing '" + str(
+ row2["policies"]) + "'")
+ logger.info("policies is not correct!")
+ assert str(row1["policies"]) == str(row2["policies"]), " policies is not correct!"
+ else:
+ logger.info("asserting the expected policies: '" + ','.join(
+ sorted(str(row1["policies"]).split(','), key=str.lower)) + "' is the same as the actual existing '" +
+ ','.join(sorted(str(row2["policies"]).split(','), key=str.lower)) + "'")
+ logger.info("policies is not correct!")
+ assert ','.join(sorted(str(row1["policies"]).split(','), key=str.lower)) == ','.join(
+ sorted(str(row2["policies"]).split(','), key=str.lower)), " policies is not correct!"
+ logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing subject perimeter by get request and put them into a table
+# 2) Sort the table by subject perimeter
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following action perimeter should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following action perimeter should be existed in the system")
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ apiresult = Table(
+ names=('actionperimetername', 'actionperimeterdescription', 'policies'),
+ dtype=('S100', 'S100', 'S100'))
+ if len(response.json()[apis_urls.perimeteractionAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ apipolicies = ""
+ apipoliciesid = []
+ apiactionperimetername = response.json()[apis_urls.perimeteractionAPI][ids]['name']
+ apiactionperimeterdescription = response.json()[apis_urls.perimeteractionAPI][ids]['description']
+ if (len(response.json()[apis_urls.perimeteractionAPI][ids]['policy_list']) != 0):
+ for policies in response.json()[apis_urls.perimeteractionAPI][ids]['policy_list']:
+ apipoliciesid.append(commonfunctions.get_policyname(str(policies)))
+ apipolicies = ",".join(apipoliciesid)
+ else:
+ apipolicies = ""
+ apiresult.add_row(vals=(
+ apiactionperimetername, apiactionperimeterdescription, apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", ""))
+
+ apiresult.sort('actionperimetername')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected action perimeter name: '" + str(
+ row1["actionperimetername"]) + "' is the same as the actual existing '" + str(
+ row2["actionperimetername"]) + "'")
+ assert str(row1["actionperimetername"]) == str(
+ row2["actionperimetername"]), "action perimeter name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected action perimeter description: '" + str(
+ row1["actionperimeterdescription"]) + "' is the same as the actual existing '" + str(
+ row2["actionperimeterdescription"]) + "'")
+ assert str(row1["actionperimeterdescription"]) == str(
+ row2["actionperimeterdescription"]), "action perimeter description is not correct!"
+ logger.info("assertion passed!")
+
+ if(str(row1["policies"]).find(',')==-1):
+ logger.info("asserting the expected policies: '" + str(
+ row1["policies"]) + "' is the same as the actual existing '" + str(
+ row2["policies"]) + "'")
+ logger.info("policies is not correct!")
+ assert str(row1["policies"]) == str(row2["policies"]), " policies is not correct!"
+ else:
+
+ logger.info("asserting the expected policies: '" + ','.join(sorted(str(row1["policies"]).split(','),key=str.lower)) + "' is the same as the actual existing '" +
+ ','.join(sorted(str(row2["policies"]).split(','), key=str.lower)) + "'")
+ logger.info("policies is not correct!")
+ assert ','.join(sorted(str(row1["policies"]).split(','),key=str.lower)) == ','.join(sorted(str(row2["policies"]).split(','),key=str.lower)), " policies is not correct!"
+ logger.info("assertion passed!")