diff options
Diffstat (limited to 'moon_manager/tests/func_tests/features/steps/authorization.py')
-rw-r--r-- | moon_manager/tests/func_tests/features/steps/authorization.py | 217 |
1 files changed, 217 insertions, 0 deletions
diff --git a/moon_manager/tests/func_tests/features/steps/authorization.py b/moon_manager/tests/func_tests/features/steps/authorization.py new file mode 100644 index 00000000..5fa0ebe7 --- /dev/null +++ b/moon_manager/tests/func_tests/features/steps/authorization.py @@ -0,0 +1,217 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from behave import * +from Static_Variables import GeneralVariables +from common_functions import * +import requests +import json +import logging +import paramiko + +apis_urls = GeneralVariables() +commonfunctions = commonfunctions() + +logger = logging.getLogger(__name__) + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) Launch Moon Manager +# 3) Set the token in the global variables +@Given('the manager is configured') +def step_impl(context): + logger.info("\n") + logger.info("******************** Scenario: " + context.scenario.name + " ********************") + logger.info("Given the manager is configured") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + # client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + logger.info("before ") + stdin, stdout, stderr = client.exec_command( + + "sudo nohup hug -m moon_manager.server &" + " /usr/bin/python3 " + ) + #stdin, stdout, stderr = client.exec_command(" sudo /usr/local/bin/moon_manager add_user alaa00 admin") + #stdin, stdout, stderr = client.exec_command(" sudo /usr/local/bin/moon_manager get_key alaa00 admin ") + #logger.info(stdout.readlines()) + #GeneralVariables.auth_headers['X-Api-Key'] = str(stdout.readlines()) + #logger.info("token: " + str(GeneralVariables.auth_headers['X-Api-Key'])) + #logger.info("after ") + # client.close() + +# Step Definition Implementation: Incomplete Step +# 1) Get all the moon slaves +# 2) Loop on the slave by id and delete them +@Given('no slave is created') +def step_impl(context): + logger.info("\n") + logger.info("******************** Scenario: " + context.scenario.name + " ********************") + logger.info("Given no slave is created") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + response = requests.get(apis_urls.serverURL + apis_urls.getslavesAPI, headers=apis_urls.auth_headers) + if len(response.json()[apis_urls.getslavesAPI]) != 0: + for ids in dict(response.json()[apis_urls.getslavesAPI]).keys(): + response = requests.delete(apis_urls.serverURL + apis_urls.slaveAPI + "/" + ids, + headers=headers) + +# Step Definition Implementation: +# 1) Create a slave by post request +# 2) Get the wrapper port id from the slave posting request & set it to the wrapperPort global variable +@Given('the slave is created') +def step_impl(context): + logger.info("Given the slave is created") + api_responseflag = {'value': False} + headers = {"Content-Type": "application/json", "X-Api-Key": apis_urls.token} + data = { + 'name': "default", + 'description': "description", + 'address': "111", + } + response = requests.post(apis_urls.serverURL + apis_urls.slaveAPI, headers=headers, + data=json.dumps(data)) + slaveid = list(response.json()[apis_urls.getslavesAPI])[0] + GeneralVariables.wrapperPort['value'] = str(response.json()[apis_urls.getslavesAPI][slaveid]['extra']['port']) + +# Step Definition Implementation: Incomplete Step +# 1) Check the Pipeline is up and running +@Given('the pipeline is running') +def step_impl(context): + logger.info("Given the pipeline is running") + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) execute the authorization curl command using the wrapperPort +@Given('the following authorization request is granted through pipeline') +def step_impl(context): + logger.info("Given the following authorization request is granted through pipeline") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + for row in context.table: + logger.info("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + stdin, stdout, stderr = client.exec_command("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + logger.info(stdout.readlines()) + GeneralVariables.actual_authresponse['value'] = str(stdout.readlines()) + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) execute the authorization curl command using the wrapperPort +@Given('the following authorization request is granted through wrapper') +def step_impl(context): + logger.info("Given the following authorization request is granted through wrapper") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + for row in context.table: + logger.info("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[ + "keystone_project_id"]) + "/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + stdin, stdout, stderr = client.exec_command("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[ + "keystone_project_id"]) + "/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + logger.info(stdout.readlines()) + GeneralVariables.actual_authresponse['value'] = str(stdout.readlines()) + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) execute the authorization curl command using the pipelinePort +# 3) set the actual_authresponse global variable with the curl response +@When('the following authorization request is sent through pipeline') +def step_impl(context): + logger.info("Given the following authorization request is sent through pipeline") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + + for row in context.table: + logger.info("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + stdin, stdout, stderr = client.exec_command("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.pipelinePort['value'] + "/authz/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + logger.info(stdout.readlines()) + GeneralVariables.actual_authresponse['value'] = str(stdout.readlines()) + +# Step Definition Implementation: Incomplete Step +# 1) Connect to the server +# 2) execute the authorization curl command using the pipelinePort +# 3) set the actual_authresponse global variable with the curl response +@When('the following authorization request is sent through wrapper') +def step_impl(context): + logger.info("Given the following authorization request is sent through wrapper") + api_responseflag = {'value': False} + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=apis_urls.serverIP, port=apis_urls.serverport, username=apis_urls.serverusername, + password=apis_urls.serverpassword) + + for row in context.table: + logger.info("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[ + "keystone_project_id"]) + "/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + stdin, stdout, stderr = client.exec_command("curl http://" + str( + apis_urls.serverIP) + ":" + GeneralVariables.wrapperPort['value'] + "/authz/" + str(row[ + "keystone_project_id"]) + "/" + str( + row["subjectperimetername"]) + "/" + str(row["objectperimetername"]) + "/" + + str(row["actionperimetername"])) + logger.info(stdout.readlines()) + GeneralVariables.actual_authresponse['value'] = str(stdout.readlines()) + +# Step Definition Implementation: Untested Step +# 1) Assert that the actual authresponse is the same as the expected. +@Then('the authorization response should be the following') +def step_impl(context): + logger.info("Then the authorization response should be the following") + for row in context.table: + logger.info("asserting the expected api response: '" + row["auth_response"] + "' and the actual response: '" + + GeneralVariables.actual_authresponse['value'] + "'") + assert row["auth_response"] == GeneralVariables.actual_authresponse[ + 'value'], "Validation is not correct, Expected: " + \ + row[ + "auth_response"] + " but the API response was: " + \ + GeneralVariables.actual_authresponse['value'] + logger.info("assertion passed!") |