aboutsummaryrefslogtreecommitdiffstats
path: root/moon_manager/tests/func_tests/features/steps
diff options
context:
space:
mode:
authorThomas Duval <thomas.duval@orange.com>2020-06-03 10:06:52 +0200
committerThomas Duval <thomas.duval@orange.com>2020-06-03 10:06:52 +0200
commit7bb53c64da2dcf88894bfd31503accdd81498f3d (patch)
tree4310e12366818af27947b5e2c80cb162da93a4b5 /moon_manager/tests/func_tests/features/steps
parentcbea4e360e9bfaa9698cf7c61c83c96a1ba89b8c (diff)
Update to new version 5.4HEADstable/jermamaster
Signed-off-by: Thomas Duval <thomas.duval@orange.com> Change-Id: Idcd868133d75928a1ffd74d749ce98503e0555ea
Diffstat (limited to 'moon_manager/tests/func_tests/features/steps')
-rw-r--r--moon_manager/tests/func_tests/features/steps/Static_Variables.py89
-rw-r--r--moon_manager/tests/func_tests/features/steps/__init__.py11
-rw-r--r--moon_manager/tests/func_tests/features/steps/assignments.py858
-rw-r--r--moon_manager/tests/func_tests/features/steps/authorization.py217
-rw-r--r--moon_manager/tests/func_tests/features/steps/common_functions.py279
-rw-r--r--moon_manager/tests/func_tests/features/steps/data.py629
-rw-r--r--moon_manager/tests/func_tests/features/steps/meta_data.py394
-rw-r--r--moon_manager/tests/func_tests/features/steps/meta_rules.py335
-rw-r--r--moon_manager/tests/func_tests/features/steps/model.py230
-rw-r--r--moon_manager/tests/func_tests/features/steps/pdp.py248
-rw-r--r--moon_manager/tests/func_tests/features/steps/perimeter.py727
-rw-r--r--moon_manager/tests/func_tests/features/steps/policy.py219
-rw-r--r--moon_manager/tests/func_tests/features/steps/rules.py495
13 files changed, 4731 insertions, 0 deletions
diff --git a/moon_manager/tests/func_tests/features/steps/Static_Variables.py b/moon_manager/tests/func_tests/features/steps/Static_Variables.py
new file mode 100644
index 00000000..471f92fa
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/Static_Variables.py
@@ -0,0 +1,89 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+class GeneralVariables:
+ serverURL="http://127.0.0.1:8000/"
+
+ serverIP="10.237.71.141"
+
+ serverport = "22"
+
+ serverusername="ubuntu"
+
+ serverpassword="ubuntu-007"
+
+ token = "{{TOKEN}}"
+
+ auth_headers = {"X-Api-Key": token}
+
+ actual_authresponse = {'value': False}
+
+ api_responseflag = {'value': False}
+
+ pipelinePort = {'value': ""}
+
+ wrapperPort = {'value': ""}
+
+ projectAPI = ""
+
+ slaveAPI="slave"
+
+ getslavesAPI = "slaves"
+
+ pdpAPI = "pdp"
+
+ modelAPI = "models"
+
+ policyAPI = "policies"
+
+ assignpolicyid={'value': ""}
+
+ assignsubjectperimeterid = {'value': ""}
+
+ assignsubjectcategoryid = {'value': ""}
+
+ assignobjectperimeterid = {'value': ""}
+
+ assignobjectcategoryid = {'value': ""}
+
+ assignactionperimeterid = {'value': ""}
+
+ assignactioncategoryid = {'value': ""}
+
+ metarulesAPI = "meta_rules"
+
+ metadatasubjectcategoryAPI = "subject_categories"
+
+ metadataobjectcategoryAPI = "object_categories"
+
+ metadataactioncategoryAPI = "action_categories"
+
+ perimetersubjectAPI = "subjects"
+
+ perimeterobjectAPI = "objects"
+
+ perimeteractionAPI = "actions"
+
+ datasubjectAPI = "subject_data"
+
+ dataobjectAPI = "object_data"
+
+ dataactionAPI = "action_data"
+
+ assignementssubjectAPI = "subject_assignments"
+
+ assignementsobjectAPI = "object_assignments"
+
+ assignementsactionAPI = "action_assignments"
+
+ rulesAPI = "rules"
+
diff --git a/moon_manager/tests/func_tests/features/steps/__init__.py b/moon_manager/tests/func_tests/features/steps/__init__.py
new file mode 100644
index 00000000..582be686
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/__init__.py
@@ -0,0 +1,11 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
diff --git a/moon_manager/tests/func_tests/features/steps/assignments.py b/moon_manager/tests/func_tests/features/steps/assignments.py
new file mode 100644
index 00000000..e3f7b5a7
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/assignments.py
@@ -0,0 +1,858 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table, Column
+from common_functions import *
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing subject meta data in the system by getting the policies then their models then the model attached meta rules and then the categories
+# 2) Get subject assignment id using both the policy id, data id & the category id
+# 3) Loop by assignment id and delete it
+@Given('the system has no subject assignments')
+def step_impl(context):
+ logger.info("Given the system has no subject assignments")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers)
+ if len(response_policies.json()[apis_urls.policyAPI]) != 0:
+ for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys():
+ # subjectcategoryidslist = []
+ # subjectdataidslist = []
+ # modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id']
+ # if (modelid != None and modelid != ""):
+ # metaruleslist = \
+ # requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers).json()[apis_urls.modelAPI][modelid]['meta_rules']
+ # for metarule_ids in metaruleslist:
+ # categorieslist = \
+ # requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,
+ # headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ # metarule_ids]['subject_categories']
+ # for categoryid in categorieslist:
+ # if (categoryid not in subjectcategoryidslist):
+ # subjectcategoryidslist.append(categoryid)
+ #
+ # response_perimeters = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,
+ # headers=apis_urls.auth_headers).json()[
+ # apis_urls.perimetersubjectAPI]
+ # for perimeterid in dict(response_perimeters).keys():
+ # for categoryid in subjectcategoryidslist:
+ # response_assignment = requests.get(
+ # apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementssubjectAPI + "/" +
+ # perimeterid + "/" + categoryid, headers=apis_urls.auth_headers)
+ # if len(response_assignment.json()[apis_urls.assignementssubjectAPI]) != 0:
+ # for ids in dict(response_assignment.json()[apis_urls.assignementssubjectAPI]).keys():
+ # assignmentsid = response_assignment.json()[apis_urls.assignementssubjectAPI][str(ids)][
+ # 'assignments']
+ # for dataid in assignmentsid:
+ response = requests.delete(
+ apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementssubjectAPI , headers=headers)
+
+# Step Definition Implementation:
+# 1) Get all the existing object meta data in the system by getting the policies then their models then the model attached meta rules and then the categories
+# 2) Get object assignment id using both the policy id, data id & the category id
+# 3) Loop by assignment id and delete it
+@Given('the system has no object assignments')
+def step_impl(context):
+ logger.info("Given the system has no object assignments")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers)
+ if len(response_policies.json()[apis_urls.policyAPI]) != 0:
+ for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys():
+ # objectcategoryidslist = []
+ # objectdataidslist = []
+ # modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id']
+ # if (modelid != None and modelid != ""):
+ # metaruleslist = \
+ # requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers).json()[
+ # apis_urls.modelAPI][modelid][
+ # 'meta_rules']
+ # for metarule_ids in metaruleslist:
+ # categorieslist = \
+ # requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,
+ # headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ # metarule_ids]['object_categories']
+ # for categoryid in categorieslist:
+ # if (categoryid not in objectcategoryidslist):
+ # objectcategoryidslist.append(categoryid)
+ #
+ # response_perimeters = \
+ # requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,
+ # headers=apis_urls.auth_headers).json()[
+ # apis_urls.perimeterobjectAPI]
+ # for perimeterid in dict(response_perimeters).keys():
+ # for categoryid in objectcategoryidslist:
+ # response_assignment = requests.get(
+ # apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementsobjectAPI + "/" +
+ # perimeterid + "/" + categoryid, headers=apis_urls.auth_headers)
+ # if len(response_assignment.json()[apis_urls.assignementsobjectAPI]) != 0:
+ # for ids in dict(response_assignment.json()[apis_urls.assignementsobjectAPI]).keys():
+ # assignmentsid = response_assignment.json()[apis_urls.assignementsobjectAPI][str(ids)][
+ # 'assignments']
+ # for dataid in assignmentsid:
+ response = requests.delete(
+ apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementsobjectAPI , headers=headers)
+
+# Step Definition Implementation:
+# 1) Get all the existing action meta data in the system by getting the policies then their models then the model attached meta rules and then the categories
+# 2) Get action assignment id using both the policy id, data id & the category id
+# 3) Loop by assignment id and delete it
+@Given('the system has no action assignments')
+def step_impl(context):
+ logger.info("Given the system has no action assignments")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers)
+ if len(response_policies.json()[apis_urls.policyAPI]) != 0:
+ for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys():
+ # actioncategoryidslist = []
+ # actiondataidslist = []
+ # modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id']
+ # if (modelid != None and modelid != ""):
+ # metaruleslist = \
+ # requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers).json()[
+ # apis_urls.modelAPI][modelid][
+ # 'meta_rules']
+ # for metarule_ids in metaruleslist:
+ # categorieslist = \
+ # requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,
+ # headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ # metarule_ids]['action_categories']
+ # for categoryid in categorieslist:
+ # if (categoryid not in actioncategoryidslist):
+ # actioncategoryidslist.append(categoryid)
+ #
+ # response_perimeters = \
+ # requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,
+ # headers=apis_urls.auth_headers).json()[
+ # apis_urls.perimeteractionAPI]
+ # for perimeterid in dict(response_perimeters).keys():
+ # for categoryid in actioncategoryidslist:
+ # response_assignment = requests.get(
+ # apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementsactionAPI + "/" +
+ # perimeterid + "/" + categoryid, headers=apis_urls.auth_headers)
+ # if len(response_assignment.json()[apis_urls.assignementsactionAPI]) != 0:
+ # for ids in dict(response_assignment.json()[apis_urls.assignementsactionAPI]).keys():
+ # assignmentsid = response_assignment.json()[apis_urls.assignementsactionAPI][str(ids)][
+ # 'assignments']
+ # for dataid in assignmentsid:
+ response = requests.delete(
+ apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementsactionAPI , headers=headers)
+
+# Step Definition Implementation:
+# 1) Post subject assignment using the policy id, subject perimeter id, subject category, list of subject data ids
+@Given('the following subject assignment exists')
+def step_impl(context):
+ logger.info("Given the following subject assignment exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row["subjectperimetername"] + "' subject data: '" + row[
+ "subjectdata"] + "' and subject category: '" + row[
+ "subjectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ perimeter_id = ""
+ categoriesname = ""
+ dataname = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+ perimeter_id = commonfunctions.get_subjectperimeterid(row['subjectperimetername'])
+ categories_id = commonfunctions.get_subjectcategoryid(row['subjectcategory'])
+ dataids = commonfunctions.get_subjectdataid(row['subjectdata'], categories_id, policies_id)
+ data = {
+ 'id': perimeter_id,
+ 'category_id': categories_id,
+ 'data_id': dataids,
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementssubjectAPI,
+ headers=headers, data=json.dumps(data))
+
+ GeneralVariables.assignpolicyid['value'] = policies_id
+ GeneralVariables.assignsubjectperimeterid['value'] = perimeter_id
+ GeneralVariables.assignsubjectcategoryid['value'] = categories_id
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Post object assignment using the policy id, object perimeter id, object category, list of object data ids
+@Given('the following object assignment exists')
+def step_impl(context):
+ logger.info("Given the following object assignment exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object perimeter name: '" + row["objectperimetername"] + "' object data: '" + row[
+ "objectdata"] + "' and object category: '" + row[
+ "objectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ perimeter_id = ""
+ categoriesname = ""
+ dataname = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+ perimeter_id = commonfunctions.get_objectperimeterid(row['objectperimetername'])
+ categories_id = commonfunctions.get_objectcategoryid(row['objectcategory'])
+ dataids = commonfunctions.get_objectdataid(row['objectdata'], categories_id, policies_id)
+
+ data = {
+ 'id': perimeter_id,
+ 'category_id': categories_id,
+ 'policy_id': policies_id,
+ 'data_id': dataids,
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementsobjectAPI,
+ headers=headers, data=json.dumps(data))
+
+ GeneralVariables.assignpolicyid['value'] = policies_id
+ GeneralVariables.assignobjectperimeterid['value'] = perimeter_id
+ GeneralVariables.assignobjectcategoryid['value'] = categories_id
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Post action assignment using the policy id, action perimeter id, action category, list of action data ids
+@Given('the following action assignment exists')
+def step_impl(context):
+ logger.info("Given the following action assignment exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "action perimeter name: '" + row["actionperimetername"] + "' action data: '" + row[
+ "actiondata"] + "' and action category: '" + row[
+ "actioncategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ perimeter_id = ""
+ categoriesname = ""
+ dataname = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+ perimeter_id = commonfunctions.get_actionperimeterid(row['actionperimetername'])
+ categories_id = commonfunctions.get_actioncategoryid(row['actioncategory'])
+ dataids = commonfunctions.get_actiondataid(row['actiondata'], categories_id, policies_id)
+
+ data = {
+ 'id': perimeter_id,
+ 'category_id': categories_id,
+ 'policy_id': policies_id,
+ 'data_id': dataids,
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementsactionAPI,
+ headers=headers, data=json.dumps(data))
+
+ GeneralVariables.assignpolicyid['value'] = policies_id
+ GeneralVariables.assignactionperimeterid['value'] = perimeter_id
+ GeneralVariables.assignactioncategoryid['value'] = categories_id
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Post action assignment using the policy id, action perimeter id, action category, list of action data ids
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following subject assignment')
+def step_impl(context):
+ logger.info("When the user sets to add the following subject assignment")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row["subjectperimetername"] + "' subject data: '" + row[
+ "subjectdata"] + "' and subject category: '" + row[
+ "subjectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ perimeter_id = ""
+ categoriesname = ""
+ dataids = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policyname'] == "" or row['policyname'] == "000000000000000000000000000000000000000000000000000"):
+ policyname = "Stanford Policy"
+ else:
+ policyname = row['policyname']
+ policies_id = commonfunctions.get_policyid(policyname)
+
+ if (row["subjectperimetername"] == "" or row[
+ "subjectperimetername"] == "000000000000000000000000000000000000000000000000000"):
+ perimetername = "WilliamsJoeseph"
+ else:
+ perimetername = row["subjectperimetername"]
+ perimeter_id = commonfunctions.get_subjectperimeterid(perimetername)
+
+ if (row["subjectcategory"] == "" or row[
+ "subjectcategory"] == "000000000000000000000000000000000000000000000000000"):
+ categoriesname = "Affiliation:"
+ else:
+ categoriesname = row['subjectcategory']
+ categories_id = commonfunctions.get_subjectcategoryid(categoriesname)
+
+ if (row["subjectdata"] == "" or row["subjectdata"] == "000000000000000000000000000000000000000000000000000"):
+ dataids = "Professor"
+ else:
+ dataids = row['subjectdata']
+ dataids = commonfunctions.get_subjectdataid(dataids, categories_id, policies_id)
+
+ if (dataids == None):
+ dataids = ""
+
+ if (row["policyname"] == "" or row["policyname"] == "000000000000000000000000000000000000000000000000000"):
+ policies_id = row["policyname"]
+ if (row["subjectperimetername"] == "" or row[
+ "subjectperimetername"] == "000000000000000000000000000000000000000000000000000"):
+ perimeter_id = row["subjectperimetername"]
+ if (row["subjectcategory"] == "" or row[
+ "subjectcategory"] == "000000000000000000000000000000000000000000000000000"):
+ categories_id = row["subjectcategory"]
+ if (row["subjectdata"] == "" or row["subjectdata"] == "000000000000000000000000000000000000000000000000000"):
+ dataids = row['subjectdata']
+ data = {
+ 'id': perimeter_id,
+ 'category_id': categories_id,
+ 'data_id': dataids,
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementssubjectAPI,
+ headers=headers, data=json.dumps(data))
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Post action assignment using the policy id, action perimeter id, action category, list of action data ids
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following object assignment')
+def step_impl(context):
+ logger.info("When the user sets to add the following object assignment")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object perimeter name: '" + row["objectperimetername"] + "' object data: '" + row[
+ "objectdata"] + "' and object category: '" + row[
+ "objectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ perimeter_id = ""
+ categoriesname = ""
+ dataids = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policyname'] == "" or row['policyname'] == "000000000000000000000000000000000000000000000000000"):
+ policyname = "Stanford Policy"
+ else:
+ policyname = row['policyname']
+ policies_id = commonfunctions.get_policyid(policyname)
+
+ if (row["objectperimetername"] == "" or row[
+ "objectperimetername"] == "000000000000000000000000000000000000000000000000000"):
+ perimetername = "StudentsGradesSheet"
+ else:
+ perimetername = row["objectperimetername"]
+ perimeter_id = commonfunctions.get_objectperimeterid(perimetername)
+
+ if (row["objectcategory"] == "" or row[
+ "objectcategory"] == "000000000000000000000000000000000000000000000000000"):
+ categoriesname = "Clearance:"
+ else:
+ categoriesname = row['objectcategory']
+ categories_id = commonfunctions.get_objectcategoryid(categoriesname)
+
+ if (row["objectdata"] == "" or row["objectdata"] == "000000000000000000000000000000000000000000000000000"):
+ dataids = "Confidential"
+ else:
+ dataids = row['objectdata']
+ dataids = commonfunctions.get_objectdataid(dataids, categories_id, policies_id)
+
+ if (dataids == None):
+ dataids = ""
+
+ if (row["policyname"] == "" or row["policyname"] == "000000000000000000000000000000000000000000000000000"):
+ policies_id = row["policyname"]
+ if (row["objectperimetername"] == "" or row[
+ "objectperimetername"] == "000000000000000000000000000000000000000000000000000"):
+ perimeter_id = row["objectperimetername"]
+ if (row["objectcategory"] == "" or row[
+ "objectcategory"] == "000000000000000000000000000000000000000000000000000"):
+ categories_id = row["objectcategory"]
+ if (row["objectdata"] == "" or row["objectdata"] == "000000000000000000000000000000000000000000000000000"):
+ dataids = row['objectdata']
+ data = {
+ 'id': perimeter_id,
+ 'category_id': categories_id,
+ 'data_id': dataids,
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementsobjectAPI,
+ headers=headers, data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Post action assignment using the policy id, action perimeter id, action category, list of action data ids
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following action assignment')
+def step_impl(context):
+ logger.info("When the user sets to add the following action assignment")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "action perimeter name: '" + row["actionperimetername"] + "' action data: '" + row[
+ "actiondata"] + "' and action category: '" + row[
+ "actioncategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ perimeter_id = ""
+ categoriesname = ""
+ dataids = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policyname'] == "" or row['policyname'] == "000000000000000000000000000000000000000000000000000"):
+ policyname = "Stanford Policy"
+ else:
+ policyname = row['policyname']
+ policies_id = commonfunctions.get_policyid(policyname)
+
+ if (row["actionperimetername"] == "" or row[
+ "actionperimetername"] == "000000000000000000000000000000000000000000000000000"):
+ perimetername = "Read"
+ else:
+ perimetername = row["actionperimetername"]
+ perimeter_id = commonfunctions.get_actionperimeterid(perimetername)
+
+ if (row["actioncategory"] == "" or row[
+ "actioncategory"] == "000000000000000000000000000000000000000000000000000"):
+ categoriesname = "Action-Class:"
+ else:
+ categoriesname = row['actioncategory']
+ categories_id = commonfunctions.get_actioncategoryid(categoriesname)
+
+ if (row["actiondata"] == "" or row["actiondata"] == "000000000000000000000000000000000000000000000000000"):
+ dataids = "Severe"
+ else:
+ dataids = row['actiondata']
+ dataids = commonfunctions.get_actiondataid(dataids, categories_id, policies_id)
+
+ if (dataids == None):
+ dataids = ""
+
+ if (row["policyname"] == "" or row["policyname"] == "000000000000000000000000000000000000000000000000000"):
+ policies_id = row["policyname"]
+ if (row["actionperimetername"] == "" or row[
+ "actionperimetername"] == "000000000000000000000000000000000000000000000000000"):
+ perimeter_id = row["actionperimetername"]
+ if (row["actioncategory"] == "" or row[
+ "actioncategory"] == "000000000000000000000000000000000000000000000000000"):
+ categories_id = row["actioncategory"]
+ if (row["actiondata"] == "" or row["actiondata"] == "000000000000000000000000000000000000000000000000000"):
+ dataids = row['actiondata']
+ data = {
+ 'id': perimeter_id,
+ 'category_id': categories_id,
+ 'policy_id': policies_id,
+ 'data_id': dataids,
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementsactionAPI,
+ headers=headers, data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Delete subject assignment by policy id,subject perimeter id, subject data id, subject category id
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following subject assignment')
+def step_impl(context):
+ logging.info("When the user sets to delete the following subject assignment")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row["subjectperimetername"] + "' subject data list: '" + row[
+ "subjectdata"] + "' and subject category: '" + row[
+ "subjectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ perimeter_id = ""
+ dataid = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+ perimeter_id = commonfunctions.get_subjectperimeterid(row['subjectperimetername'])
+ categories_id = commonfunctions.get_subjectcategoryid(row['subjectcategory'])
+ dataid = commonfunctions.get_subjectdataid(row["subjectdata"], categories_id, policies_id)
+
+ response_assignment = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementssubjectAPI + "/" +
+ perimeter_id + "/" + categories_id, headers=apis_urls.auth_headers)
+ logging.info(response_assignment.json()[apis_urls.assignementssubjectAPI])
+ if len(response_assignment.json()[apis_urls.assignementssubjectAPI]) != 0:
+ for ids in dict(response_assignment.json()[apis_urls.assignementssubjectAPI]).keys():
+ assignmentsidlist = response_assignment.json()[apis_urls.assignementssubjectAPI][str(ids)][
+ 'assignments']
+ if dataid in assignmentsidlist:
+ response = requests.delete(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementssubjectAPI + "/" +
+ perimeter_id + "/" + categories_id + "/" + dataid, headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+
+# Step Definition Implementation:
+# 1) Delete object assignment by policy id, object perimeter id, object data id, object category id
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following object assignment')
+def step_impl(context):
+ logging.info("When the user sets to delete the following object assignment")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object perimeter name: '" + row["objectperimetername"] + "' object data list: '" + row[
+ "objectdata"] + "' and object category: '" + row[
+ "objectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ perimeter_id = ""
+ datalistids = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+ perimeter_id = commonfunctions.get_objectperimeterid(row['objectperimetername'])
+ categories_id = commonfunctions.get_objectcategoryid(row['objectcategory'])
+ dataid = commonfunctions.get_objectdataid(row["objectdata"], categories_id, policies_id)
+
+ response_assignment = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementsobjectAPI + "/" +
+ perimeter_id + "/" + categories_id, headers=apis_urls.auth_headers)
+ if len(response_assignment.json()[apis_urls.assignementsobjectAPI]) != 0:
+ for ids in dict(response_assignment.json()[apis_urls.assignementsobjectAPI]).keys():
+ assignmentsidlist = response_assignment.json()[apis_urls.assignementsobjectAPI][str(ids)][
+ 'assignments']
+ if dataid in assignmentsidlist:
+ response = requests.delete(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementsobjectAPI + "/" +
+ perimeter_id + "/" + categories_id + "/" + dataid, headers=headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+
+# Step Definition Implementation:
+# 1) Delete action assignment by policy id, action perimeter id, action data id, action category id
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following action assignment')
+def step_impl(context):
+ logging.info("When the user sets to delete the following action assignment")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "action perimeter name: '" + row["actionperimetername"] + "' action data list: '" + row[
+ "actiondata"] + "' and action category: '" + row[
+ "actioncategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ perimeter_id = ""
+ datalistids = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+ perimeter_id = commonfunctions.get_actionperimeterid(row['actionperimetername'])
+ categories_id = commonfunctions.get_actioncategoryid(row['actioncategory'])
+ dataid = commonfunctions.get_actiondataid(row["actiondata"], categories_id, policies_id)
+
+ response_assignment = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementsactionAPI + "/" +
+ perimeter_id + "/" + categories_id, headers=apis_urls.auth_headers)
+ if len(response_assignment.json()[apis_urls.assignementsactionAPI]) != 0:
+ for ids in dict(response_assignment.json()[apis_urls.assignementsactionAPI]).keys():
+ assignmentsidlist = response_assignment.json()[apis_urls.assignementsactionAPI][str(ids)][
+ 'assignments']
+ if dataid in assignmentsidlist:
+ response = requests.delete(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementsactionAPI + "/" +
+ perimeter_id + "/" + categories_id + "/" + dataid, headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing subject assignment per a given policy, subject perimeter and subject category by get request and put them into a table
+# 2) Sort the table by subject perimeter name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following subject assignment should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following subject assignment should be existed in the system")
+ model = getattr(context, "model", None)
+ apiresult = Table(names=('subjectperimetername', 'subjectcategory', 'subjectdata', 'policyname'),
+ dtype=('S100', 'S100', 'S100', 'S100'))
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row["subjectperimetername"] + "' subject data list: '" + row[
+ "subjectdata"] + "' and subject category: '" + row[
+ "subjectcategory"] + "' and policies: '" + row['policyname'] + "'")
+ if (row['policyname'] == "" or row['subjectperimetername'] == ""):
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + GeneralVariables.assignpolicyid[
+ 'value'] + "/" + apis_urls.assignementssubjectAPI + "/" +
+ GeneralVariables.assignsubjectperimeterid['value'] + "/" +
+ GeneralVariables.assignsubjectcategoryid['value'], headers=apis_urls.auth_headers)
+ else:
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + commonfunctions.get_policyid(
+ row['policyname']) + "/" + apis_urls.assignementssubjectAPI + "/" +
+ commonfunctions.get_subjectperimeterid(row['subjectperimetername']) + "/" +
+ commonfunctions.get_subjectcategoryid(row['subjectcategory']), headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.assignementssubjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.assignementssubjectAPI]).keys():
+ apipolicies = ""
+ apisubjectname = commonfunctions.get_subjectperimetername(
+ response.json()[apis_urls.assignementssubjectAPI][str(ids)]['subject_id'])
+ apisubjectcategory = commonfunctions.get_subjectcategoryname(
+ response.json()[apis_urls.assignementssubjectAPI][str(ids)]['category_id'])
+ apiassignments = commonfunctions.get_subjectdataname(
+ response.json()[apis_urls.assignementssubjectAPI][str(ids)]['assignments'],
+ response.json()[apis_urls.assignementssubjectAPI][str(ids)]['category_id'],
+ response.json()[apis_urls.assignementssubjectAPI][str(ids)]['policy_id'])
+ apipolicies = commonfunctions.get_policyname(
+ response.json()[apis_urls.assignementssubjectAPI][str(ids)]['policy_id'])
+ if ((row['policyname'] == "" or row['subjectperimetername'] == "") and "".join(apiassignments)==""):
+ apiresult.add_row(vals=("", "", "", ""))
+ else:
+ apiresult.add_row(vals=(
+ apisubjectname, apisubjectcategory, apiassignments, apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+
+ apiresult.sort('subjectperimetername')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected subject perimeter name: '" + str(
+ row1["subjectperimetername"]) + "' is the same as the actual existing '" + str(
+ row2["subjectperimetername"]) + "'")
+ assert str(row1["subjectperimetername"]) == str(
+ row2["subjectperimetername"]), "subject perimeter name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected subject data description: '" + str(
+ row1["subjectcategory"]) + "' is the same as the actual existing '" + str(
+ row2["subjectcategory"]) + "'")
+ assert str(row1["subjectcategory"]) == str(
+ row2["subjectcategory"]), "subject category is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected subject data password: '" + str(
+ row1["subjectdata"]) + "' is the same as the actual existing '" + str(
+ row2["subjectdata"]) + "'")
+ assert str(row1["subjectdata"]) == str(
+ row2["subjectdata"]), "subject data list is not correct!"
+ logger.info("assertion passed!")
+
+ #logger.info("asserting the expected policies: '" + str(
+ # row1["policyname"]) + "' is the same as the actual existing '" + str(
+ # row2["policyname"]) + "'")
+ #assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!"
+ #logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing object assignment per a given policy, object perimeter and object category by get request and put them into a table
+# 2) Sort the table by object perimeter name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following object assignment should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following object assignment should be existed in the system")
+ model = getattr(context, "model", None)
+ apiresult = Table(names=('objectperimetername', 'objectcategory', 'objectdata', 'policyname'),
+ dtype=('S100', 'S100', 'S400', 'S100'))
+ for row in context.table:
+ if (row['policyname'] == "" or row['objectperimetername'] == ""):
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + GeneralVariables.assignpolicyid[
+ 'value'] + "/" + apis_urls.assignementsobjectAPI + "/" +
+ GeneralVariables.assignobjectperimeterid['value'] + "/" +
+ GeneralVariables.assignobjectcategoryid['value'], headers=apis_urls.auth_headers)
+ else:
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + commonfunctions.get_policyid(
+ row['policyname']) + "/" + apis_urls.assignementsobjectAPI + "/" +
+ commonfunctions.get_objectperimeterid(row['objectperimetername']) + "/" +
+ commonfunctions.get_objectcategoryid(row['objectcategory']), headers=apis_urls.auth_headers)
+
+ if len(response.json()[apis_urls.assignementsobjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.assignementsobjectAPI]).keys():
+ apipolicies = ""
+ apiobjectname = commonfunctions.get_objectperimetername(
+ response.json()[apis_urls.assignementsobjectAPI][str(ids)]['object_id'])
+ apiobjectcategory = commonfunctions.get_objectcategoryname(
+ response.json()[apis_urls.assignementsobjectAPI][str(ids)]['category_id'])
+ apiassignments = commonfunctions.get_objectdataname(
+ response.json()[apis_urls.assignementsobjectAPI][str(ids)]['assignments'],
+ response.json()[apis_urls.assignementsobjectAPI][str(ids)]['category_id'],
+ response.json()[apis_urls.assignementsobjectAPI][str(ids)]['policy_id'])
+ apipolicies = commonfunctions.get_policyname(
+ response.json()[apis_urls.assignementsobjectAPI][str(ids)]['policy_id'])
+ if ((row['policyname'] == "" or row['objectperimetername'] == "") and "".join(apiassignments) == ""):
+ apiresult.add_row(vals=("", "", "", ""))
+ else:
+ apiresult.add_row(vals=(
+ apiobjectname, apiobjectcategory, ",".join(apiassignments), apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+
+ apiresult.sort('objectperimetername')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected object perimeter name: '" + str(
+ row1["objectperimetername"]) + "' is the same as the actual existing '" + str(
+ row2["objectperimetername"]) + "'")
+ assert str(row1["objectperimetername"]) == str(
+ row2["objectperimetername"]), "object perimeter name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected object data description: '" + str(
+ row1["objectcategory"]) + "' is the same as the actual existing '" + str(
+ row2["objectcategory"]) + "'")
+ assert str(row1["objectcategory"]) == str(
+ row2["objectcategory"]), "object category is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected object data password: '" + str(
+ row1["objectdata"]) + "' is the same as the actual existing '" + str(
+ row2["objectdata"]) + "'")
+ assert str(row1["objectdata"]) == str(
+ row2["objectdata"]), "object data list is not correct!"
+ logger.info("assertion passed!")
+
+ #logger.info("asserting the expected policies: '" + str(
+ # row1["policyname"]) + "' is the same as the actual existing '" + str(
+ # row2["policyname"]) + "'")
+ #assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!"
+ #logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing action assignment per a given policy, action perimeter and action category by get request and put them into a table
+# 2) Sort the table by action perimeter name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following action assignment should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following action assignment should be existed in the system")
+ model = getattr(context, "model", None)
+ apiresult = Table(names=('actionperimetername', 'actioncategory', 'actiondata', 'policyname'),
+ dtype=('S100', 'S100', 'S100', 'S100'))
+ for row in context.table:
+ if (row['policyname'] == "" or row['actionperimetername'] == ""):
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + GeneralVariables.assignpolicyid[
+ 'value'] + "/" + apis_urls.assignementsactionAPI + "/" +
+ GeneralVariables.assignactionperimeterid['value'] + "/" +
+ GeneralVariables.assignactioncategoryid['value'], headers=apis_urls.auth_headers)
+ else:
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + commonfunctions.get_policyid(
+ row['policyname']) + "/" + apis_urls.assignementsactionAPI + "/" +
+ commonfunctions.get_actionperimeterid(row['actionperimetername']) + "/" +
+ commonfunctions.get_actioncategoryid(row['actioncategory']), headers=apis_urls.auth_headers)
+
+ if len(response.json()[apis_urls.assignementsactionAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.assignementsactionAPI]).keys():
+ apipolicies = ""
+ apiactionname = commonfunctions.get_actionperimetername(
+ response.json()[apis_urls.assignementsactionAPI][str(ids)]['action_id'])
+ apiactioncategory = commonfunctions.get_actioncategoryname(
+ response.json()[apis_urls.assignementsactionAPI][str(ids)]['category_id'])
+ apiassignments = commonfunctions.get_actiondataname(
+ response.json()[apis_urls.assignementsactionAPI][str(ids)]['assignments'],
+ response.json()[apis_urls.assignementsactionAPI][str(ids)]['category_id'],
+ response.json()[apis_urls.assignementsactionAPI][str(ids)]['policy_id'])
+ apipolicies = commonfunctions.get_policyname(
+ response.json()[apis_urls.assignementsactionAPI][str(ids)]['policy_id'])
+ logger.info(apiassignments)
+ if ((row['policyname'] == "" or row['actionperimetername'] == "") and "".join(apiassignments) == ""):
+ apiresult.add_row(vals=("", "", "", ""))
+ else:
+ apiresult.add_row(vals=(
+ apiactionname, apiactioncategory, apiassignments, apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+
+ apiresult.sort('actionperimetername')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected action perimeter name: '" + str(
+ row1["actionperimetername"]) + "' is the same as the actual existing '" + str(
+ row2["actionperimetername"]) + "'")
+ assert str(row1["actionperimetername"]) == str(
+ row2["actionperimetername"]), "action perimeter name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected action data description: '" + str(
+ row1["actioncategory"]) + "' is the same as the actual existing '" + str(
+ row2["actioncategory"]) + "'")
+ assert str(row1["actioncategory"]) == str(
+ row2["actioncategory"]), "action category is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected action data password: '" + str(
+ row1["actiondata"]) + "' is the same as the actual existing '" + str(
+ row2["actiondata"]) + "'")
+ assert str(row1["actiondata"]) == str(
+ row2["actiondata"]), "action data list is not correct!"
+ logger.info("assertion passed!")
+
+ #logger.info("asserting the expected policies: '" + str(
+ # row1["policyname"]) + "' is the same as the actual existing '" + str(
+ # row2["policyname"]) + "'")
+ #assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!"
+ #logger.info("assertion passed!")
diff --git a/moon_manager/tests/func_tests/features/steps/authorization.py b/moon_manager/tests/func_tests/features/steps/authorization.py
new file mode 100644
index 00000000..5fa0ebe7
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/authorization.py
@@ -0,0 +1,217 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from common_functions import *
+import requests
+import json
+import logging
+import paramiko
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) Launch Moon Manager
+# 3) Set the token in the global variables
+@Given('the manager is configured')
+def step_impl(context):
+ logger.info("\n")
+ logger.info("******************** Scenario: " + context.scenario.name + " ********************")
+ logger.info("Given the manager is configured")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ # client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+ logger.info("before ")
+ stdin, stdout, stderr = client.exec_command(
+
+ "sudo nohup hug -m moon_manager.server &"
+ " /usr/bin/python3 "
+ )
+ #stdin, stdout, stderr = client.exec_command(" sudo /usr/local/bin/moon_manager add_user alaa00 admin")
+ #stdin, stdout, stderr = client.exec_command(" sudo /usr/local/bin/moon_manager get_key alaa00 admin ")
+ #logger.info(stdout.readlines())
+ #GeneralVariables.auth_headers['X-Api-Key'] = str(stdout.readlines())
+ #logger.info("token: " + str(GeneralVariables.auth_headers['X-Api-Key']))
+ #logger.info("after ")
+ # client.close()
+
+# Step Definition Implementation: Incomplete Step
+# 1) Get all the moon slaves
+# 2) Loop on the slave by id and delete them
+@Given('no slave is created')
+def step_impl(context):
+ logger.info("\n")
+ logger.info("******************** Scenario: " + context.scenario.name + " ********************")
+ logger.info("Given no slave is created")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ response = requests.get(apis_urls.serverURL + apis_urls.getslavesAPI, headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.getslavesAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.getslavesAPI]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.slaveAPI + "/" + ids,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Create a slave by post request
+# 2) Get the wrapper port id from the slave posting request & set it to the wrapperPort global variable
+@Given('the slave is created')
+def step_impl(context):
+ logger.info("Given the slave is created")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ data = {
+ 'name': "default",
+ 'description': "description",
+ 'address': "111",
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.slaveAPI, headers=headers,
+ data=json.dumps(data))
+ slaveid = list(response.json()[apis_urls.getslavesAPI])[0]
+ GeneralVariables.wrapperPort['value'] = str(response.json()[apis_urls.getslavesAPI][slaveid]['extra']['port'])
+
+# Step Definition Implementation: Incomplete Step
+# 1) Check the Pipeline is up and running
+@Given('the pipeline is running')
+def step_impl(context):
+ logger.info("Given the pipeline is running")
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) execute the authorization curl command using the wrapperPort
+@Given('the following authorization request is granted through pipeline')
+def step_impl(context):
+ logger.info("Given the following authorization request is granted through pipeline")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+ for row in context.table:
+ logger.info("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ stdin, stdout, stderr = client.exec_command("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ logger.info(stdout.readlines())
+ GeneralVariables.actual_authresponse['value'] = str(stdout.readlines())
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) execute the authorization curl command using the wrapperPort
+@Given('the following authorization request is granted through wrapper')
+def step_impl(context):
+ logger.info("Given the following authorization request is granted through wrapper")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+ for row in context.table:
+ logger.info("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[
+ "keystone_project_id"]) + "/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ stdin, stdout, stderr = client.exec_command("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[
+ "keystone_project_id"]) + "/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ logger.info(stdout.readlines())
+ GeneralVariables.actual_authresponse['value'] = str(stdout.readlines())
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) execute the authorization curl command using the pipelinePort
+# 3) set the actual_authresponse global variable with the curl response
+@When('the following authorization request is sent through pipeline')
+def step_impl(context):
+ logger.info("Given the following authorization request is sent through pipeline")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+
+ for row in context.table:
+ logger.info("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ stdin, stdout, stderr = client.exec_command("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ logger.info(stdout.readlines())
+ GeneralVariables.actual_authresponse['value'] = str(stdout.readlines())
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) execute the authorization curl command using the pipelinePort
+# 3) set the actual_authresponse global variable with the curl response
+@When('the following authorization request is sent through wrapper')
+def step_impl(context):
+ logger.info("Given the following authorization request is sent through wrapper")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+
+ for row in context.table:
+ logger.info("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[
+ "keystone_project_id"]) + "/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ stdin, stdout, stderr = client.exec_command("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[
+ "keystone_project_id"]) + "/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ logger.info(stdout.readlines())
+ GeneralVariables.actual_authresponse['value'] = str(stdout.readlines())
+
+# Step Definition Implementation: Untested Step
+# 1) Assert that the actual authresponse is the same as the expected.
+@Then('the authorization response should be the following')
+def step_impl(context):
+ logger.info("Then the authorization response should be the following")
+ for row in context.table:
+ logger.info("asserting the expected api response: '" + row["auth_response"] + "' and the actual response: '" +
+ GeneralVariables.actual_authresponse['value'] + "'")
+ assert row["auth_response"] == GeneralVariables.actual_authresponse[
+ 'value'], "Validation is not correct, Expected: " + \
+ row[
+ "auth_response"] + " but the API response was: " + \
+ GeneralVariables.actual_authresponse['value']
+ logger.info("assertion passed!")
diff --git a/moon_manager/tests/func_tests/features/steps/common_functions.py b/moon_manager/tests/func_tests/features/steps/common_functions.py
new file mode 100644
index 00000000..b9b9f0bc
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/common_functions.py
@@ -0,0 +1,279 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from steps.Static_Variables import GeneralVariables
+import requests
+import json
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+class commonfunctions:
+ apis_urls = GeneralVariables()
+
+ def get_subjectcategoryid(self, subjectcategoryname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadatasubjectcategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadatasubjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadatasubjectcategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['name'] == subjectcategoryname):
+ return response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['id']
+
+ def get_objectcategoryid(self, objectcategoryname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataobjectcategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadataobjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadataobjectcategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['name'] == objectcategoryname):
+ return response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['id']
+
+ def get_actioncategoryid(self, actioncategoryname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataactioncategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadataactioncategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadataactioncategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['name'] == actioncategoryname):
+ return response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['id']
+
+ def get_metaruleid(self, metarulename):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metarulesAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metarulesAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metarulesAPI]).keys():
+ if (response.json()[self.apis_urls.metarulesAPI][ids]['name'] == metarulename):
+ return ids
+
+ def get_modelid(self, modelname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.modelAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.modelAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.modelAPI]).keys():
+ if (response.json()[self.apis_urls.modelAPI][ids]['name'] == modelname):
+ return ids
+
+ def get_policyid(self, policyname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.policyAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.policyAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.policyAPI]).keys():
+ if (response.json()[self.apis_urls.policyAPI][ids]['name'] == policyname):
+ return ids
+
+ def get_subjectperimeterid(self,subjectperimeter ):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimetersubjectAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimetersubjectAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.perimetersubjectAPI]).keys():
+ if (response.json()[self.apis_urls.perimetersubjectAPI][ids]['name'] == subjectperimeter):
+ return ids
+
+ def get_objectperimeterid(self,objectperimeter ):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeterobjectAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimeterobjectAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.perimeterobjectAPI]).keys():
+ if (response.json()[self.apis_urls.perimeterobjectAPI][ids]['name'] == objectperimeter):
+ return ids
+
+ def get_actionperimeterid(self, actionperimeter):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeteractionAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimeteractionAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.perimeteractionAPI]).keys():
+ if (response.json()[self.apis_urls.perimeteractionAPI][ids]['name'] == actionperimeter):
+ return ids
+
+ def get_subjectdataid(self,subjectdataname,subjectcategoryid,policyid ):
+ response_data = requests.get(
+ self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.datasubjectAPI + "/" + subjectcategoryid,headers=self.apis_urls.auth_headers)
+ if(len(response_data.json()[self.apis_urls.datasubjectAPI]))!=0:
+ subjectdataidslist = []
+ matcheddataidslist = []
+ dataids=response_data.json()[self.apis_urls.datasubjectAPI][0]['data']
+ for ids in dataids:
+ apisubjectdataid = response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(ids)]['id']
+ subjectdataidslist.append(apisubjectdataid)
+
+ if ((str(subjectdataname)).find(",") != -1):
+ datanameslist = subjectdataname.split(",")
+ for dataname in datanameslist:
+ for data_id in subjectdataidslist:
+ if ((response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)][
+ 'name']) == dataname):
+ matcheddataidslist.append(data_id)
+ return ",".join(matcheddataidslist)
+ else:
+ for data_id in subjectdataidslist:
+ if ((
+ response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)]['name']) == subjectdataname):
+ return data_id
+
+ def get_objectdataid(self,objectdataname,objectcategoryid,policyid ):
+ response_data = requests.get(
+ self.apis_urls.serverURL + self.apis_urls.policyAPI + "/" + policyid + "/" + self.apis_urls.dataobjectAPI + "/" + objectcategoryid,headers=self.apis_urls.auth_headers)
+ if (len(response_data.json()[self.apis_urls.dataobjectAPI])) != 0:
+ objectdataidslist = []
+ matcheddataidslist=[]
+ for ids in response_data.json()[ self.apis_urls.dataobjectAPI][0]['data']:
+ apiobjectdataid = response_data.json()[ self.apis_urls.dataobjectAPI][0]['data'][str(ids)]['id']
+ objectdataidslist.append(apiobjectdataid)
+ if ((str(objectdataname)).find(",") != -1):
+ datanameslist = objectdataname.split(",")
+ for dataname in datanameslist:
+ for data_id in objectdataidslist:
+ if ((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']) == dataname):
+ matcheddataidslist.append(data_id)
+ return ",".join(matcheddataidslist)
+
+ else:
+ for data_id in objectdataidslist:
+ if ((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']) == objectdataname):
+ return data_id
+
+ def get_actiondataid(self,actiondataname,actioncategoryid,policyid ):
+ response_data = requests.get(
+ self.apis_urls.serverURL + self.apis_urls.policyAPI + "/" + policyid + "/" + self.apis_urls.dataactionAPI + "/" + actioncategoryid,headers=self.apis_urls.auth_headers)
+ if (len(response_data.json()[self.apis_urls.dataactionAPI])) != 0:
+ actiondataidslist = []
+ matcheddataidslist = []
+ for ids in response_data.json()[self.apis_urls.dataactionAPI][0]['data']:
+ apiactiondataid = response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(ids)]['id']
+ actiondataidslist.append(apiactiondataid)
+ if ((str(actiondataname)).find(",") != -1):
+ datanameslist = actiondataname.split(",")
+ for dataname in datanameslist:
+ for data_id in actiondataidslist:
+ if ((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)][
+ 'name']) == dataname):
+ matcheddataidslist.append(data_id)
+ return ",".join(matcheddataidslist)
+ else:
+ for data_id in actiondataidslist:
+ if ((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)]['name']) == actiondataname):
+ return data_id
+
+ def get_subjectcategoryname(self, subjectcategoryid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadatasubjectcategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadatasubjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadatasubjectcategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['id'] == subjectcategoryid):
+ return response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['name']
+
+ def get_objectcategoryname(self, objectcategoryid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataobjectcategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadataobjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadataobjectcategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['id'] == objectcategoryid):
+ return response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['name']
+
+ def get_actioncategoryname(self, actioncategoryid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataactioncategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadataactioncategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadataactioncategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['id'] == actioncategoryid):
+ return response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['name']
+
+ def get_metarulename(self, metaruleid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metarulesAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metarulesAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.metarulesAPI]).keys():
+ if (id == metaruleid):
+ return response.json()[self.apis_urls.metarulesAPI][id]['name']
+
+ def get_modelname(self, modelid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.modelAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.modelAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.modelAPI]).keys():
+ if (id == modelid):
+ return response.json()[self.apis_urls.modelAPI][id]['name']
+
+ def get_policyname(self, policyid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.policyAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.policyAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.policyAPI]).keys():
+ if (id == policyid):
+ return response.json()[self.apis_urls.policyAPI][id]['name']
+
+ def get_subjectperimetername(self, subjectperimeterid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimetersubjectAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimetersubjectAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.perimetersubjectAPI]).keys():
+ if (id == subjectperimeterid):
+ return response.json()[self.apis_urls.perimetersubjectAPI][id]['name']
+
+ def get_objectperimetername(self, objectperimeterid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeterobjectAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimeterobjectAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.perimeterobjectAPI]).keys():
+ if (id == objectperimeterid):
+ return response.json()[self.apis_urls.perimeterobjectAPI][id]['name']
+
+ def get_actionperimetername(self, actionperimeterid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeteractionAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimeteractionAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.perimeteractionAPI]).keys():
+ if (id == actionperimeterid):
+ return response.json()[self.apis_urls.perimeteractionAPI][id]['name']
+
+ def get_subjectdataname(self, subjectdataids, subjectcategoryid, policyid):
+ subjectdatanames=[]
+ for subjectdataid in subjectdataids:
+ response_data = requests.get(
+ self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.datasubjectAPI + "/" + subjectcategoryid+"/"+subjectdataid,headers=self.apis_urls.auth_headers)
+
+ subjectdataidslist = []
+ if(response_data.status_code==200):
+ for ids in response_data.json()[self.apis_urls.datasubjectAPI][0]['data']:
+ apisubjectdataid = response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(ids)]['id']
+ subjectdataidslist.append(apisubjectdataid)
+
+ for data_id in subjectdataidslist:
+ if (str((response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)][
+ 'id'])) == subjectdataid):
+ subjectdatanames.append(str(response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)]['name']))
+ else:
+ subjectdataidslist = ""
+ return subjectdatanames
+
+ def get_objectdataname(self, objectdataids, objectcategoryid, policyid):
+ objectdatanames = []
+ for objectdataid in objectdataids:
+ response_data = requests.get(
+ self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.dataobjectAPI + "/" + objectcategoryid + "/" + objectdataid,headers=self.apis_urls.auth_headers)
+ objectdataidslist = []
+ if (response_data.status_code == 200):
+ for ids in response_data.json()[self.apis_urls.dataobjectAPI][0]['data']:
+ apiobjectdataid = response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(ids)]['id']
+ objectdataidslist.append(apiobjectdataid)
+ for data_id in objectdataidslist:
+ if (str((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)][
+ 'id'])) == objectdataid):
+ objectdatanames.append(
+ str(response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']))
+ else:
+ objectdataidslist = ""
+ return objectdatanames
+
+ def get_actiondataname(self, actiondataids, actioncategoryid, policyid):
+ actiondatanames = []
+ for actiondataid in actiondataids:
+ response_data = requests.get(
+ self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.dataactionAPI + "/" + actioncategoryid + "/" + actiondataid,headers=self.apis_urls.auth_headers)
+ #logger.info(response_data.json()[self.apis_urls.dataactionAPI][0])
+
+ actiondataidslist = []
+ if (response_data.status_code == 200):
+ for ids in response_data.json()[self.apis_urls.dataactionAPI][0]['data']:
+ apiactiondataid = response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(ids)]['id']
+ actiondataidslist.append(apiactiondataid)
+ logging.info(actiondataidslist)
+ for data_id in actiondataidslist:
+ if (str((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)][
+ 'id'])) == actiondataid):
+ actiondatanames.append(
+ str(response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)]['name']))
+ else:
+ actiondataidslist = ""
+ return actiondatanames \ No newline at end of file
diff --git a/moon_manager/tests/func_tests/features/steps/data.py b/moon_manager/tests/func_tests/features/steps/data.py
new file mode 100644
index 00000000..67d743c2
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/data.py
@@ -0,0 +1,629 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table, Column
+from common_functions import *
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing subject meta data in the system by getting the policies then their models then the model attached meta rules and then the categories
+# 2) Get subject data using both the policy id & the category id
+# 3) Loop by data id and delete it
+@Given('the system has no subject data')
+def step_impl(context):
+ logger.info("Given the system has no subject data")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI,headers=apis_urls.auth_headers)
+ if len(response_policies.json()[apis_urls.policyAPI]) != 0:
+ for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys():
+ subjectcategoryidslist = []
+ modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id']
+ if (modelid != None and modelid != ""):
+ metaruleslist = \
+ requests.get(apis_urls.serverURL + apis_urls.modelAPI,headers=apis_urls.auth_headers).json()[apis_urls.modelAPI][modelid][
+ 'meta_rules']
+ for metarule_ids in metaruleslist:
+ categorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metarule_ids]['subject_categories']
+ for categoryid in categorieslist:
+ if (categoryid not in subjectcategoryidslist):
+ subjectcategoryidslist.append(categoryid)
+
+ for categoryid in subjectcategoryidslist:
+ response_data = requests.get(
+ apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.datasubjectAPI + "/" + categoryid,headers=apis_urls.auth_headers)
+ for ids in response_data.json()[apis_urls.datasubjectAPI][0]['data']:
+ data_id = response_data.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['id']
+ requests.delete(
+ apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.datasubjectAPI + "/" + categoryid + "/" + data_id,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Post subject data using the policy id & the category id
+@Given('the following subject data exists')
+def step_impl(context):
+ logger.info("Given the following subject data exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject data name: '" + row["subjectdataname"] + "' subject data description: '" + row[
+ "subjectdatadescription"] + "' and subject category: '" + row[
+ "subjectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['policyname']) > 25):
+ policies_id = row['policyname']
+ else:
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+
+ if (len(row['subjectcategory']) > 25):
+ categories_id = row['subjectcategory']
+ else:
+ categories_id = commonfunctions.get_subjectcategoryid(row['subjectcategory'])
+
+ data = {
+ 'name': row["subjectdataname"],
+ 'description': row["subjectdatadescription"],
+
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.datasubjectAPI + "/" + str(
+ categories_id), headers=headers, data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Get all the existing object meta data in the system by getting the policies then their models then the model attached meta rules and then the categories
+# 2) Get object data using both the policy id & the category id
+# 3) Loop by data id and delete it
+@Given('the system has no object data')
+def step_impl(context):
+ logger.info("Given the system has no object data")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI,headers=apis_urls.auth_headers)
+ if len(response_policies.json()[apis_urls.policyAPI]) != 0:
+ for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys():
+ objectcategoryidslist = []
+ modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id']
+ if (modelid != None and modelid != ""):
+ metaruleslist = \
+ requests.get(apis_urls.serverURL + apis_urls.modelAPI,headers=apis_urls.auth_headers).json()[apis_urls.modelAPI][modelid][
+ 'meta_rules']
+ for metarule_ids in metaruleslist:
+ for categoryid in \
+ (requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers)).json()[apis_urls.metarulesAPI][
+ metarule_ids][
+ 'object_categories']:
+ if (categoryid not in objectcategoryidslist):
+ objectcategoryidslist.append(categoryid)
+
+ for categoryid in objectcategoryidslist:
+ response_data = requests.get(
+ apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.dataobjectAPI + "/" + categoryid,headers=apis_urls.auth_headers)
+ for ids in response_data.json()[apis_urls.dataobjectAPI][0]['data']:
+ data_id = response_data.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['id']
+ requests.delete(
+ apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.dataobjectAPI + "/" + categoryid + "/" + data_id,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Post object data using the policy id & the category id
+@Given('the following object data exists')
+def step_impl(context):
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject data name: '" + row["objectdataname"] + "' object data description: '" + row[
+ "objectdatadescription"] + "' and object category: '" + row[
+ "objectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['policyname']) > 25):
+ policies_id = row['policyname']
+ else:
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+
+ if (len(row['objectcategory']) > 25):
+ categories_id = row['objectcategory']
+ else:
+ categories_id = commonfunctions.get_objectcategoryid(row['objectcategory'])
+
+ data = {
+ 'name': row["objectdataname"],
+ 'description': row["objectdatadescription"],
+
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.dataobjectAPI + "/" + str(
+ categories_id), headers=headers, data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Get all the existing action meta data in the system by getting the policies then their models then the model attached meta rules and then the categories
+# 2) Get action data using both the policy id & the category id
+# 3) Loop by data id and delete it
+@Given('the system has no action data')
+def step_impl(context):
+ logger.info("Given the system has no action data")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ actioncategoryidslist = []
+ response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI,headers=apis_urls.auth_headers)
+ if len(response_policies.json()[apis_urls.policyAPI]) != 0:
+ for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys():
+ actioncategoryidslist = []
+ modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id']
+ if (modelid != None and modelid != ""):
+ metaruleslist = \
+ requests.get(apis_urls.serverURL + apis_urls.modelAPI,headers=apis_urls.auth_headers).json()[apis_urls.modelAPI][modelid][
+ 'meta_rules']
+ for metarule_ids in metaruleslist:
+ for categoryid in \
+ (requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers)).json()[apis_urls.metarulesAPI][
+ metarule_ids][
+ 'action_categories']:
+ if (categoryid not in actioncategoryidslist):
+ actioncategoryidslist.append(categoryid)
+
+ for categoryid in actioncategoryidslist:
+ response_data = requests.get(
+ apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.dataactionAPI + "/" + categoryid,headers=apis_urls.auth_headers)
+ for ids in response_data.json()[apis_urls.dataactionAPI][0]['data']:
+ data_id = response_data.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['id']
+ requests.delete(
+ apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.dataactionAPI + "/" + categoryid + "/" + data_id,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Post action data using the policy id & the category id
+@Given('the following action data exists')
+def step_impl(context):
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject data name: '" + row["actiondataname"] + "' action data description: '" + row[
+ "actiondatadescription"] + "' and action category: '" + row[
+ "actioncategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['policyname']) > 25):
+ policies_id = row['policyname']
+ else:
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+
+ if (len(row['actioncategory']) > 25):
+ categories_id = row['actioncategory']
+ else:
+ categories_id = commonfunctions.get_actioncategoryid(row['actioncategory'])
+
+ data = {
+ 'name': row["actiondataname"],
+ 'description': row["actiondatadescription"],
+
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.dataactionAPI + "/" + str(
+ categories_id), headers=headers, data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Add subject data using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following subject data')
+def step_impl(context):
+ logger.info("When the user sets to add the following subject data")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject data name: '" + row["subjectdataname"] + "' subject data description: '" + row[
+ "subjectdatadescription"] + "' and subject category: '" + row[
+ "subjectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['policyname']) > 25):
+ policies_id = row['policyname']
+ else:
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+
+ if (len(row['subjectcategory']) > 25):
+ categories_id = row['subjectcategory']
+ else:
+ categories_id = commonfunctions.get_subjectcategoryid(row['subjectcategory'])
+
+ data = {
+ 'name': row["subjectdataname"],
+ 'description': row["subjectdatadescription"],
+
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.datasubjectAPI + "/" + str(
+ categories_id), headers=headers, data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Delete subject data by policy id, subject data id, subject category id
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following subject data')
+def step_impl(context):
+ logging.info("When the user sets to delete the following subject data")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+
+ logger.info("subject data name:'" + row["subjectdataname"] + "' and subject category name:'" + row[
+ "subjectcategory"] + "' and policy name:'" + row["policyname"] + "'")
+
+ policies_id = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response_data = requests.delete(
+ apis_urls.serverURL + apis_urls.policyAPI + "/" + commonfunctions.get_policyid(row[
+ "policyname"]) + "/" + apis_urls.datasubjectAPI + "/" + commonfunctions.get_subjectcategoryid(
+ row["subjectcategory"]) + "/" + commonfunctions.get_subjectdataid(row["subjectdataname"],
+ commonfunctions.get_subjectcategoryid(
+ row["subjectcategory"]),
+ commonfunctions.get_policyid(
+ row["policyname"])),
+ headers=headers)
+
+ if response_data.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Add object data using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following object data')
+def step_impl(context):
+ logger.info("When the user sets to add the following object data")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object data name: '" + row["objectdataname"] + "' object data description: '" + row[
+ "objectdatadescription"] + "' and object category: '" + row[
+ "objectcategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_list = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['policyname']) > 25):
+ policies_id = row['policyname']
+ else:
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+
+ if (len(row['objectcategory']) > 25):
+ categories_id = row['objectcategory']
+ else:
+ categories_id = commonfunctions.get_objectcategoryid(row['objectcategory'])
+
+ data = {
+ 'name': row["objectdataname"],
+ 'description': row["objectdatadescription"],
+ }
+
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.dataobjectAPI + "/" + str(
+ categories_id), headers=headers, data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Delete object data by policy id, object data id, object category id
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following object data')
+def step_impl(context):
+ logging.info("When the user sets to delete the following object data")
+ model = getattr(context, "model", None)
+ for row in context.table:
+
+ logger.info("object data name:'" + row["objectdataname"] + "' and object category name:'" + row[
+ "objectcategory"] + "' and policy name:'" + row["policyname"] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response_data = requests.delete(
+ apis_urls.serverURL + apis_urls.policyAPI + "/" + commonfunctions.get_policyid(row[
+ "policyname"]) + "/" + apis_urls.dataobjectAPI + "/" + commonfunctions.get_objectcategoryid(
+ row["objectcategory"]) + "/" + commonfunctions.get_objectdataid(row["objectdataname"],
+ commonfunctions.get_objectcategoryid(
+ row["objectcategory"]),
+ commonfunctions.get_policyid(
+ row["policyname"])),
+ headers=headers)
+
+ if response_data.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Add action data using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following action data')
+def step_impl(context):
+ logger.info("When the user sets to add the following action data")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "action data name: '" + row["actiondataname"] + "' action data description: '" + row[
+ "actiondatadescription"] + "' and action category: '" + row[
+ "actioncategory"] + "' and policies: '" + row['policyname'] + "'")
+
+ policies_id = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['policyname']) > 25):
+ policies_id = row['policyname']
+ else:
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+
+ if (len(row['actioncategory']) > 25):
+ categories_id = row['actioncategory']
+ else:
+ categories_id = commonfunctions.get_actioncategoryid(row['actioncategory'])
+
+ data = {
+ 'name': row["actiondataname"],
+ 'description': row["actiondatadescription"],
+
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.dataactionAPI + "/" + str(
+ categories_id), headers=headers, data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Delete action data by policy id, action data id, action category id
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following action data')
+def step_impl(context):
+ logging.info("When the user sets to delete the following action data")
+ model = getattr(context, "model", None)
+ for row in context.table:
+
+ logger.info("action data name:'" + row["actiondataname"] + "' and action category name:'" + row[
+ "actioncategory"] + "' and policy name:'" + row["policyname"] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response_data = requests.delete(
+ apis_urls.serverURL + apis_urls.policyAPI + "/" + commonfunctions.get_policyid(row[
+ "policyname"]) + "/" + apis_urls.dataactionAPI + "/" + commonfunctions.get_actioncategoryid(
+ row["actioncategory"]) + "/" + commonfunctions.get_actiondataid(row["actiondataname"],
+ commonfunctions.get_actioncategoryid(
+ row["actioncategory"]),
+ commonfunctions.get_policyid(
+ row["policyname"])),
+ headers=headers)
+
+ if response_data.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing subject data by get request and put them into a table
+# 2) Sort the table by policy name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following subject data should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following subject data should be existed in the system")
+ model = getattr(context, "model", None)
+ apiresult = Table(names=('subjectdataname', 'subjectdatadescription', 'subjectcategory', 'policyname'),
+ dtype=('S100', 'S100', 'S100', 'S100'))
+ for row in context.table:
+ if (row['policyname'] != ""):
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + commonfunctions.get_policyid(
+ row['policyname']) + "/" + apis_urls.datasubjectAPI + "/" +
+ commonfunctions.get_subjectcategoryid(row['subjectcategory']),headers=apis_urls.auth_headers)
+
+ if len(response.json()[apis_urls.datasubjectAPI]) != 0:
+ for ids in response.json()[apis_urls.datasubjectAPI][0]['data']:
+ apipolicies = ""
+ apisubjectdataname = response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['name']
+ apisubjectdatadescription = response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)][
+ 'description']
+ apisubjectcategory = commonfunctions.get_subjectcategoryname(
+ response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['category_id'])
+ apipolicies = commonfunctions.get_policyname(
+ response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['policy_id'])
+ apiresult.add_row(vals=(
+ apisubjectdataname, apisubjectdatadescription, apisubjectcategory, apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+
+ apiresult.sort('policyname')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected subject data name: '" + str(
+ row1["subjectdataname"]) + "' is the same as the actual existing '" + str(
+ row2["subjectdataname"]) + "'")
+ assert str(row1["subjectdataname"]) == str(row2["subjectdataname"]), "subject data name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected subject data description: '" + str(
+ row1["subjectdatadescription"]) + "' is the same as the actual existing '" + str(
+ row2["subjectdatadescription"]) + "'")
+ assert str(row1["subjectdatadescription"]) == str(
+ row2["subjectdatadescription"]), "subject data description is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected subject data password: '" + str(
+ row1["subjectcategory"]) + "' is the same as the actual existing '" + str(
+ row2["subjectcategory"]) + "'")
+ assert str(row1["subjectcategory"]) == str(
+ row2["subjectcategory"]), "subject category is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected policies: '" + str(
+ row1["policyname"]) + "' is the same as the actual existing '" + str(
+ row2["policyname"]) + "'")
+ assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!"
+ logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing object data by get request and put them into a table
+# 2) Sort the table by policy name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following object data should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following object data should be existed in the system")
+ model = getattr(context, "model", None)
+ apiresult = Table(names=('objectdataname', 'objectdatadescription', 'objectcategory', 'policyname'),
+ dtype=('S100', 'S100', 'S100', 'S100'))
+
+ for row in context.table:
+ if (row['policyname'] != ""):
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + commonfunctions.get_policyid(
+ row['policyname']) + "/" + apis_urls.dataobjectAPI + "/" +
+ commonfunctions.get_objectcategoryid(row['objectcategory']),headers=apis_urls.auth_headers)
+
+ if len(response.json()[apis_urls.dataobjectAPI]) != 0:
+ for ids in response.json()[apis_urls.dataobjectAPI][0]['data']:
+ apipolicies = ""
+ apiobjectdataname = response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['name']
+ apiobjectdatadescription = response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)][
+ 'description']
+ apiobjectcategory = commonfunctions.get_objectcategoryname(
+ response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['category_id'])
+ apipolicies = commonfunctions.get_policyname(
+ response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['policy_id'])
+
+ apiresult.add_row(vals=(
+ apiobjectdataname, apiobjectdatadescription, apiobjectcategory, apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+
+ apiresult.sort('policyname')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected object data name: '" + str(
+ row1["objectdataname"]) + "' is the same as the actual existing '" + str(
+ row2["objectdataname"]) + "'")
+ assert str(row1["objectdataname"]) == str(row2["objectdataname"]), "subject data name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected object data description: '" + str(
+ row1["objectdatadescription"]) + "' is the same as the actual existing '" + str(
+ row2["objectdatadescription"]) + "'")
+ assert str(row1["objectdatadescription"]) == str(
+ row2["objectdatadescription"]), "object data description is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected object data category: '" + str(
+ row1["objectcategory"]) + "' is the same as the actual existing '" + str(
+ row2["objectcategory"]) + "'")
+ assert str(row1["objectcategory"]) == str(
+ row2["objectcategory"]), "object category is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected policies: '" + str(
+ row1["policyname"]) + "' is the same as the actual existing '" + str(
+ row2["policyname"]) + "'")
+ assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!"
+ logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing action data by get request and put them into a table
+# 2) Sort the table by policy name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following action data should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following action data should be existed in the system")
+ model = getattr(context, "model", None)
+ apiresult = Table(names=('actiondataname', 'actiondatadescription', 'actioncategory', 'policyname'),
+ dtype=('S100', 'S100', 'S100', 'S100'))
+ for row in context.table:
+ if (row['policyname'] != ""):
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + commonfunctions.get_policyid(
+ row['policyname']) + "/" + apis_urls.dataactionAPI + "/" +
+ commonfunctions.get_actioncategoryid(row['actioncategory']),headers=apis_urls.auth_headers)
+
+ if len(response.json()[apis_urls.dataactionAPI]) != 0:
+ for ids in response.json()[apis_urls.dataactionAPI][0]['data']:
+ apipolicies = ""
+ apiactiondataname = response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['name']
+ apiactiondatadescription = response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)][
+ 'description']
+ apiactioncategory = commonfunctions.get_actioncategoryname(
+ response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['category_id'])
+ apipolicies = commonfunctions.get_policyname(
+ response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['policy_id'])
+
+ apiresult.add_row(vals=(
+ apiactiondataname, apiactiondatadescription, apiactioncategory, apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+ apiresult.sort('policyname')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected action data name: '" + str(
+ row1["actiondataname"]) + "' is the same as the actual existing '" + str(
+ row2["actiondataname"]) + "'")
+ assert str(row1["actiondataname"]) == str(row2["actiondataname"]), "action data name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected action data description: '" + str(
+ row1["actiondatadescription"]) + "' is the same as the actual existing '" + str(
+ row2["actiondatadescription"]) + "'")
+ assert str(row1["actiondatadescription"]) == str(
+ row2["actiondatadescription"]), "action data description is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected action data category: '" + str(
+ row1["actioncategory"]) + "' is the same as the actual existing '" + str(
+ row2["actioncategory"]) + "'")
+ assert str(row1["actioncategory"]) == str(
+ row2["actioncategory"]), "action category is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected policies: '" + str(
+ row1["policyname"]) + "' is the same as the actual existing '" + str(
+ row2["policyname"]) + "'")
+ assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!"
+ logger.info("assertion passed!")
diff --git a/moon_manager/tests/func_tests/features/steps/meta_data.py b/moon_manager/tests/func_tests/features/steps/meta_data.py
new file mode 100644
index 00000000..b2a6d02c
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/meta_data.py
@@ -0,0 +1,394 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table, Column
+from common_functions import *
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+api_subjectcategory = {'name': "", 'description': ""}
+api_objectcategory = {'name': "", 'description': ""}
+api_actioncategory = {'name': "", 'description': ""}
+
+logger = logging.getLogger(__name__)
+
+
+# Step Definition Implementation:
+# 1) Get all the existing subject meta data in the system
+# 2) Loop by id and delete them
+@Given('the system has no subject categories')
+def step_impl(context):
+ logger.info("Given the system has no subject categories")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI, headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.metadatasubjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.metadatasubjectcategoryAPI]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI + "/" + ids,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Get all the existing action meta data in the system
+# 2) Loop by id and delete them
+@Given('the system has no action categories')
+def step_impl(context):
+ logger.info("Given the system has no action categories")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI, headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.metadataactioncategoryAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.metadataactioncategoryAPI]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI + "/" + ids,
+ headers=headers)
+
+
+# Step Definition Implementation:
+# 1) Get all the existing object meta data in the system
+# 2) Loop by id and delete them
+@Given('the system has no object categories')
+def step_impl(context):
+ logger.info("Given the system has no object categories")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI, headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.metadataobjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.metadataobjectcategoryAPI]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI + "/" + ids,
+ headers=headers)
+
+
+
+# Step Definition Implementation:
+# 1) Insert subject meta data using the post request
+@Given('the following meta data subject category exists')
+def step_impl(context):
+ logger.info("Given the following meta data subject category exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ data = {
+ 'name': row["subjectmetadataname"],
+ 'description': row["subjectmetadatadescription"],
+ }
+ logger.info(
+ "subject category name: '" + row["subjectmetadataname"] + "' and subject category description: '" + row[
+ "subjectmetadatadescription"] + "'")
+ response = requests.post(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI, headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Insert object meta data using the post request
+@Given('the following meta data object category exists')
+def step_impl(context):
+ logger.info("Given the following meta data object category exists")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ data = {
+ 'name': row["objectmetadataname"],
+ 'description': row["objectmetadatadescription"],
+ }
+ logger.info(
+ "object category name: '" + row["objectmetadataname"] + "' and object category description: '" + row[
+ "objectmetadatadescription"] + "'")
+ response = requests.post(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI, headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Insert action meta data using the post request
+@Given('the following meta data action category exists')
+def step_impl(context):
+ logger.info("Given the following meta data action category exists")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ data = {
+ 'name': row["actionmetadataname"],
+ 'description': row["actionmetadatadescription"],
+ }
+ logger.info(
+ "action category name: '" + row["actionmetadataname"] + "' and action category description: '" + row[
+ "actionmetadatadescription"] + "'")
+ response = requests.post(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI, headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Add subject meta data using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following meta data subject category')
+def step_impl(context):
+ logger.info("When the user sets to add the following meta data subject category")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ data = {
+ 'name': row["subjectmetadataname"],
+ 'description': row["subjectmetadatadescription"],
+ }
+ logger.info(
+ "subject category name: '" + row["subjectmetadataname"] + "' and subject category description: '" + row[
+ "subjectmetadatadescription"] + "'")
+
+ response = requests.post(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+
+# Step Definition Implementation:
+# 1) Get all the subject meta data by get request
+# 2) Loop by ids and search for the matching subject meta data by name and delete it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following meta data subject category')
+def step_impl(context):
+ logger.info("When the user sets to delete the following meta data subject category")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ logger.info("subject category name: '" + row["subjectmetadataname"] + "'")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI,
+ headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.metadatasubjectcategoryAPI]).keys():
+ if (response.json()[apis_urls.metadatasubjectcategoryAPI][ids]['name'] == row["subjectmetadataname"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI + "/" + ids,
+ headers=headers)
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Add object meta data using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following meta data object category')
+def step_impl(context):
+ logger.info("When the user sets to add the following meta data object category")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ data = {
+ 'name': row["objectmetadataname"],
+ 'description': row["objectmetadatadescription"],
+ }
+ logger.info(
+ "object category Name: '" + row["objectmetadataname"] + "' and object category description: '" + row[
+ "objectmetadatadescription"] + "''")
+ response = requests.post(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI, headers=headers,
+ data=json.dumps(data))
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the object meta data by get request
+# 2) Loop by ids and search for the matching object meta data by name and delete it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following meta data object category')
+def step_impl(context):
+ logger.info("When the user sets to delete the following meta data object category")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ logger.info("object category name: '" + row["objectmetadataname"] + "'")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI,
+ headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.metadataobjectcategoryAPI]).keys():
+ if (response.json()[apis_urls.metadataobjectcategoryAPI][ids]['name'] == row["objectmetadataname"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI + "/" + ids,
+ headers=headers)
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Add subject meta data using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following meta data action category')
+def step_impl(context):
+ logger.info("When the user sets to add the following meta data action category")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ data = {
+ 'name': row["actionmetadataname"],
+ 'description': row["actionmetadatadescription"],
+ }
+ logger.info(
+ "action category name: '" + row["actionmetadataname"] + "' and action category description: '" + row[
+ "actionmetadatadescription"] + "'")
+
+ response = requests.post(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI, headers=headers,
+ data=json.dumps(data))
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the action meta data by get request
+# 2) Loop by ids and search for the matching action meta data by name and delete it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following meta data action category')
+def step_impl(context):
+ logger.info("When the user sets to delete the following meta data action category")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("action category name: '" + row["actionmetadataname"] + "'")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI,
+ headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.metadataactioncategoryAPI]).keys():
+ # logger.info(ids)
+ if (response.json()[apis_urls.metadataactioncategoryAPI][ids]['name'] == row["actionmetadataname"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI + "/" + ids,
+ headers=headers)
+ # logger.info(response.status_code)
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing subject meta data by get request and put them into a table
+# 2) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following meta data subject category should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following meta data subject category should be existed in the system")
+
+ model = getattr(context, "model", None)
+ response = requests.get(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI, headers=apis_urls.auth_headers)
+ apiresult = Table(names=('subjectcategoryname', 'subjectcategorydescription'), dtype=('S100', 'S100'))
+ if len(response.json()[apis_urls.metadatasubjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.metadatasubjectcategoryAPI]).keys():
+ apisubjectcategoryname = response.json()[apis_urls.metadatasubjectcategoryAPI][ids]['name']
+ apisubjectcategorydescription = response.json()[apis_urls.metadatasubjectcategoryAPI][ids]['description']
+ apiresult.add_row(vals=(apisubjectcategoryname, apisubjectcategorydescription))
+ else:
+ apiresult.add_row(vals=("", ""))
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected subject category name: '" + str(
+ row1["subjectmetadataname"]) + "' is the same as the actual existing '" + str(
+ row2["subjectcategoryname"]) + "'")
+ assert str(row1["subjectmetadataname"]) == str(
+ row2["subjectcategoryname"]), "subject category name is not correct!"
+ logger.info("assertion passed!")
+ logger.info("asserting the expected subject category description: '" + str(
+ row1["subjectmetadatadescription"]) + "' is the same as the actual existing '" + str(
+ row2["subjectcategorydescription"]) + "'")
+ assert str(row1["subjectmetadatadescription"]) == str(
+ row2["subjectcategorydescription"]), "Subject meta-data category description is not correct!"
+ logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing object meta data by get request and put them into a table
+# 2) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following meta data object category should be existed in the system')
+def step_impl(context):
+ model = getattr(context, "model", None)
+ logger.info("Then the following meta data object category should be existed in the system")
+ response = requests.get(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI, headers=apis_urls.auth_headers)
+ apiresult = Table(names=('objectcategoryname', 'objectcategorydescription'), dtype=('S100', 'S100'))
+
+ if len(response.json()[apis_urls.metadataobjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.metadataobjectcategoryAPI]).keys():
+ apiobjectcategoryname = response.json()[apis_urls.metadataobjectcategoryAPI][ids]['name']
+ apiobjectcategorydescription = response.json()[apis_urls.metadataobjectcategoryAPI][ids]['description']
+ apiresult.add_row(vals=(apiobjectcategoryname, apiobjectcategorydescription))
+ else:
+ apiresult.add_row(vals=("", ""))
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected object category description: '" + str(
+ row1["objectmetadataname"]) + "' is the same as the actual existing '" + str(
+ row2["objectcategoryname"]) + "'")
+ assert str(row1["objectmetadataname"]) == str(
+ row2["objectcategoryname"]), "object category name is not correct!"
+ logger.info("assertion passed!")
+ logger.info("asserting the expected object category description: '" + str(
+ row1["objectmetadatadescription"]) + "' is the same as the actual existing '" + str(
+ row2["objectcategorydescription"]) + "'")
+ assert str(row1["objectmetadatadescription"]) == str(
+ row2["objectcategorydescription"]), "object meta-data category description is not correct!"
+ logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing action meta data by get request and put them into a table
+# 2) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following meta data action category should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following meta data action category should be existed in the system")
+
+ model = getattr(context, "model", None)
+ response = requests.get(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI, headers=apis_urls.auth_headers)
+ apiresult = Table(names=('actioncategoryname', 'actioncategorydescription'), dtype=('S100', 'S100'))
+ if len(response.json()[apis_urls.metadataactioncategoryAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.metadataactioncategoryAPI]).keys():
+ apiactioncategoryname = response.json()[apis_urls.metadataactioncategoryAPI][ids]['name']
+ apiactioncategorydescription = response.json()[apis_urls.metadataactioncategoryAPI][ids]['description']
+ apiresult.add_row(vals=(apiactioncategoryname, apiactioncategorydescription))
+ else:
+ apiresult.add_row(vals=("", ""))
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected action category description: '" + str(
+ row1["actionmetadataname"]) + "' is the same as the actual existing '" + str(
+ row2["actioncategoryname"]) + "'")
+
+ assert str(row1["actionmetadataname"]) == str(
+ row2["actioncategoryname"]), "action category name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected action category description: '" + str(
+ row1["actionmetadatadescription"]) + "' is the same as the actual existing '" + str(
+ row2["actioncategorydescription"]) + "'")
+
+ assert str(row1["actionmetadatadescription"]) == str(
+ row2["actioncategorydescription"]), "action meta-data category description is not correct!"
+ logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# Assert the saved api response flag with the expected flag
+@Then('the system should reply the following')
+def step_impl(context):
+ logger.info("Then the system should reply the following:")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info("asserting the expected api response: '" + row["flag"] + "' and the actual response: '" +
+ GeneralVariables.api_responseflag['value'] + "'")
+ assert row["flag"] == GeneralVariables.api_responseflag['value'], "Validation is not correct, Expected: " + row[
+ "flag"] + " but the API response was: " + GeneralVariables.api_responseflag['value']
+ logger.info("assertion passed!")
diff --git a/moon_manager/tests/func_tests/features/steps/meta_rules.py b/moon_manager/tests/func_tests/features/steps/meta_rules.py
new file mode 100644
index 00000000..f56d4d4c
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/meta_rules.py
@@ -0,0 +1,335 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table, Column
+from common_functions import *
+import numpy as np
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing meta rule in the system
+# 2) Loop by id and delete them
+@Given('the system has no meta-rules')
+def step_impl(context):
+ logger.info("Given the system has no meta-rules")
+
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.metarulesAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.metarulesAPI]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.metarulesAPI + "/" + ids,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Get subject, object, action categories ids list by calling the common funtion: get_subjectcategoryid, get_objectcategoryid and get_actioncategoryid
+# 2) create the meta rule data jason then post it
+@Given('the following meta rule exists')
+def step_impl(context):
+ logger.info("Given the following meta rule exists")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "meta-rule name: '" + row["metarulename"] + "' and meta-rule description: '" + row[
+ "metaruledescription"] + "' and subject categories:'" + row[
+ "subjectmetadata"] + "' and object categories:'" + row["objectmetadata"] + "' and action categories:'" +
+ row["actionmetadata"] + "'")
+ subjectcategoryids = []
+ objectcategoryids = []
+ actioncategoryids = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row["subjectmetadata"]) < 40 and str(row["subjectmetadata"])!=""):
+ if(str(row["subjectmetadata"]).find(",")!=-1):
+ for category in row["subjectmetadata"].split(","):
+ subjectcategoryids.append(commonfunctions.get_subjectcategoryid(category))
+ else:
+ subjectcategoryids.append(commonfunctions.get_subjectcategoryid(row["subjectmetadata"]))
+ else:
+ if(str(row["subjectmetadata"])==""):
+ subjectcategoryids=[]
+ else:
+ subjectcategoryids.append(row["subjectmetadata"])
+
+ if (len(row["objectmetadata"]) < 40 and str(row["objectmetadata"])!=""):
+ if(str(row["objectmetadata"]).find(",")!=-1):
+ for category in row["objectmetadata"].split(","):
+ objectcategoryids.append(commonfunctions.get_objectcategoryid(category))
+ else:
+ objectcategoryids.append(commonfunctions.get_objectcategoryid(row["objectmetadata"]))
+ else:
+ if (str(row["objectmetadata"]) == ""):
+ objectcategoryids = []
+ else:
+ objectcategoryids.append(row["objectmetadata"])
+
+ if (len(row["actionmetadata"]) < 40 and str(row["actionmetadata"])!=""):
+ if(str(row["actionmetadata"]).find(",")!=-1):
+ for category in row["actionmetadata"].split(","):
+ actioncategoryids.append(commonfunctions.get_actioncategoryid(category))
+ else:
+ actioncategoryids.append(commonfunctions.get_actioncategoryid(row["actionmetadata"]))
+ else:
+ if(str(row["actionmetadata"]) == ""):
+ actioncategoryids = []
+ else:
+ actioncategoryids.append(row["actionmetadata"])
+
+ data = {
+ 'name': row["metarulename"],
+ 'description': row["metaruledescription"],
+ 'subject_categories': subjectcategoryids,
+ 'object_categories': objectcategoryids,
+ 'action_categories': actioncategoryids
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.metarulesAPI, headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Get subject, object, action categories ids list by calling the common funtion: get_subjectcategoryid, get_objectcategoryid and get_actioncategoryid
+# 2) create the meta rule data jason then post it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following meta-rule')
+def step_impl(context):
+ logger.info("When the user sets to add the following meta-rule")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "meta-rule name: '" + row["metarulename"] + "' and meta-rule description: '" + row[
+ "metaruledescription"] + "' and subject categories:'" + row[
+ "subjectmetadata"] + "' and object categories:'" + row["objectmetadata"] + "' and action categories:'" +
+ row["actionmetadata"] + "'")
+
+ subjectcategoryids = []
+ objectcategoryids = []
+ actioncategoryids = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row["subjectmetadata"]) < 40 and str(row["subjectmetadata"])!=""):
+ if (str(row["subjectmetadata"]).find(",") != -1):
+ for category in row["subjectmetadata"].split(","):
+ subjectcategoryids.append(commonfunctions.get_subjectcategoryid(category))
+ else:
+ subjectcategoryids.append(commonfunctions.get_subjectcategoryid(row["subjectmetadata"]))
+ else:
+ subjectcategoryids.append(row["subjectmetadata"])
+
+ if (len(row["objectmetadata"]) < 40 and str(row["objectmetadata"])!=""):
+ if (str(row["objectmetadata"]).find(",") != -1):
+ for category in row["objectmetadata"].split(","):
+ objectcategoryids.append(commonfunctions.get_objectcategoryid(category))
+ else:
+ objectcategoryids.append(commonfunctions.get_objectcategoryid(row["objectmetadata"]))
+ else:
+ objectcategoryids.append(row["objectmetadata"])
+
+ if (len(row["actionmetadata"]) < 40 and str(row["actionmetadata"])!=""):
+ if (str(row["actionmetadata"]).find(",") != -1):
+ for category in row["actionmetadata"].split(","):
+ actioncategoryids.append(commonfunctions.get_actioncategoryid(category))
+ else:
+ actioncategoryids.append(commonfunctions.get_actioncategoryid(row["actionmetadata"]))
+ else:
+ actioncategoryids.append(row["actionmetadata"])
+
+
+ data = {
+ 'name': row["metarulename"],
+ 'description': row["metaruledescription"],
+ 'subject_categories': subjectcategoryids,
+ 'object_categories': objectcategoryids,
+ 'action_categories': actioncategoryids
+ }
+
+ response = requests.post(apis_urls.serverURL + apis_urls.metarulesAPI, headers=headers,
+ data=json.dumps(data))
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get subject, object, action categories ids list by calling the common funtion: get_subjectcategoryid, get_objectcategoryid and get_actioncategoryid
+# 2) create the meta rule data jason then patch the meta rule after searching for it's id.
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following meta-rule')
+def step_impl(context):
+ logger.info("When the user sets to update the following meta-rule")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "meta-rule name: '" + row["metarulename"] + "' which will be updated to metarule name:" + row[
+ "updatedmetarulename"] + "' and meta-rule description: '" + row[
+ "updatedmetaruledescription"] + "' and subject categories:'" + row[
+ "updatedsubjectmetadata"] + "' and object categories:'" + row[
+ "updatedobjectmetadata"] + "' and action categories:'" +
+ row["updatedactionmetadata"] + "'")
+
+ subjectcategoryids = []
+ objectcategoryids = []
+ actioncategoryids = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row["updatedsubjectmetadata"]) > 40):
+ subjectcategoryids.append(row["updatedsubjectmetadata"])
+ else:
+ for category in row["updatedsubjectmetadata"].split(","):
+ subjectcategoryids.append(commonfunctions.get_subjectcategoryid(category))
+
+ if (len(row["updatedobjectmetadata"]) > 40):
+ objectcategoryids.append(row["updatedobjectmetadata"])
+ else:
+ for category in row["updatedobjectmetadata"].split(","):
+ objectcategoryids.append(commonfunctions.get_objectcategoryid(category))
+
+ if (len(row["updatedactionmetadata"]) > 40):
+ actioncategoryids.append(row["updatedactionmetadata"])
+ else:
+ for category in row["updatedactionmetadata"].split(","):
+ actioncategoryids.append(commonfunctions.get_actioncategoryid(category))
+
+ data = {
+ 'name': row["updatedmetarulename"],
+ 'description': row["updatedmetaruledescription"],
+ 'subject_categories': subjectcategoryids,
+ 'object_categories': objectcategoryids,
+ 'action_categories': actioncategoryids
+ }
+
+ response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.metarulesAPI]).keys():
+ if (response.json()[apis_urls.metarulesAPI][ids]['name'] == row["metarulename"]):
+ response = requests.patch(apis_urls.serverURL + apis_urls.metarulesAPI + '/' + ids, headers=headers,
+ data=json.dumps(data))
+ break
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the meta rule by get request
+# 2) Loop by ids and search for the matching meta rule by name and delete it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following meta-rule')
+def step_impl(context):
+ logger.info("When the user sets to delete the following meta-rule")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info(
+ "meta-rule name: '" + row["metarulename"] + "'")
+ response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.metarulesAPI]).keys():
+ if (response.json()[apis_urls.metarulesAPI][ids]['name'] == row["metarulename"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.metarulesAPI + "/" + ids,
+ headers=headers)
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing action meta data by get request and put them into a table
+# 2) Sort the table by meta rule name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following meta-rules should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following meta-rules should be existed in the system")
+ response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers)
+ apimetarulesubjectcategoryname = ""
+ apimetaruleobjectcategoryname = ""
+ apimetaruleactioncategoryname = ""
+ apiresult = Table(
+ names=('metarulename', 'metaruledescription', 'subjectmetadata', 'actionmetadata', 'objectmetadata'),
+ dtype=('S10', 'S100', 'S100', 'S100', 'S100'))
+ if len(response.json()[apis_urls.metarulesAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.metarulesAPI]).keys():
+ apimetarulesubjectcategoryname = ""
+ apimetaruleobjectcategoryname = ""
+ apimetaruleactioncategoryname = ""
+ apimetarulename = response.json()[apis_urls.metarulesAPI][ids]['name']
+ apimetaruledescription = response.json()[apis_urls.metarulesAPI][ids]['description']
+ for categoryid in response.json()[apis_urls.metarulesAPI][ids]['subject_categories']:
+ if (len(apimetarulesubjectcategoryname) > 2):
+ apimetarulesubjectcategoryname = apimetarulesubjectcategoryname + ',' + commonfunctions.get_subjectcategoryname(
+ categoryid)
+ else:
+ apimetarulesubjectcategoryname = commonfunctions.get_subjectcategoryname(categoryid)
+ for categoryid in response.json()[apis_urls.metarulesAPI][ids]['object_categories']:
+ if (len(apimetaruleobjectcategoryname) > 2):
+ apimetaruleobjectcategoryname = apimetaruleobjectcategoryname + ',' + commonfunctions.get_objectcategoryname(
+ categoryid)
+ else:
+ apimetaruleobjectcategoryname = commonfunctions.get_objectcategoryname(categoryid)
+ for categoryid in response.json()[apis_urls.metarulesAPI][ids]['action_categories']:
+ if (len(apimetaruleactioncategoryname) > 2):
+ apimetaruleactioncategoryname = apimetaruleactioncategoryname + ',' + commonfunctions.get_actioncategoryname(
+ categoryid)
+ else:
+ apimetaruleactioncategoryname = commonfunctions.get_actioncategoryname(categoryid)
+
+ apiresult.add_row(vals=(
+ apimetarulename, apimetaruledescription, apimetarulesubjectcategoryname, apimetaruleactioncategoryname,
+ apimetaruleobjectcategoryname))
+
+ else:
+ apiresult.add_row(vals=("", "", "", "", ""))
+
+ apiresult.sort('metarulename')
+
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected meta rule name: '" + str(
+ row1["metarulename"]) + "' is the same as the actual existing '" + str(
+ row2["metarulename"]) + "'")
+ assert str(row1["metarulename"]) == str(row2["metarulename"]), "meta-rule name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected meta rule description: '" + str(
+ row1["metaruledescription"]) + "' is the same as the actual existing '" + str(
+ row2["metaruledescription"]) + "'")
+ assert str(row1["metaruledescription"]) == str(
+ row2["metaruledescription"]), "meta-rule description is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected subject categories: '" + str(
+ row1["subjectmetadata"]) + "' is the same as the actual existing '" + str(
+ row2["subjectmetadata"]) + "'")
+ assert str(row1["subjectmetadata"]) == str(row2["subjectmetadata"]), "subject category is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected object categories: '" + str(
+ row1["objectmetadata"]) + "' is the same as the actual existing '" + str(
+ row2["objectmetadata"]) + "'")
+ assert str(row1["objectmetadata"]) == str(row2["objectmetadata"]), "object category is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected action categories: '" + str(
+ row1["actionmetadata"]) + "' is the same as the actual existing '" + str(
+ row2["actionmetadata"]) + "'")
+ assert str(row1["actionmetadata"]) == str(row2["actionmetadata"]), "action category is not correct!"
+ logger.info("assertion passed!")
diff --git a/moon_manager/tests/func_tests/features/steps/model.py b/moon_manager/tests/func_tests/features/steps/model.py
new file mode 100644
index 00000000..36b16746
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/model.py
@@ -0,0 +1,230 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table, Column
+from common_functions import *
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing models in the system
+# 2) Loop by id and delete them
+@Given('the system has no models')
+def step_impl(context):
+ logger.info("Given the system has no models")
+
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.modelAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.modelAPI]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.modelAPI + "/" + ids,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Get meta rule ids list by calling the common funtion: get_metaruleid
+# 2) create the model data jason then post it
+@Given('the following model exists')
+def step_impl(context):
+ logger.info("Given the following model exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "model name: '" + row["modelname"] + "' model description: '" + row[
+ "modeldescription"] + "' and meta-rules:'" + row[
+ "metarule"]+"'")
+
+ metarulesids = []
+
+ if (len(row["metarule"]) > 35):
+ metarulesids.append(row["metarule"])
+ else:
+ for metarule in row["metarule"].split(","):
+ metarulesids.append(commonfunctions.get_metaruleid(metarule))
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ data = {
+ 'name': row["modelname"],
+ 'description': row["modeldescription"],
+ 'meta_rules': metarulesids
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.modelAPI, headers=headers,
+ data=json.dumps(data))
+
+
+# Step Definition Implementation:
+# 1) Get meta rule ids list by calling the common funtion: get_metaruleid
+# 2) create the model data jason then post it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following model')
+def step_impl(context):
+ logger.info("When the user sets to add the following model")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "model name: '" + row["modelname"] + "' model description: '" + row[
+ "modeldescription"] + "' and meta-rules:'" + row[
+ "metarule"] + "'")
+
+ metarules = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if(row["metarule"]!=""):
+ if (len(row["metarule"]) > 35):
+ metarules.append(row["metarule"])
+ else:
+ for metarule in row["metarule"].split(","):
+ metarules.append(commonfunctions.get_metaruleid(metarule))
+
+ data = {
+ 'name': row["modelname"],
+ 'description': row["modeldescription"],
+ 'meta_rules': metarules,
+ }
+ else:
+ data = {
+ 'name': row["modelname"],
+ 'description': row["modeldescription"],
+ 'meta_rules': "",
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.modelAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+
+# Step Definition Implementation:
+# 1) Get meta rule ids list by calling the common funtion: get_modelid
+# 2) create the model jason then patch the model after searching for it's id.
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following model')
+def step_impl(context):
+ logging.info("When the user sets to update the following model")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "model name: '" + row["modelname"] + "' which will be updated to model name:" + row[
+ "updatedmodelname"] + "' and model description: '" + row[
+ "updatedmodeldescription"] + "' meta-rules: '"+row["updatedmetarule"] + "'")
+
+ metarules = []
+ data={}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ if(row["updatedmetarule"]!=""):
+ if (len(row["updatedmetarule"]) > 35):
+ metarules.append(row["updatedmetarule"])
+ else:
+ for metarule in row["updatedmetarule"].split(","):
+ metarules.append(commonfunctions.get_metaruleid(metarule))
+ data = {
+ 'name': row["updatedmodelname"],
+ 'description': row["updatedmodeldescription"],
+ 'meta_rules': metarules,
+ }
+ else:
+ data = {
+ 'name': row["updatedmodelname"],
+ 'description': row["updatedmodeldescription"],
+ 'meta_rules': "",
+ }
+ response = requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.modelAPI]).keys():
+ if (response.json()[apis_urls.modelAPI][ids]['name'] == row["modelname"]):
+ response = requests.patch(apis_urls.serverURL + apis_urls.modelAPI+'/'+ids, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the model by get request
+# 2) Loop by ids and search for the matching model by name and delete it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following model')
+def step_impl(context):
+ logging.info("When the user sets to delete the following model")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info("model name: '" + row["modelname"] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("policy name:'" + row["modelname"] + "'")
+ response = requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.modelAPI]).keys():
+ if (response.json()[apis_urls.modelAPI][ids]['name'] == row["modelname"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.modelAPI + "/" + ids,
+ headers=headers)
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing models by get request and put them into a table
+# 2) Sort the table by model name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following model should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following model should be existed in the system")
+ response = requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers)
+ apimetarulesname=""
+ apiresult = Table(
+ names=('modelname', 'modeldescription', 'metarule'),
+ dtype=('S100', 'S100', 'S100'))
+ if len(response.json()[apis_urls.modelAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.modelAPI]).keys():
+ apimetarulesname = []
+ apimodelname = response.json()[apis_urls.modelAPI][ids]['name']
+ apimodeldescription = response.json()[apis_urls.modelAPI][ids]['description']
+ for metaruleid in response.json()[apis_urls.modelAPI][ids]['meta_rules']:
+ apimetarulesname.append(commonfunctions.get_metarulename(metaruleid))
+ apiresult.add_row(vals=(
+ apimodelname, apimodeldescription, ",".join(apimetarulesname)))
+ else:
+ apiresult.add_row(vals=("", "", ""))
+
+ apiresult.sort('modelname')
+
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected model name: '" + str(
+ row1["modelname"]) + "' is the same as the actual existing '" + str(
+ row2["modelname"]) + "'")
+ assert str(row1["modelname"]) == str(row2["modelname"]), "model name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected model description: '" + str(
+ row1["modeldescription"]) + "' is the same as the actual existing '" + str(
+ row2["modeldescription"]) + "'")
+ assert str(row1["modeldescription"]) == str(row2["modeldescription"]), "model description is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected meta rules: '" + str(
+ row1["metarule"]) + "' is the same as the actual existing '" + str(
+ row2["metarule"]) + "'")
+ assert str(row1["metarule"]) == str(row2["metarule"]), "metarule is not correct!"
+ logger.info("assertion passed!")
diff --git a/moon_manager/tests/func_tests/features/steps/pdp.py b/moon_manager/tests/func_tests/features/steps/pdp.py
new file mode 100644
index 00000000..bf839658
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/pdp.py
@@ -0,0 +1,248 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table
+from common_functions import *
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing pdps in the system
+# 2) Loop by id and delete them
+@Given('the system has no pdps')
+def step_impl(context):
+ logger.info("Given the system has no pdps")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI, headers=apis_urls.auth_headers)
+ pdpjason=apis_urls.pdpAPI+"s"
+ if len(response.json()[pdpjason]) != 0:
+ for ids in dict(response.json()[pdpjason]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.pdpAPI + "/" + ids,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Get model id by calling the common funtion: get_policyid
+# 2) create the pdp data jason then post it
+@Given('the following pdp exists')
+def step_impl(context):
+ logger.info("Given the following pdp exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "pdp name: '" + row["pdpname"] + "' pdp description: '" + row[
+ "pdpdescription"] + "' and keystone project:'" + row[
+ "keystone_project_id"] + "' and security pipeline '" + row['security_pipeline'] + "'")
+ policies_list = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['security_pipeline']) > 25):
+ policies_list = row['security_pipeline']
+ else:
+ for policy in row["security_pipeline"].split(","):
+ policies_list.append(commonfunctions.get_policyid(policy))
+
+ data = {
+ 'name': row["pdpname"],
+ 'description': row["pdpdescription"],
+ 'vim_project_id': row['keystone_project_id'],
+ 'security_pipeline': policies_list
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.pdpAPI, headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Get policy id by calling the common funtion: get_policyid
+# 2) create the pdp jason then patch the policy after searching for it's id.
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following pdp')
+def step_impl(context):
+ logger.info("When the user sets to add the following pdp")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "pdp name: '" + row["pdpname"] + "' pdp description: '" + row[
+ "pdpdescription"] + "' and keystone project:'" + row[
+ "keystone_project_id"] + "' and security pipeline '" + row['security_pipeline'] + "'")
+
+ policies_list = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ if (row["security_pipeline"] != ""):
+ if (len(row['security_pipeline']) > 25):
+ policies_list = row['security_pipeline']
+ else:
+ for policy in row["security_pipeline"].split(","):
+ policies_list.append(commonfunctions.get_policyid(policy))
+ data = {
+ 'name': row["pdpname"],
+ 'description': row["pdpdescription"],
+ 'vim_project_id': row['keystone_project_id'],
+ 'security_pipeline': policies_list
+ }
+ else:
+ data = {
+ 'name': row["pdpname"],
+ 'description': row["pdpdescription"],
+ 'vim_project_id': row['keystone_project_id'],
+ 'security_pipeline': ""
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.pdpAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get model id by calling the common funtion: get_policyid
+# 2) create the pdp data jason then patch it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following pdp')
+def step_impl(context):
+ logger.info("When the user sets to update the following pdp")
+
+ model = getattr(context, "model", None)
+ policies_list=[]
+ for row in context.table:
+ logger.info(
+ "pdp name: '" + row["pdpname"] + "' which will be updated to pdp name:" + row[
+ "updatedpdpname"] + "' and pdp description: '" + row[
+ "updatedpdpdescription"] + "' keystone_project: '" + row["updatedkeystone_project_id"] + "' security pipeline: '"+row["updatedsecurity_pipeline"]+"'")
+
+ policies_list = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['updatedsecurity_pipeline']) > 25):
+ policies_list = row['updatedsecurity_pipeline']
+ else:
+ for policy in row["updatedsecurity_pipeline"].split(","):
+ policies_list.append(commonfunctions.get_policyid(policy))
+
+ data = {
+ 'name': row["updatedpdpname"],
+ 'description': row["updatedpdpdescription"],
+ 'vim_project_id': row['updatedkeystone_project_id'],
+ 'security_pipeline': policies_list
+ }
+
+ response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI,headers=apis_urls.auth_headers)
+ logger.info(response.json())
+ pdpjason = apis_urls.pdpAPI + "s"
+ for ids in dict(response.json()[pdpjason]).keys():
+ logger.info(str(response.json()[pdpjason][ids]['name']))
+ if (response.json()[pdpjason][ids]['name'] == row["pdpname"]):
+ logger.info(apis_urls.serverURL + apis_urls.pdpAPI+ '/' + ids)
+ response = requests.patch(apis_urls.serverURL + apis_urls.pdpAPI+ '/' + ids, headers=headers,
+ data=json.dumps(data))
+ logger.info(response.json())
+
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+ break
+
+# Step Definition Implementation:
+# 1) Get all the pdps by get request
+# 2) Loop by ids and search for the matching pdp by name and delete it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following pdp')
+def step_impl(context):
+ logging.info("When the user sets to delete the following pdp")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("pdp name:'" + row["pdpname"] + "'")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI,headers=apis_urls.auth_headers)
+ pdpjason=apis_urls.pdpAPI+"s"
+ for ids in dict(response.json()[pdpjason]).keys():
+ if (response.json()[pdpjason][ids]['name'] == row["pdpname"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.pdpAPI + "/" + ids,
+ headers=headers)
+
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing pdps by get request and put them into a table
+# 2) Sort the table by pdp name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following pdp should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following pdp should be existed in the system")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI,headers=apis_urls.auth_headers)
+ apiresult = Table(
+ names=('pdpname', 'pdpdescription', 'keystone_project_id','security_pipeline'),
+ dtype=('S10', 'S100', 'S100','S100'))
+ pdp_jason=apis_urls.pdpAPI+"s"
+ if len(response.json()[pdp_jason]) != 0:
+ for ids in dict(response.json()[pdp_jason]).keys():
+ apipdppolicies = ""
+ apipdpname = response.json()[pdp_jason][ids]['name']
+ apipdpdescription = response.json()[pdp_jason][ids]['description']
+ apipdpprojectid = response.json()[pdp_jason][ids]['vim_project_id']
+ for policies in response.json()[pdp_jason][ids]['security_pipeline']:
+ if(len(apipdppolicies)>2):
+ apipdppolicies = apipdppolicies +','+ commonfunctions.get_policyname(policies)
+ else:
+ apipdppolicies=commonfunctions.get_policyname(policies)
+
+ apiresult.add_row(vals=(
+ apipdpname, apipdpdescription, apipdpprojectid,apipdppolicies))
+
+ else:
+ apiresult.add_row(vals=("", "", "",""))
+
+ apiresult.sort('pdpname')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected pdp name: '" + str(
+ row1["pdpname"]) + "' is the same as the actual existing '" + str(
+ row2["pdpname"]) + "'")
+ assert str(row1["pdpname"]) == str(row2["pdpname"]), "pdp name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected pdp description: '" + str(
+ row1["pdpdescription"]) + "' is the same as the actual existing '" + str(
+ row2["pdpdescription"]) + "'")
+
+ assert str(row1["pdpdescription"]) == str(row2["pdpdescription"]), "pdp description is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected keystone project id description: '" + str(
+ row1["keystone_project_id"]) + "' is the same as the actual existing '" + str(
+ row2["keystone_project_id"]) + "'")
+ assert str(row1["keystone_project_id"]) == str(row2["keystone_project_id"]), "project id is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected security pipeline description: '" + str(
+ row1["security_pipeline"]) + "' is the same as the actual existing '" + str(
+ row2["security_pipeline"]) + "'")
+ assert str(row1["security_pipeline"]) == str(row2["security_pipeline"]), "security_pipeline policies is not correct!"
+ logger.info("assertion passed!")
+
diff --git a/moon_manager/tests/func_tests/features/steps/perimeter.py b/moon_manager/tests/func_tests/features/steps/perimeter.py
new file mode 100644
index 00000000..a4a53120
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/perimeter.py
@@ -0,0 +1,727 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table
+from common_functions import *
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing subject preimeters in the system
+# 2) Loop by id to unlink the policies attached
+# 3) Then delete the perimeter itself
+@Given('the system has no subject perimeter')
+def step_impl(context):
+ logger.info("Given the system has no subject perimeter")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.perimetersubjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ policies_list = response.json()[apis_urls.perimetersubjectAPI][ids]['policy_list']
+ for policy in policies_list:
+ response_delete_policies = requests.delete(
+ apis_urls.serverURL + "policies/" + policy + "/" + apis_urls.perimetersubjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+ response_delete = requests.delete(apis_urls.serverURL + apis_urls.perimetersubjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ # exit(0)
+
+# Step Definition Implementation:
+# 1) Post subject perimeter using the policy id
+@Given('the following subject perimeter exists')
+def step_impl(context):
+ logger.info("Given the following subject perimeter exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row["subjectperimetername"] + "' subject perimeter description: '" + row[
+ "subjectperimeterdescription"] # "' and subject perimeter email:'" + row[
+ # "subjectperimeteremail"] + "' and subject perimeter password '" + row['subjectperimeterpassword']
+ + "' and policies '" + row['policies'] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ policyid=""
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ data = {
+ 'name': row["subjectperimetername"],
+ 'description': row["subjectperimeterdescription"],
+ # 'email': row['subjectperimeteremail'],
+ # 'password': row['subjectperimeterpassword'],
+
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI, headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Get all the existing object preimeters in the system
+# 2) Loop by id to unlink the policies attached
+# 3) Then delete the perimeter itself
+@Given('the system has no object perimeter')
+def step_impl(context):
+ logger.info("Given the system has no object perimeter")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.perimeterobjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ policies_list = response.json()[apis_urls.perimeterobjectAPI][ids]['policy_list']
+ for policy in policies_list:
+ response_delete_policies = requests.delete(
+ apis_urls.serverURL + "policies/" + policy + "/" + apis_urls.perimeterobjectAPI + "/" + ids,
+ headers=headers)
+ response_delete = requests.delete(apis_urls.serverURL + apis_urls.perimeterobjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+# Step Definition Implementation:
+# 1) Post object perimeter using the policy id
+@Given('the following object perimeter exists')
+def step_impl(context):
+ logger.info("Given the following object perimeter exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object perimeter name: '" + row["objectperimetername"] + "' object perimeter description: '" + row[
+ "objectperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+
+ data = {
+ 'name': row["objectperimetername"],
+ 'description': row["objectperimeterdescription"],
+
+ }
+ response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeterobjectAPI,
+ headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Get all the existing action preimeters in the system
+# 2) Loop by id to unlink the policies attached
+# 3) Then delete the perimeter itself
+@Given('the system has no action perimeter')
+def step_impl(context):
+ logger.info("Given the system has no action perimeter")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.perimeteractionAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ policies_list = response.json()[apis_urls.perimeteractionAPI][ids]['policy_list']
+ for policy in policies_list:
+ response_delete_policies = requests.delete(
+ apis_urls.serverURL + "policies/" + policy + "/" + apis_urls.perimeteractionAPI + "/" + ids,
+ headers=headers)
+ response_delete = requests.delete(apis_urls.serverURL + apis_urls.perimeteractionAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+
+# Step Definition Implementation:
+# 1) Post action perimeter using the policy id
+@Given('the following action perimeter exists')
+def step_impl(context):
+ logger.info("Given the following action perimeter exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "action perimeter name: '" + row["actionperimetername"] + "' action perimeter description: '" + row[
+ "actionperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ policyid=""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ data = {
+ 'name': row["actionperimetername"],
+ 'description': row["actionperimeterdescription"],
+
+ }
+ response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeteractionAPI,
+ headers=headers,
+ data=json.dumps(data))
+
+# Step Definition Implementation:
+# 1) Insert subject perimeter using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following subject perimeter')
+def step_impl(context):
+ logger.info("When the user sets to add the following subject perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row["subjectperimetername"] + "' subject perimeter description: '" + row[
+ "subjectperimeterdescription"] +
+ # "' and subject perimeter email:'" + row["subjectperimeteremail"] + "' and subject perimeter password '" + row['subjectperimeterpassword'] +
+ "' and policies '" + row['policies'] + "'")
+
+ policyid = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ data = {
+ 'name': row["subjectperimetername"],
+ 'description': row["subjectperimeterdescription"],
+ # 'email': row['subjectperimeteremail'],
+ # 'password': row['subjectperimeterpassword'],
+ }
+ response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing subject perimeter & get its id
+# 2) create the new perimeter jason and patch it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following subject perimeter')
+def step_impl(context):
+ logger.info("When the user sets to update the following subject perimeter")
+ model = getattr(context, "model", None)
+ policies_list = []
+ for row in context.table:
+ logger.info(
+ "subject perimeter name: '" + row[
+ 'subjectperimetername'] + "' which will be updated to subject perimeter name:'" + row[
+ "updatedsubjectperimetername"] + "' subject perimeter description: '" + row[
+ "updatedsubjectperimeterdescription"] +
+ # "' and subject perimeter email:'" + row["updatedsubjectperimeteremail"] + "' and subject perimeter password '" + row['updatedsubjectperimeterpassword']
+ "' and policies '" + row['policies'] + "'")
+
+ policyid = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid=commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["updatedsubjectperimetername"],
+ 'description': row["updatedsubjectperimeterdescription"],
+ # 'email': row['subjectperimeteremail'],
+ # 'password': row['subjectperimeterpassword'],
+ }
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ if (response.json()[apis_urls.perimetersubjectAPI][ids]['name'] == row["subjectperimetername"]):
+ #print(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI + '/' + ids)
+ response = requests.patch(apis_urls.serverURL + apis_urls.perimetersubjectAPI + '/' + ids,
+ headers=headers,data=json.dumps(data))
+ print(response)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing subject perimeter & get its id
+# 2) Delete it without having the policy id in the request
+@When('the user sets to delete the following subject perimeter')
+def step_impl(context):
+ logging.info("When the user sets to delete the following subject perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+ logger.info("subject perimeter name:'" + row["subjectperimetername"] + "'")
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ if (response.json()[apis_urls.perimetersubjectAPI][ids]['name'] == row["subjectperimetername"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.perimetersubjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing subject perimeter & get its id
+# 2) Delete it while having the policy id in the request
+@When('the user sets to delete the following subject perimeter for a given policy')
+def step_impl(context):
+ logging.info("the user sets to delete the following subject perimeter for a given policy")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+ logger.info("subject perimeter name:'" + row["subjectperimetername"] + "' and policy:"+ row["policies"]+"'")
+ policyid = commonfunctions.get_policyid(row['policies'])
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ if (response.json()[apis_urls.perimetersubjectAPI][ids]['name'] == row["subjectperimetername"]):
+ response = requests.delete(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+ logger.info(response.json())
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Insert object perimeter using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following object perimeter')
+def step_impl(context):
+ logger.info("When the user sets to add the following object perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object perimeter name: '" + row["objectperimetername"] + "' object perimeter description: '" + row[
+ "objectperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ policies_list = []
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["objectperimetername"],
+ 'description': row["objectperimeterdescription"],
+ }
+ response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeterobjectAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing object perimeter & get its id
+# 2) create the new perimeter jason and patch it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following object perimeter')
+def step_impl(context):
+ logger.info("When the user sets to update the following object perimeter")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "object perimeter name: '" + row[
+ 'objectperimetername'] + "' which will be updated to object perimeter name:" + row[
+ "updatedobjectperimetername"] + "' object perimeter description: '" + row[
+ "updatedobjectperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["updatedobjectperimetername"],
+ 'description': row["updatedobjectperimeterdescription"],
+ }
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ if (response.json()[apis_urls.perimeterobjectAPI][ids]['name'] == row["objectperimetername"]):
+ response = requests.patch(apis_urls.serverURL + apis_urls.perimeterobjectAPI + '/' + ids,
+ headers=headers,data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing object perimeter & get its id
+# 2) Delete it without having the policy id in the request
+@When('the user sets to delete the following object perimeter')
+def step_impl(context):
+ logging.info("When the user sets to delete the following object perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("object perimeter name:'" + row["objectperimetername"] + "'")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ if (response.json()[apis_urls.perimeterobjectAPI][ids]['name'] == row["objectperimetername"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.perimeterobjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing object perimeter & get its id
+# 2) Delete it while having the policy id in the request
+@When('the user sets to delete the following object perimeter for a given policy')
+def step_impl(context):
+ logging.info("the user sets to delete the following object perimeter for a given policy")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("object perimeter name:'" + row["objectperimetername"] + "' and policy:"+ row["policies"]+"'")
+ policyid = commonfunctions.get_policyid(row['policies'])
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ if (response.json()[apis_urls.perimeterobjectAPI][ids]['name'] == row["objectperimetername"]):
+ response = requests.delete(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeterobjectAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Insert action perimeter using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following action perimeter')
+def step_impl(context):
+ logger.info("When the user sets to add the following action perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "action perimeter name: '" + row["actionperimetername"] + "' action perimeter description: '" + row[
+ "actionperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ policyid=""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["actionperimetername"],
+ 'description': row["actionperimeterdescription"],
+
+ }
+ response = requests.post(
+ apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeteractionAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing action perimeter & get its id
+# 2) create the new perimeter jason and patch it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following action perimeter')
+def step_impl(context):
+ logger.info("When the user sets to update the following action perimeter")
+
+ model = getattr(context, "model", None)
+
+ for row in context.table:
+
+ logger.info(
+ "action perimeter name: '" + row[
+ 'actionperimetername'] + "' which will be updated to action perimeter name:" + row[
+ "updatedactionperimetername"] + "' action perimeter description: '" + row[
+ "updatedactionperimeterdescription"] + "' and policies '" + row['policies'] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (row['policies'] != ""):
+ policyid = commonfunctions.get_policyid(row['policies'])
+ else:
+ policyid=""
+ data = {
+ 'name': row["updatedactionperimetername"],
+ 'description': row["updatedactionperimeterdescription"],
+ }
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ if (response.json()[apis_urls.perimeteractionAPI][ids]['name'] == row["actionperimetername"]):
+ response = requests.patch(
+ apis_urls.serverURL + apis_urls.perimeteractionAPI + '/' + ids,
+ headers=headers,data=json.dumps(data))
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing action perimeter & get its id
+# 2) Delete it without having the policy id in the request
+@When('the user sets to delete the following action perimeter')
+def step_impl(context):
+ logging.info("When the user sets to delete the following action perimeter")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("action perimeter name:'" + row["actionperimetername"] + "'")
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ if (response.json()[apis_urls.perimeteractionAPI][ids]['name'] == row["actionperimetername"]):
+ response = requests.delete(apis_urls.serverURL + apis_urls.perimeteractionAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Search for the existing action perimeter & get its id
+# 2) Delete it while having the policy id in the request
+@When('the user sets to delete the following action perimeter for a given policy')
+def step_impl(context):
+ logging.info("the user sets to delete the following action perimeter for a given policy")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("action perimeter name:'" + row["actionperimetername"] + "' and policy:"+ row["policies"]+"'")
+ policyid = commonfunctions.get_policyid(row['policies'])
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ if (response.json()[apis_urls.perimeteractionAPI][ids]['name'] == row["actionperimetername"]):
+ response = requests.delete(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeteractionAPI + "/" + ids,
+ headers=apis_urls.auth_headers)
+
+ if response.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing subject perimeter by get request and put them into a table
+# 2) Sort the table by subject perimeter
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following subject perimeter should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following subject perimeter should be existed in the system")
+
+ response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers)
+ apiresult = Table(
+ names=('subjectperimetername', 'subjectperimeterdescription',
+ # 'subjectperimeteremail',
+ # 'subjectperimeterpassword',
+ 'policies'),
+ dtype=('S100', 'S100', 'S100'))
+
+ if len(response.json()[apis_urls.perimetersubjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys():
+ apipoliciesid = []
+ apipolicies = ""
+ GeneralVariables.assignsubjectperimeterid['value']=ids
+ apisubjectperimetername = response.json()[apis_urls.perimetersubjectAPI][ids]['name']
+ apisubjectperimeterdescription = response.json()[apis_urls.perimetersubjectAPI][ids]['description']
+ # apisubjectperimeteremail = response.json()[apis_urls.perimetersubjectAPI][ids]['email']
+ # apisubjectperimeterpassword = response.json()[apis_urls.perimetersubjectAPI][ids]['password']
+ if (len(response.json()[apis_urls.perimetersubjectAPI][ids]['policy_list']) != 0):
+ for policies in response.json()[apis_urls.perimetersubjectAPI][ids]['policy_list']:
+ apipoliciesid.append(commonfunctions.get_policyname(str(policies)))
+ apipolicies = ",".join(apipoliciesid)
+ else:
+ apipolicies = ""
+ apiresult.add_row(vals=(
+ apisubjectperimetername, apisubjectperimeterdescription,
+ # apisubjectperimeteremail,# apisubjectperimeterpassword,
+ apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", ""))
+
+ apiresult.sort('subjectperimetername')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected subject perimeter name: '" + str(
+ row1["subjectperimetername"]) + "' is the same as the actual existing '" + str(
+ row2["subjectperimetername"]) + "'")
+ assert str(row1["subjectperimetername"]) == str(
+ row2["subjectperimetername"]), "subject perimeter name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected subject perimeter description: '" + str(
+ row1["subjectperimeterdescription"]) + "' is the same as the actual existing '" + str(
+ row2["subjectperimeterdescription"]) + "'")
+ assert str(row1["subjectperimeterdescription"]) == str(
+ row2["subjectperimeterdescription"]), "subject perimeter description is not correct!"
+ logger.info("assertion passed!")
+
+ # logger.info("asserting the expected subject perimeter email: '" + str(
+ # row1["subjectperimeteremail"]) + "' is the same as the actual existing '" + str(
+ # row2["subjectperimeteremail"]) + "'")
+ # assert str(row1["subjectperimeteremail"]) == str(
+ # row2["subjectperimeteremail"]), "subject perimeter email is not correct!"
+ # logger.info("assertion passed!")
+ #
+ # logger.info("asserting the expected subject perimeter password: '" + str(
+ # row1["subjectperimeterpassword"]) + "' is the same as the actual existing '" + str(
+ # row2["subjectperimeterpassword"]) + "'")
+ # assert str(row1["subjectperimeterpassword"]) == str(
+ # row2["subjectperimeterpassword"]), "subject perimeter password is not correct!"
+ # logger.info("assertion passed!")
+
+ if (str(row1["policies"]).find(',') == -1):
+ logger.info("asserting the expected policies: '" + str(
+ row1["policies"]) + "' is the same as the actual existing '" + str(
+ row2["policies"]) + "'")
+ logger.info("policies is not correct!")
+ assert str(row1["policies"]) == str(row2["policies"]), " policies is not correct!"
+ else:
+
+ logger.info("asserting the expected policies: '" + ','.join(
+ sorted(str(row1["policies"]).split(','), key=str.lower)) + "' is the same as the actual existing '" +
+ ','.join(sorted(str(row2["policies"]).split(','), key=str.lower)) + "'")
+ logger.info("policies is not correct!")
+ assert ','.join(sorted(str(row1["policies"]).split(','), key=str.lower)) == ','.join(
+ sorted(str(row2["policies"]).split(','), key=str.lower)), " policies is not correct!"
+ logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing object perimeter by get request and put them into a table
+# 2) Sort the table by subject perimeter
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following object perimeter should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following object perimeter should be existed in the system")
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers)
+ apiresult = Table(
+ names=('objectperimetername', 'objectperimeterdescription', 'policies'),
+ dtype=('S100', 'S100', 'S100'))
+ if len(response.json()[apis_urls.perimeterobjectAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys():
+ apipolicies = ""
+ apipoliciesid = []
+ apiobjectperimetername = response.json()[apis_urls.perimeterobjectAPI][ids]['name']
+ apiobjectperimeterdescription = response.json()[apis_urls.perimeterobjectAPI][ids]['description']
+ if (len(response.json()[apis_urls.perimeterobjectAPI][ids]['policy_list']) != 0):
+ for policies in response.json()[apis_urls.perimeterobjectAPI][ids]['policy_list']:
+ apipoliciesid.append(commonfunctions.get_policyname(str(policies)))
+ apipolicies = ",".join(apipoliciesid)
+ else:
+ apipolicies = ""
+ apiresult.add_row(vals=(
+ apiobjectperimetername, apiobjectperimeterdescription, apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", ""))
+
+ apiresult.sort('objectperimetername')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected object perimeter name: '" + str(
+ row1["objectperimetername"]) + "' is the same as the actual existing '" + str(
+ row2["objectperimetername"]) + "'")
+ assert str(row1["objectperimetername"]) == str(
+ row2["objectperimetername"]), "object perimeter name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected object perimeter description: '" + str(
+ row1["objectperimeterdescription"]) + "' is the same as the actual existing '" + str(
+ row2["objectperimeterdescription"]) + "'")
+ assert str(row1["objectperimeterdescription"]) == str(
+ row2["objectperimeterdescription"]), "object perimeter description is not correct!"
+ logger.info("assertion passed!")
+
+ if (str(row1["policies"]).find(',') == -1):
+ logger.info("asserting the expected policies: '" + str(
+ row1["policies"]) + "' is the same as the actual existing '" + str(
+ row2["policies"]) + "'")
+ logger.info("policies is not correct!")
+ assert str(row1["policies"]) == str(row2["policies"]), " policies is not correct!"
+ else:
+ logger.info("asserting the expected policies: '" + ','.join(
+ sorted(str(row1["policies"]).split(','), key=str.lower)) + "' is the same as the actual existing '" +
+ ','.join(sorted(str(row2["policies"]).split(','), key=str.lower)) + "'")
+ logger.info("policies is not correct!")
+ assert ','.join(sorted(str(row1["policies"]).split(','), key=str.lower)) == ','.join(
+ sorted(str(row2["policies"]).split(','), key=str.lower)), " policies is not correct!"
+ logger.info("assertion passed!")
+
+# Step Definition Implementation:
+# 1) Get all the existing subject perimeter by get request and put them into a table
+# 2) Sort the table by subject perimeter
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following action perimeter should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following action perimeter should be existed in the system")
+ response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers)
+ apiresult = Table(
+ names=('actionperimetername', 'actionperimeterdescription', 'policies'),
+ dtype=('S100', 'S100', 'S100'))
+ if len(response.json()[apis_urls.perimeteractionAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys():
+ apipolicies = ""
+ apipoliciesid = []
+ apiactionperimetername = response.json()[apis_urls.perimeteractionAPI][ids]['name']
+ apiactionperimeterdescription = response.json()[apis_urls.perimeteractionAPI][ids]['description']
+ if (len(response.json()[apis_urls.perimeteractionAPI][ids]['policy_list']) != 0):
+ for policies in response.json()[apis_urls.perimeteractionAPI][ids]['policy_list']:
+ apipoliciesid.append(commonfunctions.get_policyname(str(policies)))
+ apipolicies = ",".join(apipoliciesid)
+ else:
+ apipolicies = ""
+ apiresult.add_row(vals=(
+ apiactionperimetername, apiactionperimeterdescription, apipolicies))
+ else:
+ apiresult.add_row(vals=("", "", ""))
+
+ apiresult.sort('actionperimetername')
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected action perimeter name: '" + str(
+ row1["actionperimetername"]) + "' is the same as the actual existing '" + str(
+ row2["actionperimetername"]) + "'")
+ assert str(row1["actionperimetername"]) == str(
+ row2["actionperimetername"]), "action perimeter name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected action perimeter description: '" + str(
+ row1["actionperimeterdescription"]) + "' is the same as the actual existing '" + str(
+ row2["actionperimeterdescription"]) + "'")
+ assert str(row1["actionperimeterdescription"]) == str(
+ row2["actionperimeterdescription"]), "action perimeter description is not correct!"
+ logger.info("assertion passed!")
+
+ if(str(row1["policies"]).find(',')==-1):
+ logger.info("asserting the expected policies: '" + str(
+ row1["policies"]) + "' is the same as the actual existing '" + str(
+ row2["policies"]) + "'")
+ logger.info("policies is not correct!")
+ assert str(row1["policies"]) == str(row2["policies"]), " policies is not correct!"
+ else:
+
+ logger.info("asserting the expected policies: '" + ','.join(sorted(str(row1["policies"]).split(','),key=str.lower)) + "' is the same as the actual existing '" +
+ ','.join(sorted(str(row2["policies"]).split(','), key=str.lower)) + "'")
+ logger.info("policies is not correct!")
+ assert ','.join(sorted(str(row1["policies"]).split(','),key=str.lower)) == ','.join(sorted(str(row2["policies"]).split(','),key=str.lower)), " policies is not correct!"
+ logger.info("assertion passed!")
diff --git a/moon_manager/tests/func_tests/features/steps/policy.py b/moon_manager/tests/func_tests/features/steps/policy.py
new file mode 100644
index 00000000..faa7156a
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/policy.py
@@ -0,0 +1,219 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table, Column
+from common_functions import *
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing policies in the system
+# 2) Loop by id and delete them
+@Given('the system has no policies')
+def step_impl(context):
+ logger.info("Given the system has no policies")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ response = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.policyAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.policyAPI]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.policyAPI + "/" + ids,
+ headers=headers)
+
+
+# Step Definition Implementation:
+# 1) Get model id by calling the common funtion: get_modelid
+# 2) create the policy data jason then post it
+@Given('the following policy exists')
+def step_impl(context):
+ logger.info("Given the following policy exists")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "policy name: '" + row["policyname"] + "' policy description: '" + row[
+ "policydescription"] + "' and model name:'" + row[
+ "modelname"] + "' and genre '"+row['genre']+"'")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ data = {
+ 'name': row["policyname"],
+ 'description': row["policydescription"],
+ 'model_id': commonfunctions.get_modelid(row['modelname']),
+ 'genre': row['genre']
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.policyAPI, headers=headers,
+ data=json.dumps(data))
+
+
+# Step Definition Implementation:
+# 1) Get model id by calling the common funtion: get_modelid
+# 2) create the policy data jason then post it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following policy')
+def step_impl(context):
+ logger.info("When the user sets to add the following policy")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "policy name: '" + row["policyname"] + "' policy description: '" + row[
+ "policydescription"] + "' and model name:'" + row[
+ "modelname"] + "' and genre '" + row['genre'] + "'")
+ policymodel = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['modelname']) > 20):
+ policymodel=row['modelname']
+ else:
+ policymodel=commonfunctions.get_modelid(row['modelname'])
+
+ data = {
+ 'name': row["policyname"],
+ 'description': row["policydescription"],
+ 'model_id': policymodel,
+ 'genre': row['genre']
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.policyAPI, headers=headers,
+ data=json.dumps(data))
+
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+
+# Step Definition Implementation:
+# 1) Get model id by calling the common funtion: get_modelid
+# 2) create the policy jason then patch the policy after searching for it's id.
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to update the following policy')
+def step_impl(context):
+ logger.info("When the user sets to update the following policy")
+ model = getattr(context, "model", None)
+ for row in context.table:
+ logger.info(
+ "policy name: '" + row["policyname"] + "' which will be updated to policy name:" + row[
+ "updatedpolicyname"] + "' and policy description: '" + row[
+ "updatedpolicydescription"] + "' model name: '" + row["updatedmodelname"] + "' and genre: '"+row["updatedgenre"]+"'")
+ policymodel = ""
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['updatedmodelname']) > 20):
+ policymodel = row['updatedmodelname']
+ else:
+ policymodel = commonfunctions.get_modelid(row['updatedmodelname'])
+
+ data = {
+ 'name': row["updatedpolicyname"],
+ 'description': row["updatedpolicydescription"],
+ 'model_id': policymodel,
+ 'genre': row['updatedgenre']
+ }
+ response = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.policyAPI]).keys():
+ if (response.json()[apis_urls.policyAPI][ids]['name'] == row["policyname"]):
+ print(apis_urls.serverURL + apis_urls.policyAPI + '/' + ids)
+ response = requests.patch(apis_urls.serverURL + apis_urls.policyAPI + '/' + ids, headers=headers,
+ data=json.dumps(data))
+ logger.info(response.json())
+ logger.info(response.status_code)
+ break
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the policy by get request
+# 2) Loop by ids and search for the matching policy by name and delete it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following policy')
+def step_impl(context):
+ logger.info("When the user sets to delete the following policy")
+
+ model = getattr(context, "model", None)
+ for row in context.table:
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ logger.info("policy name:'" +row["policyname"]+"'")
+ response = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers)
+ for ids in dict(response.json()[apis_urls.policyAPI]).keys():
+ if (response.json()[apis_urls.policyAPI][ids]['name'] == row["policyname"]):
+ GeneralVariables.assignpolicyid['value']=ids
+ response = requests.delete(apis_urls.serverURL + apis_urls.policyAPI + "/" + ids,
+ headers=headers)
+ break
+
+ if response.status_code==200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing policies by get request and put them into a table
+# 2) Sort the table by policy name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following policy should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following policy should be existed in the system")
+ response = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers)
+ #print(response)
+ apiresult = Table(
+ names=('policyname', 'policydescription', 'modelname','genre'),
+ dtype=('S100', 'S100', 'S100','S100'))
+ if len(response.json()[apis_urls.policyAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.policyAPI]).keys():
+ apipolicyname = response.json()[apis_urls.policyAPI][ids]['name']
+ apipolicydescription = response.json()[apis_urls.policyAPI][ids]['description']
+ apipolicymodel = commonfunctions.get_modelname(response.json()[apis_urls.policyAPI][ids]['model_id'])
+ apipolicygenre=response.json()[apis_urls.policyAPI][ids]['genre']
+
+ apiresult.add_row(vals=(
+ apipolicyname, apipolicydescription, apipolicymodel,apipolicygenre))
+
+ else:
+ apiresult.add_row(vals=("", "", "",""))
+
+ apiresult.sort('policyname')
+
+ for row1, row2 in zip(context.table, apiresult):
+ logger.info("asserting the expected policy name: '" + str(
+ row1["policyname"]) + "' is the same as the actual existing '" + str(
+ row2["policyname"]) + "'")
+ assert str(row1["policyname"]) == str(row2["policyname"]), "policy name is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected policy description: '" + str(
+ row1["policydescription"]) + "' is the same as the actual existing '" + str(
+ row2["policydescription"]) + "'")
+ assert str(row1["policydescription"]) == str(row2["policydescription"]), "policy description is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected genre: '" + str(
+ row1["genre"]) + "' is the same as the actual existing '" + str(
+ row2["genre"]) + "'")
+ assert str(row1["genre"]) == str(row2["genre"]), "genre is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected model name: '" + str(
+ row1["modelname"]) + "' is the same as the actual existing '" + str(
+ row2["modelname"]) + "'")
+ assert str(row1["modelname"]) == str(row2["modelname"]), "model name is not correct!"
+ logger.info("assertion passed!") \ No newline at end of file
diff --git a/moon_manager/tests/func_tests/features/steps/rules.py b/moon_manager/tests/func_tests/features/steps/rules.py
new file mode 100644
index 00000000..4dd85e2c
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/rules.py
@@ -0,0 +1,495 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from astropy.table import Table, Column
+from common_functions import *
+import numpy as np
+import requests
+import json
+import logging
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation:
+# 1) Get all the existing rules by the policy id
+# 2) Loop by assignment id and delete it
+@Given('the system has no rules')
+def step_impl(context):
+ logger.info("Given the system has no rules")
+
+ response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers)
+ #logger.info(response_policies.json())
+ if len(response_policies.json()[apis_urls.policyAPI]) != 0:
+ apiruleid = []
+ for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys():
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.rulesAPI + "/", headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.rulesAPI]['rules']) != 0:
+ for ids in range(len(response.json()[apis_urls.rulesAPI]['rules'])):
+ apiruleid.append(dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['id'])
+ for ruleid in apiruleid:
+ response = requests.delete(
+ apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.rulesAPI + "/" + ruleid, headers=apis_urls.auth_headers)
+
+# Step Definition Implementation:
+# 1) Add rule using the post request
+@Given('the following rule exists')
+def step_impl(context):
+ logger.info("Given the following rule exists")
+ api_responseflag = {'value': False}
+ model = getattr(context, "model", None)
+ for row in context.table:
+ subjectcategoryidslist = []
+ subjectdataidslist = []
+ objectcategoryidslist = []
+ objectdataidslist = []
+ actioncategoryidslist = []
+ actiondataidslist = []
+ ruleidslist = []
+ metaruleids = ""
+ subjectindex = 0
+ objectindex = 0
+ actionindex = 0
+ logger.info(
+ "rule '" + row["rule"] + "' and metarule name:'" + row[
+ "metarulename"] + "' and instructions: '" + row[
+ "instructions"] + "' and policyname:'" + row[
+ "policyname"] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['policyname']) > 25):
+ policies_id = row['policyname']
+ else:
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+
+ ruleparameter = row["rule"].split(",")
+ metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers)
+ for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys():
+ if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == row["metarulename"]):
+ meta_rule_id = metaruleids
+ subjectcategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['subject_categories']
+ objectcategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['object_categories']
+ actioncategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['action_categories']
+ break
+
+ index = 0
+ for categoryid in subjectcategorieslist:
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.datasubjectAPI]) != 0:
+ for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']:
+ if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+
+ for categoryid in objectcategorieslist:
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.dataobjectAPI]) != 0:
+ for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']:
+ if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+ for categoryid in actioncategorieslist:
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.dataactionAPI]) != 0:
+ for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']:
+ if (data_response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+
+ data = {
+ 'meta_rule_id': meta_rule_id,
+ 'rule': ruleidslist,
+ 'instructions': [{"decision": row['instructions']}],
+ 'enabled': 'True'
+ }
+ rulesresponse = requests.post(apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.rulesAPI,
+ headers=headers,
+ data=json.dumps(data))
+
+
+# Step Definition Implementation:
+# 1) Add subject meta data using the post request
+# 2) If the request code was 200 set the api response flag to true else false
+@When('the user sets to add the following rules')
+def step_impl(context):
+ logger.info("When the user sets to add the following rules")
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ api_responseflag = {'value': False}
+ model = getattr(context, "model", None)
+ for row in context.table:
+ subjectcategoryidslist = []
+ subjectdataidslist = []
+ objectcategoryidslist = []
+ objectdataidslist = []
+ actioncategoryidslist = []
+ actiondataidslist = []
+ ruleidslist = []
+ metaruleids = ""
+ subjectindex = 0
+ objectindex = 0
+ actionindex = 0
+ logger.info(
+ "rule '" + row["rule"] + "' and metarule name:'" + row[
+ "metarulename"] + "' and instructions: '" + row[
+ "instructions"] + "' and policyname:'" + row[
+ "policyname"] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if(row['policyname']=="" or row['policyname']=="000000000000000000000000000000000000000000000000000"):
+ policyname="Stanford Policy"
+ else:
+ policyname=row['policyname']
+ policies_id = commonfunctions.get_policyid(policyname)
+
+ if(row["metarulename"]=="" or row["metarulename"]=="000000000000000000000000000000000000000000000000000"):
+ mata_rule_name="metarule1"
+ else:
+ mata_rule_name = row['metarulename']
+
+
+ if (row["rule"] != ""):
+ ruleparameter = row["rule"].split(",")
+ metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers)
+ for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys():
+ if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == mata_rule_name):
+ meta_rule_id = metaruleids
+ subjectcategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['subject_categories']
+ objectcategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['object_categories']
+ actioncategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['action_categories']
+ break
+
+ index = 0
+ for categoryid in subjectcategorieslist:
+ if (index < len(ruleparameter)):
+ if (len(ruleparameter[index]) < 30):
+ if (ruleparameter[index] != ""):
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.datasubjectAPI]) != 0:
+ for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']:
+ if (index < len(ruleparameter)):
+ if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)][
+ 'name'] ==
+ ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+ else:
+ break
+ else:
+ ruleidslist.append("")
+ index = index + 1
+ else:
+ ruleidslist.append(ruleparameter[index])
+ index = index + 1
+ for categoryid in objectcategorieslist:
+ if (index < len(ruleparameter)):
+ if (len(ruleparameter[index]) < 30):
+ if (ruleparameter[index] != ""):
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.dataobjectAPI]) != 0:
+ for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']:
+ if (index < len(ruleparameter)):
+ if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)][
+ 'name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+ else:
+ break
+ else:
+ ruleidslist.append("")
+ index = index + 1
+ else:
+ ruleidslist.append(ruleparameter[index])
+ index = index + 1
+
+ for categoryid in actioncategorieslist:
+ if (index < len(ruleparameter)):
+ if (len(ruleparameter[index]) < 30):
+ if (ruleparameter[index] != ""):
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.dataactionAPI]) != 0:
+ for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']:
+ if (index < len(ruleparameter)):
+ if (data_response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)][
+ 'name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+ else:
+ break
+ else:
+ ruleidslist.append("")
+ index = index + 1
+ else:
+ ruleidslist.append(ruleparameter[index])
+ index = index + 1
+ if(row["metarulename"]=="" or row["metarulename"] == "000000000000000000000000000000000000000000000000000"):
+ meta_rule_id=row["metarulename"]
+ if (row["policyname"] == "" or row["policyname"] == "000000000000000000000000000000000000000000000000000"):
+ policies_id = row["policyname"]
+ data = {
+ 'meta_rule_id': meta_rule_id,
+ 'rule': ruleidslist,
+ 'instructions': [{"decision": row['instructions']}],
+ 'enabled': 'True'
+ }
+ else:
+
+ data = {
+ 'meta_rule_id': commonfunctions.get_metaruleid(mata_rule_name),
+ 'rule': [],
+ 'instructions': [{"decision": row['instructions']}],
+ 'enabled': 'True'
+ }
+ rulesresponse = requests.post(apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.rulesAPI,
+ headers=headers,
+ data=json.dumps(data))
+ logger.info(rulesresponse.json())
+ if rulesresponse.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the meta rule by get request
+# 2) Loop by ids and search for the matching meta rule by name and delete it
+# 3) If the request code was 200 set the api response flag to true else false
+@When('the user sets to delete the following rules')
+def step_impl(context):
+ logger.info("When the user sets to delete the following rules")
+ for row in context.table:
+ subjectcategoryidslist = []
+ subjectdataidslist = []
+ objectcategoryidslist = []
+ objectdataidslist = []
+ actioncategoryidslist = []
+ actiondataidslist = []
+ ruleidslist = []
+ metaruleids = ""
+ subjectindex = 0
+ objectindex = 0
+ actionindex = 0
+ logger.info(
+ "rule '" + row["rule"] + "' and metarule name:'" + row[
+ "metarulename"] + "' and policyname:'" + row[
+ "policyname"] + "'")
+
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+
+ if (len(row['policyname']) > 25):
+ policies_id = row['policyname']
+ else:
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+
+ ruleparameter = row["rule"].split(",")
+ metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers)
+ for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys():
+ if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == row["metarulename"]):
+ meta_rule_id = metaruleids
+ subjectcategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['subject_categories']
+ objectcategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['object_categories']
+ actioncategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['action_categories']
+ break
+
+ index = 0
+ for categoryid in subjectcategorieslist:
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.datasubjectAPI]) != 0:
+ for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']:
+ if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+
+ for categoryid in objectcategorieslist:
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.dataobjectAPI]) != 0:
+ for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']:
+ if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+ for categoryid in actioncategorieslist:
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.dataactionAPI]) != 0:
+ for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']:
+ if (data_response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+
+ rulesresponse = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.rulesAPI + "/", headers=apis_urls.auth_headers)
+
+ if len(rulesresponse.json()[apis_urls.rulesAPI]) != 0:
+ for ids in range(len(rulesresponse.json()[apis_urls.rulesAPI]['rules'])):
+ if (dict(rulesresponse.json()[apis_urls.rulesAPI]['rules'][ids])[
+ 'rule'] == ruleidslist):
+ ruleid = dict(rulesresponse.json()[apis_urls.rulesAPI]['rules'][ids])['id']
+ rulesresponse = requests.delete(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.rulesAPI + "/" + ruleid,headers=apis_urls.auth_headers)
+
+ if rulesresponse.status_code == 200:
+ GeneralVariables.api_responseflag['value'] = 'True'
+ else:
+ GeneralVariables.api_responseflag['value'] = 'False'
+
+# Step Definition Implementation:
+# 1) Get all the existing rules per a given policy, metarule using get request and put them into a table
+# 2) Sort the table by policy name
+# 3) Loop using both the expected and actual tables and assert the data row by row
+@Then('the following rules should be existed in the system')
+def step_impl(context):
+ logger.info("Then the following rule should be existed in the system")
+ model = getattr(context, "model", None)
+ apiresult = Table(names=('rule', 'metarule', 'instructions', 'policyname'),
+ dtype=('S1000', 'S100', 'S100', 'S100'))
+
+ expectedresult = Table(names=('rule', 'metarule', 'instructions', 'policyname'),
+ dtype=('S1000', 'S100', 'S100', 'S100'))
+
+ for row in context.table:
+ ruleidslist = []
+ apirule = []
+ if (len(row['policyname']) > 25):
+ policies_id = row['policyname']
+ else:
+ policies_id = commonfunctions.get_policyid(row['policyname'])
+
+ ruleparameter = row["rule"].split(",")
+ metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers)
+ for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys():
+ if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == row["metarulename"]):
+ meta_rule_id = metaruleids
+ subjectcategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['subject_categories']
+ objectcategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['object_categories']
+ actioncategorieslist = \
+ requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][
+ metaruleids]['action_categories']
+
+ index = 0
+ for categoryid in subjectcategorieslist:
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.datasubjectAPI]) != 0:
+ for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']:
+ if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][ids]['name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+
+ for categoryid in objectcategorieslist:
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.dataobjectAPI]) != 0:
+ for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']:
+ if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][ids]['name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+
+ for categoryid in actioncategorieslist:
+ data_response = requests.get(
+ apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers)
+ if len(data_response.json()[apis_urls.dataactionAPI]) != 0:
+ for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']:
+ if (data_response.json()[apis_urls.dataactionAPI][0]['data'][ids]['name'] == ruleparameter[
+ index]):
+ ruleidslist.append(ids)
+ index = index + 1
+ expectedresult.add_row(vals=(','.join(ruleidslist), meta_rule_id, row['instructions'], policies_id))
+
+ if (row['policyname'] != ""):
+ apipolicyid = commonfunctions.get_policyid(
+ row['policyname'])
+ response = requests.get(
+ apis_urls.serverURL + "policies/" + commonfunctions.get_policyid(
+ row['policyname']) + "/" + apis_urls.rulesAPI + "/", headers=apis_urls.auth_headers)
+
+ if len(response.json()[apis_urls.rulesAPI]) != 0:
+ for ids in range(len(response.json()[apis_urls.rulesAPI]['rules'])):
+ if (dict(response.json()[apis_urls.rulesAPI]['rules'][ids])[
+ 'meta_rule_id'] == commonfunctions.get_metaruleid(row['metarulename'])):
+ apirule = dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['rule']
+ #logger.info(dict(dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['instructions'][0])['decision'])
+ apiinstructions = dict(dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['instructions'][0])['decision']
+ apimetaruleid = dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['meta_rule_id']
+ apiresult.add_row(vals=(','.join(apirule), apimetaruleid, apiinstructions, apipolicyid))
+
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+
+ else:
+ apiresult.add_row(vals=("", "", "", ""))
+
+ apiresult.sort('policyname')
+ expectedresult.sort('policyname')
+ for row1, row2 in zip(expectedresult, apiresult):
+ logger.info("asserting the expected rule: '" + str(
+ row1["rule"]) + "' is the same as the actual existing '" + str(
+ row2["rule"]) + "'")
+ assert str(row1["rule"]) == str(row2["rule"]), "rule is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected instructions: '" + str(
+ row1["instructions"]) + "' is the same as the actual existing '" + str(
+ row2["instructions"]) + "'")
+ assert str(row1["instructions"]) == str(row2["instructions"]), "instructions is not correct!"
+ logger.info("assertion passed!")
+
+ logger.info("asserting the expected metarule: '" + str(
+ row1["metarule"]) + "' is the same as the actual existing '" + str(
+ row2["metarule"]) + "'")
+ assert str(row1["metarule"]) == str(row2["metarule"]), "metarule is not correct!"
+ logger.info("assertion passed!")