aboutsummaryrefslogtreecommitdiffstats
path: root/moon_manager/tests/unit_python/api/test_perimeter.py
diff options
context:
space:
mode:
Diffstat (limited to 'moon_manager/tests/unit_python/api/test_perimeter.py')
-rw-r--r--moon_manager/tests/unit_python/api/test_perimeter.py936
1 files changed, 606 insertions, 330 deletions
diff --git a/moon_manager/tests/unit_python/api/test_perimeter.py b/moon_manager/tests/unit_python/api/test_perimeter.py
index ff7b09d7..c741adf7 100644
--- a/moon_manager/tests/unit_python/api/test_perimeter.py
+++ b/moon_manager/tests/unit_python/api/test_perimeter.py
@@ -1,19 +1,39 @@
-# import moon_manager
-# import moon_manager.api
-import json
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
import api.utilities as utilities
from helpers import data_builder as builder
import helpers.policy_helper as policy_helper
from uuid import uuid4
+import pytest
+from moon_utilities import exceptions
+
+def get_subjects(subject_id=None):
+ from moon_manager.api import perimeter
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
-def get_subjects(client):
- req = client.get("/subjects")
+ if not subject_id:
+ req = hug.test.get(perimeter, 'subjects/', headers=auth_headers)
+ else:
+ req = hug.test.get(perimeter, 'subjects/{}'.format(subject_id), headers=auth_headers)
subjects = utilities.get_json(req.data)
return req, subjects
-def add_subjects(client, policy_id, name, perimeter_id=None, data=None):
+def add_subjects(policy_id, name, perimeter_id=None, data=None, auth_headers=None):
+ from moon_manager.api import perimeter
if not data:
name = name + uuid4().hex
data = {
@@ -23,44 +43,44 @@ def add_subjects(client, policy_id, name, perimeter_id=None, data=None):
"email": "{}@moon".format(name)
}
if not perimeter_id:
- req = client.post("/policies/{}/subjects".format(policy_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.post(perimeter, "/policies/{}/subjects".format(policy_id),
+ body=data, headers=auth_headers)
else:
- req = client.post("/policies/{}/subjects/{}".format(policy_id, perimeter_id),
- data=json.dumps(
- data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.post(perimeter, "/policies/{}/subjects/{}".format(policy_id, perimeter_id),
+ body=data, headers=auth_headers)
subjects = utilities.get_json(req.data)
return req, subjects
-def delete_subjects_without_perimeter_id(client):
- req = client.delete("/subjects/{}".format(""))
+def delete_subjects_without_perimeter_id(auth_headers=None):
+ from moon_manager.api import perimeter
+ req = hug.test.delete(perimeter, "/subjects/{}".format(""), headers=auth_headers)
return req
def test_perimeter_get_subject():
- client = utilities.register_client()
- req, subjects = get_subjects(client)
- assert req.status_code == 200
+ req, subjects = get_subjects()
+ assert req.status == hug.HTTP_200
assert isinstance(subjects, dict)
assert "subjects" in subjects
def test_perimeter_add_subject():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
- req, subjects = add_subjects(client, policy_id, "testuser")
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
value = list(subjects["subjects"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value["name"]
assert value["email"]
def test_perimeter_add_same_subject_perimeter_id_with_new_policy_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
name = "testuser"
@@ -71,13 +91,15 @@ def test_perimeter_add_same_subject_perimeter_id_with_new_policy_id():
"password": "password for {}".format(name),
"email": "{}@moon".format(name)
}
- add_subjects(client, policy_id1, data['name'], perimeter_id=perimeter_id, data=data)
+ add_subjects(policy_id1, data['name'], perimeter_id=perimeter_id, data=data,
+ auth_headers=auth_headers)
policies2 = policy_helper.add_policies()
policy_id2 = list(policies2.keys())[0]
- req, subjects = add_subjects(client, policy_id2, data['name'],
- perimeter_id=perimeter_id, data=data)
+ req, subjects = add_subjects(policy_id2, data['name'],
+ perimeter_id=perimeter_id, data=data,
+ auth_headers=auth_headers)
value = list(subjects["subjects"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value["name"]
assert value["email"]
assert len(value['policy_list']) == 2
@@ -86,20 +108,25 @@ def test_perimeter_add_same_subject_perimeter_id_with_new_policy_id():
def test_perimeter_add_same_subject_perimeter_id_with_different_name():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
perimeter_id = uuid4().hex
- add_subjects(client, policy_id1, "testuser", perimeter_id=perimeter_id)
+ add_subjects(policy_id1, "testuser", perimeter_id=perimeter_id, auth_headers=auth_headers)
policies2 = policy_helper.add_policies()
policy_id2 = list(policies2.keys())[0]
- req, subjects = add_subjects(client, policy_id2, "testuser", perimeter_id=perimeter_id)
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req, subjects = add_subjects(policy_id2, "testuser", perimeter_id=perimeter_id,
+ auth_headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
def test_perimeter_add_same_subject_name_with_new_policy_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
perimeter_id = uuid4().hex
@@ -110,8 +137,8 @@ def test_perimeter_add_same_subject_name_with_new_policy_id():
"password": "password for {}".format(name),
"email": "{}@moon".format(name)
}
- req, subjects = add_subjects(client, policy_id1, None, perimeter_id=perimeter_id,
- data=data)
+ req, subjects = add_subjects(policy_id1, None, perimeter_id=perimeter_id, data=data,
+ auth_headers=auth_headers)
policies2 = policy_helper.add_policies()
policy_id2 = list(policies2.keys())[0]
value = list(subjects["subjects"].values())[0]
@@ -121,9 +148,9 @@ def test_perimeter_add_same_subject_name_with_new_policy_id():
"password": "password for {}".format(value['name']),
"email": "{}@moon".format(value['name'])
}
- req, subjects = add_subjects(client, policy_id2, None, data=data)
+ req, subjects = add_subjects(policy_id2, None, data=data, auth_headers=auth_headers)
value = list(subjects["subjects"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value["name"]
assert value["email"]
assert len(value['policy_list']) == 2
@@ -132,7 +159,8 @@ def test_perimeter_add_same_subject_name_with_new_policy_id():
def test_perimeter_add_same_subject_name_with_same_policy_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
perimeter_id = uuid4().hex
@@ -143,8 +171,8 @@ def test_perimeter_add_same_subject_name_with_same_policy_id():
"password": "password for {}".format(name),
"email": "{}@moon".format(name)
}
- req, subjects = add_subjects(client, policy_id1, None, perimeter_id=perimeter_id,
- data=data)
+ req, subjects = add_subjects(policy_id1, None, perimeter_id=perimeter_id,
+ data=data, auth_headers=auth_headers)
value = list(subjects["subjects"].values())[0]
data = {
"name": value['name'],
@@ -152,31 +180,46 @@ def test_perimeter_add_same_subject_name_with_same_policy_id():
"password": "password for {}".format(value['name']),
"email": "{}@moon".format(value['name'])
}
- req, subjects = add_subjects(client, policy_id1, None, data=data)
- assert req.status_code == 409
- assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, subjects = add_subjects(policy_id1, None, data=data, auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
def test_perimeter_add_same_subject_perimeter_id_with_existed_policy_id_in_list():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
name = "testuser" + uuid4().hex
data = {
- "name": name,
+ "name": name + uuid4().hex,
"description": "description of {}".format(name),
"password": "password for {}".format(name),
"email": "{}@moon".format(name)
}
- req, subjects = add_subjects(client, policy_id, name, data=data)
+ subj_id = "b34e5a29-5494-4cc5-9356-daa244b8c254"
+ req, subjects = get_subjects(subj_id)
+ if subjects['subjects']:
+ for __policy_id in subjects['subjects'][subj_id]['policy_list']:
+ req = hug.test.delete(perimeter,
+ "/policies/{}/subjects/{}".format(__policy_id, subj_id),
+ headers=auth_headers)
+ req, subjects = add_subjects(policy_id, name, data=data, auth_headers=auth_headers)
perimeter_id = list(subjects["subjects"].values())[0]['id']
- req, subjects = add_subjects(client, policy_id, name, perimeter_id=perimeter_id, data=data)
- assert req.status_code == 409
- assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, subjects = add_subjects(policy_id, name, perimeter_id=perimeter_id, data=data,
+ auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
def test_perimeter_add_subject_invalid_policy_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
name = "testuser"
@@ -186,103 +229,121 @@ def test_perimeter_add_subject_invalid_policy_id():
"password": "password for {}".format(name),
"email": "{}@moon".format(name)
}
- req, subjects = add_subjects(client, policy_id + "0", "testuser", data)
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Policy Unknown'
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req, subjects = add_subjects( policy_id + "0", "testuser", data, auth_headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
-def test_perimeter_add_subject_policy_id_none():
- client = utilities.register_client()
- name = "testuser"
- data = {
- "name": name + uuid4().hex,
- "description": "description of {}".format(name),
- "password": "password for {}".format(name),
- "email": "{}@moon".format(name)
- }
- req, subjects = add_subjects(client, None, "testuser", data)
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Policy Unknown'
+def test_perimeter_add_subject_blank_data():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ with pytest.raises(exceptions.ValidationKeyError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/subjects".format(policy_id), body={'test':"aa"},
+ headers=auth_headers)
+ assert "Invalid Key :name not found" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == 'Invalid Key :name not found'
def test_perimeter_add_subject_with_forbidden_char_in_name():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "<a>",
"description": "description of {}".format(""),
"password": "password for {}".format(""),
"email": "{}@moon".format("")
}
- req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+ subj_id = "a34e5a29-5494-4cc5-9356-daa244b8c888"
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/subjects".format(subj_id), body=data,
+ headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
def test_perimeter_update_subject_name():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
- req, subjects = add_subjects(client, policy_id, "testuser")
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
value1 = list(subjects["subjects"].values())[0]
perimeter_id = value1['id']
data = {
'name': value1['name'] + "update"
}
- req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
subjects = utilities.get_json(req.data)
value2 = list(subjects["subjects"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value1['name'] + 'update' == value2['name']
assert value1['id'] == value2['id']
assert value1['description'] == value2['description']
def test_perimeter_update_subject_description():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
- req, subjects = add_subjects(client, policy_id, "testuser")
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
value1 = list(subjects["subjects"].values())[0]
perimeter_id = value1['id']
data = {
'description': value1['description'] + "update",
}
- req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
subjects = utilities.get_json(req.data)
value2 = list(subjects["subjects"].values())[0]
- assert req.status_code == 200
+
+ assert req.status == hug.HTTP_200
assert value1['name'] == value2['name']
assert value1['id'] == value2['id']
assert value1['description'] + 'update' == value2['description']
def test_perimeter_update_subject_description_and_name():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
- req, subjects = add_subjects(client, policy_id, "testuser")
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
value1 = list(subjects["subjects"].values())[0]
perimeter_id = value1['id']
data = {
'description': value1['description'] + "update",
'name': value1['name'] + "update"
}
- req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ from moon_manager.api import perimeter
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
subjects = utilities.get_json(req.data)
value2 = list(subjects["subjects"].values())[0]
- assert req.status_code == 200
+
+ assert req.status == hug.HTTP_200
assert value1['name'] + 'update' == value2['name']
assert value1['id'] == value2['id']
assert value1['description'] + 'update' == value2['description']
def test_perimeter_update_subject_wrong_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
name = 'testuser' + uuid4().hex
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -290,64 +351,83 @@ def test_perimeter_update_subject_wrong_id():
"name": name,
"description": "description of {}".format('testuser'),
}
- req, subjects = add_subjects(client, policy_id=policy_id1, name='testuser', data=data)
+ req, subjects = add_subjects(policy_id=policy_id1, name='testuser', data=data,
+ auth_headers=auth_headers)
value1 = list(subjects["subjects"].values())[0]
perimeter_id = value1['id']
data = {
'name': value1['name'] + "update",
'description': value1['description'] + "update"
}
- req = client.patch("/subjects/{}".format(perimeter_id + "wrong"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id + "wrong"),
+ body=data, headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
def test_perimeter_update_subject_name_with_existed_one():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
name1 = 'testuser' + uuid4().hex
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
perimeter_id1 = uuid4().hex
- req, subjects = add_subjects(client, policy_id=policy_id1, name=name1,
- perimeter_id=perimeter_id1)
+ req, subjects = add_subjects(policy_id=policy_id1, name=name1,
+ perimeter_id=perimeter_id1, auth_headers=auth_headers)
value1 = list(subjects["subjects"].values())[0]
perimeter_id2 = uuid4().hex
name2 = 'testuser' + uuid4().hex
- req, subjects = add_subjects(client, policy_id=policy_id1, name=name2,
- perimeter_id=perimeter_id2)
+ req, subjects = add_subjects(policy_id=policy_id1, name=name2,
+ perimeter_id=perimeter_id2, auth_headers=auth_headers)
data = {
'name': value1['name'],
}
- req = client.patch("/subjects/{}".format(perimeter_id2), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 409
+ with pytest.raises(exceptions.SubjectExisting) as exception_info:
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id2), body=data,
+ headers=auth_headers)
+ assert "409: Subject Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
def test_perimeter_delete_subject():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
- req, subjects = add_subjects(client, policy_id, "testuser")
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
subject_id = list(subjects["subjects"].values())[0]["id"]
- req = client.delete("/policies/{}/subjects/{}".format(policy_id, subject_id))
- assert req.status_code == 200
+ req = hug.test.delete(perimeter, "/policies/{}/subjects/{}".format(policy_id, subject_id),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_200
def test_perimeter_delete_subjects_without_perimeter_id():
- client = utilities.register_client()
- req = delete_subjects_without_perimeter_id(client)
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "400: Subject Unknown"
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ with pytest.raises(exceptions.SubjectUnknown) as exception_info:
+ req = delete_subjects_without_perimeter_id(auth_headers)
+ assert "400: Subject Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Subject Unknown"
-def get_objects(client):
- req = client.get("/objects")
+def get_objects():
+ from moon_manager.api import perimeter
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(perimeter, "/objects", headers=auth_headers)
objects = utilities.get_json(req.data)
return req, objects
-def add_objects(client, name, policyId=None, data=None, perimeter_id=None):
+def add_objects(name, policyId=None, data=None, perimeter_id=None, auth_headers=None):
+ from moon_manager.api import perimeter
if not policyId:
subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy(
subject_category_name="subject_category1" + uuid4().hex,
@@ -361,59 +441,70 @@ def add_objects(client, name, policyId=None, data=None, perimeter_id=None):
"description": "description of {}".format(name),
}
if not perimeter_id:
- req = client.post("/policies/{}/objects/".format(policyId), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(policyId), body=data,
+ headers=auth_headers)
else:
- req = client.post("/policies/{}/objects/{}".format(policyId, perimeter_id),
- data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.post(perimeter, "/policies/{}/objects/{}".format(policyId, perimeter_id),
+ body=data, headers=auth_headers)
+
objects = utilities.get_json(req.data)
return req, objects
-def delete_objects_without_perimeter_id(client):
- req = client.delete("/objects/{}".format(""))
+def delete_objects_without_perimeter_id(auth_headers=None):
+ from moon_manager.api import perimeter
+ req = hug.test.delete(perimeter, "/objects/{}".format(""), headers=auth_headers)
return req
def test_perimeter_get_object():
- client = utilities.register_client()
- req, objects = get_objects(client)
- assert req.status_code == 200
+
+ req, objects = get_objects()
+ assert req.status == hug.HTTP_200
assert isinstance(objects, dict)
assert "objects" in objects
def test_perimeter_add_object():
- client = utilities.register_client()
- req, objects = add_objects(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, objects = add_objects("testuser", auth_headers=auth_headers)
value = list(objects["objects"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value['name']
def test_perimeter_add_object_with_wrong_policy_id():
- client = utilities.register_client()
- req, objects = add_objects(client, "testuser", policyId='wrong')
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Policy Unknown'
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req, objects = add_objects("testuser", policyId='wrong', auth_headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
def test_perimeter_add_object_with_policy_id_none():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "testuser" + uuid4().hex,
"description": "description of {}".format("testuser"),
}
- req = client.post("/policies/{}/objects/".format(None), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Policy Unknown'
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(None), body=data,
+ headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
def test_perimeter_add_same_object_name_with_new_policy_id():
- client = utilities.register_client()
- req, objects = add_objects(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, objects = add_objects("testuser", auth_headers=auth_headers)
value1 = list(objects["objects"].values())[0]
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -421,16 +512,17 @@ def test_perimeter_add_same_object_name_with_new_policy_id():
"name": value1['name'],
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data, auth_headers=auth_headers)
value2 = list(objects["objects"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value1['id'] == value2['id']
assert value1['name'] == value2['name']
def test_perimeter_add_same_object_perimeter_id_with_new_policy_id():
- client = utilities.register_client()
- req, objects = add_objects(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, objects = add_objects( "testuser", auth_headers=auth_headers)
value1 = list(objects["objects"].values())[0]
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -438,17 +530,18 @@ def test_perimeter_add_same_object_perimeter_id_with_new_policy_id():
"name": value1['name'],
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data,
- perimeter_id=value1['id'])
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ perimeter_id=value1['id'],auth_headers=auth_headers)
value2 = list(objects["objects"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value1['id'] == value2['id']
assert value1['name'] == value2['name']
def test_perimeter_add_same_object_perimeter_id_with_different_name():
- client = utilities.register_client()
- req, objects = add_objects(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, objects = add_objects( "testuser", auth_headers=auth_headers)
value1 = list(objects["objects"].values())[0]
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -456,14 +549,17 @@ def test_perimeter_add_same_object_perimeter_id_with_different_name():
"name": value1['name'] + 'different',
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data,
- perimeter_id=value1['id'])
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ perimeter_id=value1['id'], auth_headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
def test_perimeter_add_same_object_name_with_same_policy_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
name = 'testuser' + uuid4().hex
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -471,16 +567,20 @@ def test_perimeter_add_same_object_name_with_same_policy_id():
"name": name,
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data, auth_headers=auth_headers)
value = list(objects["objects"].values())[0]
- assert req.status_code == 200
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
- assert req.status_code == 409
- assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+ assert req.status == hug.HTTP_200
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data, auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
def test_perimeter_add_same_object_perimeter_id_with_existed_policy_id_in_list():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
name = 'testuser' + uuid4().hex
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -488,16 +588,21 @@ def test_perimeter_add_same_object_perimeter_id_with_existed_policy_id_in_list()
"name": name,
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ req, objects = add_objects( 'testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
value = list(objects["objects"].values())[0]
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data,
- perimeter_id=value['id'])
- assert req.status_code == 409
- assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ perimeter_id=value['id'], auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
def test_perimeter_update_object_name():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
name = 'testuser' + uuid4().hex
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -505,26 +610,30 @@ def test_perimeter_update_object_name():
"name": name,
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
value1 = list(objects["objects"].values())[0]
perimeter_id = value1['id']
data = {
'name': value1['name'] + "update"
}
- req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
objects = utilities.get_json(req.data)
value2 = list(objects["objects"].values())[0]
- assert req.status_code == 200
+
+ assert req.status == hug.HTTP_200
assert value1['name'] + 'update' == value2['name']
assert value1['id'] == value2['id']
assert value1['description'] == value2['description']
def test_perimeter_update_object_description():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
name = 'testuser' + uuid4().hex
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -532,26 +641,30 @@ def test_perimeter_update_object_description():
"name": name,
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
value1 = list(objects["objects"].values())[0]
perimeter_id = value1['id']
data = {
'description': value1['description'] + "update"
}
- req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
objects = utilities.get_json(req.data)
value2 = list(objects["objects"].values())[0]
- assert req.status_code == 200
+
+ assert req.status == hug.HTTP_200
assert value1['name'] == value2['name']
assert value1['id'] == value2['id']
assert value1['description'] + 'update' == value2['description']
def test_perimeter_update_object_description_and_name():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
name = 'testuser' + uuid4().hex
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -559,7 +672,8 @@ def test_perimeter_update_object_description_and_name():
"name": name,
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
value1 = list(objects["objects"].values())[0]
perimeter_id = value1['id']
@@ -567,19 +681,21 @@ def test_perimeter_update_object_description_and_name():
'name': value1['name'] + "update",
'description': value1['description'] + "update"
}
- req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
objects = utilities.get_json(req.data)
value2 = list(objects["objects"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value1['name'] + 'update' == value2['name']
assert value1['id'] == value2['id']
assert value1['description'] + 'update' == value2['description']
def test_perimeter_update_object_wrong_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
name = 'testuser' + uuid4().hex
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -587,7 +703,8 @@ def test_perimeter_update_object_wrong_id():
"name": name,
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
value1 = list(objects["objects"].values())[0]
perimeter_id = value1['id']
@@ -595,13 +712,17 @@ def test_perimeter_update_object_wrong_id():
'name': value1['name'] + "update",
'description': value1['description'] + "update"
}
- req = client.patch("/objects/{}".format(perimeter_id + "wrong"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id + "wrong"), body=data,
+ headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
def test_perimeter_update_object_name_with_existed_one():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
name = 'testuser' + uuid4().hex
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -609,7 +730,8 @@ def test_perimeter_update_object_name_with_existed_one():
"name": name,
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data1)
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data1,
+ auth_headers=auth_headers)
value1 = list(objects["objects"].values())[0]
name = 'testuser' + uuid4().hex
@@ -618,7 +740,8 @@ def test_perimeter_update_object_name_with_existed_one():
"name": name,
"description": "description of {}".format('testuser'),
}
- req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data2)
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data2,
+ auth_headers=auth_headers)
value2 = list(objects["objects"].values())[0]
perimeter_id2 = value2['id']
@@ -626,59 +749,113 @@ def test_perimeter_update_object_name_with_existed_one():
data3 = {
'name': value1['name']
}
- req = client.patch("/objects/{}".format(perimeter_id2), data=json.dumps(data3),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 409
- assert json.loads(req.data)["message"] == '409: Object Existing'
+ with pytest.raises(exceptions.ObjectExisting) as exception_info:
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id2), body=data3,
+ headers=auth_headers)
+ assert "409: Object Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Object Existing'
def test_perimeter_add_object_without_name():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "<br/>",
"description": "description of {}".format(""),
}
- req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_add_object_blank_data():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ with pytest.raises(exceptions.ValidationKeyError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body={}, headers=auth_headers)
+ assert "Invalid Key :name not found" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == 'Invalid Key :name not found'
def test_perimeter_add_object_with_name_contain_spaces():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "test<a>user",
"description": "description of {}".format("test user"),
}
- req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"), body=data,
+ headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_add_object_with_name_space():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": " ",
+ "description": "description of {}".format("test user"),
+ }
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body =data, headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
def test_perimeter_delete_object():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
object_id = builder.create_object(policy_id)
- req = client.delete("/policies/{}/objects/{}".format(policy_id, object_id))
- assert req.status_code == 200
+ req = hug.test.delete(perimeter, "/policies/{}/objects/{}".format(policy_id, object_id), headers=auth_headers)
+
+ assert req.status == hug.HTTP_200
def test_perimeter_delete_objects_without_perimeter_id():
- client = utilities.register_client()
- req = delete_objects_without_perimeter_id(client)
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "400: Object Unknown"
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ with pytest.raises(exceptions.ObjectUnknown) as exception_info:
+ req = delete_objects_without_perimeter_id(auth_headers=auth_headers)
+ assert "400: Object Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Object Unknown"
-def get_actions(client):
- req = client.get("/actions")
+
+def get_actions():
+ from moon_manager.api import perimeter
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(perimeter, "/actions", headers=auth_headers)
actions = utilities.get_json(req.data)
return req, actions
-def add_actions(client, name, policy_id=None, data=None, perimeter_id=None):
+def add_actions(name, policy_id=None, data=None, perimeter_id=None, auth_headers=None):
+ from moon_manager.api import perimeter
if not policy_id:
subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
subject_category_name="subject_category1" + uuid4().hex,
@@ -693,60 +870,72 @@ def add_actions(client, name, policy_id=None, data=None, perimeter_id=None):
"description": "description of {}".format(name),
}
if not perimeter_id:
- req = client.post("/policies/{}/actions/".format(policy_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.post(perimeter, "/policies/{}/actions/".format(policy_id), body=data,
+ headers=auth_headers)
else:
- req = client.post("/policies/{}/actions/{}".format(policy_id, perimeter_id),
- data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.post(perimeter, "/policies/{}/actions/{}".format(policy_id, perimeter_id),
+ body=data, headers=auth_headers)
actions = utilities.get_json(req.data)
return req, actions
-def delete_actions_without_perimeter_id(client):
- req = client.delete("/actions/{}".format(""))
+def delete_actions_without_perimeter_id(auth_headers=None):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req = hug.test.delete(perimeter, "/actions/{}".format(""), headers=auth_headers)
return req
def test_perimeter_get_actions():
- client = utilities.register_client()
- req, actions = get_actions(client)
- assert req.status_code == 200
+
+ req, actions = get_actions()
+
+ assert req.status == hug.HTTP_200
assert isinstance(actions, dict)
assert "actions" in actions
def test_perimeter_add_actions():
- client = utilities.register_client()
- req, actions = add_actions(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
value = list(actions["actions"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value['name']
def test_perimeter_add_action_with_wrong_policy_id():
- client = utilities.register_client()
- req, actions = add_actions(client, "testuser", policy_id="wrong")
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Policy Unknown'
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req, actions = add_actions("testuser", policy_id="wrong", auth_headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
def test_perimeter_add_action_with_policy_id_none():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "testuser" + uuid4().hex,
"description": "description of {}".format("testuser"),
}
- req = client.post("/policies/{}/actions/".format(None), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Policy Unknown'
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/actions/".format(None), body=data,
+ headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
def test_perimeter_add_same_action_name_with_new_policy_id():
- client = utilities.register_client()
- req, action = add_actions(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, action = add_actions("testuser", auth_headers=auth_headers)
value1 = list(action["actions"].values())[0]
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -754,16 +943,18 @@ def test_perimeter_add_same_action_name_with_new_policy_id():
"name": value1['name'],
"description": "description of {}".format('testuser'),
}
- req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data)
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ auth_headers=auth_headers)
value2 = list(action["actions"].values())[0]
- assert req.status_code == 200
+ assert req.status == hug.HTTP_200
assert value1['id'] == value2['id']
assert value1['name'] == value2['name']
def test_perimeter_add_same_action_perimeter_id_with_new_policy_id():
- client = utilities.register_client()
- req, action = add_actions(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, action = add_actions("testuser", auth_headers=auth_headers)
value1 = list(action["actions"].values())[0]
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -771,17 +962,19 @@ def test_perimeter_add_same_action_perimeter_id_with_new_policy_id():
"name": value1['name'],
"description": "description of {}".format('testuser'),
}
- req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data,
- perimeter_id=value1['id'])
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'], auth_headers=auth_headers)
value2 = list(action["actions"].values())[0]
- assert req.status_code == 200
+
+ assert req.status == hug.HTTP_200
assert value1['id'] == value2['id']
assert value1['name'] == value2['name']
def test_perimeter_add_same_action_perimeter_id_with_different_name():
- client = utilities.register_client()
- req, action = add_actions(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, action = add_actions("testuser", auth_headers=auth_headers)
value1 = list(action["actions"].values())[0]
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
@@ -789,240 +982,323 @@ def test_perimeter_add_same_action_perimeter_id_with_different_name():
"name": value1['name'] + 'different',
"description": "description of {}".format('testuser'),
}
- req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data,
- perimeter_id=value1['id'])
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'], auth_headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
def test_perimeter_add_same_action_name_with_same_policy_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
- req, action = add_actions(client, "testuser", policy_id=policy_id1)
+ req, action = add_actions("testuser", policy_id=policy_id1, auth_headers=auth_headers)
value1 = list(action["actions"].values())[0]
data = {
"name": value1['name'],
"description": "description of {}".format('testuser'),
}
- req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data)
- assert req.status_code == 409
- assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
def test_perimeter_add_same_action_perimeter_id_with_existed_policy_id_in_list():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
policies1 = policy_helper.add_policies()
policy_id1 = list(policies1.keys())[0]
- req, action = add_actions(client, "testuser", policy_id=policy_id1)
+ req, action = add_actions("testuser", policy_id=policy_id1, auth_headers=auth_headers)
value1 = list(action["actions"].values())[0]
data = {
"name": value1['name'],
"description": "description of {}".format('testuser'),
}
- req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data,
- perimeter_id=value1['id'])
- assert req.status_code == 409
- assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'], auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
def test_perimeter_add_actions_without_name():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "<a>",
"description": "description of {}".format(""),
}
- req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/actions".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
def test_perimeter_add_actions_with_name_contain_spaces():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "test<a>user",
"description": "description of {}".format("test user"),
}
- req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/actions".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
def test_add_subjects_without_policy_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "testuser",
"description": "description of {}".format("test user"),
}
- req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "400: Policy Unknown"
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/subjects".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
def test_add_objects_without_policy_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "testuser",
"description": "description of {}".format("test user"),
}
- req = client.post("/policies/{}/objects".format("111"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "400: Policy Unknown"
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
def test_add_action_without_policy_id():
- client = utilities.register_client()
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
data = {
"name": "testuser",
"description": "description of {}".format("test user"),
}
- req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "400: Policy Unknown"
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/actions".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"), body=data,
+ headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
def test_perimeter_update_action_name():
- client = utilities.register_client()
- req, actions = add_actions(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
value1 = list(actions["actions"].values())[0]
perimeter_id = value1['id']
data = {
'name': value1['name'] + "update"
}
- req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
subjects = utilities.get_json(req.data)
value2 = list(subjects["actions"].values())[0]
- assert req.status_code == 200
+
+ assert req.status == hug.HTTP_200
assert value1['name'] + 'update' == value2['name']
assert value1['id'] == value2['id']
assert value1['description'] == value2['description']
def test_perimeter_update_actions_description():
- client = utilities.register_client()
- req, actions = add_actions(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
value1 = list(actions["actions"].values())[0]
perimeter_id = value1['id']
data = {
'description': value1['description'] + "update"
}
- req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
subjects = utilities.get_json(req.data)
value2 = list(subjects["actions"].values())[0]
- assert req.status_code == 200
+
+ assert req.status == hug.HTTP_200
assert value1['name'] == value2['name']
assert value1['id'] == value2['id']
assert value1['description'] + 'update' == value2['description']
def test_perimeter_update_actions_description_and_name():
- client = utilities.register_client()
- req, actions = add_actions(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
value1 = list(actions["actions"].values())[0]
perimeter_id = value1['id']
data = {
'name': value1['name'] + "update",
'description': value1['description'] + "update"
}
- req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
subjects = utilities.get_json(req.data)
value2 = list(subjects["actions"].values())[0]
- assert req.status_code == 200
+
+ assert req.status == hug.HTTP_200
assert value1['name'] + 'update' == value2['name']
assert value1['id'] == value2['id']
assert value1['description'] + 'update' == value2['description']
def test_perimeter_update_action_wrong_id():
- client = utilities.register_client()
- req, actions = add_actions(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
value1 = list(actions["actions"].values())[0]
perimeter_id = value1['id']
data = {
'name': value1['name'] + "update",
'description': value1['description'] + "update"
}
- req = client.patch("/actions/{}".format(perimeter_id + "wrong"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id + "wrong"), body=data,
+ headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
def test_perimeter_update_action_name_with_existed_one():
- client = utilities.register_client()
- req, actions = add_actions(client, "testuser")
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
value1 = list(actions["actions"].values())[0]
- req, actions = add_actions(client, "testuser")
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
value2 = list(actions["actions"].values())[0]
perimeter_id2 = value2['id']
data = {
'name': value1['name'],
}
- req = client.patch("/actions/{}".format(perimeter_id2), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 409
- assert json.loads(req.data)["message"] == '409: Action Existing'
+ with pytest.raises(exceptions.ActionExisting) as exception_info:
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id2), body=data,
+ headers=auth_headers)
+ assert "409: Action Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Action Existing'
def test_perimeter_delete_actions():
- client = utilities.register_client()
-
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
action_id = builder.create_action(policy_id)
- req = client.delete("/policies/{}/actions/{}".format(policy_id, action_id))
- assert req.status_code == 200
+ req = hug.test.delete(perimeter, "/policies/{}/actions/{}".format(policy_id, action_id),
+ headers=auth_headers)
-def test_delete_subject_without_policy():
- client = utilities.register_client()
+ assert req.status == hug.HTTP_200
+
+def test_delete_subject_assigned_to_policy():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ from moon_manager.db_driver import PolicyManager
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
+ subject_id = builder.create_subject(policy_id)
+ PolicyManager.delete_policy(moon_user_id="admin", policy_id=policy_id)
+ PolicyManager.delete_subject(moon_user_id="admin", policy_id=None ,perimeter_id=subject_id)
- action_id = builder.create_action(policy_id)
+ req = hug.test.get(perimeter, "subjects/{}".format(subject_id), headers=auth_headers)
+ assert req.data['subjects'] == {}
- req = client.delete("/subjects/{}".format(action_id))
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "400: Policy Unknown"
+def test_delete_subject_without_policy():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
-def test_delete_objects_without_policy():
- client = utilities.register_client()
+ subject_id = builder.create_subject(policy_id)
+
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.delete(perimeter, "/subjects/{}".format(subject_id), headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
+
+def test_delete_objects_without_policy():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
- action_id = builder.create_action(policy_id)
+ object_id = builder.create_object(policy_id)
- req = client.delete("/objects/{}".format(action_id))
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "400: Policy Unknown"
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.delete(perimeter, "/objects/{}".format(object_id), headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
-def test_delete_actions_without_policy():
- client = utilities.register_client()
+def test_delete_actions_without_policy():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
action_id = builder.create_action(policy_id)
- req = client.delete("/actions/{}".format(action_id))
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "400: Policy Unknown"
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.delete(perimeter, "/actions/{}".format(action_id), headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
def test_perimeter_delete_actions_without_perimeter_id():
- client = utilities.register_client()
- req = delete_actions_without_perimeter_id(client)
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "400: Action Unknown"
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ with pytest.raises(exceptions.ActionUnknown) as exception_info:
+ req = delete_actions_without_perimeter_id(auth_headers=auth_headers)
+ assert "400: Action Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Action Unknown"