diff options
author | asteroide <thomas.duval@orange.com> | 2017-09-13 11:19:47 +0200 |
---|---|---|
committer | asteroide <thomas.duval@orange.com> | 2017-09-13 11:19:47 +0200 |
commit | 1f754ef804ffedaf1c222f9f51313b6afcbbec6e (patch) | |
tree | 489c8dafa509d3066608d2c3fc087a07d587ab27 | |
parent | 3595c59908df7c43fad4301545d3b9c455dffcc7 (diff) |
Move test files from Interface dir to tests dir
Change-Id: Iaf697b15a8e51cd96a4a1b65d74483b7f3ac9cea
-rw-r--r-- | moonv4/tests/populate_default_values.py | 231 | ||||
-rw-r--r-- | moonv4/tests/scenario/delegation.py | 40 | ||||
-rw-r--r-- | moonv4/tests/scenario/mls.py | 54 | ||||
-rw-r--r-- | moonv4/tests/scenario/rbac.py | 44 | ||||
-rw-r--r-- | moonv4/tests/scenario/rbac_custom_100.py | 89 | ||||
-rw-r--r-- | moonv4/tests/scenario/rbac_custom_1000.py | 89 | ||||
-rw-r--r-- | moonv4/tests/scenario/rbac_custom_50.py | 89 | ||||
-rw-r--r-- | moonv4/tests/scenario/rbac_large.py | 233 | ||||
-rw-r--r-- | moonv4/tests/scenario/rbac_mls.py | 50 | ||||
-rw-r--r-- | moonv4/tests/scenario/session.py | 60 | ||||
-rw-r--r-- | moonv4/tests/scenario/session_large.py | 389 | ||||
-rw-r--r-- | moonv4/tests/test_models.py | 37 | ||||
-rw-r--r-- | moonv4/tests/test_pdp.py | 16 | ||||
-rw-r--r-- | moonv4/tests/test_policies.py | 157 | ||||
-rw-r--r-- | moonv4/tests/utils/__init__.py | 0 | ||||
-rw-r--r-- | moonv4/tests/utils/config.py | 22 | ||||
-rw-r--r-- | moonv4/tests/utils/models.py | 270 | ||||
-rw-r--r-- | moonv4/tests/utils/pdp.py | 157 | ||||
-rw-r--r-- | moonv4/tests/utils/policies.py | 631 |
19 files changed, 2658 insertions, 0 deletions
diff --git a/moonv4/tests/populate_default_values.py b/moonv4/tests/populate_default_values.py new file mode 100644 index 00000000..740ad8ed --- /dev/null +++ b/moonv4/tests/populate_default_values.py @@ -0,0 +1,231 @@ +import argparse +import logging +from importlib.machinery import SourceFileLoader +from utils.pdp import * +from utils.models import * +from utils.policies import * + +parser = argparse.ArgumentParser() +parser.add_argument('filename', help='scenario filename', nargs=1) +parser.add_argument("--verbose", "-v", action='store_true', help="verbose mode") +parser.add_argument("--debug", "-d", action='store_true', help="debug mode") +args = parser.parse_args() + +FORMAT = '%(asctime)-15s %(levelname)s %(message)s' +if args.debug: + logging.basicConfig( + format=FORMAT, + level=logging.DEBUG) +elif args.verbose: + logging.basicConfig( + format=FORMAT, + level=logging.INFO) +else: + logging.basicConfig( + format=FORMAT, + level=logging.WARNING) + +requests_log = logging.getLogger("requests.packages.urllib3") +requests_log.setLevel(logging.WARNING) +requests_log.propagate = True + +logger = logging.getLogger(__name__) + +if args.filename: + print("Loading: {}".format(args.filename[0])) + +m = SourceFileLoader("scenario", args.filename[0]) + +scenario = m.load_module() + + +def create_model(model_id=None): + if args.verbose: + logger.info("Creating model {}".format(scenario.model_name)) + if not model_id: + logger.info("Add model") + model_id = add_model(name=scenario.model_name) + logger.info("Add subject categories") + for cat in scenario.subject_categories: + scenario.subject_categories[cat] = add_subject_category(name=cat) + logger.info("Add object categories") + for cat in scenario.object_categories: + scenario.object_categories[cat] = add_object_category(name=cat) + logger.info("Add action categories") + for cat in scenario.action_categories: + scenario.action_categories[cat] = add_action_category(name=cat) + sub_cat = [] + ob_cat = [] + act_cat = [] + meta_rule_list = [] + for item_name, item_value in scenario.meta_rule.items(): + for item in item_value["value"]: + if item in scenario.subject_categories: + sub_cat.append(scenario.subject_categories[item]) + elif item in scenario.object_categories: + ob_cat.append(scenario.object_categories[item]) + elif item in scenario.action_categories: + act_cat.append(scenario.action_categories[item]) + meta_rules = check_meta_rule(meta_rule_id=None) + for _meta_rule_id, _meta_rule_value in meta_rules['meta_rules'].items(): + if _meta_rule_value['name'] == item_name: + meta_rule_id = _meta_rule_id + break + else: + logger.info("Add meta rule") + meta_rule_id = add_meta_rule(item_name, sub_cat, ob_cat, act_cat) + item_value["id"] = meta_rule_id + if meta_rule_id not in meta_rule_list: + meta_rule_list.append(meta_rule_id) + return model_id, meta_rule_list + + +def create_policy(model_id, meta_rule_list): + if args.verbose: + logger.info("Creating policy {}".format(scenario.policy_name)) + _policies = check_policy() + for _policy_id, _policy_value in _policies["policies"].items(): + if _policy_value['name'] == scenario.policy_name: + policy_id = _policy_id + break + else: + policy_id = add_policy(name=scenario.policy_name, genre=scenario.policy_genre) + + update_policy(policy_id, model_id) + + for meta_rule_id in meta_rule_list: + logger.debug("add_meta_rule_to_model {} {}".format(model_id, meta_rule_id)) + add_meta_rule_to_model(model_id, meta_rule_id) + + logger.info("Add subject data") + for subject_cat_name in scenario.subject_data: + for subject_data_name in scenario.subject_data[subject_cat_name]: + data_id = scenario.subject_data[subject_cat_name][subject_data_name] = add_subject_data( + policy_id=policy_id, + category_id=scenario.subject_categories[subject_cat_name], name=subject_data_name) + scenario.subject_data[subject_cat_name][subject_data_name] = data_id + logger.info("Add object data") + for object_cat_name in scenario.object_data: + for object_data_name in scenario.object_data[object_cat_name]: + data_id = scenario.object_data[object_cat_name][object_data_name] = add_object_data( + policy_id=policy_id, + category_id=scenario.object_categories[object_cat_name], name=object_data_name) + scenario.object_data[object_cat_name][object_data_name] = data_id + logger.info("Add action data") + for action_cat_name in scenario.action_data: + for action_data_name in scenario.action_data[action_cat_name]: + data_id = scenario.action_data[action_cat_name][action_data_name] = add_action_data( + policy_id=policy_id, + category_id=scenario.action_categories[action_cat_name], name=action_data_name) + scenario.action_data[action_cat_name][action_data_name] = data_id + + logger.info("Add subjects") + for name in scenario.subjects: + scenario.subjects[name] = add_subject(policy_id, name=name) + logger.info("Add objects") + for name in scenario.objects: + scenario.objects[name] = add_object(policy_id, name=name) + logger.info("Add actions") + for name in scenario.actions: + scenario.actions[name] = add_action(policy_id, name=name) + + logger.info("Add subject assignments") + for subject_name in scenario.subject_assignments: + if type(scenario.subject_assignments[subject_name]) in (list, tuple): + for items in scenario.subject_assignments[subject_name]: + for subject_category_name in items: + subject_id = scenario.subjects[subject_name] + subject_cat_id = scenario.subject_categories[subject_category_name] + for data in scenario.subject_assignments[subject_name]: + subject_data_id = scenario.subject_data[subject_category_name][data[subject_category_name]] + add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) + else: + for subject_category_name in scenario.subject_assignments[subject_name]: + subject_id = scenario.subjects[subject_name] + subject_cat_id = scenario.subject_categories[subject_category_name] + subject_data_id = scenario.subject_data[subject_category_name][scenario.subject_assignments[subject_name][subject_category_name]] + add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) + + logger.info("Add object assignments") + for object_name in scenario.object_assignments: + if type(scenario.object_assignments[object_name]) in (list, tuple): + for items in scenario.object_assignments[object_name]: + for object_category_name in items: + object_id = scenario.objects[object_name] + object_cat_id = scenario.object_categories[object_category_name] + for data in scenario.object_assignments[object_name]: + object_data_id = scenario.object_data[object_category_name][data[object_category_name]] + add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) + else: + for object_category_name in scenario.object_assignments[object_name]: + object_id = scenario.objects[object_name] + object_cat_id = scenario.object_categories[object_category_name] + object_data_id = scenario.object_data[object_category_name][scenario.object_assignments[object_name][object_category_name]] + add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) + + logger.info("Add action assignments") + for action_name in scenario.action_assignments: + if type(scenario.action_assignments[action_name]) in (list, tuple): + for items in scenario.action_assignments[action_name]: + for action_category_name in items: + action_id = scenario.actions[action_name] + action_cat_id = scenario.action_categories[action_category_name] + for data in scenario.action_assignments[action_name]: + action_data_id = scenario.action_data[action_category_name][data[action_category_name]] + add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) + else: + for action_category_name in scenario.action_assignments[action_name]: + action_id = scenario.actions[action_name] + action_cat_id = scenario.action_categories[action_category_name] + action_data_id = scenario.action_data[action_category_name][scenario.action_assignments[action_name][action_category_name]] + add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) + + logger.info("Add rules") + for meta_rule_name in scenario.rules: + meta_rule_value = scenario.meta_rule[meta_rule_name] + for rule in scenario.rules[meta_rule_name]: + data_list = [] + _meta_rule = list(meta_rule_value["value"]) + for data_name in rule["rule"]: + category_name = _meta_rule.pop(0) + if category_name in scenario.subject_categories: + data_list.append(scenario.subject_data[category_name][data_name]) + elif category_name in scenario.object_categories: + data_list.append(scenario.object_data[category_name][data_name]) + elif category_name in scenario.action_categories: + data_list.append(scenario.action_data[category_name][data_name]) + instructions = rule["instructions"] + add_rule(policy_id, meta_rule_value["id"], data_list, instructions) + return policy_id + + +def create_pdp(policy_id=None): + logger.info("Creating PDP {}".format(scenario.pdp_name)) + projects = get_keystone_projects() + admin_project_id = None + for _project in projects['projects']: + if _project['name'] == "admin": + admin_project_id = _project['id'] + assert admin_project_id + pdps = check_pdp()["pdps"] + for pdp_id, pdp_value in pdps.items(): + if scenario.pdp_name == pdp_value["name"]: + update_pdp(pdp_id, policy_id=policy_id) + logger.debug("Found existing PDP named {} (will add policy {})".format(scenario.pdp_name, policy_id)) + return pdp_id + _pdp_id = add_pdp(name=scenario.pdp_name, policy_id=policy_id) + map_to_keystone(pdp_id=_pdp_id, keystone_project_id=admin_project_id) + return _pdp_id + +if __name__ == "__main__": + _models = check_model() + for _model_id, _model_value in _models['models'].items(): + if _model_value['name'] == scenario.model_name: + model_id = _model_id + meta_rule_list = _model_value['meta_rules'] + create_model(model_id) + break + else: + model_id, meta_rule_list = create_model() + policy_id = create_policy(model_id, meta_rule_list) + pdp_id = create_pdp(policy_id) diff --git a/moonv4/tests/scenario/delegation.py b/moonv4/tests/scenario/delegation.py new file mode 100644 index 00000000..839e74ce --- /dev/null +++ b/moonv4/tests/scenario/delegation.py @@ -0,0 +1,40 @@ + +pdp_name = "pdp1" +policy_name = "Delegation policy example" +model_name = "Delegation" + +subjects = {"user0": "", } +objects = {"user1": "", } +actions = {"delegate": ""} + +subject_categories = {"subjectid": "", } +object_categories = {"delegated": "", } +action_categories = {"delegation-action": "", } + +subject_data = {"subjectid": {"user0": ""}} +object_data = {"delegated": {"user1": ""}} +action_data = {"delegation-action": {"delegate": ""}} + +subject_assignments = {"user0": {"subjectid": "user0"}} +object_assignments = {"user1": {"delegated": "user1"}} +action_assignments = {"delegate": {"delegation-action": "delegate"}} + +meta_rule = { + "session": {"id": "", "value": ("subjectid", "delegated", "delegation-action")}, +} + +rules = { + "session": ( + { + "rule": ("user0", "user1", "delegate"), + "instructions": ( + { + "update": {"request:subject": "user1"} # update the current user with "user1" + }, + {"chain": {"security_pipeline": "rbac"}} + ) + }, + ) +} + + diff --git a/moonv4/tests/scenario/mls.py b/moonv4/tests/scenario/mls.py new file mode 100644 index 00000000..3a3ded43 --- /dev/null +++ b/moonv4/tests/scenario/mls.py @@ -0,0 +1,54 @@ + +pdp_name = "pdp1" +policy_name = "MLS Policy example" +model_name = "MLS" + +subjects = {"user0": "", "user1": "", "user2": "", } +objects = {"vm0": "", "vm1": "", } +actions = {"start": "", "stop": ""} + +subject_categories = {"subject-security-level": "", } +object_categories = {"object-security-level": "", } +action_categories = {"action-type": "", } + +subject_data = { + "subject-security-level": {"low": "", "medium": "", "high": ""}, +} +object_data = { + "object-security-level": {"low": "", "medium": "", "high": ""}, +} +action_data = {"action-type": {"vm-action": "", "storage-action": "", }} + +subject_assignments = { + "user0": {"subject-security-level": "high"}, + "user1": {"subject-security-level": "medium"}, +} +object_assignments = { + "vm0": {"object-security-level": "medium"}, + "vm1": {"object-security-level": "low"}, +} +action_assignments = { + "start": {"action-type": "vm-action"}, + "stop": {"action-type": "vm-action"} +} + +meta_rule = { + "mls": {"id": "", "value": ("subject-security-level", "object-security-level", "action-type")}, +} + +rules = { + "mls": ( + { + "rules": ("high", "medium", "vm-action"), + "instructions": ({"decision": "grant"}) + }, + { + "rules": ("high", "low", "vm-action"), + "instructions": ({"decision": "grant"}) + }, + { + "rules": ("medium", "low", "vm-action"), + "instructions": ({"decision": "grant"}) + }, + ) +} diff --git a/moonv4/tests/scenario/rbac.py b/moonv4/tests/scenario/rbac.py new file mode 100644 index 00000000..89fd7de8 --- /dev/null +++ b/moonv4/tests/scenario/rbac.py @@ -0,0 +1,44 @@ + +pdp_name = "pdp1" +policy_name = "RBAC policy example" +model_name = "RBAC" +policy_genre = "authz" + +subjects = {"user0": "", "user1": "", } +objects = {"vm0": "", "vm1": "", } +actions = {"start": "", "stop": ""} + +subject_categories = {"role": "", } +object_categories = {"id": "", } +action_categories = {"action-type": "", } + +subject_data = {"role": {"admin": "", "employee": "", "*": ""}} +object_data = {"id": {"vm0": "", "vm1": "", "*": ""}} +action_data = {"action-type": {"vm-action": "", "*": ""}} + +subject_assignments = {"user0": ({"role": "employee"}, {"role": "*"}), "user1": ({"role": "employee"}, {"role": "*"}), } +object_assignments = {"vm0": ({"id": "vm0"}, {"id": "*"}), "vm1": ({"id": "vm1"}, {"id": "*"})} +action_assignments = {"start": ({"action-type": "vm-action"}, {"action-type": "*"}), "stop": ({"action-type": "vm-action"}, {"action-type": "*"})} + +meta_rule = { + "rbac": {"id": "", "value": ("role", "id", "action-type")}, +} + +rules = { + "rbac": ( + { + "rule": ("admin", "vm0", "vm-action"), + "instructions": ( + {"decision": "grant"}, # "grant" to immediately exit, "continue" to wait for the result of next policy + ) + }, + { + "rule": ("employee", "vm1", "vm-action"), + "instructions": ( + {"decision": "grant"}, + ) + }, + ) +} + + diff --git a/moonv4/tests/scenario/rbac_custom_100.py b/moonv4/tests/scenario/rbac_custom_100.py new file mode 100644 index 00000000..9ee55dbd --- /dev/null +++ b/moonv4/tests/scenario/rbac_custom_100.py @@ -0,0 +1,89 @@ +import random + +pdp_name = "pdp_100" +policy_name = "RBAC policy example 100 users" +model_name = "RBAC" +policy_genre = "authz" + +SUBJECT_NUMBER = 100 +OBJECT_NUMBER = 100 +ROLE_NUMBER = 50 + +subjects = {} +for _id in range(SUBJECT_NUMBER): + subjects["user{}".format(_id)] = "" +objects = {} +for _id in range(OBJECT_NUMBER): + objects["vm{}".format(_id)] = "" +actions = { + "start": "", + "stop": "", + "pause": "", + "unpause": "", + "destroy": "", +} + +subject_categories = {"role": "", } +object_categories = {"id": "", } +action_categories = {"action-type": "", } + +subject_data = {"role": {"admin": "", "*": ""}} +for _id in range(ROLE_NUMBER): + subject_data["role"]["role{}".format(_id)] = "" +object_data = {"id": {"*": ""}} +for _id in range(OBJECT_NUMBER): + object_data["id"]["vm{}".format(_id)] = "" +action_data = {"action-type": { + "vm-read": "", + "vm-write": "", + "*": "" +}} + +subject_assignments = {} +for _id in range(SUBJECT_NUMBER): + _role = "role{}".format(random.randrange(ROLE_NUMBER)) + subject_assignments["user{}".format(_id)] = [{"role": _role}, {"role": "*"}] +object_assignments = {"vm0": ({"id": "vm0"}, {"id": "*"}), "vm1": ({"id": "vm1"}, {"id": "*"})} +for _id in range(OBJECT_NUMBER): + object_assignments["vm{}".format(_id)] = [{"id": "vm{}".format(_id)}, {"id": "*"}] +action_assignments = { + "start": ({"action-type": "vm-write"}, {"action-type": "*"}), + "stop": ({"action-type": "vm-write"}, {"action-type": "*"}), + "pause": ({"action-type": "vm-read"}, {"action-type": "*"}), + "unpause": ({"action-type": "vm-read"}, {"action-type": "*"}), + "destroy": ({"action-type": "vm-write"}, {"action-type": "*"}), +} + +meta_rule = { + "rbac": {"id": "", "value": ("role", "id", "action-type")}, +} + +rules = { + "rbac": [ + { + "rule": ("admin", "vm0", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("admin", "vm0", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + ] +} + +for _id in range(SUBJECT_NUMBER): + _role = "role{}".format(random.randrange(ROLE_NUMBER)) + _vm = "vm{}".format(random.randrange(OBJECT_NUMBER)) + _action = random.choice(list(action_data['action-type'].keys())) + rules["rbac"].append( + { + "rule": (_role, _vm, _action), + "instructions": ( + {"decision": "grant"}, + ) + }, + ) diff --git a/moonv4/tests/scenario/rbac_custom_1000.py b/moonv4/tests/scenario/rbac_custom_1000.py new file mode 100644 index 00000000..d6850485 --- /dev/null +++ b/moonv4/tests/scenario/rbac_custom_1000.py @@ -0,0 +1,89 @@ +import random + +pdp_name = "pdp_1000" +policy_name = "RBAC policy example 1000 users" +model_name = "RBAC" +policy_genre = "authz" + +SUBJECT_NUMBER = 1000 +OBJECT_NUMBER = 500 +ROLE_NUMBER = 50 + +subjects = {} +for _id in range(SUBJECT_NUMBER): + subjects["user{}".format(_id)] = "" +objects = {} +for _id in range(OBJECT_NUMBER): + objects["vm{}".format(_id)] = "" +actions = { + "start": "", + "stop": "", + "pause": "", + "unpause": "", + "destroy": "", +} + +subject_categories = {"role": "", } +object_categories = {"id": "", } +action_categories = {"action-type": "", } + +subject_data = {"role": {"admin": "", "*": ""}} +for _id in range(ROLE_NUMBER): + subject_data["role"]["role{}".format(_id)] = "" +object_data = {"id": {"*": ""}} +for _id in range(OBJECT_NUMBER): + object_data["id"]["vm{}".format(_id)] = "" +action_data = {"action-type": { + "vm-read": "", + "vm-write": "", + "*": "" +}} + +subject_assignments = {} +for _id in range(SUBJECT_NUMBER): + _role = "role{}".format(random.randrange(ROLE_NUMBER)) + subject_assignments["user{}".format(_id)] = [{"role": _role}, {"role": "*"}] +object_assignments = {"vm0": ({"id": "vm0"}, {"id": "*"}), "vm1": ({"id": "vm1"}, {"id": "*"})} +for _id in range(OBJECT_NUMBER): + object_assignments["vm{}".format(_id)] = [{"id": "vm{}".format(_id)}, {"id": "*"}] +action_assignments = { + "start": ({"action-type": "vm-write"}, {"action-type": "*"}), + "stop": ({"action-type": "vm-write"}, {"action-type": "*"}), + "pause": ({"action-type": "vm-read"}, {"action-type": "*"}), + "unpause": ({"action-type": "vm-read"}, {"action-type": "*"}), + "destroy": ({"action-type": "vm-write"}, {"action-type": "*"}), +} + +meta_rule = { + "rbac": {"id": "", "value": ("role", "id", "action-type")}, +} + +rules = { + "rbac": [ + { + "rule": ("admin", "vm0", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("admin", "vm0", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + ] +} + +for _id in range(SUBJECT_NUMBER): + _role = "role{}".format(random.randrange(ROLE_NUMBER)) + _vm = "vm{}".format(random.randrange(OBJECT_NUMBER)) + _action = random.choice(list(action_data['action-type'].keys())) + rules["rbac"].append( + { + "rule": (_role, _vm, _action), + "instructions": ( + {"decision": "grant"}, + ) + }, + ) diff --git a/moonv4/tests/scenario/rbac_custom_50.py b/moonv4/tests/scenario/rbac_custom_50.py new file mode 100644 index 00000000..e1437cf4 --- /dev/null +++ b/moonv4/tests/scenario/rbac_custom_50.py @@ -0,0 +1,89 @@ +import random + +pdp_name = "pdp_50" +policy_name = "RBAC policy example 50 users" +model_name = "RBAC" +policy_genre = "authz" + +SUBJECT_NUMBER = 50 +OBJECT_NUMBER = 50 +ROLE_NUMBER = 10 + +subjects = {} +for _id in range(SUBJECT_NUMBER): + subjects["user{}".format(_id)] = "" +objects = {} +for _id in range(OBJECT_NUMBER): + objects["vm{}".format(_id)] = "" +actions = { + "start": "", + "stop": "", + "pause": "", + "unpause": "", + "destroy": "", +} + +subject_categories = {"role": "", } +object_categories = {"id": "", } +action_categories = {"action-type": "", } + +subject_data = {"role": {"admin": "", "*": ""}} +for _id in range(ROLE_NUMBER): + subject_data["role"]["role{}".format(_id)] = "" +object_data = {"id": {"*": ""}} +for _id in range(OBJECT_NUMBER): + object_data["id"]["vm{}".format(_id)] = "" +action_data = {"action-type": { + "vm-read": "", + "vm-write": "", + "*": "" +}} + +subject_assignments = {} +for _id in range(SUBJECT_NUMBER): + _role = "role{}".format(random.randrange(ROLE_NUMBER)) + subject_assignments["user{}".format(_id)] = [{"role": _role}, {"role": "*"}] +object_assignments = {"vm0": ({"id": "vm0"}, {"id": "*"}), "vm1": ({"id": "vm1"}, {"id": "*"})} +for _id in range(OBJECT_NUMBER): + object_assignments["vm{}".format(_id)] = [{"id": "vm{}".format(_id)}, {"id": "*"}] +action_assignments = { + "start": ({"action-type": "vm-write"}, {"action-type": "*"}), + "stop": ({"action-type": "vm-write"}, {"action-type": "*"}), + "pause": ({"action-type": "vm-read"}, {"action-type": "*"}), + "unpause": ({"action-type": "vm-read"}, {"action-type": "*"}), + "destroy": ({"action-type": "vm-write"}, {"action-type": "*"}), +} + +meta_rule = { + "rbac": {"id": "", "value": ("role", "id", "action-type")}, +} + +rules = { + "rbac": [ + { + "rule": ("admin", "vm0", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("admin", "vm0", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + ] +} + +for _id in range(SUBJECT_NUMBER): + _role = "role{}".format(random.randrange(ROLE_NUMBER)) + _vm = "vm{}".format(random.randrange(OBJECT_NUMBER)) + _action = random.choice(list(action_data['action-type'].keys())) + rules["rbac"].append( + { + "rule": (_role, _vm, _action), + "instructions": ( + {"decision": "grant"}, + ) + }, + ) diff --git a/moonv4/tests/scenario/rbac_large.py b/moonv4/tests/scenario/rbac_large.py new file mode 100644 index 00000000..ef5dd9b2 --- /dev/null +++ b/moonv4/tests/scenario/rbac_large.py @@ -0,0 +1,233 @@ + +pdp_name = "pdp1" +policy_name = "RBAC policy example" +model_name = "RBAC" +policy_genre = "authz" + +subjects = { + "user0": "", + "user1": "", + "user2": "", + "user3": "", + "user4": "", + "user5": "", + "user6": "", + "user7": "", + "user8": "", + "user9": "", +} +objects = { + "vm0": "", + "vm1": "", + "vm2": "", + "vm3": "", + "vm4": "", + "vm5": "", + "vm6": "", + "vm7": "", + "vm8": "", + "vm9": "", +} +actions = { + "start": "", + "stop": "", + "pause": "", + "unpause": "", + "destroy": "", +} + +subject_categories = {"role": "", } +object_categories = {"id": "", } +action_categories = {"action-type": "", } + +subject_data = {"role": { + "admin": "", + "employee": "", + "dev1": "", + "dev2": "", + "*": "" +}} +object_data = {"id": { + "vm0": "", + "vm1": "", + "vm2": "", + "vm3": "", + "vm4": "", + "vm5": "", + "vm6": "", + "vm7": "", + "vm8": "", + "vm9": "", + "*": "" +}} +action_data = {"action-type": { + "vm-read": "", + "vm-write": "", + "*": "" +}} + +subject_assignments = { + "user0": ({"role": "employee"}, {"role": "*"}), + "user1": ({"role": "employee"}, {"role": "*"}), + "user2": ({"role": "dev1"}, {"role": "*"}), + "user3": ({"role": "dev1"}, {"role": "*"}), + "user4": ({"role": "dev1"}, {"role": "*"}), + "user5": ({"role": "dev1"}, {"role": "*"}), + "user6": ({"role": "dev2"}, {"role": "*"}), + "user7": ({"role": "dev2"}, {"role": "*"}), + "user8": ({"role": "dev2"}, {"role": "*"}), + "user9": ({"role": "dev2"}, {"role": "*"}), +} +object_assignments = { + "vm0": ({"id": "vm0"}, {"id": "*"}), + "vm1": ({"id": "vm1"}, {"id": "*"}), + "vm2": ({"id": "vm2"}, {"id": "*"}), + "vm3": ({"id": "vm3"}, {"id": "*"}), + "vm4": ({"id": "vm4"}, {"id": "*"}), + "vm5": ({"id": "vm5"}, {"id": "*"}), + "vm6": ({"id": "vm6"}, {"id": "*"}), + "vm7": ({"id": "vm7"}, {"id": "*"}), + "vm8": ({"id": "vm8"}, {"id": "*"}), + "vm9": ({"id": "vm9"}, {"id": "*"}), +} +action_assignments = { + "start": ({"action-type": "vm-write"}, {"action-type": "*"}), + "stop": ({"action-type": "vm-write"}, {"action-type": "*"}), + "pause": ({"action-type": "vm-read"}, {"action-type": "*"}), + "unpause": ({"action-type": "vm-read"}, {"action-type": "*"}), + "destroy": ({"action-type": "vm-write"}, {"action-type": "*"}), +} + +meta_rule = { + "rbac": {"id": "", "value": ("role", "id", "action-type")}, +} + +rules = { + "rbac": ( + { + "rule": ("admin", "vm0", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("admin", "vm0", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + # Rules for grant all employee to do read actions to all VM except vm0 + { + "rule": ("employee", "vm1", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("employee", "vm2", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("employee", "vm3", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("employee", "vm4", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("employee", "vm5", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("employee", "vm6", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("employee", "vm7", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("employee", "vm8", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("employee", "vm9", "vm-read"), + "instructions": ( + {"decision": "grant"}, + ) + }, + # Rules for grant all dev1 to do read actions to some VM + { + "rule": ("dev1", "vm1", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("dev1", "vm2", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("dev1", "vm3", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("dev1", "vm4", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + # Rules for grant all dev2 to do read actions to some VM + { + "rule": ("dev2", "vm5", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("dev2", "vm6", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("dev2", "vm7", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("dev2", "vm8", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + { + "rule": ("dev2", "vm9", "vm-write"), + "instructions": ( + {"decision": "grant"}, + ) + }, + ) +} + + diff --git a/moonv4/tests/scenario/rbac_mls.py b/moonv4/tests/scenario/rbac_mls.py new file mode 100644 index 00000000..8a5362ea --- /dev/null +++ b/moonv4/tests/scenario/rbac_mls.py @@ -0,0 +1,50 @@ + +pdp_name = "pdp1" +policy_name = "Multi policy example" +model_name = "RBAC" + +subjects = {"user0": "", "user1": "", "user2": "", } +objects = {"vm0": "", "vm1": "", } +actions = {"start": "", "stop": ""} + +subject_categories = {"role": "", "subject-security-level": "", } +object_categories = {"id": "", "object-security-level": "", } +action_categories = {"action-type": "", } + +subject_data = { + "role": {"admin": "", "employee": ""}, + "subject-security-level": {"low": "", "medium": "", "high": ""}, +} +object_data = { + "id": {"vm1": "", "vm2": ""}, + "object-security-level": {"low": "", "medium": "", "high": ""}, +} +action_data = {"action-type": {"vm-action": "", "storage-action": "", }} + +subject_assignments = { + "user0": {"role": "admin", "subject-security-level": "high"}, + "user1": {"role": "employee", "subject-security-level": "medium"}, +} +object_assignments = { + "vm0": {"id": "vm1", "object-security-level": "medium"}, + "vm1": {"id": "vm2", "object-security-level": "low"}, +} +action_assignments = { + "start": {"action-type": "vm-action"}, + "stop": {"action-type": "vm-action"} +} + +meta_rule = { + "rbac": {"id": "", "value": ("role", "id", "action-type")}, + "mls": {"id": "", "value": ("subject-security-level", "object-security-level", "action-type")}, +} + +rules = { + "rbac": ( + ("admin", "vm1", "vm-action"), + ), + "mls": ( + ("high", "medium", "vm-action"), + ("medium", "low", "vm-action"), + ) +} diff --git a/moonv4/tests/scenario/session.py b/moonv4/tests/scenario/session.py new file mode 100644 index 00000000..97d7aec3 --- /dev/null +++ b/moonv4/tests/scenario/session.py @@ -0,0 +1,60 @@ + +pdp_name = "pdp1" +policy_name = "Session policy example" +model_name = "Session" +policy_genre = "session" + +subjects = {"user0": "", "user1": "", } +objects = {"admin": "", "employee": "", } +actions = {"activate": "", "deactivate": ""} + +subject_categories = {"subjectid": "", } +object_categories = {"role": "", } +action_categories = {"session-action": "", } + +subject_data = {"subjectid": {"user0": "", "user1": ""}} +object_data = {"role": {"admin": "", "employee": "", "*": ""}} +action_data = {"session-action": {"activate": "", "deactivate": "", "*": ""}} + +subject_assignments = {"user0": ({"subjectid": "user0"}, ), "user1": ({"subjectid": "user1"}, ), } +object_assignments = {"admin": ({"role": "admin"}, {"role": "*"}), + "employee": ({"role": "employee"}, {"role": "employee"}) + } +action_assignments = {"activate": ({"session-action": "activate"}, {"session-action": "*"}, ), + "deactivate": ({"session-action": "deactivate"}, {"session-action": "*"}, ) + } + +meta_rule = { + "session": {"id": "", "value": ("subjectid", "role", "session-action")}, +} + +rules = { + "session": ( + { + "rule": ("user0", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user1", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "delete", + "target": "rbac:role:employee" # delete the role employee from the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + ) +} + + diff --git a/moonv4/tests/scenario/session_large.py b/moonv4/tests/scenario/session_large.py new file mode 100644 index 00000000..5b4a64b6 --- /dev/null +++ b/moonv4/tests/scenario/session_large.py @@ -0,0 +1,389 @@ + +pdp_name = "pdp1" +policy_name = "Session policy example" +model_name = "Session" +policy_genre = "session" + +subjects = { + "user0": "", + "user1": "", + "user2": "", + "user3": "", + "user4": "", + "user5": "", + "user6": "", + "user7": "", + "user8": "", + "user9": "", +} +objects = {"admin": "", "employee": "", "dev1": "", "dev2": "", } +actions = {"activate": "", "deactivate": ""} + +subject_categories = {"subjectid": "", } +object_categories = {"role": "", } +action_categories = {"session-action": "", } + +subject_data = {"subjectid": { + "user0": "", + "user1": "", + "user2": "", + "user3": "", + "user4": "", + "user5": "", + "user6": "", + "user7": "", + "user8": "", + "user9": "", +}} +object_data = {"role": { + "admin": "", + "employee": "", + "dev1": "", + "dev2": "", + "*": "" +}} +action_data = {"session-action": {"activate": "", "deactivate": "", "*": ""}} + +subject_assignments = { + "user0": ({"subjectid": "user0"}, ), + "user1": ({"subjectid": "user1"}, ), + "user2": ({"subjectid": "user2"}, ), + "user3": ({"subjectid": "user3"}, ), + "user4": ({"subjectid": "user4"}, ), + "user5": ({"subjectid": "user5"}, ), + "user6": ({"subjectid": "user6"}, ), + "user7": ({"subjectid": "user7"}, ), + "user8": ({"subjectid": "user8"}, ), + "user9": ({"subjectid": "user9"}, ), +} +object_assignments = {"admin": ({"role": "admin"}, {"role": "*"}), + "employee": ({"role": "employee"}, {"role": "*"}), + "dev1": ({"role": "employee"}, {"role": "dev1"}, {"role": "*"}), + "dev2": ({"role": "employee"}, {"role": "dev2"}, {"role": "*"}), + } +action_assignments = {"activate": ({"session-action": "activate"}, {"session-action": "*"}, ), + "deactivate": ({"session-action": "deactivate"}, {"session-action": "*"}, ) + } + +meta_rule = { + "session": {"id": "", "value": ("subjectid", "role", "session-action")}, +} + +rules = { + "session": ( + { + "rule": ("user0", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user1", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "delete", + "target": "rbac:role:employee" # delete the role employee from the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user2", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user2", "dev1", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user2", "dev2", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user3", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user3", "dev1", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user3", "dev2", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user4", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user4", "dev1", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user4", "dev2", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user5", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user5", "dev1", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user5", "dev2", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user6", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user6", "dev1", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user6", "dev2", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user7", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user7", "dev1", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user7", "dev2", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user8", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user8", "dev1", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user8", "dev2", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user9", "employee", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user9", "dev1", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + { + "rule": ("user9", "dev2", "*"), + "instructions": ( + { + "update": { + "operation": "add", + "target": "rbac:role:admin" # add the role admin to the current user + } + }, + {"chain": {"name": "rbac"}} # chain with the meta_rule named rbac + ) + }, + ) +} + + diff --git a/moonv4/tests/test_models.py b/moonv4/tests/test_models.py new file mode 100644 index 00000000..0da40ce5 --- /dev/null +++ b/moonv4/tests/test_models.py @@ -0,0 +1,37 @@ +from utils.models import * + + +def test_models(): + check_model() + model_id = add_model() + check_model(model_id) + delete_model(model_id) + + +def test_meta_data_subject(): + category_id = add_subject_category() + check_subject_category(category_id) + # TODO (asteroide): must implement the deletion of linked data + # delete_subject_category(category_id) + + +def test_meta_data_object(): + category_id = add_object_category() + check_object_category(category_id) + # TODO (asteroide): must implement the deletion of linked data + # delete_object_category(category_id) + + +def test_meta_data_action(): + category_id = add_action_category() + check_action_category(category_id) + # TODO (asteroide): must implement the deletion of linked data + # delete_action_category(category_id) + + +def test_meta_rule(): + meta_rule_id, scat_id, ocat_id, acat_id = add_categories_and_meta_rule() + check_meta_rule(meta_rule_id, scat_id, ocat_id, acat_id) + delete_meta_rule(meta_rule_id) + + diff --git a/moonv4/tests/test_pdp.py b/moonv4/tests/test_pdp.py new file mode 100644 index 00000000..6cd5365b --- /dev/null +++ b/moonv4/tests/test_pdp.py @@ -0,0 +1,16 @@ +from utils.pdp import * + + +def test_pdp(): + projects = get_keystone_projects() + admin_project_id = None + for _project in projects['projects']: + if _project['name'] == "admin": + admin_project_id = _project['id'] + assert admin_project_id + check_pdp() + pdp_id = add_pdp() + check_pdp(pdp_id) + map_to_keystone(pdp_id=pdp_id, keystone_project_id=admin_project_id) + check_pdp(pdp_id=pdp_id, keystone_project_id=admin_project_id) + delete_pdp(pdp_id) diff --git a/moonv4/tests/test_policies.py b/moonv4/tests/test_policies.py new file mode 100644 index 00000000..8f26d72d --- /dev/null +++ b/moonv4/tests/test_policies.py @@ -0,0 +1,157 @@ +from utils.policies import * +from utils.models import * + + +def test_policies(): + check_policy() + policy_id = add_policy() + check_policy(policy_id) + delete_policy(policy_id) + + +def test_subjects(): + policy_id = add_policy() + subject_id = add_subject() + + update_subject(subject_id=subject_id, policy_id=policy_id) + + check_subject(subject_id=subject_id, policy_id=policy_id) + + delete_subject(subject_id, policy_id=policy_id) + delete_subject(subject_id) + + +def test_objects(): + policy_id = add_policy() + object_id = add_object() + + update_object(object_id=object_id, policy_id=policy_id) + check_object(object_id=object_id, policy_id=policy_id) + + delete_object(object_id=object_id, policy_id=policy_id) + delete_object(object_id=object_id) + + +def test_actions(): + policy_id = add_policy() + action_id = add_action() + + update_action(action_id=action_id, policy_id=policy_id) + check_action(action_id=action_id, policy_id=policy_id) + + delete_action(action_id=action_id, policy_id=policy_id) + delete_action(action_id=action_id) + + +def test_subject_data(): + policy_id = add_policy() + + model_id = add_model() + + update_policy(policy_id, model_id) + + meta_rule_id, subject_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule() + add_meta_rule_to_model(model_id, meta_rule_id) + + subject_data_id = add_subject_data(policy_id=policy_id, category_id=subject_cat_id) + check_subject_data(policy_id=policy_id, data_id=subject_data_id, category_id=subject_cat_id) + delete_subject_data(policy_id=policy_id, data_id=subject_data_id, category_id=subject_cat_id) + + +def test_object_data(): + policy_id = add_policy() + + model_id = add_model() + + update_policy(policy_id, model_id) + + meta_rule_id, object_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule() + add_meta_rule_to_model(model_id, meta_rule_id) + + object_data_id = add_object_data(policy_id=policy_id, category_id=object_cat_id) + check_object_data(policy_id=policy_id, data_id=object_data_id, category_id=object_cat_id) + delete_object_data(policy_id=policy_id, data_id=object_data_id, category_id=object_cat_id) + + +def test_action_data(): + policy_id = add_policy() + + model_id = add_model() + + update_policy(policy_id, model_id) + + meta_rule_id, action_cat_id, action_cat_id, action_cat_id = add_categories_and_meta_rule() + add_meta_rule_to_model(model_id, meta_rule_id) + + action_data_id = add_action_data(policy_id=policy_id, category_id=action_cat_id) + check_action_data(policy_id=policy_id, data_id=action_data_id, category_id=action_cat_id) + delete_action_data(policy_id=policy_id, data_id=action_data_id, category_id=action_cat_id) + + +def test_assignments(): + policy_id = add_policy() + + model_id = add_model() + + update_policy(policy_id, model_id) + + meta_rule_id, subject_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule() + add_meta_rule_to_model(model_id, meta_rule_id) + + subject_data_id = add_subject_data(policy_id=policy_id, category_id=subject_cat_id) + subject_data_id_bis = add_subject_data(policy_id=policy_id, category_id=subject_cat_id) + object_data_id = add_object_data(policy_id=policy_id, category_id=object_cat_id) + object_data_id_bis = add_object_data(policy_id=policy_id, category_id=object_cat_id) + action_data_id = add_action_data(policy_id=policy_id, category_id=action_cat_id) + action_data_id_bis = add_action_data(policy_id=policy_id, category_id=action_cat_id) + + subject_id = add_subject(policy_id) + object_id = add_object(policy_id) + action_id = add_action(policy_id) + + add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) + add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id_bis) + add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) + add_object_assignments(policy_id, object_id, object_cat_id, object_data_id_bis) + add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) + add_action_assignments(policy_id, action_id, action_cat_id, action_data_id_bis) + + check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) + check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id_bis) + check_object_assignments(policy_id, object_id, object_cat_id, object_data_id) + check_object_assignments(policy_id, object_id, object_cat_id, object_data_id_bis) + check_action_assignments(policy_id, action_id, action_cat_id, action_data_id) + check_action_assignments(policy_id, action_id, action_cat_id, action_data_id_bis) + + delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id) + delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id) + delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id) + + +def test_rule(): + policy_id = add_policy() + + model_id = add_model() + + update_policy(policy_id, model_id) + + meta_rule_id, subject_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule() + add_meta_rule_to_model(model_id, meta_rule_id) + + subject_data_id = add_subject_data(policy_id=policy_id, category_id=subject_cat_id) + object_data_id = add_object_data(policy_id=policy_id, category_id=object_cat_id) + action_data_id = add_action_data(policy_id=policy_id, category_id=action_cat_id) + + subject_id = add_subject(policy_id) + object_id = add_object(policy_id) + action_id = add_action(policy_id) + + add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) + add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) + add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) + + rule_id = add_rule(policy_id, meta_rule_id, [subject_data_id, object_data_id, action_data_id]) + check_rule(policy_id, meta_rule_id, rule_id, [subject_data_id, object_data_id, action_data_id]) + + delete_rule(policy_id, rule_id) + diff --git a/moonv4/tests/utils/__init__.py b/moonv4/tests/utils/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/moonv4/tests/utils/__init__.py diff --git a/moonv4/tests/utils/config.py b/moonv4/tests/utils/config.py new file mode 100644 index 00000000..30c8ea4f --- /dev/null +++ b/moonv4/tests/utils/config.py @@ -0,0 +1,22 @@ +import yaml + + +def get_config_data(filename="moon.conf"): + data_config = None + for _file in ( + filename, + "conf/moon.conf", + "../moon.conf", + "../conf/moon.conf", + "/etc/moon/moon.conf", + ): + try: + data_config = yaml.safe_load(open(_file)) + except FileNotFoundError: + data_config = None + continue + else: + break + if not data_config: + raise Exception("Configuration file not found...") + return data_config diff --git a/moonv4/tests/utils/models.py b/moonv4/tests/utils/models.py new file mode 100644 index 00000000..3cf31354 --- /dev/null +++ b/moonv4/tests/utils/models.py @@ -0,0 +1,270 @@ +import requests +import copy +import utils.config + +config = utils.config.get_config_data() + +URL = "http://{}:{}".format( + config['components']['manager']['hostname'], + config['components']['manager']['port']) +URL = URL + "{}" +HEADERS = {"content-type": "application/json"} + +model_template = { + "name": "test_model", + "description": "test", + "meta_rules": [] +} + +category_template = { + "name": "name of the category", + "description": "description of the category" +} + +meta_rule_template = { + "name": "test_meta_rule", + "subject_categories": [], + "object_categories": [], + "action_categories": [] +} + + +def check_model(model_id=None, check_model_name=True): + req = requests.get(URL.format("/models")) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "models" in result + if model_id: + assert result["models"] + assert model_id in result['models'] + assert "name" in result['models'][model_id] + if check_model_name: + assert model_template["name"] == result['models'][model_id]["name"] + return result + + +def add_model(name=None): + if name: + model_template['name'] = name + req = requests.post(URL.format("/models"), json=model_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + model_id = list(result['models'].keys())[0] + if "result" in result: + assert result["result"] + assert "name" in result['models'][model_id] + assert model_template["name"] == result['models'][model_id]["name"] + return model_id + + +def delete_model(model_id): + req = requests.delete(URL.format("/models/{}".format(model_id))) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "result" in result + assert result["result"] + + +def add_subject_category(name="subject_cat_1"): + category_template["name"] = name + req = requests.post(URL.format("/subject_categories"), json=category_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "subject_categories" in result + category_id = list(result['subject_categories'].keys())[0] + if "result" in result: + assert result["result"] + assert "name" in result['subject_categories'][category_id] + assert category_template["name"] == result['subject_categories'][category_id]["name"] + return category_id + + +def check_subject_category(category_id): + req = requests.get(URL.format("/subject_categories")) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "subject_categories" in result + if "result" in result: + assert result["result"] + assert category_id in result['subject_categories'] + assert "name" in result['subject_categories'][category_id] + assert category_template["name"] == result['subject_categories'][category_id]["name"] + + +def delete_subject_category(category_id): + req = requests.delete(URL.format("/subject_categories/{}".format(category_id))) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + if "result" in result: + assert result["result"] + + +def add_object_category(name="object_cat_1"): + category_template["name"] = name + req = requests.post(URL.format("/object_categories"), json=category_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "object_categories" in result + category_id = list(result['object_categories'].keys())[0] + if "result" in result: + assert result["result"] + assert "name" in result['object_categories'][category_id] + assert category_template["name"] == result['object_categories'][category_id]["name"] + return category_id + + +def check_object_category(category_id): + req = requests.get(URL.format("/object_categories")) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "object_categories" in result + if "result" in result: + assert result["result"] + assert category_id in result['object_categories'] + assert "name" in result['object_categories'][category_id] + assert category_template["name"] == result['object_categories'][category_id]["name"] + + +def delete_object_category(category_id): + req = requests.delete(URL.format("/object_categories/{}".format(category_id))) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + if "result" in result: + assert result["result"] + + +def add_action_category(name="action_cat_1"): + category_template["name"] = name + req = requests.post(URL.format("/action_categories"), json=category_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "action_categories" in result + category_id = list(result['action_categories'].keys())[0] + if "result" in result: + assert result["result"] + assert "name" in result['action_categories'][category_id] + assert category_template["name"] == result['action_categories'][category_id]["name"] + return category_id + + +def check_action_category(category_id): + req = requests.get(URL.format("/action_categories")) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "action_categories" in result + if "result" in result: + assert result["result"] + assert category_id in result['action_categories'] + assert "name" in result['action_categories'][category_id] + assert category_template["name"] == result['action_categories'][category_id]["name"] + + +def delete_action_category(category_id): + req = requests.delete(URL.format("/action_categories/{}".format(category_id))) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + if "result" in result: + assert result["result"] + + +def add_categories_and_meta_rule(name="test_meta_rule"): + scat_id = add_subject_category() + ocat_id = add_object_category() + acat_id = add_action_category() + _meta_rule_template = copy.deepcopy(meta_rule_template) + _meta_rule_template["name"] = name + _meta_rule_template["subject_categories"].append(scat_id) + _meta_rule_template["object_categories"].append(ocat_id) + _meta_rule_template["action_categories"].append(acat_id) + req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "meta_rules" in result + meta_rule_id = list(result['meta_rules'].keys())[0] + if "result" in result: + assert result["result"] + assert "name" in result['meta_rules'][meta_rule_id] + assert _meta_rule_template["name"] == result['meta_rules'][meta_rule_id]["name"] + return meta_rule_id, scat_id, ocat_id, acat_id + + +def add_meta_rule(name="test_meta_rule", scat=[], ocat=[], acat=[]): + _meta_rule_template = copy.deepcopy(meta_rule_template) + _meta_rule_template["name"] = name + _meta_rule_template["subject_categories"] = [] + _meta_rule_template["subject_categories"].extend(scat) + _meta_rule_template["object_categories"] = [] + _meta_rule_template["object_categories"].extend(ocat) + _meta_rule_template["action_categories"] = [] + _meta_rule_template["action_categories"].extend(acat) + req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "meta_rules" in result + meta_rule_id = list(result['meta_rules'].keys())[0] + if "result" in result: + assert result["result"] + assert "name" in result['meta_rules'][meta_rule_id] + assert _meta_rule_template["name"] == result['meta_rules'][meta_rule_id]["name"] + return meta_rule_id + + +def check_meta_rule(meta_rule_id, scat_id=None, ocat_id=None, acat_id=None): + req = requests.get(URL.format("/meta_rules")) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "meta_rules" in result + if "result" in result: + assert result["result"] + if not meta_rule_id: + return result + assert meta_rule_id in result['meta_rules'] + assert "name" in result['meta_rules'][meta_rule_id] + if scat_id: + assert scat_id in result['meta_rules'][meta_rule_id]["subject_categories"] + if ocat_id: + assert ocat_id in result['meta_rules'][meta_rule_id]["object_categories"] + if acat_id: + assert acat_id in result['meta_rules'][meta_rule_id]["action_categories"] + + +def delete_meta_rule(meta_rule_id): + req = requests.delete(URL.format("/meta_rules/{}".format(meta_rule_id))) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + if "result" in result: + assert result["result"] + + +def add_meta_rule_to_model(model_id, meta_rule_id): + model = check_model(model_id, check_model_name=False)['models'] + meta_rule_list = model[model_id]["meta_rules"] + if meta_rule_id not in meta_rule_list: + meta_rule_list.append(meta_rule_id) + req = requests.patch(URL.format("/models/{}".format(model_id)), + json={"meta_rules": meta_rule_list}, + headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + model_id = list(result['models'].keys())[0] + if "result" in result: + assert result["result"] + assert "meta_rules" in result['models'][model_id] + assert meta_rule_list == result['models'][model_id]["meta_rules"] diff --git a/moonv4/tests/utils/pdp.py b/moonv4/tests/utils/pdp.py new file mode 100644 index 00000000..4f513aa6 --- /dev/null +++ b/moonv4/tests/utils/pdp.py @@ -0,0 +1,157 @@ +import requests +import utils.config + +config = utils.config.get_config_data() + +URL = "http://{}:{}".format( + config['components']['manager']['hostname'], + config['components']['manager']['port']) +HEADERS = {"content-type": "application/json"} +KEYSTONE_USER = config['openstack']['keystone']['user'] +KEYSTONE_PASSWORD = config['openstack']['keystone']['password'] +KEYSTONE_PROJECT = config['openstack']['keystone']['project'] +KEYSTONE_SERVER = config['openstack']['keystone']['url'] + +pdp_template = { + "name": "test_pdp", + "security_pipeline": [], + "keystone_project_id": "", + "description": "test", +} + + +def get_keystone_projects(): + + HEADERS = { + "Content-Type": "application/json" + } + + data_auth = { + "auth": { + "identity": { + "methods": [ + "password" + ], + "password": { + "user": { + "name": KEYSTONE_USER, + "domain": { + "name": "Default" + }, + "password": KEYSTONE_PASSWORD + } + } + } + } + } + + req = requests.post("{}/auth/tokens".format(KEYSTONE_SERVER), json=data_auth, headers=HEADERS) + assert req.status_code in (200, 201) + TOKEN = req.headers['X-Subject-Token'] + HEADERS['X-Auth-Token'] = TOKEN + req = requests.get("{}/projects".format(KEYSTONE_SERVER), headers=HEADERS) + if req.status_code not in (200, 201): + data_auth["auth"]["scope"] = { + "project": { + "name": KEYSTONE_PROJECT, + "domain": { + "id": "default" + } + } + } + req = requests.post("{}/auth/tokens".format(KEYSTONE_SERVER), json=data_auth, headers=HEADERS) + assert req.status_code in (200, 201) + TOKEN = req.headers['X-Subject-Token'] + HEADERS['X-Auth-Token'] = TOKEN + req = requests.get("{}/projects".format(KEYSTONE_SERVER), headers=HEADERS) + assert req.status_code in (200, 201) + return req.json() + + +def check_pdp(pdp_id=None, keystone_project_id=None, moon_url=None): + _URL = URL + if moon_url: + _URL = moon_url + req = requests.get(_URL + "/pdp") + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "pdps" in result + if pdp_id: + assert result["pdps"] + assert pdp_id in result['pdps'] + assert "name" in result['pdps'][pdp_id] + assert pdp_template["name"] == result['pdps'][pdp_id]["name"] + if keystone_project_id: + assert result["pdps"] + assert pdp_id in result['pdps'] + assert "keystone_project_id" in result['pdps'][pdp_id] + assert keystone_project_id == result['pdps'][pdp_id]["keystone_project_id"] + return result + + +def add_pdp(name="test_pdp", policy_id=None): + pdp_template['name'] = name + if policy_id: + pdp_template['security_pipeline'].append(policy_id) + req = requests.post(URL + "/pdp", json=pdp_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + pdp_id = list(result['pdps'].keys())[0] + if "result" in result: + assert result["result"] + assert "name" in result['pdps'][pdp_id] + assert pdp_template["name"] == result['pdps'][pdp_id]["name"] + return pdp_id + + +def update_pdp(pdp_id, policy_id=None): + req = requests.get(URL + "/pdp/{}".format(pdp_id)) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "pdps" in result + assert pdp_id in result['pdps'] + pipeline = result['pdps'][pdp_id]["security_pipeline"] + if policy_id not in pipeline: + pipeline.append(policy_id) + req = requests.patch(URL + "/pdp/{}".format(pdp_id), + json={"security_pipeline": pipeline}) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "pdps" in result + assert pdp_id in result['pdps'] + + req = requests.get(URL + "/pdp/{}".format(pdp_id)) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "pdps" in result + assert pdp_id in result['pdps'] + assert policy_id in pipeline + + +def map_to_keystone(pdp_id, keystone_project_id): + req = requests.patch(URL + "/pdp/{}".format(pdp_id), json={"keystone_project_id": keystone_project_id}, + headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + if "result" in result: + assert result["result"] + assert pdp_id in result['pdps'] + assert "name" in result['pdps'][pdp_id] + assert pdp_template["name"] == result['pdps'][pdp_id]["name"] + return pdp_id + + +def delete_pdp(pdp_id): + req = requests.delete(URL + "/pdp/{}".format(pdp_id)) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "result" in result + assert result["result"] + diff --git a/moonv4/tests/utils/policies.py b/moonv4/tests/utils/policies.py new file mode 100644 index 00000000..2180317e --- /dev/null +++ b/moonv4/tests/utils/policies.py @@ -0,0 +1,631 @@ +import requests +import utils.config + +config = utils.config.get_config_data() + +URL = "http://{}:{}".format(config['components']['manager']['hostname'], config['components']['manager']['port']) +URL = URL + "{}" +HEADERS = {"content-type": "application/json"} +FILE = open("/tmp/test.log", "w") + +policy_template = { + "name": "test_policy", + "model_id": "", + "genre": "authz", + "description": "test", +} + +subject_template = { + "name": "test_subject", + "description": "test", + "email": "mail", + "password": "my_pass", +} + +object_template = { + "name": "test_subject", + "description": "test" +} + +action_template = { + "name": "test_subject", + "description": "test" +} + +subject_data_template = { + "name": "subject_data1", + "description": "description of the data subject" +} + +object_data_template = { + "name": "object_data1", + "description": "description of the data subject" +} + +action_data_template = { + "name": "action_data1", + "description": "description of the data subject" +} + +subject_assignment_template = { + "id": "", + "category_id": "", + "scope_id": "" +} + + +def check_policy(policy_id=None): + print("URL={}".format(URL.format("/policies"))) + req = requests.get(URL.format("/policies")) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "policies" in result + if policy_id: + assert result["policies"] + assert policy_id in result['policies'] + assert "name" in result['policies'][policy_id] + assert policy_template["name"] == result['policies'][policy_id]["name"] + return result + + +def add_policy(name="test_policy", genre="authz"): + policy_template["name"] = name + policy_template["genre"] = genre + req = requests.post(URL.format("/policies"), json=policy_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + policy_id = list(result['policies'].keys())[0] + if "result" in result: + assert result["result"] + assert "name" in result['policies'][policy_id] + assert policy_template["name"] == result['policies'][policy_id]["name"] + return policy_id + + +def update_policy(policy_id, model_id): + req = requests.patch(URL.format("/policies/{}".format(policy_id)), + json={"model_id": model_id}, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + policy_id = list(result['policies'].keys())[0] + if "result" in result: + assert result["result"] + assert "model_id" in result['policies'][policy_id] + assert model_id == result['policies'][policy_id]["model_id"] + + +def delete_policy(policy_id): + req = requests.delete(URL.format("/policies/{}".format(policy_id))) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "result" in result + assert result["result"] + + +def add_subject(policy_id=None, name="test_subject"): + subject_template['name'] = name + if policy_id: + req = requests.post(URL.format("/policies/{}/subjects".format(policy_id)), + json=subject_template, headers=HEADERS) + else: + req = requests.post(URL.format("/subjects"), json=subject_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "subjects" in result + subject_id = list(result['subjects'].keys())[0] + return subject_id + + +def update_subject(subject_id, policy_id=None, description=None): + if policy_id and not description: + req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)), + json={}) + elif policy_id and description: + req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)), + json={"description": description}) + else: + req = requests.patch(URL.format("/subjects/{}".format(subject_id)), + json={"description": description}) + assert req.status_code == 200 + result = req.json() + assert "subjects" in result + assert "name" in result["subjects"][subject_id] + assert subject_template["name"] == result["subjects"][subject_id]["name"] + assert "policy_list" in result["subjects"][subject_id] + if policy_id: + assert policy_id in result["subjects"][subject_id]["policy_list"] + if description: + assert description in result["subjects"][subject_id]["description"] + + +def check_subject(subject_id=None, policy_id=None): + if policy_id: + req = requests.get(URL.format("/policies/{}/subjects".format(policy_id))) + else: + req = requests.get(URL.format("/subjects")) + assert req.status_code == 200 + result = req.json() + assert "subjects" in result + assert "name" in result["subjects"][subject_id] + assert subject_template["name"] == result["subjects"][subject_id]["name"] + if policy_id: + assert "policy_list" in result["subjects"][subject_id] + assert policy_id in result["subjects"][subject_id]["policy_list"] + + +def delete_subject(subject_id, policy_id=None): + if policy_id: + req = requests.delete(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id))) + else: + req = requests.delete(URL.format("/subjects/{}".format(subject_id))) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "result" in result + assert result["result"] + + if policy_id: + req = requests.get(URL.format("/policies/{}/subjects".format(policy_id))) + else: + req = requests.get(URL.format("/subjects")) + assert req.status_code == 200 + result = req.json() + assert "subjects" in result + if subject_id in result["subjects"]: + assert "name" in result["subjects"][subject_id] + assert subject_template["name"] == result["subjects"][subject_id]["name"] + if policy_id: + assert "policy_list" in result["subjects"][subject_id] + assert policy_id not in result["subjects"][subject_id]["policy_list"] + + +def add_object(policy_id=None, name="test_object"): + object_template['name'] = name + if policy_id: + req = requests.post(URL.format("/policies/{}/objects".format(policy_id)), + json=object_template, headers=HEADERS) + else: + req = requests.post(URL.format("/objects"), json=object_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "objects" in result + object_id = list(result['objects'].keys())[0] + return object_id + + +def update_object(object_id, policy_id): + req = requests.patch(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)), json={}) + assert req.status_code == 200 + result = req.json() + assert "objects" in result + assert "name" in result["objects"][object_id] + assert object_template["name"] == result["objects"][object_id]["name"] + assert "policy_list" in result["objects"][object_id] + assert policy_id in result["objects"][object_id]["policy_list"] + + +def check_object(object_id=None, policy_id=None): + if policy_id: + req = requests.get(URL.format("/policies/{}/objects".format(policy_id))) + else: + req = requests.get(URL.format("/objects")) + assert req.status_code == 200 + result = req.json() + assert "objects" in result + assert "name" in result["objects"][object_id] + assert object_template["name"] == result["objects"][object_id]["name"] + if policy_id: + assert "policy_list" in result["objects"][object_id] + assert policy_id in result["objects"][object_id]["policy_list"] + + +def delete_object(object_id, policy_id=None): + if policy_id: + req = requests.delete(URL.format("/policies/{}/objects/{}".format(policy_id, object_id))) + else: + req = requests.delete(URL.format("/objects/{}".format(object_id))) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "result" in result + assert result["result"] + + if policy_id: + req = requests.get(URL.format("/policies/{}/objects".format(policy_id))) + else: + req = requests.get(URL.format("/objects")) + assert req.status_code == 200 + result = req.json() + assert "objects" in result + if object_id in result["objects"]: + assert "name" in result["objects"][object_id] + assert object_template["name"] == result["objects"][object_id]["name"] + if policy_id: + assert "policy_list" in result["objects"][object_id] + assert policy_id not in result["objects"][object_id]["policy_list"] + + +def add_action(policy_id=None, name="test_action"): + action_template['name'] = name + if policy_id: + req = requests.post(URL.format("/policies/{}/actions".format(policy_id)), + json=action_template, headers=HEADERS) + else: + req = requests.post(URL.format("/actions"), json=action_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "actions" in result + action_id = list(result['actions'].keys())[0] + return action_id + + +def update_action(action_id, policy_id): + req = requests.patch(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)), json={}) + assert req.status_code == 200 + result = req.json() + assert "actions" in result + assert "name" in result["actions"][action_id] + assert action_template["name"] == result["actions"][action_id]["name"] + assert "policy_list" in result["actions"][action_id] + assert policy_id in result["actions"][action_id]["policy_list"] + + +def check_action(action_id=None, policy_id=None): + if policy_id: + req = requests.get(URL.format("/policies/{}/actions".format(policy_id))) + else: + req = requests.get(URL.format("/actions")) + assert req.status_code == 200 + result = req.json() + assert "actions" in result + assert "name" in result["actions"][action_id] + assert action_template["name"] == result["actions"][action_id]["name"] + if policy_id: + assert "policy_list" in result["actions"][action_id] + assert policy_id in result["actions"][action_id]["policy_list"] + + +def delete_action(action_id, policy_id=None): + if policy_id: + req = requests.delete(URL.format("/policies/{}/actions/{}".format(policy_id, action_id))) + else: + req = requests.delete(URL.format("/actions/{}".format(action_id))) + assert req.status_code == 200 + result = req.json() + assert type(result) is dict + assert "result" in result + assert result["result"] + + if policy_id: + req = requests.get(URL.format("/policies/{}/actions".format(policy_id))) + else: + req = requests.get(URL.format("/actions")) + assert req.status_code == 200 + result = req.json() + assert "actions" in result + if action_id in result["actions"]: + assert "name" in result["actions"][action_id] + assert action_template["name"] == result["actions"][action_id]["name"] + if policy_id: + assert "policy_list" in result["actions"][action_id] + assert policy_id not in result["actions"][action_id]["policy_list"] + + +def add_subject_data(policy_id, category_id, name="subject_data1"): + subject_data_template['name'] = name + req = requests.post(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)), + json=subject_data_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "subject_data" in result + subject_id = list(result['subject_data']['data'].keys())[0] + return subject_id + + +def check_subject_data(policy_id, data_id, category_id): + req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id))) + assert req.status_code == 200 + result = req.json() + assert "subject_data" in result + for _data in result['subject_data']: + assert data_id in list(_data['data'].keys()) + assert category_id == _data["category_id"] + + +def delete_subject_data(policy_id, category_id, data_id): + req = requests.delete(URL.format("/policies/{}/subject_data/{}/{}".format(policy_id, category_id, data_id)), + headers=HEADERS) + assert req.status_code == 200 + req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id))) + assert req.status_code == 200 + result = req.json() + assert "subject_data" in result + for _data in result['subject_data']: + assert data_id not in list(_data['data'].keys()) + assert category_id == _data["category_id"] + + +def add_object_data(policy_id, category_id, name="object_data1"): + object_data_template['name'] = name + req = requests.post(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)), + json=object_data_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "object_data" in result + object_id = list(result['object_data']['data'].keys())[0] + return object_id + + +def check_object_data(policy_id, data_id, category_id): + req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id))) + assert req.status_code == 200 + result = req.json() + assert "object_data" in result + for _data in result['object_data']: + assert data_id in list(_data['data'].keys()) + assert category_id == _data["category_id"] + + +def delete_object_data(policy_id, category_id, data_id): + req = requests.delete(URL.format("/policies/{}/object_data/{}/{}".format(policy_id, category_id, data_id)), + headers=HEADERS) + assert req.status_code == 200 + req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id))) + assert req.status_code == 200 + result = req.json() + assert "object_data" in result + for _data in result['object_data']: + assert data_id not in list(_data['data'].keys()) + assert category_id == _data["category_id"] + + +def add_action_data(policy_id, category_id, name="action_data1"): + action_data_template['name'] = name + req = requests.post(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)), + json=action_data_template, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "action_data" in result + action_id = list(result['action_data']['data'].keys())[0] + return action_id + + +def check_action_data(policy_id, data_id, category_id): + req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id))) + assert req.status_code == 200 + result = req.json() + assert "action_data" in result + for _data in result['action_data']: + assert data_id in list(_data['data'].keys()) + assert category_id == _data["category_id"] + + +def delete_action_data(policy_id, category_id, data_id): + req = requests.delete(URL.format("/policies/{}/action_data/{}/{}".format(policy_id, category_id, data_id)), + headers=HEADERS) + assert req.status_code == 200 + req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id))) + assert req.status_code == 200 + result = req.json() + assert "action_data" in result + for _data in result['action_data']: + assert data_id not in list(_data['data'].keys()) + assert category_id == _data["category_id"] + + +def add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id): + req = requests.post(URL.format("/policies/{}/subject_assignments".format(policy_id)), + json={ + "id": subject_id, + "category_id": subject_cat_id, + "data_id": subject_data_id + }, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "subject_assignments" in result + assert result["subject_assignments"] + + +def check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id): + req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( + policy_id, subject_id, subject_cat_id, subject_data_id))) + assert req.status_code == 200 + result = req.json() + assert "subject_assignments" in result + assert result["subject_assignments"] + for key in result["subject_assignments"]: + assert "subject_id" in result["subject_assignments"][key] + assert "category_id" in result["subject_assignments"][key] + assert "assignments" in result["subject_assignments"][key] + if result["subject_assignments"][key]['subject_id'] == subject_id and \ + result["subject_assignments"][key]["category_id"] == subject_cat_id: + assert subject_data_id in result["subject_assignments"][key]["assignments"] + + +def check_object_assignments(policy_id, object_id, object_cat_id, object_data_id): + req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( + policy_id, object_id, object_cat_id, object_data_id))) + assert req.status_code == 200 + result = req.json() + assert "object_assignments" in result + assert result["object_assignments"] + for key in result["object_assignments"]: + assert "object_id" in result["object_assignments"][key] + assert "category_id" in result["object_assignments"][key] + assert "assignments" in result["object_assignments"][key] + if result["object_assignments"][key]['object_id'] == object_id and \ + result["object_assignments"][key]["category_id"] == object_cat_id: + assert object_data_id in result["object_assignments"][key]["assignments"] + + +def check_action_assignments(policy_id, action_id, action_cat_id, action_data_id): + req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( + policy_id, action_id, action_cat_id, action_data_id))) + assert req.status_code == 200 + result = req.json() + assert "action_assignments" in result + assert result["action_assignments"] + for key in result["action_assignments"]: + assert "action_id" in result["action_assignments"][key] + assert "category_id" in result["action_assignments"][key] + assert "assignments" in result["action_assignments"][key] + if result["action_assignments"][key]['action_id'] == action_id and \ + result["action_assignments"][key]["category_id"] == action_cat_id: + assert action_data_id in result["action_assignments"][key]["assignments"] + + +def add_object_assignments(policy_id, object_id, object_cat_id, object_data_id): + req = requests.post(URL.format("/policies/{}/object_assignments".format(policy_id)), + json={ + "id": object_id, + "category_id": object_cat_id, + "data_id": object_data_id + }, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "object_assignments" in result + assert result["object_assignments"] + + +def add_action_assignments(policy_id, action_id, action_cat_id, action_data_id): + req = requests.post(URL.format("/policies/{}/action_assignments".format(policy_id)), + json={ + "id": action_id, + "category_id": action_cat_id, + "data_id": action_data_id + }, headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "action_assignments" in result + assert result["action_assignments"] + + +def delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id): + req = requests.delete(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( + policy_id, subject_id, subject_cat_id, subject_data_id))) + assert req.status_code == 200 + result = req.json() + assert "result" in result + assert result["result"] + + req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( + policy_id, subject_id, subject_cat_id, subject_data_id))) + assert req.status_code == 200 + result = req.json() + assert "subject_assignments" in result + assert result["subject_assignments"] + for key in result["subject_assignments"]: + assert "subject_id" in result["subject_assignments"][key] + assert "category_id" in result["subject_assignments"][key] + assert "assignments" in result["subject_assignments"][key] + if result["subject_assignments"][key]['subject_id'] == subject_id and \ + result["subject_assignments"][key]["category_id"] == subject_cat_id: + assert subject_data_id not in result["subject_assignments"][key]["assignments"] + + +def delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id): + req = requests.delete(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( + policy_id, object_id, object_cat_id, object_data_id))) + assert req.status_code == 200 + result = req.json() + assert "result" in result + assert result["result"] + + req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( + policy_id, object_id, object_cat_id, object_data_id))) + assert req.status_code == 200 + result = req.json() + assert "object_assignments" in result + assert result["object_assignments"] + for key in result["object_assignments"]: + assert "object_id" in result["object_assignments"][key] + assert "category_id" in result["object_assignments"][key] + assert "assignments" in result["object_assignments"][key] + if result["object_assignments"][key]['object_id'] == object_id and \ + result["object_assignments"][key]["category_id"] == object_cat_id: + assert object_data_id not in result["object_assignments"][key]["assignments"] + + +def delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id): + req = requests.delete(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( + policy_id, action_id, action_cat_id, action_data_id))) + assert req.status_code == 200 + result = req.json() + assert "result" in result + assert result["result"] + + req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( + policy_id, action_id, action_cat_id, action_data_id))) + assert req.status_code == 200 + result = req.json() + assert "action_assignments" in result + assert result["action_assignments"] + for key in result["action_assignments"]: + assert "action_id" in result["action_assignments"][key] + assert "category_id" in result["action_assignments"][key] + assert "assignments" in result["action_assignments"][key] + if result["action_assignments"][key]['action_id'] == action_id and \ + result["action_assignments"][key]["category_id"] == action_cat_id: + assert action_data_id not in result["action_assignments"][key]["assignments"] + + +def add_rule(policy_id, meta_rule_id, rule, instructions={"chain": [{"security_pipeline": "rbac"}]}): + req = requests.post(URL.format("/policies/{}/rules".format(policy_id)), + json={ + "meta_rule_id": meta_rule_id, + "rule": rule, + "instructions": instructions, + "enabled": True + }, + headers=HEADERS) + assert req.status_code == 200 + result = req.json() + assert "rules" in result + try: + rule_id = list(result["rules"].keys())[0] + except Exception as e: + return False + assert "policy_id" in result["rules"][rule_id] + assert policy_id == result["rules"][rule_id]["policy_id"] + assert "meta_rule_id" in result["rules"][rule_id] + assert meta_rule_id == result["rules"][rule_id]["meta_rule_id"] + assert rule == result["rules"][rule_id]["rule"] + return rule_id + + +def check_rule(policy_id, meta_rule_id, rule_id, rule): + req = requests.get(URL.format("/policies/{}/rules".format(policy_id))) + assert req.status_code == 200 + result = req.json() + assert "rules" in result + assert "policy_id" in result["rules"] + assert policy_id == result["rules"]["policy_id"] + for item in result["rules"]["rules"]: + assert "meta_rule_id" in item + if meta_rule_id == item["meta_rule_id"]: + if rule_id == item["id"]: + assert rule == item["rule"] + + +def delete_rule(policy_id, rule_id): + req = requests.delete(URL.format("/policies/{}/rules/{}".format(policy_id, rule_id))) + assert req.status_code == 200 + result = req.json() + assert "result" in result + assert result["result"] + + req = requests.get(URL.format("/policies/{}/rules".format(policy_id))) + assert req.status_code == 200 + result = req.json() + assert "rules" in result + assert "policy_id" in result["rules"] + assert policy_id == result["rules"]["policy_id"] + found_rule = False + for item in result["rules"]["rules"]: + if rule_id == item["id"]: + found_rule = True + assert not found_rule |