From 7bb53c64da2dcf88894bfd31503accdd81498f3d Mon Sep 17 00:00:00 2001 From: Thomas Duval Date: Wed, 3 Jun 2020 10:06:52 +0200 Subject: Update to new version 5.4 Signed-off-by: Thomas Duval Change-Id: Idcd868133d75928a1ffd74d749ce98503e0555ea --- .../func_tests/features/steps/common_functions.py | 279 +++++++++++++++++++++ 1 file changed, 279 insertions(+) create mode 100644 moon_manager/tests/func_tests/features/steps/common_functions.py (limited to 'moon_manager/tests/func_tests/features/steps/common_functions.py') diff --git a/moon_manager/tests/func_tests/features/steps/common_functions.py b/moon_manager/tests/func_tests/features/steps/common_functions.py new file mode 100644 index 00000000..b9b9f0bc --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/common_functions.py @@ -0,0 +1,279 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from steps.Static_Variables import GeneralVariables +import requests +import json +import logging + + +logger = logging.getLogger(__name__) + +class commonfunctions: + apis_urls = GeneralVariables() + + def get_subjectcategoryid(self, subjectcategoryname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadatasubjectcategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadatasubjectcategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadatasubjectcategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['name'] == subjectcategoryname): + return response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['id'] + + def get_objectcategoryid(self, objectcategoryname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataobjectcategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadataobjectcategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadataobjectcategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['name'] == objectcategoryname): + return response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['id'] + + def get_actioncategoryid(self, actioncategoryname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataactioncategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadataactioncategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadataactioncategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['name'] == actioncategoryname): + return response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['id'] + + def get_metaruleid(self, metarulename): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metarulesAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metarulesAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metarulesAPI]).keys(): + if (response.json()[self.apis_urls.metarulesAPI][ids]['name'] == metarulename): + return ids + + def get_modelid(self, modelname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.modelAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.modelAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.modelAPI]).keys(): + if (response.json()[self.apis_urls.modelAPI][ids]['name'] == modelname): + return ids + + def get_policyid(self, policyname): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.policyAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.policyAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.policyAPI]).keys(): + if (response.json()[self.apis_urls.policyAPI][ids]['name'] == policyname): + return ids + + def get_subjectperimeterid(self,subjectperimeter ): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimetersubjectAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimetersubjectAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.perimetersubjectAPI]).keys(): + if (response.json()[self.apis_urls.perimetersubjectAPI][ids]['name'] == subjectperimeter): + return ids + + def get_objectperimeterid(self,objectperimeter ): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeterobjectAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimeterobjectAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.perimeterobjectAPI]).keys(): + if (response.json()[self.apis_urls.perimeterobjectAPI][ids]['name'] == objectperimeter): + return ids + + def get_actionperimeterid(self, actionperimeter): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeteractionAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimeteractionAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.perimeteractionAPI]).keys(): + if (response.json()[self.apis_urls.perimeteractionAPI][ids]['name'] == actionperimeter): + return ids + + def get_subjectdataid(self,subjectdataname,subjectcategoryid,policyid ): + response_data = requests.get( + self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.datasubjectAPI + "/" + subjectcategoryid,headers=self.apis_urls.auth_headers) + if(len(response_data.json()[self.apis_urls.datasubjectAPI]))!=0: + subjectdataidslist = [] + matcheddataidslist = [] + dataids=response_data.json()[self.apis_urls.datasubjectAPI][0]['data'] + for ids in dataids: + apisubjectdataid = response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(ids)]['id'] + subjectdataidslist.append(apisubjectdataid) + + if ((str(subjectdataname)).find(",") != -1): + datanameslist = subjectdataname.split(",") + for dataname in datanameslist: + for data_id in subjectdataidslist: + if ((response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)][ + 'name']) == dataname): + matcheddataidslist.append(data_id) + return ",".join(matcheddataidslist) + else: + for data_id in subjectdataidslist: + if (( + response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)]['name']) == subjectdataname): + return data_id + + def get_objectdataid(self,objectdataname,objectcategoryid,policyid ): + response_data = requests.get( + self.apis_urls.serverURL + self.apis_urls.policyAPI + "/" + policyid + "/" + self.apis_urls.dataobjectAPI + "/" + objectcategoryid,headers=self.apis_urls.auth_headers) + if (len(response_data.json()[self.apis_urls.dataobjectAPI])) != 0: + objectdataidslist = [] + matcheddataidslist=[] + for ids in response_data.json()[ self.apis_urls.dataobjectAPI][0]['data']: + apiobjectdataid = response_data.json()[ self.apis_urls.dataobjectAPI][0]['data'][str(ids)]['id'] + objectdataidslist.append(apiobjectdataid) + if ((str(objectdataname)).find(",") != -1): + datanameslist = objectdataname.split(",") + for dataname in datanameslist: + for data_id in objectdataidslist: + if ((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']) == dataname): + matcheddataidslist.append(data_id) + return ",".join(matcheddataidslist) + + else: + for data_id in objectdataidslist: + if ((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']) == objectdataname): + return data_id + + def get_actiondataid(self,actiondataname,actioncategoryid,policyid ): + response_data = requests.get( + self.apis_urls.serverURL + self.apis_urls.policyAPI + "/" + policyid + "/" + self.apis_urls.dataactionAPI + "/" + actioncategoryid,headers=self.apis_urls.auth_headers) + if (len(response_data.json()[self.apis_urls.dataactionAPI])) != 0: + actiondataidslist = [] + matcheddataidslist = [] + for ids in response_data.json()[self.apis_urls.dataactionAPI][0]['data']: + apiactiondataid = response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(ids)]['id'] + actiondataidslist.append(apiactiondataid) + if ((str(actiondataname)).find(",") != -1): + datanameslist = actiondataname.split(",") + for dataname in datanameslist: + for data_id in actiondataidslist: + if ((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)][ + 'name']) == dataname): + matcheddataidslist.append(data_id) + return ",".join(matcheddataidslist) + else: + for data_id in actiondataidslist: + if ((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)]['name']) == actiondataname): + return data_id + + def get_subjectcategoryname(self, subjectcategoryid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadatasubjectcategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadatasubjectcategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadatasubjectcategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['id'] == subjectcategoryid): + return response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['name'] + + def get_objectcategoryname(self, objectcategoryid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataobjectcategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadataobjectcategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadataobjectcategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['id'] == objectcategoryid): + return response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['name'] + + def get_actioncategoryname(self, actioncategoryid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataactioncategoryAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metadataactioncategoryAPI]) != 0: + for ids in dict(response.json()[self.apis_urls.metadataactioncategoryAPI]).keys(): + if (response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['id'] == actioncategoryid): + return response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['name'] + + def get_metarulename(self, metaruleid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.metarulesAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.metarulesAPI]) != 0: + for id in dict(response.json()[self.apis_urls.metarulesAPI]).keys(): + if (id == metaruleid): + return response.json()[self.apis_urls.metarulesAPI][id]['name'] + + def get_modelname(self, modelid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.modelAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.modelAPI]) != 0: + for id in dict(response.json()[self.apis_urls.modelAPI]).keys(): + if (id == modelid): + return response.json()[self.apis_urls.modelAPI][id]['name'] + + def get_policyname(self, policyid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.policyAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.policyAPI]) != 0: + for id in dict(response.json()[self.apis_urls.policyAPI]).keys(): + if (id == policyid): + return response.json()[self.apis_urls.policyAPI][id]['name'] + + def get_subjectperimetername(self, subjectperimeterid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimetersubjectAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimetersubjectAPI]) != 0: + for id in dict(response.json()[self.apis_urls.perimetersubjectAPI]).keys(): + if (id == subjectperimeterid): + return response.json()[self.apis_urls.perimetersubjectAPI][id]['name'] + + def get_objectperimetername(self, objectperimeterid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeterobjectAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimeterobjectAPI]) != 0: + for id in dict(response.json()[self.apis_urls.perimeterobjectAPI]).keys(): + if (id == objectperimeterid): + return response.json()[self.apis_urls.perimeterobjectAPI][id]['name'] + + def get_actionperimetername(self, actionperimeterid): + response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeteractionAPI,headers=self.apis_urls.auth_headers) + if len(response.json()[self.apis_urls.perimeteractionAPI]) != 0: + for id in dict(response.json()[self.apis_urls.perimeteractionAPI]).keys(): + if (id == actionperimeterid): + return response.json()[self.apis_urls.perimeteractionAPI][id]['name'] + + def get_subjectdataname(self, subjectdataids, subjectcategoryid, policyid): + subjectdatanames=[] + for subjectdataid in subjectdataids: + response_data = requests.get( + self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.datasubjectAPI + "/" + subjectcategoryid+"/"+subjectdataid,headers=self.apis_urls.auth_headers) + + subjectdataidslist = [] + if(response_data.status_code==200): + for ids in response_data.json()[self.apis_urls.datasubjectAPI][0]['data']: + apisubjectdataid = response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(ids)]['id'] + subjectdataidslist.append(apisubjectdataid) + + for data_id in subjectdataidslist: + if (str((response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)][ + 'id'])) == subjectdataid): + subjectdatanames.append(str(response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)]['name'])) + else: + subjectdataidslist = "" + return subjectdatanames + + def get_objectdataname(self, objectdataids, objectcategoryid, policyid): + objectdatanames = [] + for objectdataid in objectdataids: + response_data = requests.get( + self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.dataobjectAPI + "/" + objectcategoryid + "/" + objectdataid,headers=self.apis_urls.auth_headers) + objectdataidslist = [] + if (response_data.status_code == 200): + for ids in response_data.json()[self.apis_urls.dataobjectAPI][0]['data']: + apiobjectdataid = response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(ids)]['id'] + objectdataidslist.append(apiobjectdataid) + for data_id in objectdataidslist: + if (str((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)][ + 'id'])) == objectdataid): + objectdatanames.append( + str(response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name'])) + else: + objectdataidslist = "" + return objectdatanames + + def get_actiondataname(self, actiondataids, actioncategoryid, policyid): + actiondatanames = [] + for actiondataid in actiondataids: + response_data = requests.get( + self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.dataactionAPI + "/" + actioncategoryid + "/" + actiondataid,headers=self.apis_urls.auth_headers) + #logger.info(response_data.json()[self.apis_urls.dataactionAPI][0]) + + actiondataidslist = [] + if (response_data.status_code == 200): + for ids in response_data.json()[self.apis_urls.dataactionAPI][0]['data']: + apiactiondataid = response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(ids)]['id'] + actiondataidslist.append(apiactiondataid) + logging.info(actiondataidslist) + for data_id in actiondataidslist: + if (str((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)][ + 'id'])) == actiondataid): + actiondatanames.append( + str(response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)]['name'])) + else: + actiondataidslist = "" + return actiondatanames \ No newline at end of file -- cgit 1.2.3-korg