From 7bb53c64da2dcf88894bfd31503accdd81498f3d Mon Sep 17 00:00:00 2001 From: Thomas Duval Date: Wed, 3 Jun 2020 10:06:52 +0200 Subject: Update to new version 5.4 Signed-off-by: Thomas Duval Change-Id: Idcd868133d75928a1ffd74d749ce98503e0555ea --- .../func_tests/features/steps/Static_Variables.py | 89 +++ .../tests/func_tests/features/steps/__init__.py | 11 + .../tests/func_tests/features/steps/assignments.py | 858 +++++++++++++++++++++ .../func_tests/features/steps/authorization.py | 217 ++++++ .../func_tests/features/steps/common_functions.py | 279 +++++++ .../tests/func_tests/features/steps/data.py | 629 +++++++++++++++ .../tests/func_tests/features/steps/meta_data.py | 394 ++++++++++ .../tests/func_tests/features/steps/meta_rules.py | 335 ++++++++ .../tests/func_tests/features/steps/model.py | 230 ++++++ .../tests/func_tests/features/steps/pdp.py | 248 ++++++ .../tests/func_tests/features/steps/perimeter.py | 727 +++++++++++++++++ .../tests/func_tests/features/steps/policy.py | 219 ++++++ .../tests/func_tests/features/steps/rules.py | 495 ++++++++++++ 13 files changed, 4731 insertions(+) create mode 100644 moon_manager/tests/func_tests/features/steps/Static_Variables.py create mode 100644 moon_manager/tests/func_tests/features/steps/__init__.py create mode 100644 moon_manager/tests/func_tests/features/steps/assignments.py create mode 100644 moon_manager/tests/func_tests/features/steps/authorization.py create mode 100644 moon_manager/tests/func_tests/features/steps/common_functions.py create mode 100644 moon_manager/tests/func_tests/features/steps/data.py create mode 100644 moon_manager/tests/func_tests/features/steps/meta_data.py create mode 100644 moon_manager/tests/func_tests/features/steps/meta_rules.py create mode 100644 moon_manager/tests/func_tests/features/steps/model.py create mode 100644 moon_manager/tests/func_tests/features/steps/pdp.py create mode 100644 moon_manager/tests/func_tests/features/steps/perimeter.py create mode 100644 moon_manager/tests/func_tests/features/steps/policy.py create mode 100644 moon_manager/tests/func_tests/features/steps/rules.py (limited to 'moon_manager/tests/func_tests/features/steps') diff --git a/moon_manager/tests/func_tests/features/steps/Static_Variables.py b/moon_manager/tests/func_tests/features/steps/Static_Variables.py new file mode 100644 index 00000000..471f92fa --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/Static_Variables.py @@ -0,0 +1,89 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +class GeneralVariables: + serverURL="http://127.0.0.1:8000/" + + serverIP="10.237.71.141" + + serverport = "22" + + serverusername="ubuntu" + + serverpassword="ubuntu-007" + + token = "{{TOKEN}}" + + auth_headers = {"X-Api-Key": token} + + actual_authresponse = {'value': False} + + api_responseflag = {'value': False} + + pipelinePort = {'value': ""} + + wrapperPort = {'value': ""} + + projectAPI = "" + + slaveAPI="slave" + + getslavesAPI = "slaves" + + pdpAPI = "pdp" + + modelAPI = "models" + + policyAPI = "policies" + + assignpolicyid={'value': ""} + + assignsubjectperimeterid = {'value': ""} + + assignsubjectcategoryid = {'value': ""} + + assignobjectperimeterid = {'value': ""} + + assignobjectcategoryid = {'value': ""} + + assignactionperimeterid = {'value': ""} + + assignactioncategoryid = {'value': ""} + + metarulesAPI = "meta_rules" + + metadatasubjectcategoryAPI = "subject_categories" + + metadataobjectcategoryAPI = "object_categories" + + metadataactioncategoryAPI = "action_categories" + + perimetersubjectAPI = "subjects" + + perimeterobjectAPI = "objects" + + perimeteractionAPI = "actions" + + datasubjectAPI = "subject_data" + + dataobjectAPI = "object_data" + + dataactionAPI = "action_data" + + assignementssubjectAPI = "subject_assignments" + + assignementsobjectAPI = "object_assignments" + + assignementsactionAPI = "action_assignments" + + rulesAPI = "rules" + diff --git a/moon_manager/tests/func_tests/features/steps/__init__.py b/moon_manager/tests/func_tests/features/steps/__init__.py new file mode 100644 index 00000000..582be686 --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/__init__.py @@ -0,0 +1,11 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + diff --git a/moon_manager/tests/func_tests/features/steps/assignments.py b/moon_manager/tests/func_tests/features/steps/assignments.py new file mode 100644 index 00000000..e3f7b5a7 --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/assignments.py @@ -0,0 +1,858 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table, Column +from common_functions import * +import requests +import json +import logging + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: +# 1) Get all the existing subject meta data in the system by getting the policies then their models then the model attached meta rules and then the categories +# 2) Get subject assignment id using both the policy id, data id & the category id +# 3) Loop by assignment id and delete it +@Given('the system has no subject assignments') +def step_impl(context): + logger.info("Given the system has no subject assignments") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers) + if len(response_policies.json()[apis_urls.policyAPI]) != 0: + for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys(): + # subjectcategoryidslist = [] + # subjectdataidslist = [] + # modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id'] + # if (modelid != None and modelid != ""): + # metaruleslist = \ + # requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers).json()[apis_urls.modelAPI][modelid]['meta_rules'] + # for metarule_ids in metaruleslist: + # categorieslist = \ + # requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, + # headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + # metarule_ids]['subject_categories'] + # for categoryid in categorieslist: + # if (categoryid not in subjectcategoryidslist): + # subjectcategoryidslist.append(categoryid) + # + # response_perimeters = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI, + # headers=apis_urls.auth_headers).json()[ + # apis_urls.perimetersubjectAPI] + # for perimeterid in dict(response_perimeters).keys(): + # for categoryid in subjectcategoryidslist: + # response_assignment = requests.get( + # apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementssubjectAPI + "/" + + # perimeterid + "/" + categoryid, headers=apis_urls.auth_headers) + # if len(response_assignment.json()[apis_urls.assignementssubjectAPI]) != 0: + # for ids in dict(response_assignment.json()[apis_urls.assignementssubjectAPI]).keys(): + # assignmentsid = response_assignment.json()[apis_urls.assignementssubjectAPI][str(ids)][ + # 'assignments'] + # for dataid in assignmentsid: + response = requests.delete( + apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementssubjectAPI , headers=headers) + +# Step Definition Implementation: +# 1) Get all the existing object meta data in the system by getting the policies then their models then the model attached meta rules and then the categories +# 2) Get object assignment id using both the policy id, data id & the category id +# 3) Loop by assignment id and delete it +@Given('the system has no object assignments') +def step_impl(context): + logger.info("Given the system has no object assignments") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers) + if len(response_policies.json()[apis_urls.policyAPI]) != 0: + for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys(): + # objectcategoryidslist = [] + # objectdataidslist = [] + # modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id'] + # if (modelid != None and modelid != ""): + # metaruleslist = \ + # requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers).json()[ + # apis_urls.modelAPI][modelid][ + # 'meta_rules'] + # for metarule_ids in metaruleslist: + # categorieslist = \ + # requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, + # headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + # metarule_ids]['object_categories'] + # for categoryid in categorieslist: + # if (categoryid not in objectcategoryidslist): + # objectcategoryidslist.append(categoryid) + # + # response_perimeters = \ + # requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI, + # headers=apis_urls.auth_headers).json()[ + # apis_urls.perimeterobjectAPI] + # for perimeterid in dict(response_perimeters).keys(): + # for categoryid in objectcategoryidslist: + # response_assignment = requests.get( + # apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementsobjectAPI + "/" + + # perimeterid + "/" + categoryid, headers=apis_urls.auth_headers) + # if len(response_assignment.json()[apis_urls.assignementsobjectAPI]) != 0: + # for ids in dict(response_assignment.json()[apis_urls.assignementsobjectAPI]).keys(): + # assignmentsid = response_assignment.json()[apis_urls.assignementsobjectAPI][str(ids)][ + # 'assignments'] + # for dataid in assignmentsid: + response = requests.delete( + apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementsobjectAPI , headers=headers) + +# Step Definition Implementation: +# 1) Get all the existing action meta data in the system by getting the policies then their models then the model attached meta rules and then the categories +# 2) Get action assignment id using both the policy id, data id & the category id +# 3) Loop by assignment id and delete it +@Given('the system has no action assignments') +def step_impl(context): + logger.info("Given the system has no action assignments") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers) + if len(response_policies.json()[apis_urls.policyAPI]) != 0: + for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys(): + # actioncategoryidslist = [] + # actiondataidslist = [] + # modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id'] + # if (modelid != None and modelid != ""): + # metaruleslist = \ + # requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers).json()[ + # apis_urls.modelAPI][modelid][ + # 'meta_rules'] + # for metarule_ids in metaruleslist: + # categorieslist = \ + # requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, + # headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + # metarule_ids]['action_categories'] + # for categoryid in categorieslist: + # if (categoryid not in actioncategoryidslist): + # actioncategoryidslist.append(categoryid) + # + # response_perimeters = \ + # requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI, + # headers=apis_urls.auth_headers).json()[ + # apis_urls.perimeteractionAPI] + # for perimeterid in dict(response_perimeters).keys(): + # for categoryid in actioncategoryidslist: + # response_assignment = requests.get( + # apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementsactionAPI + "/" + + # perimeterid + "/" + categoryid, headers=apis_urls.auth_headers) + # if len(response_assignment.json()[apis_urls.assignementsactionAPI]) != 0: + # for ids in dict(response_assignment.json()[apis_urls.assignementsactionAPI]).keys(): + # assignmentsid = response_assignment.json()[apis_urls.assignementsactionAPI][str(ids)][ + # 'assignments'] + # for dataid in assignmentsid: + response = requests.delete( + apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.assignementsactionAPI , headers=headers) + +# Step Definition Implementation: +# 1) Post subject assignment using the policy id, subject perimeter id, subject category, list of subject data ids +@Given('the following subject assignment exists') +def step_impl(context): + logger.info("Given the following subject assignment exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "subject perimeter name: '" + row["subjectperimetername"] + "' subject data: '" + row[ + "subjectdata"] + "' and subject category: '" + row[ + "subjectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + perimeter_id = "" + categoriesname = "" + dataname = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + policies_id = commonfunctions.get_policyid(row['policyname']) + perimeter_id = commonfunctions.get_subjectperimeterid(row['subjectperimetername']) + categories_id = commonfunctions.get_subjectcategoryid(row['subjectcategory']) + dataids = commonfunctions.get_subjectdataid(row['subjectdata'], categories_id, policies_id) + data = { + 'id': perimeter_id, + 'category_id': categories_id, + 'data_id': dataids, + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementssubjectAPI, + headers=headers, data=json.dumps(data)) + + GeneralVariables.assignpolicyid['value'] = policies_id + GeneralVariables.assignsubjectperimeterid['value'] = perimeter_id + GeneralVariables.assignsubjectcategoryid['value'] = categories_id + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Post object assignment using the policy id, object perimeter id, object category, list of object data ids +@Given('the following object assignment exists') +def step_impl(context): + logger.info("Given the following object assignment exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "object perimeter name: '" + row["objectperimetername"] + "' object data: '" + row[ + "objectdata"] + "' and object category: '" + row[ + "objectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + perimeter_id = "" + categoriesname = "" + dataname = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + policies_id = commonfunctions.get_policyid(row['policyname']) + perimeter_id = commonfunctions.get_objectperimeterid(row['objectperimetername']) + categories_id = commonfunctions.get_objectcategoryid(row['objectcategory']) + dataids = commonfunctions.get_objectdataid(row['objectdata'], categories_id, policies_id) + + data = { + 'id': perimeter_id, + 'category_id': categories_id, + 'policy_id': policies_id, + 'data_id': dataids, + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementsobjectAPI, + headers=headers, data=json.dumps(data)) + + GeneralVariables.assignpolicyid['value'] = policies_id + GeneralVariables.assignobjectperimeterid['value'] = perimeter_id + GeneralVariables.assignobjectcategoryid['value'] = categories_id + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Post action assignment using the policy id, action perimeter id, action category, list of action data ids +@Given('the following action assignment exists') +def step_impl(context): + logger.info("Given the following action assignment exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "action perimeter name: '" + row["actionperimetername"] + "' action data: '" + row[ + "actiondata"] + "' and action category: '" + row[ + "actioncategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + perimeter_id = "" + categoriesname = "" + dataname = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + policies_id = commonfunctions.get_policyid(row['policyname']) + perimeter_id = commonfunctions.get_actionperimeterid(row['actionperimetername']) + categories_id = commonfunctions.get_actioncategoryid(row['actioncategory']) + dataids = commonfunctions.get_actiondataid(row['actiondata'], categories_id, policies_id) + + data = { + 'id': perimeter_id, + 'category_id': categories_id, + 'policy_id': policies_id, + 'data_id': dataids, + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementsactionAPI, + headers=headers, data=json.dumps(data)) + + GeneralVariables.assignpolicyid['value'] = policies_id + GeneralVariables.assignactionperimeterid['value'] = perimeter_id + GeneralVariables.assignactioncategoryid['value'] = categories_id + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Post action assignment using the policy id, action perimeter id, action category, list of action data ids +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following subject assignment') +def step_impl(context): + logger.info("When the user sets to add the following subject assignment") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "subject perimeter name: '" + row["subjectperimetername"] + "' subject data: '" + row[ + "subjectdata"] + "' and subject category: '" + row[ + "subjectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + perimeter_id = "" + categoriesname = "" + dataids = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policyname'] == "" or row['policyname'] == "000000000000000000000000000000000000000000000000000"): + policyname = "Stanford Policy" + else: + policyname = row['policyname'] + policies_id = commonfunctions.get_policyid(policyname) + + if (row["subjectperimetername"] == "" or row[ + "subjectperimetername"] == "000000000000000000000000000000000000000000000000000"): + perimetername = "WilliamsJoeseph" + else: + perimetername = row["subjectperimetername"] + perimeter_id = commonfunctions.get_subjectperimeterid(perimetername) + + if (row["subjectcategory"] == "" or row[ + "subjectcategory"] == "000000000000000000000000000000000000000000000000000"): + categoriesname = "Affiliation:" + else: + categoriesname = row['subjectcategory'] + categories_id = commonfunctions.get_subjectcategoryid(categoriesname) + + if (row["subjectdata"] == "" or row["subjectdata"] == "000000000000000000000000000000000000000000000000000"): + dataids = "Professor" + else: + dataids = row['subjectdata'] + dataids = commonfunctions.get_subjectdataid(dataids, categories_id, policies_id) + + if (dataids == None): + dataids = "" + + if (row["policyname"] == "" or row["policyname"] == "000000000000000000000000000000000000000000000000000"): + policies_id = row["policyname"] + if (row["subjectperimetername"] == "" or row[ + "subjectperimetername"] == "000000000000000000000000000000000000000000000000000"): + perimeter_id = row["subjectperimetername"] + if (row["subjectcategory"] == "" or row[ + "subjectcategory"] == "000000000000000000000000000000000000000000000000000"): + categories_id = row["subjectcategory"] + if (row["subjectdata"] == "" or row["subjectdata"] == "000000000000000000000000000000000000000000000000000"): + dataids = row['subjectdata'] + data = { + 'id': perimeter_id, + 'category_id': categories_id, + 'data_id': dataids, + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementssubjectAPI, + headers=headers, data=json.dumps(data)) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Post action assignment using the policy id, action perimeter id, action category, list of action data ids +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following object assignment') +def step_impl(context): + logger.info("When the user sets to add the following object assignment") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "object perimeter name: '" + row["objectperimetername"] + "' object data: '" + row[ + "objectdata"] + "' and object category: '" + row[ + "objectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + perimeter_id = "" + categoriesname = "" + dataids = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policyname'] == "" or row['policyname'] == "000000000000000000000000000000000000000000000000000"): + policyname = "Stanford Policy" + else: + policyname = row['policyname'] + policies_id = commonfunctions.get_policyid(policyname) + + if (row["objectperimetername"] == "" or row[ + "objectperimetername"] == "000000000000000000000000000000000000000000000000000"): + perimetername = "StudentsGradesSheet" + else: + perimetername = row["objectperimetername"] + perimeter_id = commonfunctions.get_objectperimeterid(perimetername) + + if (row["objectcategory"] == "" or row[ + "objectcategory"] == "000000000000000000000000000000000000000000000000000"): + categoriesname = "Clearance:" + else: + categoriesname = row['objectcategory'] + categories_id = commonfunctions.get_objectcategoryid(categoriesname) + + if (row["objectdata"] == "" or row["objectdata"] == "000000000000000000000000000000000000000000000000000"): + dataids = "Confidential" + else: + dataids = row['objectdata'] + dataids = commonfunctions.get_objectdataid(dataids, categories_id, policies_id) + + if (dataids == None): + dataids = "" + + if (row["policyname"] == "" or row["policyname"] == "000000000000000000000000000000000000000000000000000"): + policies_id = row["policyname"] + if (row["objectperimetername"] == "" or row[ + "objectperimetername"] == "000000000000000000000000000000000000000000000000000"): + perimeter_id = row["objectperimetername"] + if (row["objectcategory"] == "" or row[ + "objectcategory"] == "000000000000000000000000000000000000000000000000000"): + categories_id = row["objectcategory"] + if (row["objectdata"] == "" or row["objectdata"] == "000000000000000000000000000000000000000000000000000"): + dataids = row['objectdata'] + data = { + 'id': perimeter_id, + 'category_id': categories_id, + 'data_id': dataids, + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementsobjectAPI, + headers=headers, data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Post action assignment using the policy id, action perimeter id, action category, list of action data ids +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following action assignment') +def step_impl(context): + logger.info("When the user sets to add the following action assignment") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "action perimeter name: '" + row["actionperimetername"] + "' action data: '" + row[ + "actiondata"] + "' and action category: '" + row[ + "actioncategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + perimeter_id = "" + categoriesname = "" + dataids = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policyname'] == "" or row['policyname'] == "000000000000000000000000000000000000000000000000000"): + policyname = "Stanford Policy" + else: + policyname = row['policyname'] + policies_id = commonfunctions.get_policyid(policyname) + + if (row["actionperimetername"] == "" or row[ + "actionperimetername"] == "000000000000000000000000000000000000000000000000000"): + perimetername = "Read" + else: + perimetername = row["actionperimetername"] + perimeter_id = commonfunctions.get_actionperimeterid(perimetername) + + if (row["actioncategory"] == "" or row[ + "actioncategory"] == "000000000000000000000000000000000000000000000000000"): + categoriesname = "Action-Class:" + else: + categoriesname = row['actioncategory'] + categories_id = commonfunctions.get_actioncategoryid(categoriesname) + + if (row["actiondata"] == "" or row["actiondata"] == "000000000000000000000000000000000000000000000000000"): + dataids = "Severe" + else: + dataids = row['actiondata'] + dataids = commonfunctions.get_actiondataid(dataids, categories_id, policies_id) + + if (dataids == None): + dataids = "" + + if (row["policyname"] == "" or row["policyname"] == "000000000000000000000000000000000000000000000000000"): + policies_id = row["policyname"] + if (row["actionperimetername"] == "" or row[ + "actionperimetername"] == "000000000000000000000000000000000000000000000000000"): + perimeter_id = row["actionperimetername"] + if (row["actioncategory"] == "" or row[ + "actioncategory"] == "000000000000000000000000000000000000000000000000000"): + categories_id = row["actioncategory"] + if (row["actiondata"] == "" or row["actiondata"] == "000000000000000000000000000000000000000000000000000"): + dataids = row['actiondata'] + data = { + 'id': perimeter_id, + 'category_id': categories_id, + 'policy_id': policies_id, + 'data_id': dataids, + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.assignementsactionAPI, + headers=headers, data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Delete subject assignment by policy id,subject perimeter id, subject data id, subject category id +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following subject assignment') +def step_impl(context): + logging.info("When the user sets to delete the following subject assignment") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "subject perimeter name: '" + row["subjectperimetername"] + "' subject data list: '" + row[ + "subjectdata"] + "' and subject category: '" + row[ + "subjectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + perimeter_id = "" + dataid = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + policies_id = commonfunctions.get_policyid(row['policyname']) + perimeter_id = commonfunctions.get_subjectperimeterid(row['subjectperimetername']) + categories_id = commonfunctions.get_subjectcategoryid(row['subjectcategory']) + dataid = commonfunctions.get_subjectdataid(row["subjectdata"], categories_id, policies_id) + + response_assignment = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementssubjectAPI + "/" + + perimeter_id + "/" + categories_id, headers=apis_urls.auth_headers) + logging.info(response_assignment.json()[apis_urls.assignementssubjectAPI]) + if len(response_assignment.json()[apis_urls.assignementssubjectAPI]) != 0: + for ids in dict(response_assignment.json()[apis_urls.assignementssubjectAPI]).keys(): + assignmentsidlist = response_assignment.json()[apis_urls.assignementssubjectAPI][str(ids)][ + 'assignments'] + if dataid in assignmentsidlist: + response = requests.delete( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementssubjectAPI + "/" + + perimeter_id + "/" + categories_id + "/" + dataid, headers=apis_urls.auth_headers) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + + +# Step Definition Implementation: +# 1) Delete object assignment by policy id, object perimeter id, object data id, object category id +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following object assignment') +def step_impl(context): + logging.info("When the user sets to delete the following object assignment") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "object perimeter name: '" + row["objectperimetername"] + "' object data list: '" + row[ + "objectdata"] + "' and object category: '" + row[ + "objectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + perimeter_id = "" + datalistids = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + policies_id = commonfunctions.get_policyid(row['policyname']) + perimeter_id = commonfunctions.get_objectperimeterid(row['objectperimetername']) + categories_id = commonfunctions.get_objectcategoryid(row['objectcategory']) + dataid = commonfunctions.get_objectdataid(row["objectdata"], categories_id, policies_id) + + response_assignment = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementsobjectAPI + "/" + + perimeter_id + "/" + categories_id, headers=apis_urls.auth_headers) + if len(response_assignment.json()[apis_urls.assignementsobjectAPI]) != 0: + for ids in dict(response_assignment.json()[apis_urls.assignementsobjectAPI]).keys(): + assignmentsidlist = response_assignment.json()[apis_urls.assignementsobjectAPI][str(ids)][ + 'assignments'] + if dataid in assignmentsidlist: + response = requests.delete( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementsobjectAPI + "/" + + perimeter_id + "/" + categories_id + "/" + dataid, headers=headers) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + + +# Step Definition Implementation: +# 1) Delete action assignment by policy id, action perimeter id, action data id, action category id +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following action assignment') +def step_impl(context): + logging.info("When the user sets to delete the following action assignment") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "action perimeter name: '" + row["actionperimetername"] + "' action data list: '" + row[ + "actiondata"] + "' and action category: '" + row[ + "actioncategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + perimeter_id = "" + datalistids = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + policies_id = commonfunctions.get_policyid(row['policyname']) + perimeter_id = commonfunctions.get_actionperimeterid(row['actionperimetername']) + categories_id = commonfunctions.get_actioncategoryid(row['actioncategory']) + dataid = commonfunctions.get_actiondataid(row["actiondata"], categories_id, policies_id) + + response_assignment = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementsactionAPI + "/" + + perimeter_id + "/" + categories_id, headers=apis_urls.auth_headers) + if len(response_assignment.json()[apis_urls.assignementsactionAPI]) != 0: + for ids in dict(response_assignment.json()[apis_urls.assignementsactionAPI]).keys(): + assignmentsidlist = response_assignment.json()[apis_urls.assignementsactionAPI][str(ids)][ + 'assignments'] + if dataid in assignmentsidlist: + response = requests.delete( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.assignementsactionAPI + "/" + + perimeter_id + "/" + categories_id + "/" + dataid, headers=apis_urls.auth_headers) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing subject assignment per a given policy, subject perimeter and subject category by get request and put them into a table +# 2) Sort the table by subject perimeter name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following subject assignment should be existed in the system') +def step_impl(context): + logger.info("Then the following subject assignment should be existed in the system") + model = getattr(context, "model", None) + apiresult = Table(names=('subjectperimetername', 'subjectcategory', 'subjectdata', 'policyname'), + dtype=('S100', 'S100', 'S100', 'S100')) + for row in context.table: + logger.info( + "subject perimeter name: '" + row["subjectperimetername"] + "' subject data list: '" + row[ + "subjectdata"] + "' and subject category: '" + row[ + "subjectcategory"] + "' and policies: '" + row['policyname'] + "'") + if (row['policyname'] == "" or row['subjectperimetername'] == ""): + response = requests.get( + apis_urls.serverURL + "policies/" + GeneralVariables.assignpolicyid[ + 'value'] + "/" + apis_urls.assignementssubjectAPI + "/" + + GeneralVariables.assignsubjectperimeterid['value'] + "/" + + GeneralVariables.assignsubjectcategoryid['value'], headers=apis_urls.auth_headers) + else: + response = requests.get( + apis_urls.serverURL + "policies/" + commonfunctions.get_policyid( + row['policyname']) + "/" + apis_urls.assignementssubjectAPI + "/" + + commonfunctions.get_subjectperimeterid(row['subjectperimetername']) + "/" + + commonfunctions.get_subjectcategoryid(row['subjectcategory']), headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.assignementssubjectAPI]) != 0: + for ids in dict(response.json()[apis_urls.assignementssubjectAPI]).keys(): + apipolicies = "" + apisubjectname = commonfunctions.get_subjectperimetername( + response.json()[apis_urls.assignementssubjectAPI][str(ids)]['subject_id']) + apisubjectcategory = commonfunctions.get_subjectcategoryname( + response.json()[apis_urls.assignementssubjectAPI][str(ids)]['category_id']) + apiassignments = commonfunctions.get_subjectdataname( + response.json()[apis_urls.assignementssubjectAPI][str(ids)]['assignments'], + response.json()[apis_urls.assignementssubjectAPI][str(ids)]['category_id'], + response.json()[apis_urls.assignementssubjectAPI][str(ids)]['policy_id']) + apipolicies = commonfunctions.get_policyname( + response.json()[apis_urls.assignementssubjectAPI][str(ids)]['policy_id']) + if ((row['policyname'] == "" or row['subjectperimetername'] == "") and "".join(apiassignments)==""): + apiresult.add_row(vals=("", "", "", "")) + else: + apiresult.add_row(vals=( + apisubjectname, apisubjectcategory, apiassignments, apipolicies)) + else: + apiresult.add_row(vals=("", "", "", "")) + + apiresult.sort('subjectperimetername') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected subject perimeter name: '" + str( + row1["subjectperimetername"]) + "' is the same as the actual existing '" + str( + row2["subjectperimetername"]) + "'") + assert str(row1["subjectperimetername"]) == str( + row2["subjectperimetername"]), "subject perimeter name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected subject data description: '" + str( + row1["subjectcategory"]) + "' is the same as the actual existing '" + str( + row2["subjectcategory"]) + "'") + assert str(row1["subjectcategory"]) == str( + row2["subjectcategory"]), "subject category is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected subject data password: '" + str( + row1["subjectdata"]) + "' is the same as the actual existing '" + str( + row2["subjectdata"]) + "'") + assert str(row1["subjectdata"]) == str( + row2["subjectdata"]), "subject data list is not correct!" + logger.info("assertion passed!") + + #logger.info("asserting the expected policies: '" + str( + # row1["policyname"]) + "' is the same as the actual existing '" + str( + # row2["policyname"]) + "'") + #assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!" + #logger.info("assertion passed!") + +# Step Definition Implementation: +# 1) Get all the existing object assignment per a given policy, object perimeter and object category by get request and put them into a table +# 2) Sort the table by object perimeter name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following object assignment should be existed in the system') +def step_impl(context): + logger.info("Then the following object assignment should be existed in the system") + model = getattr(context, "model", None) + apiresult = Table(names=('objectperimetername', 'objectcategory', 'objectdata', 'policyname'), + dtype=('S100', 'S100', 'S400', 'S100')) + for row in context.table: + if (row['policyname'] == "" or row['objectperimetername'] == ""): + response = requests.get( + apis_urls.serverURL + "policies/" + GeneralVariables.assignpolicyid[ + 'value'] + "/" + apis_urls.assignementsobjectAPI + "/" + + GeneralVariables.assignobjectperimeterid['value'] + "/" + + GeneralVariables.assignobjectcategoryid['value'], headers=apis_urls.auth_headers) + else: + response = requests.get( + apis_urls.serverURL + "policies/" + commonfunctions.get_policyid( + row['policyname']) + "/" + apis_urls.assignementsobjectAPI + "/" + + commonfunctions.get_objectperimeterid(row['objectperimetername']) + "/" + + commonfunctions.get_objectcategoryid(row['objectcategory']), headers=apis_urls.auth_headers) + + if len(response.json()[apis_urls.assignementsobjectAPI]) != 0: + for ids in dict(response.json()[apis_urls.assignementsobjectAPI]).keys(): + apipolicies = "" + apiobjectname = commonfunctions.get_objectperimetername( + response.json()[apis_urls.assignementsobjectAPI][str(ids)]['object_id']) + apiobjectcategory = commonfunctions.get_objectcategoryname( + response.json()[apis_urls.assignementsobjectAPI][str(ids)]['category_id']) + apiassignments = commonfunctions.get_objectdataname( + response.json()[apis_urls.assignementsobjectAPI][str(ids)]['assignments'], + response.json()[apis_urls.assignementsobjectAPI][str(ids)]['category_id'], + response.json()[apis_urls.assignementsobjectAPI][str(ids)]['policy_id']) + apipolicies = commonfunctions.get_policyname( + response.json()[apis_urls.assignementsobjectAPI][str(ids)]['policy_id']) + if ((row['policyname'] == "" or row['objectperimetername'] == "") and "".join(apiassignments) == ""): + apiresult.add_row(vals=("", "", "", "")) + else: + apiresult.add_row(vals=( + apiobjectname, apiobjectcategory, ",".join(apiassignments), apipolicies)) + else: + apiresult.add_row(vals=("", "", "", "")) + + apiresult.sort('objectperimetername') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected object perimeter name: '" + str( + row1["objectperimetername"]) + "' is the same as the actual existing '" + str( + row2["objectperimetername"]) + "'") + assert str(row1["objectperimetername"]) == str( + row2["objectperimetername"]), "object perimeter name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected object data description: '" + str( + row1["objectcategory"]) + "' is the same as the actual existing '" + str( + row2["objectcategory"]) + "'") + assert str(row1["objectcategory"]) == str( + row2["objectcategory"]), "object category is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected object data password: '" + str( + row1["objectdata"]) + "' is the same as the actual existing '" + str( + row2["objectdata"]) + "'") + assert str(row1["objectdata"]) == str( + row2["objectdata"]), "object data list is not correct!" + logger.info("assertion passed!") + + #logger.info("asserting the expected policies: '" + str( + # row1["policyname"]) + "' is the same as the actual existing '" + str( + # row2["policyname"]) + "'") + #assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!" + #logger.info("assertion passed!") + +# Step Definition Implementation: +# 1) Get all the existing action assignment per a given policy, action perimeter and action category by get request and put them into a table +# 2) Sort the table by action perimeter name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following action assignment should be existed in the system') +def step_impl(context): + logger.info("Then the following action assignment should be existed in the system") + model = getattr(context, "model", None) + apiresult = Table(names=('actionperimetername', 'actioncategory', 'actiondata', 'policyname'), + dtype=('S100', 'S100', 'S100', 'S100')) + for row in context.table: + if (row['policyname'] == "" or row['actionperimetername'] == ""): + response = requests.get( + apis_urls.serverURL + "policies/" + GeneralVariables.assignpolicyid[ + 'value'] + "/" + apis_urls.assignementsactionAPI + "/" + + GeneralVariables.assignactionperimeterid['value'] + "/" + + GeneralVariables.assignactioncategoryid['value'], headers=apis_urls.auth_headers) + else: + response = requests.get( + apis_urls.serverURL + "policies/" + commonfunctions.get_policyid( + row['policyname']) + "/" + apis_urls.assignementsactionAPI + "/" + + commonfunctions.get_actionperimeterid(row['actionperimetername']) + "/" + + commonfunctions.get_actioncategoryid(row['actioncategory']), headers=apis_urls.auth_headers) + + if len(response.json()[apis_urls.assignementsactionAPI]) != 0: + for ids in dict(response.json()[apis_urls.assignementsactionAPI]).keys(): + apipolicies = "" + apiactionname = commonfunctions.get_actionperimetername( + response.json()[apis_urls.assignementsactionAPI][str(ids)]['action_id']) + apiactioncategory = commonfunctions.get_actioncategoryname( + response.json()[apis_urls.assignementsactionAPI][str(ids)]['category_id']) + apiassignments = commonfunctions.get_actiondataname( + response.json()[apis_urls.assignementsactionAPI][str(ids)]['assignments'], + response.json()[apis_urls.assignementsactionAPI][str(ids)]['category_id'], + response.json()[apis_urls.assignementsactionAPI][str(ids)]['policy_id']) + apipolicies = commonfunctions.get_policyname( + response.json()[apis_urls.assignementsactionAPI][str(ids)]['policy_id']) + logger.info(apiassignments) + if ((row['policyname'] == "" or row['actionperimetername'] == "") and "".join(apiassignments) == ""): + apiresult.add_row(vals=("", "", "", "")) + else: + apiresult.add_row(vals=( + apiactionname, apiactioncategory, apiassignments, apipolicies)) + else: + apiresult.add_row(vals=("", "", "", "")) + + apiresult.sort('actionperimetername') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected action perimeter name: '" + str( + row1["actionperimetername"]) + "' is the same as the actual existing '" + str( + row2["actionperimetername"]) + "'") + assert str(row1["actionperimetername"]) == str( + row2["actionperimetername"]), "action perimeter name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected action data description: '" + str( + row1["actioncategory"]) + "' is the same as the actual existing '" + str( + row2["actioncategory"]) + "'") + assert str(row1["actioncategory"]) == str( + row2["actioncategory"]), "action category is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected action data password: '" + str( + row1["actiondata"]) + "' is the same as the actual existing '" + str( + row2["actiondata"]) + "'") + assert str(row1["actiondata"]) == str( + row2["actiondata"]), "action data list is not correct!" + logger.info("assertion passed!") + + #logger.info("asserting the expected policies: '" + str( + # row1["policyname"]) + "' is the same as the actual existing '" + str( + # row2["policyname"]) + "'") + #assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!" + #logger.info("assertion passed!") diff --git a/moon_manager/tests/func_tests/features/steps/authorization.py b/moon_manager/tests/func_tests/features/steps/authorization.py new file mode 100644 index 00000000..5fa0ebe7 --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/authorization.py @@ -0,0 +1,217 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from common_functions import * +import requests +import json +import logging +import paramiko + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) Launch Moon Manager +# 3) Set the token in the global variables +@Given('the manager is configured') +def step_impl(context): + logger.info("\n") + logger.info("******************** Scenario: " + context.scenario.name + " ********************") + logger.info("Given the manager is configured") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + # client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + logger.info("before ") + stdin, stdout, stderr = client.exec_command( + + "sudo nohup hug -m moon_manager.server &" + " /usr/bin/python3 " + ) + #stdin, stdout, stderr = client.exec_command(" sudo /usr/local/bin/moon_manager add_user alaa00 admin") + #stdin, stdout, stderr = client.exec_command(" sudo /usr/local/bin/moon_manager get_key alaa00 admin ") + #logger.info(stdout.readlines()) + #GeneralVariables.auth_headers['X-Api-Key'] = str(stdout.readlines()) + #logger.info("token: " + str(GeneralVariables.auth_headers['X-Api-Key'])) + #logger.info("after ") + # client.close() + +# Step Definition Implementation: Incomplete Step +# 1) Get all the moon slaves +# 2) Loop on the slave by id and delete them +@Given('no slave is created') +def step_impl(context): + logger.info("\n") + logger.info("******************** Scenario: " + context.scenario.name + " ********************") + logger.info("Given no slave is created") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + response = requests.get(apis_urls.serverURL + apis_urls.getslavesAPI, headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.getslavesAPI]) != 0: + for ids in dict(response.json()[apis_urls.getslavesAPI]).keys(): + response = requests.delete(apis_urls.serverURL + apis_urls.slaveAPI + "/" + ids, + headers=headers) + +# Step Definition Implementation: +# 1) Create a slave by post request +# 2) Get the wrapper port id from the slave posting request & set it to the wrapperPort global variable +@Given('the slave is created') +def step_impl(context): + logger.info("Given the slave is created") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + data = { + 'name': "default", + 'description': "description", + 'address': "111", + } + response = requests.post(apis_urls.serverURL + apis_urls.slaveAPI, headers=headers, + data=json.dumps(data)) + slaveid = list(response.json()[apis_urls.getslavesAPI])[0] + GeneralVariables.wrapperPort['value'] = str(response.json()[apis_urls.getslavesAPI][slaveid]['extra']['port']) + +# Step Definition Implementation: Incomplete Step +# 1) Check the Pipeline is up and running +@Given('the pipeline is running') +def step_impl(context): + logger.info("Given the pipeline is running") + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) execute the authorization curl command using the wrapperPort +@Given('the following authorization request is granted through pipeline') +def step_impl(context): + logger.info("Given the following authorization request is granted through pipeline") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + for row in context.table: + logger.info("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + stdin, stdout, stderr = client.exec_command("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + logger.info(stdout.readlines()) + GeneralVariables.actual_authresponse['value'] = str(stdout.readlines()) + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) execute the authorization curl command using the wrapperPort +@Given('the following authorization request is granted through wrapper') +def step_impl(context): + logger.info("Given the following authorization request is granted through wrapper") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + for row in context.table: + logger.info("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[ + "keystone_project_id"]) + "/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + stdin, stdout, stderr = client.exec_command("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[ + "keystone_project_id"]) + "/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + logger.info(stdout.readlines()) + GeneralVariables.actual_authresponse['value'] = str(stdout.readlines()) + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) execute the authorization curl command using the pipelinePort +# 3) set the actual_authresponse global variable with the curl response +@When('the following authorization request is sent through pipeline') +def step_impl(context): + logger.info("Given the following authorization request is sent through pipeline") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + + for row in context.table: + logger.info("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + stdin, stdout, stderr = client.exec_command("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + logger.info(stdout.readlines()) + GeneralVariables.actual_authresponse['value'] = str(stdout.readlines()) + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) execute the authorization curl command using the pipelinePort +# 3) set the actual_authresponse global variable with the curl response +@When('the following authorization request is sent through wrapper') +def step_impl(context): + logger.info("Given the following authorization request is sent through wrapper") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + + for row in context.table: + logger.info("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[ + "keystone_project_id"]) + "/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + stdin, stdout, stderr = client.exec_command("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[ + "keystone_project_id"]) + "/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + logger.info(stdout.readlines()) + GeneralVariables.actual_authresponse['value'] = str(stdout.readlines()) + +# Step Definition Implementation: Untested Step +# 1) Assert that the actual authresponse is the same as the expected. +@Then('the authorization response should be the following') +def step_impl(context): + logger.info("Then the authorization response should be the following") + for row in context.table: + logger.info("asserting the expected api response: '" + row["auth_response"] + "' and the actual response: '" + + GeneralVariables.actual_authresponse['value'] + "'") + assert row["auth_response"] == GeneralVariables.actual_authresponse[ + 'value'], "Validation is not correct, Expected: " + \ + row[ + "auth_response"] + " but the API response was: " + \ + GeneralVariables.actual_authresponse['value'] + logger.info("assertion passed!") diff --git a/moon_manager/tests/func_tests/features/steps/common_functions.py b/moon_manager/tests/func_tests/features/steps/common_functions.py new file mode 100644 index 00000000..b9b9f0bc --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/common_functions.py @@ -0,0 +1,279 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from steps.Static_Variables import GeneralVariables +import requests +import json +import logging + + +logger = logging.getLogger(__name__) + +class commonfunctions: + apis_urls = GeneralVariables() + + def get_subjectcategoryid(self, subjectcategoryname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadatasubjectcategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadatasubjectcategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadatasubjectcategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['name'] == subjectcategoryname): + return response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['id'] + + def get_objectcategoryid(self, objectcategoryname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataobjectcategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadataobjectcategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadataobjectcategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['name'] == objectcategoryname): + return response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['id'] + + def get_actioncategoryid(self, actioncategoryname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataactioncategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadataactioncategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadataactioncategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['name'] == actioncategoryname): + return response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['id'] + + def get_metaruleid(self, metarulename): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metarulesAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metarulesAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metarulesAPI]).keys(): + if (response.json()[self.apis_urls.metarulesAPI][ids]['name'] == metarulename): + return ids + + def get_modelid(self, modelname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.modelAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.modelAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.modelAPI]).keys(): + if (response.json()[self.apis_urls.modelAPI][ids]['name'] == modelname): + return ids + + def get_policyid(self, policyname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.policyAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.policyAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.policyAPI]).keys(): + if (response.json()[self.apis_urls.policyAPI][ids]['name'] == policyname): + return ids + + def get_subjectperimeterid(self,subjectperimeter ): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimetersubjectAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimetersubjectAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.perimetersubjectAPI]).keys(): + if (response.json()[self.apis_urls.perimetersubjectAPI][ids]['name'] == subjectperimeter): + return ids + + def get_objectperimeterid(self,objectperimeter ): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeterobjectAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimeterobjectAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.perimeterobjectAPI]).keys(): + if (response.json()[self.apis_urls.perimeterobjectAPI][ids]['name'] == objectperimeter): + return ids + + def get_actionperimeterid(self, actionperimeter): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeteractionAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimeteractionAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.perimeteractionAPI]).keys(): + if (response.json()[self.apis_urls.perimeteractionAPI][ids]['name'] == actionperimeter): + return ids + + def get_subjectdataid(self,subjectdataname,subjectcategoryid,policyid ): + response_data = requests.get( + self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.datasubjectAPI + "/" + subjectcategoryid,headers=self.apis_urls.auth_headers) + if(len(response_data.json()[self.apis_urls.datasubjectAPI]))!=0: + subjectdataidslist = [] + matcheddataidslist = [] + dataids=response_data.json()[self.apis_urls.datasubjectAPI][0]['data'] + for ids in dataids: + apisubjectdataid = response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(ids)]['id'] + subjectdataidslist.append(apisubjectdataid) + + if ((str(subjectdataname)).find(",") != -1): + datanameslist = subjectdataname.split(",") + for dataname in datanameslist: + for data_id in subjectdataidslist: + if ((response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)][ + 'name']) == dataname): + matcheddataidslist.append(data_id) + return ",".join(matcheddataidslist) + else: + for data_id in subjectdataidslist: + if (( + response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)]['name']) == subjectdataname): + return data_id + + def get_objectdataid(self,objectdataname,objectcategoryid,policyid ): + response_data = requests.get( + self.apis_urls.serverURL + self.apis_urls.policyAPI + "/" + policyid + "/" + self.apis_urls.dataobjectAPI + "/" + objectcategoryid,headers=self.apis_urls.auth_headers) + if (len(response_data.json()[self.apis_urls.dataobjectAPI])) != 0: + objectdataidslist = [] + matcheddataidslist=[] + for ids in response_data.json()[ self.apis_urls.dataobjectAPI][0]['data']: + apiobjectdataid = response_data.json()[ self.apis_urls.dataobjectAPI][0]['data'][str(ids)]['id'] + objectdataidslist.append(apiobjectdataid) + if ((str(objectdataname)).find(",") != -1): + datanameslist = objectdataname.split(",") + for dataname in datanameslist: + for data_id in objectdataidslist: + if ((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']) == dataname): + matcheddataidslist.append(data_id) + return ",".join(matcheddataidslist) + + else: + for data_id in objectdataidslist: + if ((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']) == objectdataname): + return data_id + + def get_actiondataid(self,actiondataname,actioncategoryid,policyid ): + response_data = requests.get( + self.apis_urls.serverURL + self.apis_urls.policyAPI + "/" + policyid + "/" + self.apis_urls.dataactionAPI + "/" + actioncategoryid,headers=self.apis_urls.auth_headers) + if (len(response_data.json()[self.apis_urls.dataactionAPI])) != 0: + actiondataidslist = [] + matcheddataidslist = [] + for ids in response_data.json()[self.apis_urls.dataactionAPI][0]['data']: + apiactiondataid = response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(ids)]['id'] + actiondataidslist.append(apiactiondataid) + if ((str(actiondataname)).find(",") != -1): + datanameslist = actiondataname.split(",") + for dataname in datanameslist: + for data_id in actiondataidslist: + if ((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)][ + 'name']) == dataname): + matcheddataidslist.append(data_id) + return ",".join(matcheddataidslist) + else: + for data_id in actiondataidslist: + if ((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)]['name']) == actiondataname): + return data_id + + def get_subjectcategoryname(self, subjectcategoryid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadatasubjectcategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadatasubjectcategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadatasubjectcategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['id'] == subjectcategoryid): + return response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['name'] + + def get_objectcategoryname(self, objectcategoryid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataobjectcategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadataobjectcategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadataobjectcategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['id'] == objectcategoryid): + return response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['name'] + + def get_actioncategoryname(self, actioncategoryid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataactioncategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadataactioncategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadataactioncategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['id'] == actioncategoryid): + return response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['name'] + + def get_metarulename(self, metaruleid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metarulesAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metarulesAPI]) != 0: + for id in dict(response.json()[self.apis_urls.metarulesAPI]).keys(): + if (id == metaruleid): + return response.json()[self.apis_urls.metarulesAPI][id]['name'] + + def get_modelname(self, modelid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.modelAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.modelAPI]) != 0: + for id in dict(response.json()[self.apis_urls.modelAPI]).keys(): + if (id == modelid): + return response.json()[self.apis_urls.modelAPI][id]['name'] + + def get_policyname(self, policyid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.policyAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.policyAPI]) != 0: + for id in dict(response.json()[self.apis_urls.policyAPI]).keys(): + if (id == policyid): + return response.json()[self.apis_urls.policyAPI][id]['name'] + + def get_subjectperimetername(self, subjectperimeterid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimetersubjectAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimetersubjectAPI]) != 0: + for id in dict(response.json()[self.apis_urls.perimetersubjectAPI]).keys(): + if (id == subjectperimeterid): + return response.json()[self.apis_urls.perimetersubjectAPI][id]['name'] + + def get_objectperimetername(self, objectperimeterid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeterobjectAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimeterobjectAPI]) != 0: + for id in dict(response.json()[self.apis_urls.perimeterobjectAPI]).keys(): + if (id == objectperimeterid): + return response.json()[self.apis_urls.perimeterobjectAPI][id]['name'] + + def get_actionperimetername(self, actionperimeterid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeteractionAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimeteractionAPI]) != 0: + for id in dict(response.json()[self.apis_urls.perimeteractionAPI]).keys(): + if (id == actionperimeterid): + return response.json()[self.apis_urls.perimeteractionAPI][id]['name'] + + def get_subjectdataname(self, subjectdataids, subjectcategoryid, policyid): + subjectdatanames=[] + for subjectdataid in subjectdataids: + response_data = requests.get( + self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.datasubjectAPI + "/" + subjectcategoryid+"/"+subjectdataid,headers=self.apis_urls.auth_headers) + + subjectdataidslist = [] + if(response_data.status_code==200): + for ids in response_data.json()[self.apis_urls.datasubjectAPI][0]['data']: + apisubjectdataid = response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(ids)]['id'] + subjectdataidslist.append(apisubjectdataid) + + for data_id in subjectdataidslist: + if (str((response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)][ + 'id'])) == subjectdataid): + subjectdatanames.append(str(response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)]['name'])) + else: + subjectdataidslist = "" + return subjectdatanames + + def get_objectdataname(self, objectdataids, objectcategoryid, policyid): + objectdatanames = [] + for objectdataid in objectdataids: + response_data = requests.get( + self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.dataobjectAPI + "/" + objectcategoryid + "/" + objectdataid,headers=self.apis_urls.auth_headers) + objectdataidslist = [] + if (response_data.status_code == 200): + for ids in response_data.json()[self.apis_urls.dataobjectAPI][0]['data']: + apiobjectdataid = response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(ids)]['id'] + objectdataidslist.append(apiobjectdataid) + for data_id in objectdataidslist: + if (str((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)][ + 'id'])) == objectdataid): + objectdatanames.append( + str(response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name'])) + else: + objectdataidslist = "" + return objectdatanames + + def get_actiondataname(self, actiondataids, actioncategoryid, policyid): + actiondatanames = [] + for actiondataid in actiondataids: + response_data = requests.get( + self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.dataactionAPI + "/" + actioncategoryid + "/" + actiondataid,headers=self.apis_urls.auth_headers) + #logger.info(response_data.json()[self.apis_urls.dataactionAPI][0]) + + actiondataidslist = [] + if (response_data.status_code == 200): + for ids in response_data.json()[self.apis_urls.dataactionAPI][0]['data']: + apiactiondataid = response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(ids)]['id'] + actiondataidslist.append(apiactiondataid) + logging.info(actiondataidslist) + for data_id in actiondataidslist: + if (str((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)][ + 'id'])) == actiondataid): + actiondatanames.append( + str(response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)]['name'])) + else: + actiondataidslist = "" + return actiondatanames \ No newline at end of file diff --git a/moon_manager/tests/func_tests/features/steps/data.py b/moon_manager/tests/func_tests/features/steps/data.py new file mode 100644 index 00000000..67d743c2 --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/data.py @@ -0,0 +1,629 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table, Column +from common_functions import * +import requests +import json +import logging + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: +# 1) Get all the existing subject meta data in the system by getting the policies then their models then the model attached meta rules and then the categories +# 2) Get subject data using both the policy id & the category id +# 3) Loop by data id and delete it +@Given('the system has no subject data') +def step_impl(context): + logger.info("Given the system has no subject data") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI,headers=apis_urls.auth_headers) + if len(response_policies.json()[apis_urls.policyAPI]) != 0: + for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys(): + subjectcategoryidslist = [] + modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id'] + if (modelid != None and modelid != ""): + metaruleslist = \ + requests.get(apis_urls.serverURL + apis_urls.modelAPI,headers=apis_urls.auth_headers).json()[apis_urls.modelAPI][modelid][ + 'meta_rules'] + for metarule_ids in metaruleslist: + categorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metarule_ids]['subject_categories'] + for categoryid in categorieslist: + if (categoryid not in subjectcategoryidslist): + subjectcategoryidslist.append(categoryid) + + for categoryid in subjectcategoryidslist: + response_data = requests.get( + apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.datasubjectAPI + "/" + categoryid,headers=apis_urls.auth_headers) + for ids in response_data.json()[apis_urls.datasubjectAPI][0]['data']: + data_id = response_data.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['id'] + requests.delete( + apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.datasubjectAPI + "/" + categoryid + "/" + data_id, + headers=headers) + +# Step Definition Implementation: +# 1) Post subject data using the policy id & the category id +@Given('the following subject data exists') +def step_impl(context): + logger.info("Given the following subject data exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "subject data name: '" + row["subjectdataname"] + "' subject data description: '" + row[ + "subjectdatadescription"] + "' and subject category: '" + row[ + "subjectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + if (len(row['subjectcategory']) > 25): + categories_id = row['subjectcategory'] + else: + categories_id = commonfunctions.get_subjectcategoryid(row['subjectcategory']) + + data = { + 'name': row["subjectdataname"], + 'description': row["subjectdatadescription"], + + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.datasubjectAPI + "/" + str( + categories_id), headers=headers, data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Get all the existing object meta data in the system by getting the policies then their models then the model attached meta rules and then the categories +# 2) Get object data using both the policy id & the category id +# 3) Loop by data id and delete it +@Given('the system has no object data') +def step_impl(context): + logger.info("Given the system has no object data") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI,headers=apis_urls.auth_headers) + if len(response_policies.json()[apis_urls.policyAPI]) != 0: + for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys(): + objectcategoryidslist = [] + modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id'] + if (modelid != None and modelid != ""): + metaruleslist = \ + requests.get(apis_urls.serverURL + apis_urls.modelAPI,headers=apis_urls.auth_headers).json()[apis_urls.modelAPI][modelid][ + 'meta_rules'] + for metarule_ids in metaruleslist: + for categoryid in \ + (requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers)).json()[apis_urls.metarulesAPI][ + metarule_ids][ + 'object_categories']: + if (categoryid not in objectcategoryidslist): + objectcategoryidslist.append(categoryid) + + for categoryid in objectcategoryidslist: + response_data = requests.get( + apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.dataobjectAPI + "/" + categoryid,headers=apis_urls.auth_headers) + for ids in response_data.json()[apis_urls.dataobjectAPI][0]['data']: + data_id = response_data.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['id'] + requests.delete( + apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.dataobjectAPI + "/" + categoryid + "/" + data_id, + headers=headers) + +# Step Definition Implementation: +# 1) Post object data using the policy id & the category id +@Given('the following object data exists') +def step_impl(context): + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "subject data name: '" + row["objectdataname"] + "' object data description: '" + row[ + "objectdatadescription"] + "' and object category: '" + row[ + "objectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + if (len(row['objectcategory']) > 25): + categories_id = row['objectcategory'] + else: + categories_id = commonfunctions.get_objectcategoryid(row['objectcategory']) + + data = { + 'name': row["objectdataname"], + 'description': row["objectdatadescription"], + + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.dataobjectAPI + "/" + str( + categories_id), headers=headers, data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Get all the existing action meta data in the system by getting the policies then their models then the model attached meta rules and then the categories +# 2) Get action data using both the policy id & the category id +# 3) Loop by data id and delete it +@Given('the system has no action data') +def step_impl(context): + logger.info("Given the system has no action data") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + actioncategoryidslist = [] + response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI,headers=apis_urls.auth_headers) + if len(response_policies.json()[apis_urls.policyAPI]) != 0: + for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys(): + actioncategoryidslist = [] + modelid = response_policies.json()[apis_urls.policyAPI][policies_ids]['model_id'] + if (modelid != None and modelid != ""): + metaruleslist = \ + requests.get(apis_urls.serverURL + apis_urls.modelAPI,headers=apis_urls.auth_headers).json()[apis_urls.modelAPI][modelid][ + 'meta_rules'] + for metarule_ids in metaruleslist: + for categoryid in \ + (requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers)).json()[apis_urls.metarulesAPI][ + metarule_ids][ + 'action_categories']: + if (categoryid not in actioncategoryidslist): + actioncategoryidslist.append(categoryid) + + for categoryid in actioncategoryidslist: + response_data = requests.get( + apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.dataactionAPI + "/" + categoryid,headers=apis_urls.auth_headers) + for ids in response_data.json()[apis_urls.dataactionAPI][0]['data']: + data_id = response_data.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['id'] + requests.delete( + apis_urls.serverURL + apis_urls.policyAPI + "/" + policies_ids + "/" + apis_urls.dataactionAPI + "/" + categoryid + "/" + data_id, + headers=headers) + +# Step Definition Implementation: +# 1) Post action data using the policy id & the category id +@Given('the following action data exists') +def step_impl(context): + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "subject data name: '" + row["actiondataname"] + "' action data description: '" + row[ + "actiondatadescription"] + "' and action category: '" + row[ + "actioncategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + if (len(row['actioncategory']) > 25): + categories_id = row['actioncategory'] + else: + categories_id = commonfunctions.get_actioncategoryid(row['actioncategory']) + + data = { + 'name': row["actiondataname"], + 'description': row["actiondatadescription"], + + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.dataactionAPI + "/" + str( + categories_id), headers=headers, data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Add subject data using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following subject data') +def step_impl(context): + logger.info("When the user sets to add the following subject data") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "subject data name: '" + row["subjectdataname"] + "' subject data description: '" + row[ + "subjectdatadescription"] + "' and subject category: '" + row[ + "subjectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + if (len(row['subjectcategory']) > 25): + categories_id = row['subjectcategory'] + else: + categories_id = commonfunctions.get_subjectcategoryid(row['subjectcategory']) + + data = { + 'name': row["subjectdataname"], + 'description': row["subjectdatadescription"], + + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.datasubjectAPI + "/" + str( + categories_id), headers=headers, data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Delete subject data by policy id, subject data id, subject category id +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following subject data') +def step_impl(context): + logging.info("When the user sets to delete the following subject data") + + model = getattr(context, "model", None) + for row in context.table: + + logger.info("subject data name:'" + row["subjectdataname"] + "' and subject category name:'" + row[ + "subjectcategory"] + "' and policy name:'" + row["policyname"] + "'") + + policies_id = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response_data = requests.delete( + apis_urls.serverURL + apis_urls.policyAPI + "/" + commonfunctions.get_policyid(row[ + "policyname"]) + "/" + apis_urls.datasubjectAPI + "/" + commonfunctions.get_subjectcategoryid( + row["subjectcategory"]) + "/" + commonfunctions.get_subjectdataid(row["subjectdataname"], + commonfunctions.get_subjectcategoryid( + row["subjectcategory"]), + commonfunctions.get_policyid( + row["policyname"])), + headers=headers) + + if response_data.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Add object data using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following object data') +def step_impl(context): + logger.info("When the user sets to add the following object data") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "object data name: '" + row["objectdataname"] + "' object data description: '" + row[ + "objectdatadescription"] + "' and object category: '" + row[ + "objectcategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_list = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + if (len(row['objectcategory']) > 25): + categories_id = row['objectcategory'] + else: + categories_id = commonfunctions.get_objectcategoryid(row['objectcategory']) + + data = { + 'name': row["objectdataname"], + 'description': row["objectdatadescription"], + } + + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.dataobjectAPI + "/" + str( + categories_id), headers=headers, data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Delete object data by policy id, object data id, object category id +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following object data') +def step_impl(context): + logging.info("When the user sets to delete the following object data") + model = getattr(context, "model", None) + for row in context.table: + + logger.info("object data name:'" + row["objectdataname"] + "' and object category name:'" + row[ + "objectcategory"] + "' and policy name:'" + row["policyname"] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response_data = requests.delete( + apis_urls.serverURL + apis_urls.policyAPI + "/" + commonfunctions.get_policyid(row[ + "policyname"]) + "/" + apis_urls.dataobjectAPI + "/" + commonfunctions.get_objectcategoryid( + row["objectcategory"]) + "/" + commonfunctions.get_objectdataid(row["objectdataname"], + commonfunctions.get_objectcategoryid( + row["objectcategory"]), + commonfunctions.get_policyid( + row["policyname"])), + headers=headers) + + if response_data.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Add action data using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following action data') +def step_impl(context): + logger.info("When the user sets to add the following action data") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "action data name: '" + row["actiondataname"] + "' action data description: '" + row[ + "actiondatadescription"] + "' and action category: '" + row[ + "actioncategory"] + "' and policies: '" + row['policyname'] + "'") + + policies_id = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + if (len(row['actioncategory']) > 25): + categories_id = row['actioncategory'] + else: + categories_id = commonfunctions.get_actioncategoryid(row['actioncategory']) + + data = { + 'name': row["actiondataname"], + 'description': row["actiondatadescription"], + + } + response = requests.post( + apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.dataactionAPI + "/" + str( + categories_id), headers=headers, data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Delete action data by policy id, action data id, action category id +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following action data') +def step_impl(context): + logging.info("When the user sets to delete the following action data") + model = getattr(context, "model", None) + for row in context.table: + + logger.info("action data name:'" + row["actiondataname"] + "' and action category name:'" + row[ + "actioncategory"] + "' and policy name:'" + row["policyname"] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response_data = requests.delete( + apis_urls.serverURL + apis_urls.policyAPI + "/" + commonfunctions.get_policyid(row[ + "policyname"]) + "/" + apis_urls.dataactionAPI + "/" + commonfunctions.get_actioncategoryid( + row["actioncategory"]) + "/" + commonfunctions.get_actiondataid(row["actiondataname"], + commonfunctions.get_actioncategoryid( + row["actioncategory"]), + commonfunctions.get_policyid( + row["policyname"])), + headers=headers) + + if response_data.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing subject data by get request and put them into a table +# 2) Sort the table by policy name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following subject data should be existed in the system') +def step_impl(context): + logger.info("Then the following subject data should be existed in the system") + model = getattr(context, "model", None) + apiresult = Table(names=('subjectdataname', 'subjectdatadescription', 'subjectcategory', 'policyname'), + dtype=('S100', 'S100', 'S100', 'S100')) + for row in context.table: + if (row['policyname'] != ""): + response = requests.get( + apis_urls.serverURL + "policies/" + commonfunctions.get_policyid( + row['policyname']) + "/" + apis_urls.datasubjectAPI + "/" + + commonfunctions.get_subjectcategoryid(row['subjectcategory']),headers=apis_urls.auth_headers) + + if len(response.json()[apis_urls.datasubjectAPI]) != 0: + for ids in response.json()[apis_urls.datasubjectAPI][0]['data']: + apipolicies = "" + apisubjectdataname = response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['name'] + apisubjectdatadescription = response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)][ + 'description'] + apisubjectcategory = commonfunctions.get_subjectcategoryname( + response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['category_id']) + apipolicies = commonfunctions.get_policyname( + response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['policy_id']) + apiresult.add_row(vals=( + apisubjectdataname, apisubjectdatadescription, apisubjectcategory, apipolicies)) + else: + apiresult.add_row(vals=("", "", "", "")) + + else: + apiresult.add_row(vals=("", "", "", "")) + + apiresult.sort('policyname') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected subject data name: '" + str( + row1["subjectdataname"]) + "' is the same as the actual existing '" + str( + row2["subjectdataname"]) + "'") + assert str(row1["subjectdataname"]) == str(row2["subjectdataname"]), "subject data name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected subject data description: '" + str( + row1["subjectdatadescription"]) + "' is the same as the actual existing '" + str( + row2["subjectdatadescription"]) + "'") + assert str(row1["subjectdatadescription"]) == str( + row2["subjectdatadescription"]), "subject data description is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected subject data password: '" + str( + row1["subjectcategory"]) + "' is the same as the actual existing '" + str( + row2["subjectcategory"]) + "'") + assert str(row1["subjectcategory"]) == str( + row2["subjectcategory"]), "subject category is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected policies: '" + str( + row1["policyname"]) + "' is the same as the actual existing '" + str( + row2["policyname"]) + "'") + assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!" + logger.info("assertion passed!") + +# Step Definition Implementation: +# 1) Get all the existing object data by get request and put them into a table +# 2) Sort the table by policy name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following object data should be existed in the system') +def step_impl(context): + logger.info("Then the following object data should be existed in the system") + model = getattr(context, "model", None) + apiresult = Table(names=('objectdataname', 'objectdatadescription', 'objectcategory', 'policyname'), + dtype=('S100', 'S100', 'S100', 'S100')) + + for row in context.table: + if (row['policyname'] != ""): + response = requests.get( + apis_urls.serverURL + "policies/" + commonfunctions.get_policyid( + row['policyname']) + "/" + apis_urls.dataobjectAPI + "/" + + commonfunctions.get_objectcategoryid(row['objectcategory']),headers=apis_urls.auth_headers) + + if len(response.json()[apis_urls.dataobjectAPI]) != 0: + for ids in response.json()[apis_urls.dataobjectAPI][0]['data']: + apipolicies = "" + apiobjectdataname = response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['name'] + apiobjectdatadescription = response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)][ + 'description'] + apiobjectcategory = commonfunctions.get_objectcategoryname( + response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['category_id']) + apipolicies = commonfunctions.get_policyname( + response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['policy_id']) + + apiresult.add_row(vals=( + apiobjectdataname, apiobjectdatadescription, apiobjectcategory, apipolicies)) + else: + apiresult.add_row(vals=("", "", "", "")) + else: + apiresult.add_row(vals=("", "", "", "")) + + apiresult.sort('policyname') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected object data name: '" + str( + row1["objectdataname"]) + "' is the same as the actual existing '" + str( + row2["objectdataname"]) + "'") + assert str(row1["objectdataname"]) == str(row2["objectdataname"]), "subject data name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected object data description: '" + str( + row1["objectdatadescription"]) + "' is the same as the actual existing '" + str( + row2["objectdatadescription"]) + "'") + assert str(row1["objectdatadescription"]) == str( + row2["objectdatadescription"]), "object data description is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected object data category: '" + str( + row1["objectcategory"]) + "' is the same as the actual existing '" + str( + row2["objectcategory"]) + "'") + assert str(row1["objectcategory"]) == str( + row2["objectcategory"]), "object category is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected policies: '" + str( + row1["policyname"]) + "' is the same as the actual existing '" + str( + row2["policyname"]) + "'") + assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!" + logger.info("assertion passed!") + +# Step Definition Implementation: +# 1) Get all the existing action data by get request and put them into a table +# 2) Sort the table by policy name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following action data should be existed in the system') +def step_impl(context): + logger.info("Then the following action data should be existed in the system") + model = getattr(context, "model", None) + apiresult = Table(names=('actiondataname', 'actiondatadescription', 'actioncategory', 'policyname'), + dtype=('S100', 'S100', 'S100', 'S100')) + for row in context.table: + if (row['policyname'] != ""): + response = requests.get( + apis_urls.serverURL + "policies/" + commonfunctions.get_policyid( + row['policyname']) + "/" + apis_urls.dataactionAPI + "/" + + commonfunctions.get_actioncategoryid(row['actioncategory']),headers=apis_urls.auth_headers) + + if len(response.json()[apis_urls.dataactionAPI]) != 0: + for ids in response.json()[apis_urls.dataactionAPI][0]['data']: + apipolicies = "" + apiactiondataname = response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['name'] + apiactiondatadescription = response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)][ + 'description'] + apiactioncategory = commonfunctions.get_actioncategoryname( + response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['category_id']) + apipolicies = commonfunctions.get_policyname( + response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['policy_id']) + + apiresult.add_row(vals=( + apiactiondataname, apiactiondatadescription, apiactioncategory, apipolicies)) + else: + apiresult.add_row(vals=("", "", "", "")) + + else: + apiresult.add_row(vals=("", "", "", "")) + apiresult.sort('policyname') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected action data name: '" + str( + row1["actiondataname"]) + "' is the same as the actual existing '" + str( + row2["actiondataname"]) + "'") + assert str(row1["actiondataname"]) == str(row2["actiondataname"]), "action data name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected action data description: '" + str( + row1["actiondatadescription"]) + "' is the same as the actual existing '" + str( + row2["actiondatadescription"]) + "'") + assert str(row1["actiondatadescription"]) == str( + row2["actiondatadescription"]), "action data description is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected action data category: '" + str( + row1["actioncategory"]) + "' is the same as the actual existing '" + str( + row2["actioncategory"]) + "'") + assert str(row1["actioncategory"]) == str( + row2["actioncategory"]), "action category is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected policies: '" + str( + row1["policyname"]) + "' is the same as the actual existing '" + str( + row2["policyname"]) + "'") + assert str(row1["policyname"]) == str(row2["policyname"]), " policies is not correct!" + logger.info("assertion passed!") diff --git a/moon_manager/tests/func_tests/features/steps/meta_data.py b/moon_manager/tests/func_tests/features/steps/meta_data.py new file mode 100644 index 00000000..b2a6d02c --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/meta_data.py @@ -0,0 +1,394 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table, Column +from common_functions import * +import requests +import json +import logging + +apis_urls = GeneralVariables() +api_subjectcategory = {'name': "", 'description': ""} +api_objectcategory = {'name': "", 'description': ""} +api_actioncategory = {'name': "", 'description': ""} + +logger = logging.getLogger(__name__) + + +# Step Definition Implementation: +# 1) Get all the existing subject meta data in the system +# 2) Loop by id and delete them +@Given('the system has no subject categories') +def step_impl(context): + logger.info("Given the system has no subject categories") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response = requests.get(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI, headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.metadatasubjectcategoryAPI]) != 0: + for ids in dict(response.json()[apis_urls.metadatasubjectcategoryAPI]).keys(): + response = requests.delete(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI + "/" + ids, + headers=headers) + +# Step Definition Implementation: +# 1) Get all the existing action meta data in the system +# 2) Loop by id and delete them +@Given('the system has no action categories') +def step_impl(context): + logger.info("Given the system has no action categories") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response = requests.get(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI, headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.metadataactioncategoryAPI]) != 0: + for ids in dict(response.json()[apis_urls.metadataactioncategoryAPI]).keys(): + response = requests.delete(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI + "/" + ids, + headers=headers) + + +# Step Definition Implementation: +# 1) Get all the existing object meta data in the system +# 2) Loop by id and delete them +@Given('the system has no object categories') +def step_impl(context): + logger.info("Given the system has no object categories") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response = requests.get(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI, headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.metadataobjectcategoryAPI]) != 0: + for ids in dict(response.json()[apis_urls.metadataobjectcategoryAPI]).keys(): + response = requests.delete(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI + "/" + ids, + headers=headers) + + + +# Step Definition Implementation: +# 1) Insert subject meta data using the post request +@Given('the following meta data subject category exists') +def step_impl(context): + logger.info("Given the following meta data subject category exists") + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + data = { + 'name': row["subjectmetadataname"], + 'description': row["subjectmetadatadescription"], + } + logger.info( + "subject category name: '" + row["subjectmetadataname"] + "' and subject category description: '" + row[ + "subjectmetadatadescription"] + "'") + response = requests.post(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI, headers=headers, + data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Insert object meta data using the post request +@Given('the following meta data object category exists') +def step_impl(context): + logger.info("Given the following meta data object category exists") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + data = { + 'name': row["objectmetadataname"], + 'description': row["objectmetadatadescription"], + } + logger.info( + "object category name: '" + row["objectmetadataname"] + "' and object category description: '" + row[ + "objectmetadatadescription"] + "'") + response = requests.post(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI, headers=headers, + data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Insert action meta data using the post request +@Given('the following meta data action category exists') +def step_impl(context): + logger.info("Given the following meta data action category exists") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + data = { + 'name': row["actionmetadataname"], + 'description': row["actionmetadatadescription"], + } + logger.info( + "action category name: '" + row["actionmetadataname"] + "' and action category description: '" + row[ + "actionmetadatadescription"] + "'") + response = requests.post(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI, headers=headers, + data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Add subject meta data using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following meta data subject category') +def step_impl(context): + logger.info("When the user sets to add the following meta data subject category") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + data = { + 'name': row["subjectmetadataname"], + 'description': row["subjectmetadatadescription"], + } + logger.info( + "subject category name: '" + row["subjectmetadataname"] + "' and subject category description: '" + row[ + "subjectmetadatadescription"] + "'") + + response = requests.post(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI, headers=headers, + data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + + +# Step Definition Implementation: +# 1) Get all the subject meta data by get request +# 2) Loop by ids and search for the matching subject meta data by name and delete it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following meta data subject category') +def step_impl(context): + logger.info("When the user sets to delete the following meta data subject category") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + logger.info("subject category name: '" + row["subjectmetadataname"] + "'") + + response = requests.get(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI, + headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.metadatasubjectcategoryAPI]).keys(): + if (response.json()[apis_urls.metadatasubjectcategoryAPI][ids]['name'] == row["subjectmetadataname"]): + response = requests.delete(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI + "/" + ids, + headers=headers) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Add object meta data using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following meta data object category') +def step_impl(context): + logger.info("When the user sets to add the following meta data object category") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + data = { + 'name': row["objectmetadataname"], + 'description': row["objectmetadatadescription"], + } + logger.info( + "object category Name: '" + row["objectmetadataname"] + "' and object category description: '" + row[ + "objectmetadatadescription"] + "''") + response = requests.post(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI, headers=headers, + data=json.dumps(data)) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the object meta data by get request +# 2) Loop by ids and search for the matching object meta data by name and delete it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following meta data object category') +def step_impl(context): + logger.info("When the user sets to delete the following meta data object category") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + logger.info("object category name: '" + row["objectmetadataname"] + "'") + + response = requests.get(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI, + headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.metadataobjectcategoryAPI]).keys(): + if (response.json()[apis_urls.metadataobjectcategoryAPI][ids]['name'] == row["objectmetadataname"]): + response = requests.delete(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI + "/" + ids, + headers=headers) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Add subject meta data using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following meta data action category') +def step_impl(context): + logger.info("When the user sets to add the following meta data action category") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + data = { + 'name': row["actionmetadataname"], + 'description': row["actionmetadatadescription"], + } + logger.info( + "action category name: '" + row["actionmetadataname"] + "' and action category description: '" + row[ + "actionmetadatadescription"] + "'") + + response = requests.post(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI, headers=headers, + data=json.dumps(data)) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the action meta data by get request +# 2) Loop by ids and search for the matching action meta data by name and delete it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following meta data action category') +def step_impl(context): + logger.info("When the user sets to delete the following meta data action category") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + logger.info("action category name: '" + row["actionmetadataname"] + "'") + + response = requests.get(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI, + headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.metadataactioncategoryAPI]).keys(): + # logger.info(ids) + if (response.json()[apis_urls.metadataactioncategoryAPI][ids]['name'] == row["actionmetadataname"]): + response = requests.delete(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI + "/" + ids, + headers=headers) + # logger.info(response.status_code) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing subject meta data by get request and put them into a table +# 2) Loop using both the expected and actual tables and assert the data row by row +@Then('the following meta data subject category should be existed in the system') +def step_impl(context): + logger.info("Then the following meta data subject category should be existed in the system") + + model = getattr(context, "model", None) + response = requests.get(apis_urls.serverURL + apis_urls.metadatasubjectcategoryAPI, headers=apis_urls.auth_headers) + apiresult = Table(names=('subjectcategoryname', 'subjectcategorydescription'), dtype=('S100', 'S100')) + if len(response.json()[apis_urls.metadatasubjectcategoryAPI]) != 0: + for ids in dict(response.json()[apis_urls.metadatasubjectcategoryAPI]).keys(): + apisubjectcategoryname = response.json()[apis_urls.metadatasubjectcategoryAPI][ids]['name'] + apisubjectcategorydescription = response.json()[apis_urls.metadatasubjectcategoryAPI][ids]['description'] + apiresult.add_row(vals=(apisubjectcategoryname, apisubjectcategorydescription)) + else: + apiresult.add_row(vals=("", "")) + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected subject category name: '" + str( + row1["subjectmetadataname"]) + "' is the same as the actual existing '" + str( + row2["subjectcategoryname"]) + "'") + assert str(row1["subjectmetadataname"]) == str( + row2["subjectcategoryname"]), "subject category name is not correct!" + logger.info("assertion passed!") + logger.info("asserting the expected subject category description: '" + str( + row1["subjectmetadatadescription"]) + "' is the same as the actual existing '" + str( + row2["subjectcategorydescription"]) + "'") + assert str(row1["subjectmetadatadescription"]) == str( + row2["subjectcategorydescription"]), "Subject meta-data category description is not correct!" + logger.info("assertion passed!") + +# Step Definition Implementation: +# 1) Get all the existing object meta data by get request and put them into a table +# 2) Loop using both the expected and actual tables and assert the data row by row +@Then('the following meta data object category should be existed in the system') +def step_impl(context): + model = getattr(context, "model", None) + logger.info("Then the following meta data object category should be existed in the system") + response = requests.get(apis_urls.serverURL + apis_urls.metadataobjectcategoryAPI, headers=apis_urls.auth_headers) + apiresult = Table(names=('objectcategoryname', 'objectcategorydescription'), dtype=('S100', 'S100')) + + if len(response.json()[apis_urls.metadataobjectcategoryAPI]) != 0: + for ids in dict(response.json()[apis_urls.metadataobjectcategoryAPI]).keys(): + apiobjectcategoryname = response.json()[apis_urls.metadataobjectcategoryAPI][ids]['name'] + apiobjectcategorydescription = response.json()[apis_urls.metadataobjectcategoryAPI][ids]['description'] + apiresult.add_row(vals=(apiobjectcategoryname, apiobjectcategorydescription)) + else: + apiresult.add_row(vals=("", "")) + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected object category description: '" + str( + row1["objectmetadataname"]) + "' is the same as the actual existing '" + str( + row2["objectcategoryname"]) + "'") + assert str(row1["objectmetadataname"]) == str( + row2["objectcategoryname"]), "object category name is not correct!" + logger.info("assertion passed!") + logger.info("asserting the expected object category description: '" + str( + row1["objectmetadatadescription"]) + "' is the same as the actual existing '" + str( + row2["objectcategorydescription"]) + "'") + assert str(row1["objectmetadatadescription"]) == str( + row2["objectcategorydescription"]), "object meta-data category description is not correct!" + logger.info("assertion passed!") + +# Step Definition Implementation: +# 1) Get all the existing action meta data by get request and put them into a table +# 2) Loop using both the expected and actual tables and assert the data row by row +@Then('the following meta data action category should be existed in the system') +def step_impl(context): + logger.info("Then the following meta data action category should be existed in the system") + + model = getattr(context, "model", None) + response = requests.get(apis_urls.serverURL + apis_urls.metadataactioncategoryAPI, headers=apis_urls.auth_headers) + apiresult = Table(names=('actioncategoryname', 'actioncategorydescription'), dtype=('S100', 'S100')) + if len(response.json()[apis_urls.metadataactioncategoryAPI]) != 0: + for ids in dict(response.json()[apis_urls.metadataactioncategoryAPI]).keys(): + apiactioncategoryname = response.json()[apis_urls.metadataactioncategoryAPI][ids]['name'] + apiactioncategorydescription = response.json()[apis_urls.metadataactioncategoryAPI][ids]['description'] + apiresult.add_row(vals=(apiactioncategoryname, apiactioncategorydescription)) + else: + apiresult.add_row(vals=("", "")) + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected action category description: '" + str( + row1["actionmetadataname"]) + "' is the same as the actual existing '" + str( + row2["actioncategoryname"]) + "'") + + assert str(row1["actionmetadataname"]) == str( + row2["actioncategoryname"]), "action category name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected action category description: '" + str( + row1["actionmetadatadescription"]) + "' is the same as the actual existing '" + str( + row2["actioncategorydescription"]) + "'") + + assert str(row1["actionmetadatadescription"]) == str( + row2["actioncategorydescription"]), "action meta-data category description is not correct!" + logger.info("assertion passed!") + +# Step Definition Implementation: +# Assert the saved api response flag with the expected flag +@Then('the system should reply the following') +def step_impl(context): + logger.info("Then the system should reply the following:") + model = getattr(context, "model", None) + for row in context.table: + logger.info("asserting the expected api response: '" + row["flag"] + "' and the actual response: '" + + GeneralVariables.api_responseflag['value'] + "'") + assert row["flag"] == GeneralVariables.api_responseflag['value'], "Validation is not correct, Expected: " + row[ + "flag"] + " but the API response was: " + GeneralVariables.api_responseflag['value'] + logger.info("assertion passed!") diff --git a/moon_manager/tests/func_tests/features/steps/meta_rules.py b/moon_manager/tests/func_tests/features/steps/meta_rules.py new file mode 100644 index 00000000..f56d4d4c --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/meta_rules.py @@ -0,0 +1,335 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table, Column +from common_functions import * +import numpy as np +import requests +import json +import logging + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: +# 1) Get all the existing meta rule in the system +# 2) Loop by id and delete them +@Given('the system has no meta-rules') +def step_impl(context): + logger.info("Given the system has no meta-rules") + + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.metarulesAPI]) != 0: + for ids in dict(response.json()[apis_urls.metarulesAPI]).keys(): + response = requests.delete(apis_urls.serverURL + apis_urls.metarulesAPI + "/" + ids, + headers=headers) + +# Step Definition Implementation: +# 1) Get subject, object, action categories ids list by calling the common funtion: get_subjectcategoryid, get_objectcategoryid and get_actioncategoryid +# 2) create the meta rule data jason then post it +@Given('the following meta rule exists') +def step_impl(context): + logger.info("Given the following meta rule exists") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "meta-rule name: '" + row["metarulename"] + "' and meta-rule description: '" + row[ + "metaruledescription"] + "' and subject categories:'" + row[ + "subjectmetadata"] + "' and object categories:'" + row["objectmetadata"] + "' and action categories:'" + + row["actionmetadata"] + "'") + subjectcategoryids = [] + objectcategoryids = [] + actioncategoryids = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row["subjectmetadata"]) < 40 and str(row["subjectmetadata"])!=""): + if(str(row["subjectmetadata"]).find(",")!=-1): + for category in row["subjectmetadata"].split(","): + subjectcategoryids.append(commonfunctions.get_subjectcategoryid(category)) + else: + subjectcategoryids.append(commonfunctions.get_subjectcategoryid(row["subjectmetadata"])) + else: + if(str(row["subjectmetadata"])==""): + subjectcategoryids=[] + else: + subjectcategoryids.append(row["subjectmetadata"]) + + if (len(row["objectmetadata"]) < 40 and str(row["objectmetadata"])!=""): + if(str(row["objectmetadata"]).find(",")!=-1): + for category in row["objectmetadata"].split(","): + objectcategoryids.append(commonfunctions.get_objectcategoryid(category)) + else: + objectcategoryids.append(commonfunctions.get_objectcategoryid(row["objectmetadata"])) + else: + if (str(row["objectmetadata"]) == ""): + objectcategoryids = [] + else: + objectcategoryids.append(row["objectmetadata"]) + + if (len(row["actionmetadata"]) < 40 and str(row["actionmetadata"])!=""): + if(str(row["actionmetadata"]).find(",")!=-1): + for category in row["actionmetadata"].split(","): + actioncategoryids.append(commonfunctions.get_actioncategoryid(category)) + else: + actioncategoryids.append(commonfunctions.get_actioncategoryid(row["actionmetadata"])) + else: + if(str(row["actionmetadata"]) == ""): + actioncategoryids = [] + else: + actioncategoryids.append(row["actionmetadata"]) + + data = { + 'name': row["metarulename"], + 'description': row["metaruledescription"], + 'subject_categories': subjectcategoryids, + 'object_categories': objectcategoryids, + 'action_categories': actioncategoryids + } + response = requests.post(apis_urls.serverURL + apis_urls.metarulesAPI, headers=headers, + data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Get subject, object, action categories ids list by calling the common funtion: get_subjectcategoryid, get_objectcategoryid and get_actioncategoryid +# 2) create the meta rule data jason then post it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following meta-rule') +def step_impl(context): + logger.info("When the user sets to add the following meta-rule") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "meta-rule name: '" + row["metarulename"] + "' and meta-rule description: '" + row[ + "metaruledescription"] + "' and subject categories:'" + row[ + "subjectmetadata"] + "' and object categories:'" + row["objectmetadata"] + "' and action categories:'" + + row["actionmetadata"] + "'") + + subjectcategoryids = [] + objectcategoryids = [] + actioncategoryids = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row["subjectmetadata"]) < 40 and str(row["subjectmetadata"])!=""): + if (str(row["subjectmetadata"]).find(",") != -1): + for category in row["subjectmetadata"].split(","): + subjectcategoryids.append(commonfunctions.get_subjectcategoryid(category)) + else: + subjectcategoryids.append(commonfunctions.get_subjectcategoryid(row["subjectmetadata"])) + else: + subjectcategoryids.append(row["subjectmetadata"]) + + if (len(row["objectmetadata"]) < 40 and str(row["objectmetadata"])!=""): + if (str(row["objectmetadata"]).find(",") != -1): + for category in row["objectmetadata"].split(","): + objectcategoryids.append(commonfunctions.get_objectcategoryid(category)) + else: + objectcategoryids.append(commonfunctions.get_objectcategoryid(row["objectmetadata"])) + else: + objectcategoryids.append(row["objectmetadata"]) + + if (len(row["actionmetadata"]) < 40 and str(row["actionmetadata"])!=""): + if (str(row["actionmetadata"]).find(",") != -1): + for category in row["actionmetadata"].split(","): + actioncategoryids.append(commonfunctions.get_actioncategoryid(category)) + else: + actioncategoryids.append(commonfunctions.get_actioncategoryid(row["actionmetadata"])) + else: + actioncategoryids.append(row["actionmetadata"]) + + + data = { + 'name': row["metarulename"], + 'description': row["metaruledescription"], + 'subject_categories': subjectcategoryids, + 'object_categories': objectcategoryids, + 'action_categories': actioncategoryids + } + + response = requests.post(apis_urls.serverURL + apis_urls.metarulesAPI, headers=headers, + data=json.dumps(data)) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get subject, object, action categories ids list by calling the common funtion: get_subjectcategoryid, get_objectcategoryid and get_actioncategoryid +# 2) create the meta rule data jason then patch the meta rule after searching for it's id. +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to update the following meta-rule') +def step_impl(context): + logger.info("When the user sets to update the following meta-rule") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "meta-rule name: '" + row["metarulename"] + "' which will be updated to metarule name:" + row[ + "updatedmetarulename"] + "' and meta-rule description: '" + row[ + "updatedmetaruledescription"] + "' and subject categories:'" + row[ + "updatedsubjectmetadata"] + "' and object categories:'" + row[ + "updatedobjectmetadata"] + "' and action categories:'" + + row["updatedactionmetadata"] + "'") + + subjectcategoryids = [] + objectcategoryids = [] + actioncategoryids = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row["updatedsubjectmetadata"]) > 40): + subjectcategoryids.append(row["updatedsubjectmetadata"]) + else: + for category in row["updatedsubjectmetadata"].split(","): + subjectcategoryids.append(commonfunctions.get_subjectcategoryid(category)) + + if (len(row["updatedobjectmetadata"]) > 40): + objectcategoryids.append(row["updatedobjectmetadata"]) + else: + for category in row["updatedobjectmetadata"].split(","): + objectcategoryids.append(commonfunctions.get_objectcategoryid(category)) + + if (len(row["updatedactionmetadata"]) > 40): + actioncategoryids.append(row["updatedactionmetadata"]) + else: + for category in row["updatedactionmetadata"].split(","): + actioncategoryids.append(commonfunctions.get_actioncategoryid(category)) + + data = { + 'name': row["updatedmetarulename"], + 'description': row["updatedmetaruledescription"], + 'subject_categories': subjectcategoryids, + 'object_categories': objectcategoryids, + 'action_categories': actioncategoryids + } + + response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.metarulesAPI]).keys(): + if (response.json()[apis_urls.metarulesAPI][ids]['name'] == row["metarulename"]): + response = requests.patch(apis_urls.serverURL + apis_urls.metarulesAPI + '/' + ids, headers=headers, + data=json.dumps(data)) + break + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the meta rule by get request +# 2) Loop by ids and search for the matching meta rule by name and delete it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following meta-rule') +def step_impl(context): + logger.info("When the user sets to delete the following meta-rule") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + logger.info( + "meta-rule name: '" + row["metarulename"] + "'") + response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.metarulesAPI]).keys(): + if (response.json()[apis_urls.metarulesAPI][ids]['name'] == row["metarulename"]): + response = requests.delete(apis_urls.serverURL + apis_urls.metarulesAPI + "/" + ids, + headers=headers) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing action meta data by get request and put them into a table +# 2) Sort the table by meta rule name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following meta-rules should be existed in the system') +def step_impl(context): + logger.info("Then the following meta-rules should be existed in the system") + response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI,headers=apis_urls.auth_headers) + apimetarulesubjectcategoryname = "" + apimetaruleobjectcategoryname = "" + apimetaruleactioncategoryname = "" + apiresult = Table( + names=('metarulename', 'metaruledescription', 'subjectmetadata', 'actionmetadata', 'objectmetadata'), + dtype=('S10', 'S100', 'S100', 'S100', 'S100')) + if len(response.json()[apis_urls.metarulesAPI]) != 0: + for ids in dict(response.json()[apis_urls.metarulesAPI]).keys(): + apimetarulesubjectcategoryname = "" + apimetaruleobjectcategoryname = "" + apimetaruleactioncategoryname = "" + apimetarulename = response.json()[apis_urls.metarulesAPI][ids]['name'] + apimetaruledescription = response.json()[apis_urls.metarulesAPI][ids]['description'] + for categoryid in response.json()[apis_urls.metarulesAPI][ids]['subject_categories']: + if (len(apimetarulesubjectcategoryname) > 2): + apimetarulesubjectcategoryname = apimetarulesubjectcategoryname + ',' + commonfunctions.get_subjectcategoryname( + categoryid) + else: + apimetarulesubjectcategoryname = commonfunctions.get_subjectcategoryname(categoryid) + for categoryid in response.json()[apis_urls.metarulesAPI][ids]['object_categories']: + if (len(apimetaruleobjectcategoryname) > 2): + apimetaruleobjectcategoryname = apimetaruleobjectcategoryname + ',' + commonfunctions.get_objectcategoryname( + categoryid) + else: + apimetaruleobjectcategoryname = commonfunctions.get_objectcategoryname(categoryid) + for categoryid in response.json()[apis_urls.metarulesAPI][ids]['action_categories']: + if (len(apimetaruleactioncategoryname) > 2): + apimetaruleactioncategoryname = apimetaruleactioncategoryname + ',' + commonfunctions.get_actioncategoryname( + categoryid) + else: + apimetaruleactioncategoryname = commonfunctions.get_actioncategoryname(categoryid) + + apiresult.add_row(vals=( + apimetarulename, apimetaruledescription, apimetarulesubjectcategoryname, apimetaruleactioncategoryname, + apimetaruleobjectcategoryname)) + + else: + apiresult.add_row(vals=("", "", "", "", "")) + + apiresult.sort('metarulename') + + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected meta rule name: '" + str( + row1["metarulename"]) + "' is the same as the actual existing '" + str( + row2["metarulename"]) + "'") + assert str(row1["metarulename"]) == str(row2["metarulename"]), "meta-rule name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected meta rule description: '" + str( + row1["metaruledescription"]) + "' is the same as the actual existing '" + str( + row2["metaruledescription"]) + "'") + assert str(row1["metaruledescription"]) == str( + row2["metaruledescription"]), "meta-rule description is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected subject categories: '" + str( + row1["subjectmetadata"]) + "' is the same as the actual existing '" + str( + row2["subjectmetadata"]) + "'") + assert str(row1["subjectmetadata"]) == str(row2["subjectmetadata"]), "subject category is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected object categories: '" + str( + row1["objectmetadata"]) + "' is the same as the actual existing '" + str( + row2["objectmetadata"]) + "'") + assert str(row1["objectmetadata"]) == str(row2["objectmetadata"]), "object category is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected action categories: '" + str( + row1["actionmetadata"]) + "' is the same as the actual existing '" + str( + row2["actionmetadata"]) + "'") + assert str(row1["actionmetadata"]) == str(row2["actionmetadata"]), "action category is not correct!" + logger.info("assertion passed!") diff --git a/moon_manager/tests/func_tests/features/steps/model.py b/moon_manager/tests/func_tests/features/steps/model.py new file mode 100644 index 00000000..36b16746 --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/model.py @@ -0,0 +1,230 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table, Column +from common_functions import * +import requests +import json +import logging + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: +# 1) Get all the existing models in the system +# 2) Loop by id and delete them +@Given('the system has no models') +def step_impl(context): + logger.info("Given the system has no models") + + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response = requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.modelAPI]) != 0: + for ids in dict(response.json()[apis_urls.modelAPI]).keys(): + response = requests.delete(apis_urls.serverURL + apis_urls.modelAPI + "/" + ids, + headers=headers) + +# Step Definition Implementation: +# 1) Get meta rule ids list by calling the common funtion: get_metaruleid +# 2) create the model data jason then post it +@Given('the following model exists') +def step_impl(context): + logger.info("Given the following model exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "model name: '" + row["modelname"] + "' model description: '" + row[ + "modeldescription"] + "' and meta-rules:'" + row[ + "metarule"]+"'") + + metarulesids = [] + + if (len(row["metarule"]) > 35): + metarulesids.append(row["metarule"]) + else: + for metarule in row["metarule"].split(","): + metarulesids.append(commonfunctions.get_metaruleid(metarule)) + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + data = { + 'name': row["modelname"], + 'description': row["modeldescription"], + 'meta_rules': metarulesids + } + response = requests.post(apis_urls.serverURL + apis_urls.modelAPI, headers=headers, + data=json.dumps(data)) + + +# Step Definition Implementation: +# 1) Get meta rule ids list by calling the common funtion: get_metaruleid +# 2) create the model data jason then post it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following model') +def step_impl(context): + logger.info("When the user sets to add the following model") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "model name: '" + row["modelname"] + "' model description: '" + row[ + "modeldescription"] + "' and meta-rules:'" + row[ + "metarule"] + "'") + + metarules = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if(row["metarule"]!=""): + if (len(row["metarule"]) > 35): + metarules.append(row["metarule"]) + else: + for metarule in row["metarule"].split(","): + metarules.append(commonfunctions.get_metaruleid(metarule)) + + data = { + 'name': row["modelname"], + 'description': row["modeldescription"], + 'meta_rules': metarules, + } + else: + data = { + 'name': row["modelname"], + 'description': row["modeldescription"], + 'meta_rules': "", + } + response = requests.post(apis_urls.serverURL + apis_urls.modelAPI, headers=headers, + data=json.dumps(data)) + + if response.status_code==200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + + +# Step Definition Implementation: +# 1) Get meta rule ids list by calling the common funtion: get_modelid +# 2) create the model jason then patch the model after searching for it's id. +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to update the following model') +def step_impl(context): + logging.info("When the user sets to update the following model") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "model name: '" + row["modelname"] + "' which will be updated to model name:" + row[ + "updatedmodelname"] + "' and model description: '" + row[ + "updatedmodeldescription"] + "' meta-rules: '"+row["updatedmetarule"] + "'") + + metarules = [] + data={} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + if(row["updatedmetarule"]!=""): + if (len(row["updatedmetarule"]) > 35): + metarules.append(row["updatedmetarule"]) + else: + for metarule in row["updatedmetarule"].split(","): + metarules.append(commonfunctions.get_metaruleid(metarule)) + data = { + 'name': row["updatedmodelname"], + 'description': row["updatedmodeldescription"], + 'meta_rules': metarules, + } + else: + data = { + 'name': row["updatedmodelname"], + 'description': row["updatedmodeldescription"], + 'meta_rules': "", + } + response = requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.modelAPI]).keys(): + if (response.json()[apis_urls.modelAPI][ids]['name'] == row["modelname"]): + response = requests.patch(apis_urls.serverURL + apis_urls.modelAPI+'/'+ids, headers=headers, + data=json.dumps(data)) + + if response.status_code==200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the model by get request +# 2) Loop by ids and search for the matching model by name and delete it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following model') +def step_impl(context): + logging.info("When the user sets to delete the following model") + model = getattr(context, "model", None) + for row in context.table: + logger.info("model name: '" + row["modelname"] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + logger.info("policy name:'" + row["modelname"] + "'") + response = requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.modelAPI]).keys(): + if (response.json()[apis_urls.modelAPI][ids]['name'] == row["modelname"]): + response = requests.delete(apis_urls.serverURL + apis_urls.modelAPI + "/" + ids, + headers=headers) + if response.status_code==200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing models by get request and put them into a table +# 2) Sort the table by model name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following model should be existed in the system') +def step_impl(context): + logger.info("Then the following model should be existed in the system") + response = requests.get(apis_urls.serverURL + apis_urls.modelAPI, headers=apis_urls.auth_headers) + apimetarulesname="" + apiresult = Table( + names=('modelname', 'modeldescription', 'metarule'), + dtype=('S100', 'S100', 'S100')) + if len(response.json()[apis_urls.modelAPI]) != 0: + for ids in dict(response.json()[apis_urls.modelAPI]).keys(): + apimetarulesname = [] + apimodelname = response.json()[apis_urls.modelAPI][ids]['name'] + apimodeldescription = response.json()[apis_urls.modelAPI][ids]['description'] + for metaruleid in response.json()[apis_urls.modelAPI][ids]['meta_rules']: + apimetarulesname.append(commonfunctions.get_metarulename(metaruleid)) + apiresult.add_row(vals=( + apimodelname, apimodeldescription, ",".join(apimetarulesname))) + else: + apiresult.add_row(vals=("", "", "")) + + apiresult.sort('modelname') + + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected model name: '" + str( + row1["modelname"]) + "' is the same as the actual existing '" + str( + row2["modelname"]) + "'") + assert str(row1["modelname"]) == str(row2["modelname"]), "model name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected model description: '" + str( + row1["modeldescription"]) + "' is the same as the actual existing '" + str( + row2["modeldescription"]) + "'") + assert str(row1["modeldescription"]) == str(row2["modeldescription"]), "model description is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected meta rules: '" + str( + row1["metarule"]) + "' is the same as the actual existing '" + str( + row2["metarule"]) + "'") + assert str(row1["metarule"]) == str(row2["metarule"]), "metarule is not correct!" + logger.info("assertion passed!") diff --git a/moon_manager/tests/func_tests/features/steps/pdp.py b/moon_manager/tests/func_tests/features/steps/pdp.py new file mode 100644 index 00000000..bf839658 --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/pdp.py @@ -0,0 +1,248 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table +from common_functions import * +import requests +import json +import logging + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: +# 1) Get all the existing pdps in the system +# 2) Loop by id and delete them +@Given('the system has no pdps') +def step_impl(context): + logger.info("Given the system has no pdps") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI, headers=apis_urls.auth_headers) + pdpjason=apis_urls.pdpAPI+"s" + if len(response.json()[pdpjason]) != 0: + for ids in dict(response.json()[pdpjason]).keys(): + response = requests.delete(apis_urls.serverURL + apis_urls.pdpAPI + "/" + ids, + headers=headers) + +# Step Definition Implementation: +# 1) Get model id by calling the common funtion: get_policyid +# 2) create the pdp data jason then post it +@Given('the following pdp exists') +def step_impl(context): + logger.info("Given the following pdp exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "pdp name: '" + row["pdpname"] + "' pdp description: '" + row[ + "pdpdescription"] + "' and keystone project:'" + row[ + "keystone_project_id"] + "' and security pipeline '" + row['security_pipeline'] + "'") + policies_list = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['security_pipeline']) > 25): + policies_list = row['security_pipeline'] + else: + for policy in row["security_pipeline"].split(","): + policies_list.append(commonfunctions.get_policyid(policy)) + + data = { + 'name': row["pdpname"], + 'description': row["pdpdescription"], + 'vim_project_id': row['keystone_project_id'], + 'security_pipeline': policies_list + } + response = requests.post(apis_urls.serverURL + apis_urls.pdpAPI, headers=headers, + data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Get policy id by calling the common funtion: get_policyid +# 2) create the pdp jason then patch the policy after searching for it's id. +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following pdp') +def step_impl(context): + logger.info("When the user sets to add the following pdp") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "pdp name: '" + row["pdpname"] + "' pdp description: '" + row[ + "pdpdescription"] + "' and keystone project:'" + row[ + "keystone_project_id"] + "' and security pipeline '" + row['security_pipeline'] + "'") + + policies_list = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + if (row["security_pipeline"] != ""): + if (len(row['security_pipeline']) > 25): + policies_list = row['security_pipeline'] + else: + for policy in row["security_pipeline"].split(","): + policies_list.append(commonfunctions.get_policyid(policy)) + data = { + 'name': row["pdpname"], + 'description': row["pdpdescription"], + 'vim_project_id': row['keystone_project_id'], + 'security_pipeline': policies_list + } + else: + data = { + 'name': row["pdpname"], + 'description': row["pdpdescription"], + 'vim_project_id': row['keystone_project_id'], + 'security_pipeline': "" + } + response = requests.post(apis_urls.serverURL + apis_urls.pdpAPI, headers=headers, + data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get model id by calling the common funtion: get_policyid +# 2) create the pdp data jason then patch it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to update the following pdp') +def step_impl(context): + logger.info("When the user sets to update the following pdp") + + model = getattr(context, "model", None) + policies_list=[] + for row in context.table: + logger.info( + "pdp name: '" + row["pdpname"] + "' which will be updated to pdp name:" + row[ + "updatedpdpname"] + "' and pdp description: '" + row[ + "updatedpdpdescription"] + "' keystone_project: '" + row["updatedkeystone_project_id"] + "' security pipeline: '"+row["updatedsecurity_pipeline"]+"'") + + policies_list = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['updatedsecurity_pipeline']) > 25): + policies_list = row['updatedsecurity_pipeline'] + else: + for policy in row["updatedsecurity_pipeline"].split(","): + policies_list.append(commonfunctions.get_policyid(policy)) + + data = { + 'name': row["updatedpdpname"], + 'description': row["updatedpdpdescription"], + 'vim_project_id': row['updatedkeystone_project_id'], + 'security_pipeline': policies_list + } + + response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI,headers=apis_urls.auth_headers) + logger.info(response.json()) + pdpjason = apis_urls.pdpAPI + "s" + for ids in dict(response.json()[pdpjason]).keys(): + logger.info(str(response.json()[pdpjason][ids]['name'])) + if (response.json()[pdpjason][ids]['name'] == row["pdpname"]): + logger.info(apis_urls.serverURL + apis_urls.pdpAPI+ '/' + ids) + response = requests.patch(apis_urls.serverURL + apis_urls.pdpAPI+ '/' + ids, headers=headers, + data=json.dumps(data)) + logger.info(response.json()) + + if response.status_code==200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + break + +# Step Definition Implementation: +# 1) Get all the pdps by get request +# 2) Loop by ids and search for the matching pdp by name and delete it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following pdp') +def step_impl(context): + logging.info("When the user sets to delete the following pdp") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + logger.info("pdp name:'" + row["pdpname"] + "'") + + response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI,headers=apis_urls.auth_headers) + pdpjason=apis_urls.pdpAPI+"s" + for ids in dict(response.json()[pdpjason]).keys(): + if (response.json()[pdpjason][ids]['name'] == row["pdpname"]): + response = requests.delete(apis_urls.serverURL + apis_urls.pdpAPI + "/" + ids, + headers=headers) + + if response.status_code==200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing pdps by get request and put them into a table +# 2) Sort the table by pdp name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following pdp should be existed in the system') +def step_impl(context): + logger.info("Then the following pdp should be existed in the system") + + response = requests.get(apis_urls.serverURL + apis_urls.pdpAPI,headers=apis_urls.auth_headers) + apiresult = Table( + names=('pdpname', 'pdpdescription', 'keystone_project_id','security_pipeline'), + dtype=('S10', 'S100', 'S100','S100')) + pdp_jason=apis_urls.pdpAPI+"s" + if len(response.json()[pdp_jason]) != 0: + for ids in dict(response.json()[pdp_jason]).keys(): + apipdppolicies = "" + apipdpname = response.json()[pdp_jason][ids]['name'] + apipdpdescription = response.json()[pdp_jason][ids]['description'] + apipdpprojectid = response.json()[pdp_jason][ids]['vim_project_id'] + for policies in response.json()[pdp_jason][ids]['security_pipeline']: + if(len(apipdppolicies)>2): + apipdppolicies = apipdppolicies +','+ commonfunctions.get_policyname(policies) + else: + apipdppolicies=commonfunctions.get_policyname(policies) + + apiresult.add_row(vals=( + apipdpname, apipdpdescription, apipdpprojectid,apipdppolicies)) + + else: + apiresult.add_row(vals=("", "", "","")) + + apiresult.sort('pdpname') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected pdp name: '" + str( + row1["pdpname"]) + "' is the same as the actual existing '" + str( + row2["pdpname"]) + "'") + assert str(row1["pdpname"]) == str(row2["pdpname"]), "pdp name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected pdp description: '" + str( + row1["pdpdescription"]) + "' is the same as the actual existing '" + str( + row2["pdpdescription"]) + "'") + + assert str(row1["pdpdescription"]) == str(row2["pdpdescription"]), "pdp description is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected keystone project id description: '" + str( + row1["keystone_project_id"]) + "' is the same as the actual existing '" + str( + row2["keystone_project_id"]) + "'") + assert str(row1["keystone_project_id"]) == str(row2["keystone_project_id"]), "project id is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected security pipeline description: '" + str( + row1["security_pipeline"]) + "' is the same as the actual existing '" + str( + row2["security_pipeline"]) + "'") + assert str(row1["security_pipeline"]) == str(row2["security_pipeline"]), "security_pipeline policies is not correct!" + logger.info("assertion passed!") + diff --git a/moon_manager/tests/func_tests/features/steps/perimeter.py b/moon_manager/tests/func_tests/features/steps/perimeter.py new file mode 100644 index 00000000..a4a53120 --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/perimeter.py @@ -0,0 +1,727 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table +from common_functions import * +import requests +import json +import logging + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: +# 1) Get all the existing subject preimeters in the system +# 2) Loop by id to unlink the policies attached +# 3) Then delete the perimeter itself +@Given('the system has no subject perimeter') +def step_impl(context): + logger.info("Given the system has no subject perimeter") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.perimetersubjectAPI]) != 0: + for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys(): + policies_list = response.json()[apis_urls.perimetersubjectAPI][ids]['policy_list'] + for policy in policies_list: + response_delete_policies = requests.delete( + apis_urls.serverURL + "policies/" + policy + "/" + apis_urls.perimetersubjectAPI + "/" + ids, + headers=apis_urls.auth_headers) + response_delete = requests.delete(apis_urls.serverURL + apis_urls.perimetersubjectAPI + "/" + ids, + headers=apis_urls.auth_headers) + + # exit(0) + +# Step Definition Implementation: +# 1) Post subject perimeter using the policy id +@Given('the following subject perimeter exists') +def step_impl(context): + logger.info("Given the following subject perimeter exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "subject perimeter name: '" + row["subjectperimetername"] + "' subject perimeter description: '" + row[ + "subjectperimeterdescription"] # "' and subject perimeter email:'" + row[ + # "subjectperimeteremail"] + "' and subject perimeter password '" + row['subjectperimeterpassword'] + + "' and policies '" + row['policies'] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + policyid="" + if (row['policies'] != ""): + policyid = commonfunctions.get_policyid(row['policies']) + data = { + 'name': row["subjectperimetername"], + 'description': row["subjectperimeterdescription"], + # 'email': row['subjectperimeteremail'], + # 'password': row['subjectperimeterpassword'], + + } + response = requests.post( + apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI, headers=headers, + data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Get all the existing object preimeters in the system +# 2) Loop by id to unlink the policies attached +# 3) Then delete the perimeter itself +@Given('the system has no object perimeter') +def step_impl(context): + logger.info("Given the system has no object perimeter") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.perimeterobjectAPI]) != 0: + for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys(): + policies_list = response.json()[apis_urls.perimeterobjectAPI][ids]['policy_list'] + for policy in policies_list: + response_delete_policies = requests.delete( + apis_urls.serverURL + "policies/" + policy + "/" + apis_urls.perimeterobjectAPI + "/" + ids, + headers=headers) + response_delete = requests.delete(apis_urls.serverURL + apis_urls.perimeterobjectAPI + "/" + ids, + headers=apis_urls.auth_headers) + +# Step Definition Implementation: +# 1) Post object perimeter using the policy id +@Given('the following object perimeter exists') +def step_impl(context): + logger.info("Given the following object perimeter exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "object perimeter name: '" + row["objectperimetername"] + "' object perimeter description: '" + row[ + "objectperimeterdescription"] + "' and policies '" + row['policies'] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + if (row['policies'] != ""): + policyid = commonfunctions.get_policyid(row['policies']) + + data = { + 'name': row["objectperimetername"], + 'description': row["objectperimeterdescription"], + + } + response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeterobjectAPI, + headers=headers, + data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Get all the existing action preimeters in the system +# 2) Loop by id to unlink the policies attached +# 3) Then delete the perimeter itself +@Given('the system has no action perimeter') +def step_impl(context): + logger.info("Given the system has no action perimeter") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.perimeteractionAPI]) != 0: + for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys(): + policies_list = response.json()[apis_urls.perimeteractionAPI][ids]['policy_list'] + for policy in policies_list: + response_delete_policies = requests.delete( + apis_urls.serverURL + "policies/" + policy + "/" + apis_urls.perimeteractionAPI + "/" + ids, + headers=headers) + response_delete = requests.delete(apis_urls.serverURL + apis_urls.perimeteractionAPI + "/" + ids, + headers=apis_urls.auth_headers) + + +# Step Definition Implementation: +# 1) Post action perimeter using the policy id +@Given('the following action perimeter exists') +def step_impl(context): + logger.info("Given the following action perimeter exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "action perimeter name: '" + row["actionperimetername"] + "' action perimeter description: '" + row[ + "actionperimeterdescription"] + "' and policies '" + row['policies'] + "'") + + policyid="" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policies'] != ""): + policyid = commonfunctions.get_policyid(row['policies']) + data = { + 'name': row["actionperimetername"], + 'description': row["actionperimeterdescription"], + + } + response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeteractionAPI, + headers=headers, + data=json.dumps(data)) + +# Step Definition Implementation: +# 1) Insert subject perimeter using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following subject perimeter') +def step_impl(context): + logger.info("When the user sets to add the following subject perimeter") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "subject perimeter name: '" + row["subjectperimetername"] + "' subject perimeter description: '" + row[ + "subjectperimeterdescription"] + + # "' and subject perimeter email:'" + row["subjectperimeteremail"] + "' and subject perimeter password '" + row['subjectperimeterpassword'] + + "' and policies '" + row['policies'] + "'") + + policyid = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policies'] != ""): + policyid = commonfunctions.get_policyid(row['policies']) + data = { + 'name': row["subjectperimetername"], + 'description': row["subjectperimeterdescription"], + # 'email': row['subjectperimeteremail'], + # 'password': row['subjectperimeterpassword'], + } + response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI, headers=headers, + data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Search for the existing subject perimeter & get its id +# 2) create the new perimeter jason and patch it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to update the following subject perimeter') +def step_impl(context): + logger.info("When the user sets to update the following subject perimeter") + model = getattr(context, "model", None) + policies_list = [] + for row in context.table: + logger.info( + "subject perimeter name: '" + row[ + 'subjectperimetername'] + "' which will be updated to subject perimeter name:'" + row[ + "updatedsubjectperimetername"] + "' subject perimeter description: '" + row[ + "updatedsubjectperimeterdescription"] + + # "' and subject perimeter email:'" + row["updatedsubjectperimeteremail"] + "' and subject perimeter password '" + row['updatedsubjectperimeterpassword'] + "' and policies '" + row['policies'] + "'") + + policyid = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policies'] != ""): + policyid=commonfunctions.get_policyid(row['policies']) + else: + policyid="" + data = { + 'name': row["updatedsubjectperimetername"], + 'description': row["updatedsubjectperimeterdescription"], + # 'email': row['subjectperimeteremail'], + # 'password': row['subjectperimeterpassword'], + } + response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys(): + if (response.json()[apis_urls.perimetersubjectAPI][ids]['name'] == row["subjectperimetername"]): + #print(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI + '/' + ids) + response = requests.patch(apis_urls.serverURL + apis_urls.perimetersubjectAPI + '/' + ids, + headers=headers,data=json.dumps(data)) + print(response) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Search for the existing subject perimeter & get its id +# 2) Delete it without having the policy id in the request +@When('the user sets to delete the following subject perimeter') +def step_impl(context): + logging.info("When the user sets to delete the following subject perimeter") + + model = getattr(context, "model", None) + for row in context.table: + headers = { + 'Content-Type': 'application/json', + } + logger.info("subject perimeter name:'" + row["subjectperimetername"] + "'") + response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys(): + if (response.json()[apis_urls.perimetersubjectAPI][ids]['name'] == row["subjectperimetername"]): + response = requests.delete(apis_urls.serverURL + apis_urls.perimetersubjectAPI + "/" + ids, + headers=apis_urls.auth_headers) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Search for the existing subject perimeter & get its id +# 2) Delete it while having the policy id in the request +@When('the user sets to delete the following subject perimeter for a given policy') +def step_impl(context): + logging.info("the user sets to delete the following subject perimeter for a given policy") + + model = getattr(context, "model", None) + for row in context.table: + headers = { + 'Content-Type': 'application/json', + } + logger.info("subject perimeter name:'" + row["subjectperimetername"] + "' and policy:"+ row["policies"]+"'") + policyid = commonfunctions.get_policyid(row['policies']) + response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys(): + if (response.json()[apis_urls.perimetersubjectAPI][ids]['name'] == row["subjectperimetername"]): + response = requests.delete(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimetersubjectAPI + "/" + ids, + headers=apis_urls.auth_headers) + logger.info(response.json()) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Insert object perimeter using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following object perimeter') +def step_impl(context): + logger.info("When the user sets to add the following object perimeter") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "object perimeter name: '" + row["objectperimetername"] + "' object perimeter description: '" + row[ + "objectperimeterdescription"] + "' and policies '" + row['policies'] + "'") + + policies_list = [] + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policies'] != ""): + policyid = commonfunctions.get_policyid(row['policies']) + else: + policyid="" + data = { + 'name': row["objectperimetername"], + 'description': row["objectperimeterdescription"], + } + response = requests.post(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeterobjectAPI, headers=headers, + data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Search for the existing object perimeter & get its id +# 2) create the new perimeter jason and patch it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to update the following object perimeter') +def step_impl(context): + logger.info("When the user sets to update the following object perimeter") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "object perimeter name: '" + row[ + 'objectperimetername'] + "' which will be updated to object perimeter name:" + row[ + "updatedobjectperimetername"] + "' object perimeter description: '" + row[ + "updatedobjectperimeterdescription"] + "' and policies '" + row['policies'] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policies'] != ""): + policyid = commonfunctions.get_policyid(row['policies']) + else: + policyid="" + data = { + 'name': row["updatedobjectperimetername"], + 'description': row["updatedobjectperimeterdescription"], + } + response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys(): + if (response.json()[apis_urls.perimeterobjectAPI][ids]['name'] == row["objectperimetername"]): + response = requests.patch(apis_urls.serverURL + apis_urls.perimeterobjectAPI + '/' + ids, + headers=headers,data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Search for the existing object perimeter & get its id +# 2) Delete it without having the policy id in the request +@When('the user sets to delete the following object perimeter') +def step_impl(context): + logging.info("When the user sets to delete the following object perimeter") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + logger.info("object perimeter name:'" + row["objectperimetername"] + "'") + + response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys(): + if (response.json()[apis_urls.perimeterobjectAPI][ids]['name'] == row["objectperimetername"]): + response = requests.delete(apis_urls.serverURL + apis_urls.perimeterobjectAPI + "/" + ids, + headers=apis_urls.auth_headers) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Search for the existing object perimeter & get its id +# 2) Delete it while having the policy id in the request +@When('the user sets to delete the following object perimeter for a given policy') +def step_impl(context): + logging.info("the user sets to delete the following object perimeter for a given policy") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + logger.info("object perimeter name:'" + row["objectperimetername"] + "' and policy:"+ row["policies"]+"'") + policyid = commonfunctions.get_policyid(row['policies']) + response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys(): + if (response.json()[apis_urls.perimeterobjectAPI][ids]['name'] == row["objectperimetername"]): + response = requests.delete(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeterobjectAPI + "/" + ids, + headers=apis_urls.auth_headers) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Insert action perimeter using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following action perimeter') +def step_impl(context): + logger.info("When the user sets to add the following action perimeter") + + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "action perimeter name: '" + row["actionperimetername"] + "' action perimeter description: '" + row[ + "actionperimeterdescription"] + "' and policies '" + row['policies'] + "'") + + policyid="" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policies'] != ""): + policyid = commonfunctions.get_policyid(row['policies']) + else: + policyid="" + data = { + 'name': row["actionperimetername"], + 'description': row["actionperimeterdescription"], + + } + response = requests.post( + apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeteractionAPI, headers=headers, + data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Search for the existing action perimeter & get its id +# 2) create the new perimeter jason and patch it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to update the following action perimeter') +def step_impl(context): + logger.info("When the user sets to update the following action perimeter") + + model = getattr(context, "model", None) + + for row in context.table: + + logger.info( + "action perimeter name: '" + row[ + 'actionperimetername'] + "' which will be updated to action perimeter name:" + row[ + "updatedactionperimetername"] + "' action perimeter description: '" + row[ + "updatedactionperimeterdescription"] + "' and policies '" + row['policies'] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (row['policies'] != ""): + policyid = commonfunctions.get_policyid(row['policies']) + else: + policyid="" + data = { + 'name': row["updatedactionperimetername"], + 'description': row["updatedactionperimeterdescription"], + } + response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys(): + if (response.json()[apis_urls.perimeteractionAPI][ids]['name'] == row["actionperimetername"]): + response = requests.patch( + apis_urls.serverURL + apis_urls.perimeteractionAPI + '/' + ids, + headers=headers,data=json.dumps(data)) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Search for the existing action perimeter & get its id +# 2) Delete it without having the policy id in the request +@When('the user sets to delete the following action perimeter') +def step_impl(context): + logging.info("When the user sets to delete the following action perimeter") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + logger.info("action perimeter name:'" + row["actionperimetername"] + "'") + response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys(): + if (response.json()[apis_urls.perimeteractionAPI][ids]['name'] == row["actionperimetername"]): + response = requests.delete(apis_urls.serverURL + apis_urls.perimeteractionAPI + "/" + ids, + headers=apis_urls.auth_headers) + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Search for the existing action perimeter & get its id +# 2) Delete it while having the policy id in the request +@When('the user sets to delete the following action perimeter for a given policy') +def step_impl(context): + logging.info("the user sets to delete the following action perimeter for a given policy") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + logger.info("action perimeter name:'" + row["actionperimetername"] + "' and policy:"+ row["policies"]+"'") + policyid = commonfunctions.get_policyid(row['policies']) + response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys(): + if (response.json()[apis_urls.perimeteractionAPI][ids]['name'] == row["actionperimetername"]): + response = requests.delete(apis_urls.serverURL + "policies/" + policyid + "/" + apis_urls.perimeteractionAPI + "/" + ids, + headers=apis_urls.auth_headers) + + if response.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing subject perimeter by get request and put them into a table +# 2) Sort the table by subject perimeter +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following subject perimeter should be existed in the system') +def step_impl(context): + logger.info("Then the following subject perimeter should be existed in the system") + + response = requests.get(apis_urls.serverURL + apis_urls.perimetersubjectAPI,headers=apis_urls.auth_headers) + apiresult = Table( + names=('subjectperimetername', 'subjectperimeterdescription', + # 'subjectperimeteremail', + # 'subjectperimeterpassword', + 'policies'), + dtype=('S100', 'S100', 'S100')) + + if len(response.json()[apis_urls.perimetersubjectAPI]) != 0: + for ids in dict(response.json()[apis_urls.perimetersubjectAPI]).keys(): + apipoliciesid = [] + apipolicies = "" + GeneralVariables.assignsubjectperimeterid['value']=ids + apisubjectperimetername = response.json()[apis_urls.perimetersubjectAPI][ids]['name'] + apisubjectperimeterdescription = response.json()[apis_urls.perimetersubjectAPI][ids]['description'] + # apisubjectperimeteremail = response.json()[apis_urls.perimetersubjectAPI][ids]['email'] + # apisubjectperimeterpassword = response.json()[apis_urls.perimetersubjectAPI][ids]['password'] + if (len(response.json()[apis_urls.perimetersubjectAPI][ids]['policy_list']) != 0): + for policies in response.json()[apis_urls.perimetersubjectAPI][ids]['policy_list']: + apipoliciesid.append(commonfunctions.get_policyname(str(policies))) + apipolicies = ",".join(apipoliciesid) + else: + apipolicies = "" + apiresult.add_row(vals=( + apisubjectperimetername, apisubjectperimeterdescription, + # apisubjectperimeteremail,# apisubjectperimeterpassword, + apipolicies)) + else: + apiresult.add_row(vals=("", "", "")) + + apiresult.sort('subjectperimetername') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected subject perimeter name: '" + str( + row1["subjectperimetername"]) + "' is the same as the actual existing '" + str( + row2["subjectperimetername"]) + "'") + assert str(row1["subjectperimetername"]) == str( + row2["subjectperimetername"]), "subject perimeter name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected subject perimeter description: '" + str( + row1["subjectperimeterdescription"]) + "' is the same as the actual existing '" + str( + row2["subjectperimeterdescription"]) + "'") + assert str(row1["subjectperimeterdescription"]) == str( + row2["subjectperimeterdescription"]), "subject perimeter description is not correct!" + logger.info("assertion passed!") + + # logger.info("asserting the expected subject perimeter email: '" + str( + # row1["subjectperimeteremail"]) + "' is the same as the actual existing '" + str( + # row2["subjectperimeteremail"]) + "'") + # assert str(row1["subjectperimeteremail"]) == str( + # row2["subjectperimeteremail"]), "subject perimeter email is not correct!" + # logger.info("assertion passed!") + # + # logger.info("asserting the expected subject perimeter password: '" + str( + # row1["subjectperimeterpassword"]) + "' is the same as the actual existing '" + str( + # row2["subjectperimeterpassword"]) + "'") + # assert str(row1["subjectperimeterpassword"]) == str( + # row2["subjectperimeterpassword"]), "subject perimeter password is not correct!" + # logger.info("assertion passed!") + + if (str(row1["policies"]).find(',') == -1): + logger.info("asserting the expected policies: '" + str( + row1["policies"]) + "' is the same as the actual existing '" + str( + row2["policies"]) + "'") + logger.info("policies is not correct!") + assert str(row1["policies"]) == str(row2["policies"]), " policies is not correct!" + else: + + logger.info("asserting the expected policies: '" + ','.join( + sorted(str(row1["policies"]).split(','), key=str.lower)) + "' is the same as the actual existing '" + + ','.join(sorted(str(row2["policies"]).split(','), key=str.lower)) + "'") + logger.info("policies is not correct!") + assert ','.join(sorted(str(row1["policies"]).split(','), key=str.lower)) == ','.join( + sorted(str(row2["policies"]).split(','), key=str.lower)), " policies is not correct!" + logger.info("assertion passed!") + +# Step Definition Implementation: +# 1) Get all the existing object perimeter by get request and put them into a table +# 2) Sort the table by subject perimeter +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following object perimeter should be existed in the system') +def step_impl(context): + logger.info("Then the following object perimeter should be existed in the system") + response = requests.get(apis_urls.serverURL + apis_urls.perimeterobjectAPI,headers=apis_urls.auth_headers) + apiresult = Table( + names=('objectperimetername', 'objectperimeterdescription', 'policies'), + dtype=('S100', 'S100', 'S100')) + if len(response.json()[apis_urls.perimeterobjectAPI]) != 0: + for ids in dict(response.json()[apis_urls.perimeterobjectAPI]).keys(): + apipolicies = "" + apipoliciesid = [] + apiobjectperimetername = response.json()[apis_urls.perimeterobjectAPI][ids]['name'] + apiobjectperimeterdescription = response.json()[apis_urls.perimeterobjectAPI][ids]['description'] + if (len(response.json()[apis_urls.perimeterobjectAPI][ids]['policy_list']) != 0): + for policies in response.json()[apis_urls.perimeterobjectAPI][ids]['policy_list']: + apipoliciesid.append(commonfunctions.get_policyname(str(policies))) + apipolicies = ",".join(apipoliciesid) + else: + apipolicies = "" + apiresult.add_row(vals=( + apiobjectperimetername, apiobjectperimeterdescription, apipolicies)) + else: + apiresult.add_row(vals=("", "", "")) + + apiresult.sort('objectperimetername') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected object perimeter name: '" + str( + row1["objectperimetername"]) + "' is the same as the actual existing '" + str( + row2["objectperimetername"]) + "'") + assert str(row1["objectperimetername"]) == str( + row2["objectperimetername"]), "object perimeter name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected object perimeter description: '" + str( + row1["objectperimeterdescription"]) + "' is the same as the actual existing '" + str( + row2["objectperimeterdescription"]) + "'") + assert str(row1["objectperimeterdescription"]) == str( + row2["objectperimeterdescription"]), "object perimeter description is not correct!" + logger.info("assertion passed!") + + if (str(row1["policies"]).find(',') == -1): + logger.info("asserting the expected policies: '" + str( + row1["policies"]) + "' is the same as the actual existing '" + str( + row2["policies"]) + "'") + logger.info("policies is not correct!") + assert str(row1["policies"]) == str(row2["policies"]), " policies is not correct!" + else: + logger.info("asserting the expected policies: '" + ','.join( + sorted(str(row1["policies"]).split(','), key=str.lower)) + "' is the same as the actual existing '" + + ','.join(sorted(str(row2["policies"]).split(','), key=str.lower)) + "'") + logger.info("policies is not correct!") + assert ','.join(sorted(str(row1["policies"]).split(','), key=str.lower)) == ','.join( + sorted(str(row2["policies"]).split(','), key=str.lower)), " policies is not correct!" + logger.info("assertion passed!") + +# Step Definition Implementation: +# 1) Get all the existing subject perimeter by get request and put them into a table +# 2) Sort the table by subject perimeter +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following action perimeter should be existed in the system') +def step_impl(context): + logger.info("Then the following action perimeter should be existed in the system") + response = requests.get(apis_urls.serverURL + apis_urls.perimeteractionAPI,headers=apis_urls.auth_headers) + apiresult = Table( + names=('actionperimetername', 'actionperimeterdescription', 'policies'), + dtype=('S100', 'S100', 'S100')) + if len(response.json()[apis_urls.perimeteractionAPI]) != 0: + for ids in dict(response.json()[apis_urls.perimeteractionAPI]).keys(): + apipolicies = "" + apipoliciesid = [] + apiactionperimetername = response.json()[apis_urls.perimeteractionAPI][ids]['name'] + apiactionperimeterdescription = response.json()[apis_urls.perimeteractionAPI][ids]['description'] + if (len(response.json()[apis_urls.perimeteractionAPI][ids]['policy_list']) != 0): + for policies in response.json()[apis_urls.perimeteractionAPI][ids]['policy_list']: + apipoliciesid.append(commonfunctions.get_policyname(str(policies))) + apipolicies = ",".join(apipoliciesid) + else: + apipolicies = "" + apiresult.add_row(vals=( + apiactionperimetername, apiactionperimeterdescription, apipolicies)) + else: + apiresult.add_row(vals=("", "", "")) + + apiresult.sort('actionperimetername') + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected action perimeter name: '" + str( + row1["actionperimetername"]) + "' is the same as the actual existing '" + str( + row2["actionperimetername"]) + "'") + assert str(row1["actionperimetername"]) == str( + row2["actionperimetername"]), "action perimeter name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected action perimeter description: '" + str( + row1["actionperimeterdescription"]) + "' is the same as the actual existing '" + str( + row2["actionperimeterdescription"]) + "'") + assert str(row1["actionperimeterdescription"]) == str( + row2["actionperimeterdescription"]), "action perimeter description is not correct!" + logger.info("assertion passed!") + + if(str(row1["policies"]).find(',')==-1): + logger.info("asserting the expected policies: '" + str( + row1["policies"]) + "' is the same as the actual existing '" + str( + row2["policies"]) + "'") + logger.info("policies is not correct!") + assert str(row1["policies"]) == str(row2["policies"]), " policies is not correct!" + else: + + logger.info("asserting the expected policies: '" + ','.join(sorted(str(row1["policies"]).split(','),key=str.lower)) + "' is the same as the actual existing '" + + ','.join(sorted(str(row2["policies"]).split(','), key=str.lower)) + "'") + logger.info("policies is not correct!") + assert ','.join(sorted(str(row1["policies"]).split(','),key=str.lower)) == ','.join(sorted(str(row2["policies"]).split(','),key=str.lower)), " policies is not correct!" + logger.info("assertion passed!") diff --git a/moon_manager/tests/func_tests/features/steps/policy.py b/moon_manager/tests/func_tests/features/steps/policy.py new file mode 100644 index 00000000..faa7156a --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/policy.py @@ -0,0 +1,219 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table, Column +from common_functions import * +import requests +import json +import logging + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: +# 1) Get all the existing policies in the system +# 2) Loop by id and delete them +@Given('the system has no policies') +def step_impl(context): + logger.info("Given the system has no policies") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + response = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.policyAPI]) != 0: + for ids in dict(response.json()[apis_urls.policyAPI]).keys(): + response = requests.delete(apis_urls.serverURL + apis_urls.policyAPI + "/" + ids, + headers=headers) + + +# Step Definition Implementation: +# 1) Get model id by calling the common funtion: get_modelid +# 2) create the policy data jason then post it +@Given('the following policy exists') +def step_impl(context): + logger.info("Given the following policy exists") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "policy name: '" + row["policyname"] + "' policy description: '" + row[ + "policydescription"] + "' and model name:'" + row[ + "modelname"] + "' and genre '"+row['genre']+"'") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + data = { + 'name': row["policyname"], + 'description': row["policydescription"], + 'model_id': commonfunctions.get_modelid(row['modelname']), + 'genre': row['genre'] + } + response = requests.post(apis_urls.serverURL + apis_urls.policyAPI, headers=headers, + data=json.dumps(data)) + + +# Step Definition Implementation: +# 1) Get model id by calling the common funtion: get_modelid +# 2) create the policy data jason then post it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following policy') +def step_impl(context): + logger.info("When the user sets to add the following policy") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "policy name: '" + row["policyname"] + "' policy description: '" + row[ + "policydescription"] + "' and model name:'" + row[ + "modelname"] + "' and genre '" + row['genre'] + "'") + policymodel = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['modelname']) > 20): + policymodel=row['modelname'] + else: + policymodel=commonfunctions.get_modelid(row['modelname']) + + data = { + 'name': row["policyname"], + 'description': row["policydescription"], + 'model_id': policymodel, + 'genre': row['genre'] + } + response = requests.post(apis_urls.serverURL + apis_urls.policyAPI, headers=headers, + data=json.dumps(data)) + + if response.status_code==200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + + +# Step Definition Implementation: +# 1) Get model id by calling the common funtion: get_modelid +# 2) create the policy jason then patch the policy after searching for it's id. +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to update the following policy') +def step_impl(context): + logger.info("When the user sets to update the following policy") + model = getattr(context, "model", None) + for row in context.table: + logger.info( + "policy name: '" + row["policyname"] + "' which will be updated to policy name:" + row[ + "updatedpolicyname"] + "' and policy description: '" + row[ + "updatedpolicydescription"] + "' model name: '" + row["updatedmodelname"] + "' and genre: '"+row["updatedgenre"]+"'") + policymodel = "" + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['updatedmodelname']) > 20): + policymodel = row['updatedmodelname'] + else: + policymodel = commonfunctions.get_modelid(row['updatedmodelname']) + + data = { + 'name': row["updatedpolicyname"], + 'description': row["updatedpolicydescription"], + 'model_id': policymodel, + 'genre': row['updatedgenre'] + } + response = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.policyAPI]).keys(): + if (response.json()[apis_urls.policyAPI][ids]['name'] == row["policyname"]): + print(apis_urls.serverURL + apis_urls.policyAPI + '/' + ids) + response = requests.patch(apis_urls.serverURL + apis_urls.policyAPI + '/' + ids, headers=headers, + data=json.dumps(data)) + logger.info(response.json()) + logger.info(response.status_code) + break + if response.status_code==200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the policy by get request +# 2) Loop by ids and search for the matching policy by name and delete it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following policy') +def step_impl(context): + logger.info("When the user sets to delete the following policy") + + model = getattr(context, "model", None) + for row in context.table: + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + logger.info("policy name:'" +row["policyname"]+"'") + response = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers) + for ids in dict(response.json()[apis_urls.policyAPI]).keys(): + if (response.json()[apis_urls.policyAPI][ids]['name'] == row["policyname"]): + GeneralVariables.assignpolicyid['value']=ids + response = requests.delete(apis_urls.serverURL + apis_urls.policyAPI + "/" + ids, + headers=headers) + break + + if response.status_code==200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing policies by get request and put them into a table +# 2) Sort the table by policy name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following policy should be existed in the system') +def step_impl(context): + logger.info("Then the following policy should be existed in the system") + response = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers) + #print(response) + apiresult = Table( + names=('policyname', 'policydescription', 'modelname','genre'), + dtype=('S100', 'S100', 'S100','S100')) + if len(response.json()[apis_urls.policyAPI]) != 0: + for ids in dict(response.json()[apis_urls.policyAPI]).keys(): + apipolicyname = response.json()[apis_urls.policyAPI][ids]['name'] + apipolicydescription = response.json()[apis_urls.policyAPI][ids]['description'] + apipolicymodel = commonfunctions.get_modelname(response.json()[apis_urls.policyAPI][ids]['model_id']) + apipolicygenre=response.json()[apis_urls.policyAPI][ids]['genre'] + + apiresult.add_row(vals=( + apipolicyname, apipolicydescription, apipolicymodel,apipolicygenre)) + + else: + apiresult.add_row(vals=("", "", "","")) + + apiresult.sort('policyname') + + for row1, row2 in zip(context.table, apiresult): + logger.info("asserting the expected policy name: '" + str( + row1["policyname"]) + "' is the same as the actual existing '" + str( + row2["policyname"]) + "'") + assert str(row1["policyname"]) == str(row2["policyname"]), "policy name is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected policy description: '" + str( + row1["policydescription"]) + "' is the same as the actual existing '" + str( + row2["policydescription"]) + "'") + assert str(row1["policydescription"]) == str(row2["policydescription"]), "policy description is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected genre: '" + str( + row1["genre"]) + "' is the same as the actual existing '" + str( + row2["genre"]) + "'") + assert str(row1["genre"]) == str(row2["genre"]), "genre is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected model name: '" + str( + row1["modelname"]) + "' is the same as the actual existing '" + str( + row2["modelname"]) + "'") + assert str(row1["modelname"]) == str(row2["modelname"]), "model name is not correct!" + logger.info("assertion passed!") \ No newline at end of file diff --git a/moon_manager/tests/func_tests/features/steps/rules.py b/moon_manager/tests/func_tests/features/steps/rules.py new file mode 100644 index 00000000..4dd85e2c --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/rules.py @@ -0,0 +1,495 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table, Column +from common_functions import * +import numpy as np +import requests +import json +import logging + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: +# 1) Get all the existing rules by the policy id +# 2) Loop by assignment id and delete it +@Given('the system has no rules') +def step_impl(context): + logger.info("Given the system has no rules") + + response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers) + #logger.info(response_policies.json()) + if len(response_policies.json()[apis_urls.policyAPI]) != 0: + apiruleid = [] + for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys(): + response = requests.get( + apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.rulesAPI + "/", headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.rulesAPI]['rules']) != 0: + for ids in range(len(response.json()[apis_urls.rulesAPI]['rules'])): + apiruleid.append(dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['id']) + for ruleid in apiruleid: + response = requests.delete( + apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.rulesAPI + "/" + ruleid, headers=apis_urls.auth_headers) + +# Step Definition Implementation: +# 1) Add rule using the post request +@Given('the following rule exists') +def step_impl(context): + logger.info("Given the following rule exists") + api_responseflag = {'value': False} + model = getattr(context, "model", None) + for row in context.table: + subjectcategoryidslist = [] + subjectdataidslist = [] + objectcategoryidslist = [] + objectdataidslist = [] + actioncategoryidslist = [] + actiondataidslist = [] + ruleidslist = [] + metaruleids = "" + subjectindex = 0 + objectindex = 0 + actionindex = 0 + logger.info( + "rule '" + row["rule"] + "' and metarule name:'" + row[ + "metarulename"] + "' and instructions: '" + row[ + "instructions"] + "' and policyname:'" + row[ + "policyname"] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + ruleparameter = row["rule"].split(",") + metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers) + for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys(): + if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == row["metarulename"]): + meta_rule_id = metaruleids + subjectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['subject_categories'] + objectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['object_categories'] + actioncategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['action_categories'] + break + + index = 0 + for categoryid in subjectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.datasubjectAPI]) != 0: + for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']: + if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + for categoryid in objectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataobjectAPI]) != 0: + for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']: + if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + for categoryid in actioncategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataactionAPI]) != 0: + for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']: + if (data_response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + data = { + 'meta_rule_id': meta_rule_id, + 'rule': ruleidslist, + 'instructions': [{"decision": row['instructions']}], + 'enabled': 'True' + } + rulesresponse = requests.post(apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.rulesAPI, + headers=headers, + data=json.dumps(data)) + + +# Step Definition Implementation: +# 1) Add subject meta data using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following rules') +def step_impl(context): + logger.info("When the user sets to add the following rules") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + api_responseflag = {'value': False} + model = getattr(context, "model", None) + for row in context.table: + subjectcategoryidslist = [] + subjectdataidslist = [] + objectcategoryidslist = [] + objectdataidslist = [] + actioncategoryidslist = [] + actiondataidslist = [] + ruleidslist = [] + metaruleids = "" + subjectindex = 0 + objectindex = 0 + actionindex = 0 + logger.info( + "rule '" + row["rule"] + "' and metarule name:'" + row[ + "metarulename"] + "' and instructions: '" + row[ + "instructions"] + "' and policyname:'" + row[ + "policyname"] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if(row['policyname']=="" or row['policyname']=="000000000000000000000000000000000000000000000000000"): + policyname="Stanford Policy" + else: + policyname=row['policyname'] + policies_id = commonfunctions.get_policyid(policyname) + + if(row["metarulename"]=="" or row["metarulename"]=="000000000000000000000000000000000000000000000000000"): + mata_rule_name="metarule1" + else: + mata_rule_name = row['metarulename'] + + + if (row["rule"] != ""): + ruleparameter = row["rule"].split(",") + metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers) + for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys(): + if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == mata_rule_name): + meta_rule_id = metaruleids + subjectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['subject_categories'] + objectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['object_categories'] + actioncategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['action_categories'] + break + + index = 0 + for categoryid in subjectcategorieslist: + if (index < len(ruleparameter)): + if (len(ruleparameter[index]) < 30): + if (ruleparameter[index] != ""): + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.datasubjectAPI]) != 0: + for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']: + if (index < len(ruleparameter)): + if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)][ + 'name'] == + ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + else: + break + else: + ruleidslist.append("") + index = index + 1 + else: + ruleidslist.append(ruleparameter[index]) + index = index + 1 + for categoryid in objectcategorieslist: + if (index < len(ruleparameter)): + if (len(ruleparameter[index]) < 30): + if (ruleparameter[index] != ""): + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataobjectAPI]) != 0: + for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']: + if (index < len(ruleparameter)): + if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)][ + 'name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + else: + break + else: + ruleidslist.append("") + index = index + 1 + else: + ruleidslist.append(ruleparameter[index]) + index = index + 1 + + for categoryid in actioncategorieslist: + if (index < len(ruleparameter)): + if (len(ruleparameter[index]) < 30): + if (ruleparameter[index] != ""): + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataactionAPI]) != 0: + for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']: + if (index < len(ruleparameter)): + if (data_response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)][ + 'name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + else: + break + else: + ruleidslist.append("") + index = index + 1 + else: + ruleidslist.append(ruleparameter[index]) + index = index + 1 + if(row["metarulename"]=="" or row["metarulename"] == "000000000000000000000000000000000000000000000000000"): + meta_rule_id=row["metarulename"] + if (row["policyname"] == "" or row["policyname"] == "000000000000000000000000000000000000000000000000000"): + policies_id = row["policyname"] + data = { + 'meta_rule_id': meta_rule_id, + 'rule': ruleidslist, + 'instructions': [{"decision": row['instructions']}], + 'enabled': 'True' + } + else: + + data = { + 'meta_rule_id': commonfunctions.get_metaruleid(mata_rule_name), + 'rule': [], + 'instructions': [{"decision": row['instructions']}], + 'enabled': 'True' + } + rulesresponse = requests.post(apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.rulesAPI, + headers=headers, + data=json.dumps(data)) + logger.info(rulesresponse.json()) + if rulesresponse.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the meta rule by get request +# 2) Loop by ids and search for the matching meta rule by name and delete it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following rules') +def step_impl(context): + logger.info("When the user sets to delete the following rules") + for row in context.table: + subjectcategoryidslist = [] + subjectdataidslist = [] + objectcategoryidslist = [] + objectdataidslist = [] + actioncategoryidslist = [] + actiondataidslist = [] + ruleidslist = [] + metaruleids = "" + subjectindex = 0 + objectindex = 0 + actionindex = 0 + logger.info( + "rule '" + row["rule"] + "' and metarule name:'" + row[ + "metarulename"] + "' and policyname:'" + row[ + "policyname"] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + ruleparameter = row["rule"].split(",") + metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers) + for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys(): + if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == row["metarulename"]): + meta_rule_id = metaruleids + subjectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['subject_categories'] + objectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['object_categories'] + actioncategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['action_categories'] + break + + index = 0 + for categoryid in subjectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.datasubjectAPI]) != 0: + for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']: + if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + for categoryid in objectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataobjectAPI]) != 0: + for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']: + if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + for categoryid in actioncategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataactionAPI]) != 0: + for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']: + if (data_response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + rulesresponse = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.rulesAPI + "/", headers=apis_urls.auth_headers) + + if len(rulesresponse.json()[apis_urls.rulesAPI]) != 0: + for ids in range(len(rulesresponse.json()[apis_urls.rulesAPI]['rules'])): + if (dict(rulesresponse.json()[apis_urls.rulesAPI]['rules'][ids])[ + 'rule'] == ruleidslist): + ruleid = dict(rulesresponse.json()[apis_urls.rulesAPI]['rules'][ids])['id'] + rulesresponse = requests.delete( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.rulesAPI + "/" + ruleid,headers=apis_urls.auth_headers) + + if rulesresponse.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing rules per a given policy, metarule using get request and put them into a table +# 2) Sort the table by policy name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following rules should be existed in the system') +def step_impl(context): + logger.info("Then the following rule should be existed in the system") + model = getattr(context, "model", None) + apiresult = Table(names=('rule', 'metarule', 'instructions', 'policyname'), + dtype=('S1000', 'S100', 'S100', 'S100')) + + expectedresult = Table(names=('rule', 'metarule', 'instructions', 'policyname'), + dtype=('S1000', 'S100', 'S100', 'S100')) + + for row in context.table: + ruleidslist = [] + apirule = [] + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + ruleparameter = row["rule"].split(",") + metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers) + for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys(): + if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == row["metarulename"]): + meta_rule_id = metaruleids + subjectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['subject_categories'] + objectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['object_categories'] + actioncategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['action_categories'] + + index = 0 + for categoryid in subjectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.datasubjectAPI]) != 0: + for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']: + if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][ids]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + for categoryid in objectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataobjectAPI]) != 0: + for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']: + if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][ids]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + for categoryid in actioncategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataactionAPI]) != 0: + for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']: + if (data_response.json()[apis_urls.dataactionAPI][0]['data'][ids]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + expectedresult.add_row(vals=(','.join(ruleidslist), meta_rule_id, row['instructions'], policies_id)) + + if (row['policyname'] != ""): + apipolicyid = commonfunctions.get_policyid( + row['policyname']) + response = requests.get( + apis_urls.serverURL + "policies/" + commonfunctions.get_policyid( + row['policyname']) + "/" + apis_urls.rulesAPI + "/", headers=apis_urls.auth_headers) + + if len(response.json()[apis_urls.rulesAPI]) != 0: + for ids in range(len(response.json()[apis_urls.rulesAPI]['rules'])): + if (dict(response.json()[apis_urls.rulesAPI]['rules'][ids])[ + 'meta_rule_id'] == commonfunctions.get_metaruleid(row['metarulename'])): + apirule = dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['rule'] + #logger.info(dict(dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['instructions'][0])['decision']) + apiinstructions = dict(dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['instructions'][0])['decision'] + apimetaruleid = dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['meta_rule_id'] + apiresult.add_row(vals=(','.join(apirule), apimetaruleid, apiinstructions, apipolicyid)) + + else: + apiresult.add_row(vals=("", "", "", "")) + + else: + apiresult.add_row(vals=("", "", "", "")) + + apiresult.sort('policyname') + expectedresult.sort('policyname') + for row1, row2 in zip(expectedresult, apiresult): + logger.info("asserting the expected rule: '" + str( + row1["rule"]) + "' is the same as the actual existing '" + str( + row2["rule"]) + "'") + assert str(row1["rule"]) == str(row2["rule"]), "rule is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected instructions: '" + str( + row1["instructions"]) + "' is the same as the actual existing '" + str( + row2["instructions"]) + "'") + assert str(row1["instructions"]) == str(row2["instructions"]), "instructions is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected metarule: '" + str( + row1["metarule"]) + "' is the same as the actual existing '" + str( + row2["metarule"]) + "'") + assert str(row1["metarule"]) == str(row2["metarule"]), "metarule is not correct!" + logger.info("assertion passed!") -- cgit 1.2.3-korg