aboutsummaryrefslogtreecommitdiffstats
path: root/moon_manager/tests/func_tests/features/steps/authorization.py
diff options
context:
space:
mode:
Diffstat (limited to 'moon_manager/tests/func_tests/features/steps/authorization.py')
-rw-r--r--moon_manager/tests/func_tests/features/steps/authorization.py217
1 files changed, 217 insertions, 0 deletions
diff --git a/moon_manager/tests/func_tests/features/steps/authorization.py b/moon_manager/tests/func_tests/features/steps/authorization.py
new file mode 100644
index 00000000..5fa0ebe7
--- /dev/null
+++ b/moon_manager/tests/func_tests/features/steps/authorization.py
@@ -0,0 +1,217 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from behave import *
+from Static_Variables import GeneralVariables
+from common_functions import *
+import requests
+import json
+import logging
+import paramiko
+
+apis_urls = GeneralVariables()
+commonfunctions = commonfunctions()
+
+logger = logging.getLogger(__name__)
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) Launch Moon Manager
+# 3) Set the token in the global variables
+@Given('the manager is configured')
+def step_impl(context):
+ logger.info("\n")
+ logger.info("******************** Scenario: " + context.scenario.name + " ********************")
+ logger.info("Given the manager is configured")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ # client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+ logger.info("before ")
+ stdin, stdout, stderr = client.exec_command(
+
+ "sudo nohup hug -m moon_manager.server &"
+ " /usr/bin/python3 "
+ )
+ #stdin, stdout, stderr = client.exec_command(" sudo /usr/local/bin/moon_manager add_user alaa00 admin")
+ #stdin, stdout, stderr = client.exec_command(" sudo /usr/local/bin/moon_manager get_key alaa00 admin ")
+ #logger.info(stdout.readlines())
+ #GeneralVariables.auth_headers['X-Api-Key'] = str(stdout.readlines())
+ #logger.info("token: " + str(GeneralVariables.auth_headers['X-Api-Key']))
+ #logger.info("after ")
+ # client.close()
+
+# Step Definition Implementation: Incomplete Step
+# 1) Get all the moon slaves
+# 2) Loop on the slave by id and delete them
+@Given('no slave is created')
+def step_impl(context):
+ logger.info("\n")
+ logger.info("******************** Scenario: " + context.scenario.name + " ********************")
+ logger.info("Given no slave is created")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ response = requests.get(apis_urls.serverURL + apis_urls.getslavesAPI, headers=apis_urls.auth_headers)
+ if len(response.json()[apis_urls.getslavesAPI]) != 0:
+ for ids in dict(response.json()[apis_urls.getslavesAPI]).keys():
+ response = requests.delete(apis_urls.serverURL + apis_urls.slaveAPI + "/" + ids,
+ headers=headers)
+
+# Step Definition Implementation:
+# 1) Create a slave by post request
+# 2) Get the wrapper port id from the slave posting request & set it to the wrapperPort global variable
+@Given('the slave is created')
+def step_impl(context):
+ logger.info("Given the slave is created")
+ api_responseflag = {'value': False}
+ headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token}
+ data = {
+ 'name': "default",
+ 'description': "description",
+ 'address': "111",
+ }
+ response = requests.post(apis_urls.serverURL + apis_urls.slaveAPI, headers=headers,
+ data=json.dumps(data))
+ slaveid = list(response.json()[apis_urls.getslavesAPI])[0]
+ GeneralVariables.wrapperPort['value'] = str(response.json()[apis_urls.getslavesAPI][slaveid]['extra']['port'])
+
+# Step Definition Implementation: Incomplete Step
+# 1) Check the Pipeline is up and running
+@Given('the pipeline is running')
+def step_impl(context):
+ logger.info("Given the pipeline is running")
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) execute the authorization curl command using the wrapperPort
+@Given('the following authorization request is granted through pipeline')
+def step_impl(context):
+ logger.info("Given the following authorization request is granted through pipeline")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+ for row in context.table:
+ logger.info("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ stdin, stdout, stderr = client.exec_command("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ logger.info(stdout.readlines())
+ GeneralVariables.actual_authresponse['value'] = str(stdout.readlines())
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) execute the authorization curl command using the wrapperPort
+@Given('the following authorization request is granted through wrapper')
+def step_impl(context):
+ logger.info("Given the following authorization request is granted through wrapper")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+ for row in context.table:
+ logger.info("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[
+ "keystone_project_id"]) + "/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ stdin, stdout, stderr = client.exec_command("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[
+ "keystone_project_id"]) + "/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ logger.info(stdout.readlines())
+ GeneralVariables.actual_authresponse['value'] = str(stdout.readlines())
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) execute the authorization curl command using the pipelinePort
+# 3) set the actual_authresponse global variable with the curl response
+@When('the following authorization request is sent through pipeline')
+def step_impl(context):
+ logger.info("Given the following authorization request is sent through pipeline")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+
+ for row in context.table:
+ logger.info("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ stdin, stdout, stderr = client.exec_command("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ logger.info(stdout.readlines())
+ GeneralVariables.actual_authresponse['value'] = str(stdout.readlines())
+
+# Step Definition Implementation: Incomplete Step
+# 1) Connect to the server
+# 2) execute the authorization curl command using the pipelinePort
+# 3) set the actual_authresponse global variable with the curl response
+@When('the following authorization request is sent through wrapper')
+def step_impl(context):
+ logger.info("Given the following authorization request is sent through wrapper")
+ api_responseflag = {'value': False}
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ client.set_missing_host_key_policy(paramiko.WarningPolicy)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername,
+ password=apis_urls.serverpassword)
+
+ for row in context.table:
+ logger.info("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[
+ "keystone_project_id"]) + "/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ stdin, stdout, stderr = client.exec_command("curl http://" + str(
+ apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[
+ "keystone_project_id"]) + "/" + str(
+ row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" +
+ str(row["actionperimetername"]))
+ logger.info(stdout.readlines())
+ GeneralVariables.actual_authresponse['value'] = str(stdout.readlines())
+
+# Step Definition Implementation: Untested Step
+# 1) Assert that the actual authresponse is the same as the expected.
+@Then('the authorization response should be the following')
+def step_impl(context):
+ logger.info("Then the authorization response should be the following")
+ for row in context.table:
+ logger.info("asserting the expected api response: '" + row["auth_response"] + "' and the actual response: '" +
+ GeneralVariables.actual_authresponse['value'] + "'")
+ assert row["auth_response"] == GeneralVariables.actual_authresponse[
+ 'value'], "Validation is not correct, Expected: " + \
+ row[
+ "auth_response"] + " but the API response was: " + \
+ GeneralVariables.actual_authresponse['value']
+ logger.info("assertion passed!")