diff options
Diffstat (limited to 'moon_manager/tests/func_tests/features/steps/rules.py')
-rw-r--r-- | moon_manager/tests/func_tests/features/steps/rules.py | 495 |
1 files changed, 495 insertions, 0 deletions
diff --git a/moon_manager/tests/func_tests/features/steps/rules.py b/moon_manager/tests/func_tests/features/steps/rules.py new file mode 100644 index 00000000..4dd85e2c --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/rules.py @@ -0,0 +1,495 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from astropy.table import Table, Column +from common_functions import * +import numpy as np +import requests +import json +import logging + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: +# 1) Get all the existing rules by the policy id +# 2) Loop by assignment id and delete it +@Given('the system has no rules') +def step_impl(context): + logger.info("Given the system has no rules") + + response_policies = requests.get(apis_urls.serverURL + apis_urls.policyAPI, headers=apis_urls.auth_headers) + #logger.info(response_policies.json()) + if len(response_policies.json()[apis_urls.policyAPI]) != 0: + apiruleid = [] + for policies_ids in dict(response_policies.json()[apis_urls.policyAPI]).keys(): + response = requests.get( + apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.rulesAPI + "/", headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.rulesAPI]['rules']) != 0: + for ids in range(len(response.json()[apis_urls.rulesAPI]['rules'])): + apiruleid.append(dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['id']) + for ruleid in apiruleid: + response = requests.delete( + apis_urls.serverURL + "policies/" + policies_ids + "/" + apis_urls.rulesAPI + "/" + ruleid, headers=apis_urls.auth_headers) + +# Step Definition Implementation: +# 1) Add rule using the post request +@Given('the following rule exists') +def step_impl(context): + logger.info("Given the following rule exists") + api_responseflag = {'value': False} + model = getattr(context, "model", None) + for row in context.table: + subjectcategoryidslist = [] + subjectdataidslist = [] + objectcategoryidslist = [] + objectdataidslist = [] + actioncategoryidslist = [] + actiondataidslist = [] + ruleidslist = [] + metaruleids = "" + subjectindex = 0 + objectindex = 0 + actionindex = 0 + logger.info( + "rule '" + row["rule"] + "' and metarule name:'" + row[ + "metarulename"] + "' and instructions: '" + row[ + "instructions"] + "' and policyname:'" + row[ + "policyname"] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + ruleparameter = row["rule"].split(",") + metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers) + for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys(): + if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == row["metarulename"]): + meta_rule_id = metaruleids + subjectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['subject_categories'] + objectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['object_categories'] + actioncategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['action_categories'] + break + + index = 0 + for categoryid in subjectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.datasubjectAPI]) != 0: + for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']: + if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + for categoryid in objectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataobjectAPI]) != 0: + for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']: + if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + for categoryid in actioncategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataactionAPI]) != 0: + for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']: + if (data_response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + data = { + 'meta_rule_id': meta_rule_id, + 'rule': ruleidslist, + 'instructions': [{"decision": row['instructions']}], + 'enabled': 'True' + } + rulesresponse = requests.post(apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.rulesAPI, + headers=headers, + data=json.dumps(data)) + + +# Step Definition Implementation: +# 1) Add subject meta data using the post request +# 2) If the request code was 200 set the api response flag to true else false +@When('the user sets to add the following rules') +def step_impl(context): + logger.info("When the user sets to add the following rules") + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + api_responseflag = {'value': False} + model = getattr(context, "model", None) + for row in context.table: + subjectcategoryidslist = [] + subjectdataidslist = [] + objectcategoryidslist = [] + objectdataidslist = [] + actioncategoryidslist = [] + actiondataidslist = [] + ruleidslist = [] + metaruleids = "" + subjectindex = 0 + objectindex = 0 + actionindex = 0 + logger.info( + "rule '" + row["rule"] + "' and metarule name:'" + row[ + "metarulename"] + "' and instructions: '" + row[ + "instructions"] + "' and policyname:'" + row[ + "policyname"] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if(row['policyname']=="" or row['policyname']=="000000000000000000000000000000000000000000000000000"): + policyname="Stanford Policy" + else: + policyname=row['policyname'] + policies_id = commonfunctions.get_policyid(policyname) + + if(row["metarulename"]=="" or row["metarulename"]=="000000000000000000000000000000000000000000000000000"): + mata_rule_name="metarule1" + else: + mata_rule_name = row['metarulename'] + + + if (row["rule"] != ""): + ruleparameter = row["rule"].split(",") + metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers) + for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys(): + if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == mata_rule_name): + meta_rule_id = metaruleids + subjectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['subject_categories'] + objectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['object_categories'] + actioncategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['action_categories'] + break + + index = 0 + for categoryid in subjectcategorieslist: + if (index < len(ruleparameter)): + if (len(ruleparameter[index]) < 30): + if (ruleparameter[index] != ""): + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.datasubjectAPI]) != 0: + for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']: + if (index < len(ruleparameter)): + if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)][ + 'name'] == + ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + else: + break + else: + ruleidslist.append("") + index = index + 1 + else: + ruleidslist.append(ruleparameter[index]) + index = index + 1 + for categoryid in objectcategorieslist: + if (index < len(ruleparameter)): + if (len(ruleparameter[index]) < 30): + if (ruleparameter[index] != ""): + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataobjectAPI]) != 0: + for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']: + if (index < len(ruleparameter)): + if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)][ + 'name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + else: + break + else: + ruleidslist.append("") + index = index + 1 + else: + ruleidslist.append(ruleparameter[index]) + index = index + 1 + + for categoryid in actioncategorieslist: + if (index < len(ruleparameter)): + if (len(ruleparameter[index]) < 30): + if (ruleparameter[index] != ""): + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataactionAPI]) != 0: + for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']: + if (index < len(ruleparameter)): + if (data_response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)][ + 'name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + else: + break + else: + ruleidslist.append("") + index = index + 1 + else: + ruleidslist.append(ruleparameter[index]) + index = index + 1 + if(row["metarulename"]=="" or row["metarulename"] == "000000000000000000000000000000000000000000000000000"): + meta_rule_id=row["metarulename"] + if (row["policyname"] == "" or row["policyname"] == "000000000000000000000000000000000000000000000000000"): + policies_id = row["policyname"] + data = { + 'meta_rule_id': meta_rule_id, + 'rule': ruleidslist, + 'instructions': [{"decision": row['instructions']}], + 'enabled': 'True' + } + else: + + data = { + 'meta_rule_id': commonfunctions.get_metaruleid(mata_rule_name), + 'rule': [], + 'instructions': [{"decision": row['instructions']}], + 'enabled': 'True' + } + rulesresponse = requests.post(apis_urls.serverURL + "policies/" + str(policies_id) + "/" + apis_urls.rulesAPI, + headers=headers, + data=json.dumps(data)) + logger.info(rulesresponse.json()) + if rulesresponse.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the meta rule by get request +# 2) Loop by ids and search for the matching meta rule by name and delete it +# 3) If the request code was 200 set the api response flag to true else false +@When('the user sets to delete the following rules') +def step_impl(context): + logger.info("When the user sets to delete the following rules") + for row in context.table: + subjectcategoryidslist = [] + subjectdataidslist = [] + objectcategoryidslist = [] + objectdataidslist = [] + actioncategoryidslist = [] + actiondataidslist = [] + ruleidslist = [] + metaruleids = "" + subjectindex = 0 + objectindex = 0 + actionindex = 0 + logger.info( + "rule '" + row["rule"] + "' and metarule name:'" + row[ + "metarulename"] + "' and policyname:'" + row[ + "policyname"] + "'") + + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + ruleparameter = row["rule"].split(",") + metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers) + for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys(): + if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == row["metarulename"]): + meta_rule_id = metaruleids + subjectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['subject_categories'] + objectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['object_categories'] + actioncategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['action_categories'] + break + + index = 0 + for categoryid in subjectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.datasubjectAPI]) != 0: + for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']: + if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + for categoryid in objectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataobjectAPI]) != 0: + for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']: + if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + for categoryid in actioncategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataactionAPI]) != 0: + for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']: + if (data_response.json()[apis_urls.dataactionAPI][0]['data'][str(ids)]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + rulesresponse = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.rulesAPI + "/", headers=apis_urls.auth_headers) + + if len(rulesresponse.json()[apis_urls.rulesAPI]) != 0: + for ids in range(len(rulesresponse.json()[apis_urls.rulesAPI]['rules'])): + if (dict(rulesresponse.json()[apis_urls.rulesAPI]['rules'][ids])[ + 'rule'] == ruleidslist): + ruleid = dict(rulesresponse.json()[apis_urls.rulesAPI]['rules'][ids])['id'] + rulesresponse = requests.delete( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.rulesAPI + "/" + ruleid,headers=apis_urls.auth_headers) + + if rulesresponse.status_code == 200: + GeneralVariables.api_responseflag['value'] = 'True' + else: + GeneralVariables.api_responseflag['value'] = 'False' + +# Step Definition Implementation: +# 1) Get all the existing rules per a given policy, metarule using get request and put them into a table +# 2) Sort the table by policy name +# 3) Loop using both the expected and actual tables and assert the data row by row +@Then('the following rules should be existed in the system') +def step_impl(context): + logger.info("Then the following rule should be existed in the system") + model = getattr(context, "model", None) + apiresult = Table(names=('rule', 'metarule', 'instructions', 'policyname'), + dtype=('S1000', 'S100', 'S100', 'S100')) + + expectedresult = Table(names=('rule', 'metarule', 'instructions', 'policyname'), + dtype=('S1000', 'S100', 'S100', 'S100')) + + for row in context.table: + ruleidslist = [] + apirule = [] + if (len(row['policyname']) > 25): + policies_id = row['policyname'] + else: + policies_id = commonfunctions.get_policyid(row['policyname']) + + ruleparameter = row["rule"].split(",") + metarules_response = requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers) + for metaruleids in dict(metarules_response.json()[apis_urls.metarulesAPI]).keys(): + if (metarules_response.json()[apis_urls.metarulesAPI][metaruleids]['name'] == row["metarulename"]): + meta_rule_id = metaruleids + subjectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['subject_categories'] + objectcategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['object_categories'] + actioncategorieslist = \ + requests.get(apis_urls.serverURL + apis_urls.metarulesAPI, headers=apis_urls.auth_headers).json()[apis_urls.metarulesAPI][ + metaruleids]['action_categories'] + + index = 0 + for categoryid in subjectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.datasubjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.datasubjectAPI]) != 0: + for ids in data_response.json()[apis_urls.datasubjectAPI][0]['data']: + if (data_response.json()[apis_urls.datasubjectAPI][0]['data'][ids]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + for categoryid in objectcategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataobjectAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataobjectAPI]) != 0: + for ids in data_response.json()[apis_urls.dataobjectAPI][0]['data']: + if (data_response.json()[apis_urls.dataobjectAPI][0]['data'][ids]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + + for categoryid in actioncategorieslist: + data_response = requests.get( + apis_urls.serverURL + "policies/" + policies_id + "/" + apis_urls.dataactionAPI + "/" + categoryid, headers=apis_urls.auth_headers) + if len(data_response.json()[apis_urls.dataactionAPI]) != 0: + for ids in data_response.json()[apis_urls.dataactionAPI][0]['data']: + if (data_response.json()[apis_urls.dataactionAPI][0]['data'][ids]['name'] == ruleparameter[ + index]): + ruleidslist.append(ids) + index = index + 1 + expectedresult.add_row(vals=(','.join(ruleidslist), meta_rule_id, row['instructions'], policies_id)) + + if (row['policyname'] != ""): + apipolicyid = commonfunctions.get_policyid( + row['policyname']) + response = requests.get( + apis_urls.serverURL + "policies/" + commonfunctions.get_policyid( + row['policyname']) + "/" + apis_urls.rulesAPI + "/", headers=apis_urls.auth_headers) + + if len(response.json()[apis_urls.rulesAPI]) != 0: + for ids in range(len(response.json()[apis_urls.rulesAPI]['rules'])): + if (dict(response.json()[apis_urls.rulesAPI]['rules'][ids])[ + 'meta_rule_id'] == commonfunctions.get_metaruleid(row['metarulename'])): + apirule = dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['rule'] + #logger.info(dict(dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['instructions'][0])['decision']) + apiinstructions = dict(dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['instructions'][0])['decision'] + apimetaruleid = dict(response.json()[apis_urls.rulesAPI]['rules'][ids])['meta_rule_id'] + apiresult.add_row(vals=(','.join(apirule), apimetaruleid, apiinstructions, apipolicyid)) + + else: + apiresult.add_row(vals=("", "", "", "")) + + else: + apiresult.add_row(vals=("", "", "", "")) + + apiresult.sort('policyname') + expectedresult.sort('policyname') + for row1, row2 in zip(expectedresult, apiresult): + logger.info("asserting the expected rule: '" + str( + row1["rule"]) + "' is the same as the actual existing '" + str( + row2["rule"]) + "'") + assert str(row1["rule"]) == str(row2["rule"]), "rule is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected instructions: '" + str( + row1["instructions"]) + "' is the same as the actual existing '" + str( + row2["instructions"]) + "'") + assert str(row1["instructions"]) == str(row2["instructions"]), "instructions is not correct!" + logger.info("assertion passed!") + + logger.info("asserting the expected metarule: '" + str( + row1["metarule"]) + "' is the same as the actual existing '" + str( + row2["metarule"]) + "'") + assert str(row1["metarule"]) == str(row2["metarule"]), "metarule is not correct!" + logger.info("assertion passed!") |