aboutsummaryrefslogtreecommitdiffstats
path: root/moon_manager/tests/func_tests/features/steps/common_functions.py
diff options
context:
space:
mode:
Diffstat (limited to 'moon_manager/tests/func_tests/features/steps/common_functions.py')
-rw-r--r--moon_manager/tests/func_tests/features/steps/common_functions.py279
1 files changed, 279 insertions, 0 deletions
diff --git a/moon_manager/tests/func_tests/features/steps/common_functions.py b/moon_manager/tests/func_tests/features/steps/common_functions.py
new file mode 100644
index 00000000..b9b9f0bc
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/common_functions.py
@@ -0,0 +1,279 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from steps.Static_Variables import GeneralVariables
+import requests
+import json
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+class commonfunctions:
+ apis_urls = GeneralVariables()
+
+ def get_subjectcategoryid(self, subjectcategoryname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadatasubjectcategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadatasubjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadatasubjectcategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['name'] == subjectcategoryname):
+ return response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['id']
+
+ def get_objectcategoryid(self, objectcategoryname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataobjectcategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadataobjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadataobjectcategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['name'] == objectcategoryname):
+ return response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['id']
+
+ def get_actioncategoryid(self, actioncategoryname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataactioncategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadataactioncategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadataactioncategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['name'] == actioncategoryname):
+ return response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['id']
+
+ def get_metaruleid(self, metarulename):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metarulesAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metarulesAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metarulesAPI]).keys():
+ if (response.json()[self.apis_urls.metarulesAPI][ids]['name'] == metarulename):
+ return ids
+
+ def get_modelid(self, modelname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.modelAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.modelAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.modelAPI]).keys():
+ if (response.json()[self.apis_urls.modelAPI][ids]['name'] == modelname):
+ return ids
+
+ def get_policyid(self, policyname):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.policyAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.policyAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.policyAPI]).keys():
+ if (response.json()[self.apis_urls.policyAPI][ids]['name'] == policyname):
+ return ids
+
+ def get_subjectperimeterid(self,subjectperimeter ):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimetersubjectAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimetersubjectAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.perimetersubjectAPI]).keys():
+ if (response.json()[self.apis_urls.perimetersubjectAPI][ids]['name'] == subjectperimeter):
+ return ids
+
+ def get_objectperimeterid(self,objectperimeter ):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeterobjectAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimeterobjectAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.perimeterobjectAPI]).keys():
+ if (response.json()[self.apis_urls.perimeterobjectAPI][ids]['name'] == objectperimeter):
+ return ids
+
+ def get_actionperimeterid(self, actionperimeter):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeteractionAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimeteractionAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.perimeteractionAPI]).keys():
+ if (response.json()[self.apis_urls.perimeteractionAPI][ids]['name'] == actionperimeter):
+ return ids
+
+ def get_subjectdataid(self,subjectdataname,subjectcategoryid,policyid ):
+ response_data = requests.get(
+ self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.datasubjectAPI + "/" + subjectcategoryid,headers=self.apis_urls.auth_headers)
+ if(len(response_data.json()[self.apis_urls.datasubjectAPI]))!=0:
+ subjectdataidslist = []
+ matcheddataidslist = []
+ dataids=response_data.json()[self.apis_urls.datasubjectAPI][0]['data']
+ for ids in dataids:
+ apisubjectdataid = response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(ids)]['id']
+ subjectdataidslist.append(apisubjectdataid)
+
+ if ((str(subjectdataname)).find(",") != -1):
+ datanameslist = subjectdataname.split(",")
+ for dataname in datanameslist:
+ for data_id in subjectdataidslist:
+ if ((response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)][
+ 'name']) == dataname):
+ matcheddataidslist.append(data_id)
+ return ",".join(matcheddataidslist)
+ else:
+ for data_id in subjectdataidslist:
+ if ((
+ response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)]['name']) == subjectdataname):
+ return data_id
+
+ def get_objectdataid(self,objectdataname,objectcategoryid,policyid ):
+ response_data = requests.get(
+ self.apis_urls.serverURL + self.apis_urls.policyAPI + "/" + policyid + "/" + self.apis_urls.dataobjectAPI + "/" + objectcategoryid,headers=self.apis_urls.auth_headers)
+ if (len(response_data.json()[self.apis_urls.dataobjectAPI])) != 0:
+ objectdataidslist = []
+ matcheddataidslist=[]
+ for ids in response_data.json()[ self.apis_urls.dataobjectAPI][0]['data']:
+ apiobjectdataid = response_data.json()[ self.apis_urls.dataobjectAPI][0]['data'][str(ids)]['id']
+ objectdataidslist.append(apiobjectdataid)
+ if ((str(objectdataname)).find(",") != -1):
+ datanameslist = objectdataname.split(",")
+ for dataname in datanameslist:
+ for data_id in objectdataidslist:
+ if ((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']) == dataname):
+ matcheddataidslist.append(data_id)
+ return ",".join(matcheddataidslist)
+
+ else:
+ for data_id in objectdataidslist:
+ if ((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']) == objectdataname):
+ return data_id
+
+ def get_actiondataid(self,actiondataname,actioncategoryid,policyid ):
+ response_data = requests.get(
+ self.apis_urls.serverURL + self.apis_urls.policyAPI + "/" + policyid + "/" + self.apis_urls.dataactionAPI + "/" + actioncategoryid,headers=self.apis_urls.auth_headers)
+ if (len(response_data.json()[self.apis_urls.dataactionAPI])) != 0:
+ actiondataidslist = []
+ matcheddataidslist = []
+ for ids in response_data.json()[self.apis_urls.dataactionAPI][0]['data']:
+ apiactiondataid = response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(ids)]['id']
+ actiondataidslist.append(apiactiondataid)
+ if ((str(actiondataname)).find(",") != -1):
+ datanameslist = actiondataname.split(",")
+ for dataname in datanameslist:
+ for data_id in actiondataidslist:
+ if ((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)][
+ 'name']) == dataname):
+ matcheddataidslist.append(data_id)
+ return ",".join(matcheddataidslist)
+ else:
+ for data_id in actiondataidslist:
+ if ((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)]['name']) == actiondataname):
+ return data_id
+
+ def get_subjectcategoryname(self, subjectcategoryid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadatasubjectcategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadatasubjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadatasubjectcategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['id'] == subjectcategoryid):
+ return response.json()[self.apis_urls.metadatasubjectcategoryAPI][ids]['name']
+
+ def get_objectcategoryname(self, objectcategoryid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataobjectcategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadataobjectcategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadataobjectcategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['id'] == objectcategoryid):
+ return response.json()[self.apis_urls.metadataobjectcategoryAPI][ids]['name']
+
+ def get_actioncategoryname(self, actioncategoryid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metadataactioncategoryAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metadataactioncategoryAPI]) != 0:
+ for ids in dict(response.json()[self.apis_urls.metadataactioncategoryAPI]).keys():
+ if (response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['id'] == actioncategoryid):
+ return response.json()[self.apis_urls.metadataactioncategoryAPI][ids]['name']
+
+ def get_metarulename(self, metaruleid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.metarulesAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.metarulesAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.metarulesAPI]).keys():
+ if (id == metaruleid):
+ return response.json()[self.apis_urls.metarulesAPI][id]['name']
+
+ def get_modelname(self, modelid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.modelAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.modelAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.modelAPI]).keys():
+ if (id == modelid):
+ return response.json()[self.apis_urls.modelAPI][id]['name']
+
+ def get_policyname(self, policyid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.policyAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.policyAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.policyAPI]).keys():
+ if (id == policyid):
+ return response.json()[self.apis_urls.policyAPI][id]['name']
+
+ def get_subjectperimetername(self, subjectperimeterid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimetersubjectAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimetersubjectAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.perimetersubjectAPI]).keys():
+ if (id == subjectperimeterid):
+ return response.json()[self.apis_urls.perimetersubjectAPI][id]['name']
+
+ def get_objectperimetername(self, objectperimeterid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeterobjectAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimeterobjectAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.perimeterobjectAPI]).keys():
+ if (id == objectperimeterid):
+ return response.json()[self.apis_urls.perimeterobjectAPI][id]['name']
+
+ def get_actionperimetername(self, actionperimeterid):
+ response = requests.get(self.apis_urls.serverURL + self.apis_urls.perimeteractionAPI,headers=self.apis_urls.auth_headers)
+ if len(response.json()[self.apis_urls.perimeteractionAPI]) != 0:
+ for id in dict(response.json()[self.apis_urls.perimeteractionAPI]).keys():
+ if (id == actionperimeterid):
+ return response.json()[self.apis_urls.perimeteractionAPI][id]['name']
+
+ def get_subjectdataname(self, subjectdataids, subjectcategoryid, policyid):
+ subjectdatanames=[]
+ for subjectdataid in subjectdataids:
+ response_data = requests.get(
+ self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.datasubjectAPI + "/" + subjectcategoryid+"/"+subjectdataid,headers=self.apis_urls.auth_headers)
+
+ subjectdataidslist = []
+ if(response_data.status_code==200):
+ for ids in response_data.json()[self.apis_urls.datasubjectAPI][0]['data']:
+ apisubjectdataid = response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(ids)]['id']
+ subjectdataidslist.append(apisubjectdataid)
+
+ for data_id in subjectdataidslist:
+ if (str((response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)][
+ 'id'])) == subjectdataid):
+ subjectdatanames.append(str(response_data.json()[self.apis_urls.datasubjectAPI][0]['data'][str(data_id)]['name']))
+ else:
+ subjectdataidslist = ""
+ return subjectdatanames
+
+ def get_objectdataname(self, objectdataids, objectcategoryid, policyid):
+ objectdatanames = []
+ for objectdataid in objectdataids:
+ response_data = requests.get(
+ self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.dataobjectAPI + "/" + objectcategoryid + "/" + objectdataid,headers=self.apis_urls.auth_headers)
+ objectdataidslist = []
+ if (response_data.status_code == 200):
+ for ids in response_data.json()[self.apis_urls.dataobjectAPI][0]['data']:
+ apiobjectdataid = response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(ids)]['id']
+ objectdataidslist.append(apiobjectdataid)
+ for data_id in objectdataidslist:
+ if (str((response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)][
+ 'id'])) == objectdataid):
+ objectdatanames.append(
+ str(response_data.json()[self.apis_urls.dataobjectAPI][0]['data'][str(data_id)]['name']))
+ else:
+ objectdataidslist = ""
+ return objectdatanames
+
+ def get_actiondataname(self, actiondataids, actioncategoryid, policyid):
+ actiondatanames = []
+ for actiondataid in actiondataids:
+ response_data = requests.get(
+ self.apis_urls.serverURL + "policies/" + policyid + "/" + self.apis_urls.dataactionAPI + "/" + actioncategoryid + "/" + actiondataid,headers=self.apis_urls.auth_headers)
+ #logger.info(response_data.json()[self.apis_urls.dataactionAPI][0])
+
+ actiondataidslist = []
+ if (response_data.status_code == 200):
+ for ids in response_data.json()[self.apis_urls.dataactionAPI][0]['data']:
+ apiactiondataid = response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(ids)]['id']
+ actiondataidslist.append(apiactiondataid)
+ logging.info(actiondataidslist)
+ for data_id in actiondataidslist:
+ if (str((response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)][
+ 'id'])) == actiondataid):
+ actiondatanames.append(
+ str(response_data.json()[self.apis_urls.dataactionAPI][0]['data'][str(data_id)]['name']))
+ else:
+ actiondataidslist = ""
+ return actiondatanames \ No newline at end of file