aboutsummaryrefslogtreecommitdiffstats
path: root/templates/moonforming/utils
diff options
context:
space:
mode:
authorWuKong <rebirthmonkey@gmail.com>2017-12-24 13:09:46 +0100
committerWuKong <rebirthmonkey@gmail.com>2017-12-24 13:09:46 +0100
commitb2904eb52f85938a18f55520ce2b4cf4dcb0269f (patch)
tree055bcf58dc3d767fc61a870a4f859e0f740ac3db /templates/moonforming/utils
parent1b21e61269223d9a9b97112e75606b1f612a0a24 (diff)
moon doc cleanup
Change-Id: Iccc0858e305d724fd8c3341e27f9fa90ffeb916f Signed-off-by: WuKong <rebirthmonkey@gmail.com>
Diffstat (limited to 'templates/moonforming/utils')
-rw-r--r--templates/moonforming/utils/__init__.py0
-rw-r--r--templates/moonforming/utils/config.py22
-rw-r--r--templates/moonforming/utils/models.py270
-rw-r--r--templates/moonforming/utils/pdp.py163
-rw-r--r--templates/moonforming/utils/policies.py635
5 files changed, 0 insertions, 1090 deletions
diff --git a/templates/moonforming/utils/__init__.py b/templates/moonforming/utils/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/templates/moonforming/utils/__init__.py
+++ /dev/null
diff --git a/templates/moonforming/utils/config.py b/templates/moonforming/utils/config.py
deleted file mode 100644
index 30c8ea4f..00000000
--- a/templates/moonforming/utils/config.py
+++ /dev/null
@@ -1,22 +0,0 @@
-import yaml
-
-
-def get_config_data(filename="moon.conf"):
- data_config = None
- for _file in (
- filename,
- "conf/moon.conf",
- "../moon.conf",
- "../conf/moon.conf",
- "/etc/moon/moon.conf",
- ):
- try:
- data_config = yaml.safe_load(open(_file))
- except FileNotFoundError:
- data_config = None
- continue
- else:
- break
- if not data_config:
- raise Exception("Configuration file not found...")
- return data_config
diff --git a/templates/moonforming/utils/models.py b/templates/moonforming/utils/models.py
deleted file mode 100644
index 3cf31354..00000000
--- a/templates/moonforming/utils/models.py
+++ /dev/null
@@ -1,270 +0,0 @@
-import requests
-import copy
-import utils.config
-
-config = utils.config.get_config_data()
-
-URL = "http://{}:{}".format(
- config['components']['manager']['hostname'],
- config['components']['manager']['port'])
-URL = URL + "{}"
-HEADERS = {"content-type": "application/json"}
-
-model_template = {
- "name": "test_model",
- "description": "test",
- "meta_rules": []
-}
-
-category_template = {
- "name": "name of the category",
- "description": "description of the category"
-}
-
-meta_rule_template = {
- "name": "test_meta_rule",
- "subject_categories": [],
- "object_categories": [],
- "action_categories": []
-}
-
-
-def check_model(model_id=None, check_model_name=True):
- req = requests.get(URL.format("/models"))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "models" in result
- if model_id:
- assert result["models"]
- assert model_id in result['models']
- assert "name" in result['models'][model_id]
- if check_model_name:
- assert model_template["name"] == result['models'][model_id]["name"]
- return result
-
-
-def add_model(name=None):
- if name:
- model_template['name'] = name
- req = requests.post(URL.format("/models"), json=model_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- model_id = list(result['models'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "name" in result['models'][model_id]
- assert model_template["name"] == result['models'][model_id]["name"]
- return model_id
-
-
-def delete_model(model_id):
- req = requests.delete(URL.format("/models/{}".format(model_id)))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "result" in result
- assert result["result"]
-
-
-def add_subject_category(name="subject_cat_1"):
- category_template["name"] = name
- req = requests.post(URL.format("/subject_categories"), json=category_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "subject_categories" in result
- category_id = list(result['subject_categories'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "name" in result['subject_categories'][category_id]
- assert category_template["name"] == result['subject_categories'][category_id]["name"]
- return category_id
-
-
-def check_subject_category(category_id):
- req = requests.get(URL.format("/subject_categories"))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "subject_categories" in result
- if "result" in result:
- assert result["result"]
- assert category_id in result['subject_categories']
- assert "name" in result['subject_categories'][category_id]
- assert category_template["name"] == result['subject_categories'][category_id]["name"]
-
-
-def delete_subject_category(category_id):
- req = requests.delete(URL.format("/subject_categories/{}".format(category_id)))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- if "result" in result:
- assert result["result"]
-
-
-def add_object_category(name="object_cat_1"):
- category_template["name"] = name
- req = requests.post(URL.format("/object_categories"), json=category_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "object_categories" in result
- category_id = list(result['object_categories'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "name" in result['object_categories'][category_id]
- assert category_template["name"] == result['object_categories'][category_id]["name"]
- return category_id
-
-
-def check_object_category(category_id):
- req = requests.get(URL.format("/object_categories"))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "object_categories" in result
- if "result" in result:
- assert result["result"]
- assert category_id in result['object_categories']
- assert "name" in result['object_categories'][category_id]
- assert category_template["name"] == result['object_categories'][category_id]["name"]
-
-
-def delete_object_category(category_id):
- req = requests.delete(URL.format("/object_categories/{}".format(category_id)))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- if "result" in result:
- assert result["result"]
-
-
-def add_action_category(name="action_cat_1"):
- category_template["name"] = name
- req = requests.post(URL.format("/action_categories"), json=category_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "action_categories" in result
- category_id = list(result['action_categories'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "name" in result['action_categories'][category_id]
- assert category_template["name"] == result['action_categories'][category_id]["name"]
- return category_id
-
-
-def check_action_category(category_id):
- req = requests.get(URL.format("/action_categories"))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "action_categories" in result
- if "result" in result:
- assert result["result"]
- assert category_id in result['action_categories']
- assert "name" in result['action_categories'][category_id]
- assert category_template["name"] == result['action_categories'][category_id]["name"]
-
-
-def delete_action_category(category_id):
- req = requests.delete(URL.format("/action_categories/{}".format(category_id)))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- if "result" in result:
- assert result["result"]
-
-
-def add_categories_and_meta_rule(name="test_meta_rule"):
- scat_id = add_subject_category()
- ocat_id = add_object_category()
- acat_id = add_action_category()
- _meta_rule_template = copy.deepcopy(meta_rule_template)
- _meta_rule_template["name"] = name
- _meta_rule_template["subject_categories"].append(scat_id)
- _meta_rule_template["object_categories"].append(ocat_id)
- _meta_rule_template["action_categories"].append(acat_id)
- req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "meta_rules" in result
- meta_rule_id = list(result['meta_rules'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "name" in result['meta_rules'][meta_rule_id]
- assert _meta_rule_template["name"] == result['meta_rules'][meta_rule_id]["name"]
- return meta_rule_id, scat_id, ocat_id, acat_id
-
-
-def add_meta_rule(name="test_meta_rule", scat=[], ocat=[], acat=[]):
- _meta_rule_template = copy.deepcopy(meta_rule_template)
- _meta_rule_template["name"] = name
- _meta_rule_template["subject_categories"] = []
- _meta_rule_template["subject_categories"].extend(scat)
- _meta_rule_template["object_categories"] = []
- _meta_rule_template["object_categories"].extend(ocat)
- _meta_rule_template["action_categories"] = []
- _meta_rule_template["action_categories"].extend(acat)
- req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "meta_rules" in result
- meta_rule_id = list(result['meta_rules'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "name" in result['meta_rules'][meta_rule_id]
- assert _meta_rule_template["name"] == result['meta_rules'][meta_rule_id]["name"]
- return meta_rule_id
-
-
-def check_meta_rule(meta_rule_id, scat_id=None, ocat_id=None, acat_id=None):
- req = requests.get(URL.format("/meta_rules"))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "meta_rules" in result
- if "result" in result:
- assert result["result"]
- if not meta_rule_id:
- return result
- assert meta_rule_id in result['meta_rules']
- assert "name" in result['meta_rules'][meta_rule_id]
- if scat_id:
- assert scat_id in result['meta_rules'][meta_rule_id]["subject_categories"]
- if ocat_id:
- assert ocat_id in result['meta_rules'][meta_rule_id]["object_categories"]
- if acat_id:
- assert acat_id in result['meta_rules'][meta_rule_id]["action_categories"]
-
-
-def delete_meta_rule(meta_rule_id):
- req = requests.delete(URL.format("/meta_rules/{}".format(meta_rule_id)))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- if "result" in result:
- assert result["result"]
-
-
-def add_meta_rule_to_model(model_id, meta_rule_id):
- model = check_model(model_id, check_model_name=False)['models']
- meta_rule_list = model[model_id]["meta_rules"]
- if meta_rule_id not in meta_rule_list:
- meta_rule_list.append(meta_rule_id)
- req = requests.patch(URL.format("/models/{}".format(model_id)),
- json={"meta_rules": meta_rule_list},
- headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- model_id = list(result['models'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "meta_rules" in result['models'][model_id]
- assert meta_rule_list == result['models'][model_id]["meta_rules"]
diff --git a/templates/moonforming/utils/pdp.py b/templates/moonforming/utils/pdp.py
deleted file mode 100644
index f3c6df37..00000000
--- a/templates/moonforming/utils/pdp.py
+++ /dev/null
@@ -1,163 +0,0 @@
-import logging
-import requests
-import utils.config
-
-config = utils.config.get_config_data()
-logger = logging.getLogger("moonforming.utils.policies")
-
-URL = "http://{}:{}".format(
- config['components']['manager']['hostname'],
- config['components']['manager']['port'])
-HEADERS = {"content-type": "application/json"}
-KEYSTONE_USER = config['openstack']['keystone']['user']
-KEYSTONE_PASSWORD = config['openstack']['keystone']['password']
-KEYSTONE_PROJECT = config['openstack']['keystone']['project']
-KEYSTONE_SERVER = config['openstack']['keystone']['url']
-
-pdp_template = {
- "name": "test_pdp",
- "security_pipeline": [],
- "keystone_project_id": None,
- "description": "test",
-}
-
-
-def get_keystone_projects():
-
- HEADERS = {
- "Content-Type": "application/json"
- }
-
- data_auth = {
- "auth": {
- "identity": {
- "methods": [
- "password"
- ],
- "password": {
- "user": {
- "name": KEYSTONE_USER,
- "domain": {
- "name": "Default"
- },
- "password": KEYSTONE_PASSWORD
- }
- }
- }
- }
- }
-
- req = requests.post("{}/auth/tokens".format(KEYSTONE_SERVER), json=data_auth, headers=HEADERS)
- logger.debug("{}/auth/tokens".format(KEYSTONE_SERVER))
- logger.debug(req.text)
- assert req.status_code in (200, 201)
- TOKEN = req.headers['X-Subject-Token']
- HEADERS['X-Auth-Token'] = TOKEN
- req = requests.get("{}/projects".format(KEYSTONE_SERVER), headers=HEADERS)
- if req.status_code not in (200, 201):
- data_auth["auth"]["scope"] = {
- "project": {
- "name": KEYSTONE_PROJECT,
- "domain": {
- "id": "default"
- }
- }
- }
- req = requests.post("{}/auth/tokens".format(KEYSTONE_SERVER), json=data_auth, headers=HEADERS)
- assert req.status_code in (200, 201)
- TOKEN = req.headers['X-Subject-Token']
- HEADERS['X-Auth-Token'] = TOKEN
- req = requests.get("{}/projects".format(KEYSTONE_SERVER), headers=HEADERS)
- assert req.status_code in (200, 201)
- return req.json()
-
-
-def check_pdp(pdp_id=None, keystone_project_id=None, moon_url=None):
- _URL = URL
- if moon_url:
- _URL = moon_url
- req = requests.get(_URL + "/pdp")
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "pdps" in result
- if pdp_id:
- assert result["pdps"]
- assert pdp_id in result['pdps']
- assert "name" in result['pdps'][pdp_id]
- assert pdp_template["name"] == result['pdps'][pdp_id]["name"]
- if keystone_project_id:
- assert result["pdps"]
- assert pdp_id in result['pdps']
- assert "keystone_project_id" in result['pdps'][pdp_id]
- assert keystone_project_id == result['pdps'][pdp_id]["keystone_project_id"]
- return result
-
-
-def add_pdp(name="test_pdp", policy_id=None):
- pdp_template['name'] = name
- if policy_id:
- pdp_template['security_pipeline'].append(policy_id)
- req = requests.post(URL + "/pdp", json=pdp_template, headers=HEADERS)
- logger.debug(req.status_code)
- logger.debug(req)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- pdp_id = list(result['pdps'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "name" in result['pdps'][pdp_id]
- assert pdp_template["name"] == result['pdps'][pdp_id]["name"]
- return pdp_id
-
-
-def update_pdp(pdp_id, policy_id=None):
- req = requests.get(URL + "/pdp/{}".format(pdp_id))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "pdps" in result
- assert pdp_id in result['pdps']
- pipeline = result['pdps'][pdp_id]["security_pipeline"]
- if policy_id not in pipeline:
- pipeline.append(policy_id)
- req = requests.patch(URL + "/pdp/{}".format(pdp_id),
- json={"security_pipeline": pipeline})
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "pdps" in result
- assert pdp_id in result['pdps']
-
- req = requests.get(URL + "/pdp/{}".format(pdp_id))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "pdps" in result
- assert pdp_id in result['pdps']
- assert policy_id in pipeline
-
-
-def map_to_keystone(pdp_id, keystone_project_id):
- req = requests.patch(URL + "/pdp/{}".format(pdp_id), json={"keystone_project_id": keystone_project_id},
- headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- if "result" in result:
- assert result["result"]
- assert pdp_id in result['pdps']
- assert "name" in result['pdps'][pdp_id]
- assert pdp_template["name"] == result['pdps'][pdp_id]["name"]
- return pdp_id
-
-
-def delete_pdp(pdp_id):
- req = requests.delete(URL + "/pdp/{}".format(pdp_id))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "result" in result
- assert result["result"]
-
diff --git a/templates/moonforming/utils/policies.py b/templates/moonforming/utils/policies.py
deleted file mode 100644
index bd08291a..00000000
--- a/templates/moonforming/utils/policies.py
+++ /dev/null
@@ -1,635 +0,0 @@
-import logging
-import requests
-import utils.config
-
-config = utils.config.get_config_data()
-logger = logging.getLogger("moonforming.utils.policies")
-
-URL = "http://{}:{}".format(config['components']['manager']['hostname'], config['components']['manager']['port'])
-URL = URL + "{}"
-HEADERS = {"content-type": "application/json"}
-FILE = open("/tmp/test.log", "w")
-
-policy_template = {
- "name": "test_policy",
- "model_id": "",
- "genre": "authz",
- "description": "test",
-}
-
-subject_template = {
- "name": "test_subject",
- "description": "test",
- "email": "mail",
- "password": "my_pass",
-}
-
-object_template = {
- "name": "test_subject",
- "description": "test"
-}
-
-action_template = {
- "name": "test_subject",
- "description": "test"
-}
-
-subject_data_template = {
- "name": "subject_data1",
- "description": "description of the data subject"
-}
-
-object_data_template = {
- "name": "object_data1",
- "description": "description of the data subject"
-}
-
-action_data_template = {
- "name": "action_data1",
- "description": "description of the data subject"
-}
-
-subject_assignment_template = {
- "id": "",
- "category_id": "",
- "scope_id": ""
-}
-
-
-def check_policy(policy_id=None):
- req = requests.get(URL.format("/policies"))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "policies" in result
- if policy_id:
- assert result["policies"]
- assert policy_id in result['policies']
- assert "name" in result['policies'][policy_id]
- assert policy_template["name"] == result['policies'][policy_id]["name"]
- return result
-
-
-def add_policy(name="test_policy", genre="authz"):
- policy_template["name"] = name
- policy_template["genre"] = genre
- req = requests.post(URL.format("/policies"), json=policy_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- policy_id = list(result['policies'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "name" in result['policies'][policy_id]
- assert policy_template["name"] == result['policies'][policy_id]["name"]
- return policy_id
-
-
-def update_policy(policy_id, model_id):
- req = requests.patch(URL.format("/policies/{}".format(policy_id)),
- json={"model_id": model_id}, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- policy_id = list(result['policies'].keys())[0]
- if "result" in result:
- assert result["result"]
- assert "model_id" in result['policies'][policy_id]
- assert model_id == result['policies'][policy_id]["model_id"]
-
-
-def delete_policy(policy_id):
- req = requests.delete(URL.format("/policies/{}".format(policy_id)))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "result" in result
- assert result["result"]
-
-
-def add_subject(policy_id=None, name="test_subject"):
- subject_template['name'] = name
- if policy_id:
- logger.debug(URL.format("/policies/{}/subjects".format(policy_id)))
- req = requests.post(URL.format("/policies/{}/subjects".format(policy_id)),
- json=subject_template, headers=HEADERS)
- else:
- logger.debug(URL.format("/subjects"))
- req = requests.post(URL.format("/subjects"), json=subject_template, headers=HEADERS)
- logger.debug(req.text)
- assert req.status_code == 200
- result = req.json()
- assert "subjects" in result
- subject_id = list(result['subjects'].keys())[0]
- return subject_id
-
-
-def update_subject(subject_id, policy_id=None, description=None):
- if policy_id and not description:
- req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)),
- json={})
- elif policy_id and description:
- req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)),
- json={"description": description})
- else:
- req = requests.patch(URL.format("/subjects/{}".format(subject_id)),
- json={"description": description})
- assert req.status_code == 200
- result = req.json()
- assert "subjects" in result
- assert "name" in result["subjects"][subject_id]
- assert subject_template["name"] == result["subjects"][subject_id]["name"]
- assert "policy_list" in result["subjects"][subject_id]
- if policy_id:
- assert policy_id in result["subjects"][subject_id]["policy_list"]
- if description:
- assert description in result["subjects"][subject_id]["description"]
-
-
-def check_subject(subject_id=None, policy_id=None):
- if policy_id:
- req = requests.get(URL.format("/policies/{}/subjects".format(policy_id)))
- else:
- req = requests.get(URL.format("/subjects"))
- assert req.status_code == 200
- result = req.json()
- assert "subjects" in result
- assert "name" in result["subjects"][subject_id]
- assert subject_template["name"] == result["subjects"][subject_id]["name"]
- if policy_id:
- assert "policy_list" in result["subjects"][subject_id]
- assert policy_id in result["subjects"][subject_id]["policy_list"]
-
-
-def delete_subject(subject_id, policy_id=None):
- if policy_id:
- req = requests.delete(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)))
- else:
- req = requests.delete(URL.format("/subjects/{}".format(subject_id)))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "result" in result
- assert result["result"]
-
- if policy_id:
- req = requests.get(URL.format("/policies/{}/subjects".format(policy_id)))
- else:
- req = requests.get(URL.format("/subjects"))
- assert req.status_code == 200
- result = req.json()
- assert "subjects" in result
- if subject_id in result["subjects"]:
- assert "name" in result["subjects"][subject_id]
- assert subject_template["name"] == result["subjects"][subject_id]["name"]
- if policy_id:
- assert "policy_list" in result["subjects"][subject_id]
- assert policy_id not in result["subjects"][subject_id]["policy_list"]
-
-
-def add_object(policy_id=None, name="test_object"):
- object_template['name'] = name
- if policy_id:
- req = requests.post(URL.format("/policies/{}/objects".format(policy_id)),
- json=object_template, headers=HEADERS)
- else:
- req = requests.post(URL.format("/objects"), json=object_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert "objects" in result
- object_id = list(result['objects'].keys())[0]
- return object_id
-
-
-def update_object(object_id, policy_id):
- req = requests.patch(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)), json={})
- assert req.status_code == 200
- result = req.json()
- assert "objects" in result
- assert "name" in result["objects"][object_id]
- assert object_template["name"] == result["objects"][object_id]["name"]
- assert "policy_list" in result["objects"][object_id]
- assert policy_id in result["objects"][object_id]["policy_list"]
-
-
-def check_object(object_id=None, policy_id=None):
- if policy_id:
- req = requests.get(URL.format("/policies/{}/objects".format(policy_id)))
- else:
- req = requests.get(URL.format("/objects"))
- assert req.status_code == 200
- result = req.json()
- assert "objects" in result
- assert "name" in result["objects"][object_id]
- assert object_template["name"] == result["objects"][object_id]["name"]
- if policy_id:
- assert "policy_list" in result["objects"][object_id]
- assert policy_id in result["objects"][object_id]["policy_list"]
-
-
-def delete_object(object_id, policy_id=None):
- if policy_id:
- req = requests.delete(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)))
- else:
- req = requests.delete(URL.format("/objects/{}".format(object_id)))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "result" in result
- assert result["result"]
-
- if policy_id:
- req = requests.get(URL.format("/policies/{}/objects".format(policy_id)))
- else:
- req = requests.get(URL.format("/objects"))
- assert req.status_code == 200
- result = req.json()
- assert "objects" in result
- if object_id in result["objects"]:
- assert "name" in result["objects"][object_id]
- assert object_template["name"] == result["objects"][object_id]["name"]
- if policy_id:
- assert "policy_list" in result["objects"][object_id]
- assert policy_id not in result["objects"][object_id]["policy_list"]
-
-
-def add_action(policy_id=None, name="test_action"):
- action_template['name'] = name
- if policy_id:
- req = requests.post(URL.format("/policies/{}/actions".format(policy_id)),
- json=action_template, headers=HEADERS)
- else:
- req = requests.post(URL.format("/actions"), json=action_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert "actions" in result
- action_id = list(result['actions'].keys())[0]
- return action_id
-
-
-def update_action(action_id, policy_id):
- req = requests.patch(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)), json={})
- assert req.status_code == 200
- result = req.json()
- assert "actions" in result
- assert "name" in result["actions"][action_id]
- assert action_template["name"] == result["actions"][action_id]["name"]
- assert "policy_list" in result["actions"][action_id]
- assert policy_id in result["actions"][action_id]["policy_list"]
-
-
-def check_action(action_id=None, policy_id=None):
- if policy_id:
- req = requests.get(URL.format("/policies/{}/actions".format(policy_id)))
- else:
- req = requests.get(URL.format("/actions"))
- assert req.status_code == 200
- result = req.json()
- assert "actions" in result
- assert "name" in result["actions"][action_id]
- assert action_template["name"] == result["actions"][action_id]["name"]
- if policy_id:
- assert "policy_list" in result["actions"][action_id]
- assert policy_id in result["actions"][action_id]["policy_list"]
-
-
-def delete_action(action_id, policy_id=None):
- if policy_id:
- req = requests.delete(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)))
- else:
- req = requests.delete(URL.format("/actions/{}".format(action_id)))
- assert req.status_code == 200
- result = req.json()
- assert type(result) is dict
- assert "result" in result
- assert result["result"]
-
- if policy_id:
- req = requests.get(URL.format("/policies/{}/actions".format(policy_id)))
- else:
- req = requests.get(URL.format("/actions"))
- assert req.status_code == 200
- result = req.json()
- assert "actions" in result
- if action_id in result["actions"]:
- assert "name" in result["actions"][action_id]
- assert action_template["name"] == result["actions"][action_id]["name"]
- if policy_id:
- assert "policy_list" in result["actions"][action_id]
- assert policy_id not in result["actions"][action_id]["policy_list"]
-
-
-def add_subject_data(policy_id, category_id, name="subject_data1"):
- subject_data_template['name'] = name
- req = requests.post(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)),
- json=subject_data_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert "subject_data" in result
- subject_id = list(result['subject_data']['data'].keys())[0]
- return subject_id
-
-
-def check_subject_data(policy_id, data_id, category_id):
- req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)))
- assert req.status_code == 200
- result = req.json()
- assert "subject_data" in result
- for _data in result['subject_data']:
- assert data_id in list(_data['data'].keys())
- assert category_id == _data["category_id"]
-
-
-def delete_subject_data(policy_id, category_id, data_id):
- req = requests.delete(URL.format("/policies/{}/subject_data/{}/{}".format(policy_id, category_id, data_id)),
- headers=HEADERS)
- assert req.status_code == 200
- req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)))
- assert req.status_code == 200
- result = req.json()
- assert "subject_data" in result
- for _data in result['subject_data']:
- assert data_id not in list(_data['data'].keys())
- assert category_id == _data["category_id"]
-
-
-def add_object_data(policy_id, category_id, name="object_data1"):
- object_data_template['name'] = name
- req = requests.post(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)),
- json=object_data_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert "object_data" in result
- object_id = list(result['object_data']['data'].keys())[0]
- return object_id
-
-
-def check_object_data(policy_id, data_id, category_id):
- req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)))
- assert req.status_code == 200
- result = req.json()
- assert "object_data" in result
- for _data in result['object_data']:
- assert data_id in list(_data['data'].keys())
- assert category_id == _data["category_id"]
-
-
-def delete_object_data(policy_id, category_id, data_id):
- req = requests.delete(URL.format("/policies/{}/object_data/{}/{}".format(policy_id, category_id, data_id)),
- headers=HEADERS)
- assert req.status_code == 200
- req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)))
- assert req.status_code == 200
- result = req.json()
- assert "object_data" in result
- for _data in result['object_data']:
- assert data_id not in list(_data['data'].keys())
- assert category_id == _data["category_id"]
-
-
-def add_action_data(policy_id, category_id, name="action_data1"):
- action_data_template['name'] = name
- req = requests.post(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)),
- json=action_data_template, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert "action_data" in result
- action_id = list(result['action_data']['data'].keys())[0]
- return action_id
-
-
-def check_action_data(policy_id, data_id, category_id):
- req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)))
- assert req.status_code == 200
- result = req.json()
- assert "action_data" in result
- for _data in result['action_data']:
- assert data_id in list(_data['data'].keys())
- assert category_id == _data["category_id"]
-
-
-def delete_action_data(policy_id, category_id, data_id):
- req = requests.delete(URL.format("/policies/{}/action_data/{}/{}".format(policy_id, category_id, data_id)),
- headers=HEADERS)
- assert req.status_code == 200
- req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)))
- assert req.status_code == 200
- result = req.json()
- assert "action_data" in result
- for _data in result['action_data']:
- assert data_id not in list(_data['data'].keys())
- assert category_id == _data["category_id"]
-
-
-def add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id):
- req = requests.post(URL.format("/policies/{}/subject_assignments".format(policy_id)),
- json={
- "id": subject_id,
- "category_id": subject_cat_id,
- "data_id": subject_data_id
- }, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert "subject_assignments" in result
- assert result["subject_assignments"]
-
-
-def check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id):
- req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format(
- policy_id, subject_id, subject_cat_id, subject_data_id)))
- assert req.status_code == 200
- result = req.json()
- assert "subject_assignments" in result
- assert result["subject_assignments"]
- for key in result["subject_assignments"]:
- assert "subject_id" in result["subject_assignments"][key]
- assert "category_id" in result["subject_assignments"][key]
- assert "assignments" in result["subject_assignments"][key]
- if result["subject_assignments"][key]['subject_id'] == subject_id and \
- result["subject_assignments"][key]["category_id"] == subject_cat_id:
- assert subject_data_id in result["subject_assignments"][key]["assignments"]
-
-
-def check_object_assignments(policy_id, object_id, object_cat_id, object_data_id):
- req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format(
- policy_id, object_id, object_cat_id, object_data_id)))
- assert req.status_code == 200
- result = req.json()
- assert "object_assignments" in result
- assert result["object_assignments"]
- for key in result["object_assignments"]:
- assert "object_id" in result["object_assignments"][key]
- assert "category_id" in result["object_assignments"][key]
- assert "assignments" in result["object_assignments"][key]
- if result["object_assignments"][key]['object_id'] == object_id and \
- result["object_assignments"][key]["category_id"] == object_cat_id:
- assert object_data_id in result["object_assignments"][key]["assignments"]
-
-
-def check_action_assignments(policy_id, action_id, action_cat_id, action_data_id):
- req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format(
- policy_id, action_id, action_cat_id, action_data_id)))
- assert req.status_code == 200
- result = req.json()
- assert "action_assignments" in result
- assert result["action_assignments"]
- for key in result["action_assignments"]:
- assert "action_id" in result["action_assignments"][key]
- assert "category_id" in result["action_assignments"][key]
- assert "assignments" in result["action_assignments"][key]
- if result["action_assignments"][key]['action_id'] == action_id and \
- result["action_assignments"][key]["category_id"] == action_cat_id:
- assert action_data_id in result["action_assignments"][key]["assignments"]
-
-
-def add_object_assignments(policy_id, object_id, object_cat_id, object_data_id):
- req = requests.post(URL.format("/policies/{}/object_assignments".format(policy_id)),
- json={
- "id": object_id,
- "category_id": object_cat_id,
- "data_id": object_data_id
- }, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert "object_assignments" in result
- assert result["object_assignments"]
-
-
-def add_action_assignments(policy_id, action_id, action_cat_id, action_data_id):
- req = requests.post(URL.format("/policies/{}/action_assignments".format(policy_id)),
- json={
- "id": action_id,
- "category_id": action_cat_id,
- "data_id": action_data_id
- }, headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert "action_assignments" in result
- assert result["action_assignments"]
-
-
-def delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id):
- req = requests.delete(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format(
- policy_id, subject_id, subject_cat_id, subject_data_id)))
- assert req.status_code == 200
- result = req.json()
- assert "result" in result
- assert result["result"]
-
- req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format(
- policy_id, subject_id, subject_cat_id, subject_data_id)))
- assert req.status_code == 200
- result = req.json()
- assert "subject_assignments" in result
- assert result["subject_assignments"]
- for key in result["subject_assignments"]:
- assert "subject_id" in result["subject_assignments"][key]
- assert "category_id" in result["subject_assignments"][key]
- assert "assignments" in result["subject_assignments"][key]
- if result["subject_assignments"][key]['subject_id'] == subject_id and \
- result["subject_assignments"][key]["category_id"] == subject_cat_id:
- assert subject_data_id not in result["subject_assignments"][key]["assignments"]
-
-
-def delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id):
- req = requests.delete(URL.format("/policies/{}/object_assignments/{}/{}/{}".format(
- policy_id, object_id, object_cat_id, object_data_id)))
- assert req.status_code == 200
- result = req.json()
- assert "result" in result
- assert result["result"]
-
- req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format(
- policy_id, object_id, object_cat_id, object_data_id)))
- assert req.status_code == 200
- result = req.json()
- assert "object_assignments" in result
- assert result["object_assignments"]
- for key in result["object_assignments"]:
- assert "object_id" in result["object_assignments"][key]
- assert "category_id" in result["object_assignments"][key]
- assert "assignments" in result["object_assignments"][key]
- if result["object_assignments"][key]['object_id'] == object_id and \
- result["object_assignments"][key]["category_id"] == object_cat_id:
- assert object_data_id not in result["object_assignments"][key]["assignments"]
-
-
-def delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id):
- req = requests.delete(URL.format("/policies/{}/action_assignments/{}/{}/{}".format(
- policy_id, action_id, action_cat_id, action_data_id)))
- assert req.status_code == 200
- result = req.json()
- assert "result" in result
- assert result["result"]
-
- req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format(
- policy_id, action_id, action_cat_id, action_data_id)))
- assert req.status_code == 200
- result = req.json()
- assert "action_assignments" in result
- assert result["action_assignments"]
- for key in result["action_assignments"]:
- assert "action_id" in result["action_assignments"][key]
- assert "category_id" in result["action_assignments"][key]
- assert "assignments" in result["action_assignments"][key]
- if result["action_assignments"][key]['action_id'] == action_id and \
- result["action_assignments"][key]["category_id"] == action_cat_id:
- assert action_data_id not in result["action_assignments"][key]["assignments"]
-
-
-def add_rule(policy_id, meta_rule_id, rule, instructions={"chain": [{"security_pipeline": "rbac"}]}):
- req = requests.post(URL.format("/policies/{}/rules".format(policy_id)),
- json={
- "meta_rule_id": meta_rule_id,
- "rule": rule,
- "instructions": instructions,
- "enabled": True
- },
- headers=HEADERS)
- assert req.status_code == 200
- result = req.json()
- assert "rules" in result
- try:
- rule_id = list(result["rules"].keys())[0]
- except Exception as e:
- return False
- assert "policy_id" in result["rules"][rule_id]
- assert policy_id == result["rules"][rule_id]["policy_id"]
- assert "meta_rule_id" in result["rules"][rule_id]
- assert meta_rule_id == result["rules"][rule_id]["meta_rule_id"]
- assert rule == result["rules"][rule_id]["rule"]
- return rule_id
-
-
-def check_rule(policy_id, meta_rule_id, rule_id, rule):
- req = requests.get(URL.format("/policies/{}/rules".format(policy_id)))
- assert req.status_code == 200
- result = req.json()
- assert "rules" in result
- assert "policy_id" in result["rules"]
- assert policy_id == result["rules"]["policy_id"]
- for item in result["rules"]["rules"]:
- assert "meta_rule_id" in item
- if meta_rule_id == item["meta_rule_id"]:
- if rule_id == item["id"]:
- assert rule == item["rule"]
-
-
-def delete_rule(policy_id, rule_id):
- req = requests.delete(URL.format("/policies/{}/rules/{}".format(policy_id, rule_id)))
- assert req.status_code == 200
- result = req.json()
- assert "result" in result
- assert result["result"]
-
- req = requests.get(URL.format("/policies/{}/rules".format(policy_id)))
- assert req.status_code == 200
- result = req.json()
- assert "rules" in result
- assert "policy_id" in result["rules"]
- assert policy_id == result["rules"]["policy_id"]
- found_rule = False
- for item in result["rules"]["rules"]:
- if rule_id == item["id"]:
- found_rule = True
- assert not found_rule