diff options
Diffstat (limited to 'moonv4/tests')
-rw-r--r-- | moonv4/tests/get_keystone_projects.py | 7 | ||||
-rw-r--r-- | moonv4/tests/populate_default_values.py | 195 | ||||
-rw-r--r-- | moonv4/tests/send_authz.py | 243 | ||||
-rw-r--r-- | moonv4/tests/test_models.py | 37 | ||||
-rw-r--r-- | moonv4/tests/test_pdp.py | 16 | ||||
-rw-r--r-- | moonv4/tests/test_policies.py | 157 | ||||
-rw-r--r-- | moonv4/tests/utils/__init__.py | 0 | ||||
-rw-r--r-- | moonv4/tests/utils/config.py | 44 | ||||
-rw-r--r-- | moonv4/tests/utils/models.py | 275 | ||||
-rw-r--r-- | moonv4/tests/utils/parse.py | 83 | ||||
-rw-r--r-- | moonv4/tests/utils/pdp.py | 175 | ||||
-rw-r--r-- | moonv4/tests/utils/policies.py | 642 |
12 files changed, 24 insertions, 1850 deletions
diff --git a/moonv4/tests/get_keystone_projects.py b/moonv4/tests/get_keystone_projects.py index 7b37b0e7..9b5d87cd 100644 --- a/moonv4/tests/get_keystone_projects.py +++ b/moonv4/tests/get_keystone_projects.py @@ -1,8 +1,5 @@ -from utils import pdp -from utils import parse -from utils import models -from utils import policies -from utils import pdp +from python_moonclient import parse, models, policies, pdp + if __name__ == "__main__": args = parse.parse() diff --git a/moonv4/tests/populate_default_values.py b/moonv4/tests/populate_default_values.py index 28795526..d5a5769b 100644 --- a/moonv4/tests/populate_default_values.py +++ b/moonv4/tests/populate_default_values.py @@ -1,194 +1,11 @@ import logging from importlib.machinery import SourceFileLoader -from utils import parse -from utils import models -from utils import policies -from utils import pdp +from python_moonclient import parse, models, policies, pdp -logger = None - - -def create_model(model_id=None): - if args.verbose: - logger.info("Creating model {}".format(scenario.model_name)) - if not model_id: - logger.info("Add model") - model_id = models.add_model(name=scenario.model_name) - logger.info("Add subject categories") - for cat in scenario.subject_categories: - scenario.subject_categories[cat] = models.add_subject_category(name=cat) - logger.info("Add object categories") - for cat in scenario.object_categories: - scenario.object_categories[cat] = models.add_object_category(name=cat) - logger.info("Add action categories") - for cat in scenario.action_categories: - scenario.action_categories[cat] = models.add_action_category(name=cat) - sub_cat = [] - ob_cat = [] - act_cat = [] - meta_rule_list = [] - for item_name, item_value in scenario.meta_rule.items(): - for item in item_value["value"]: - if item in scenario.subject_categories: - sub_cat.append(scenario.subject_categories[item]) - elif item in scenario.object_categories: - ob_cat.append(scenario.object_categories[item]) - elif item in scenario.action_categories: - act_cat.append(scenario.action_categories[item]) - meta_rules = models.check_meta_rule(meta_rule_id=None) - for _meta_rule_id, _meta_rule_value in meta_rules['meta_rules'].items(): - if _meta_rule_value['name'] == item_name: - meta_rule_id = _meta_rule_id - break - else: - logger.info("Add meta rule") - meta_rule_id = models.add_meta_rule(item_name, sub_cat, ob_cat, act_cat) - item_value["id"] = meta_rule_id - if meta_rule_id not in meta_rule_list: - meta_rule_list.append(meta_rule_id) - return model_id, meta_rule_list - - -def create_policy(model_id, meta_rule_list): - if args.verbose: - logger.info("Creating policy {}".format(scenario.policy_name)) - _policies = policies.check_policy() - for _policy_id, _policy_value in _policies["policies"].items(): - if _policy_value['name'] == scenario.policy_name: - policy_id = _policy_id - break - else: - policy_id = policies.add_policy(name=scenario.policy_name, genre=scenario.policy_genre) - - policies.update_policy(policy_id, model_id) - - for meta_rule_id in meta_rule_list: - logger.debug("add_meta_rule_to_model {} {}".format(model_id, meta_rule_id)) - models.add_meta_rule_to_model(model_id, meta_rule_id) - - logger.info("Add subject data") - for subject_cat_name in scenario.subject_data: - for subject_data_name in scenario.subject_data[subject_cat_name]: - data_id = scenario.subject_data[subject_cat_name][subject_data_name] = policies.add_subject_data( - policy_id=policy_id, - category_id=scenario.subject_categories[subject_cat_name], name=subject_data_name) - scenario.subject_data[subject_cat_name][subject_data_name] = data_id - logger.info("Add object data") - for object_cat_name in scenario.object_data: - for object_data_name in scenario.object_data[object_cat_name]: - data_id = scenario.object_data[object_cat_name][object_data_name] = policies.add_object_data( - policy_id=policy_id, - category_id=scenario.object_categories[object_cat_name], name=object_data_name) - scenario.object_data[object_cat_name][object_data_name] = data_id - logger.info("Add action data") - for action_cat_name in scenario.action_data: - for action_data_name in scenario.action_data[action_cat_name]: - data_id = scenario.action_data[action_cat_name][action_data_name] = policies.add_action_data( - policy_id=policy_id, - category_id=scenario.action_categories[action_cat_name], name=action_data_name) - scenario.action_data[action_cat_name][action_data_name] = data_id - - logger.info("Add subjects") - for name in scenario.subjects: - scenario.subjects[name] = policies.add_subject(policy_id, name=name) - logger.info("Add objects") - for name in scenario.objects: - scenario.objects[name] = policies.add_object(policy_id, name=name) - logger.info("Add actions") - for name in scenario.actions: - scenario.actions[name] = policies.add_action(policy_id, name=name) - - logger.info("Add subject assignments") - for subject_name in scenario.subject_assignments: - if type(scenario.subject_assignments[subject_name]) in (list, tuple): - for items in scenario.subject_assignments[subject_name]: - for subject_category_name in items: - subject_id = scenario.subjects[subject_name] - subject_cat_id = scenario.subject_categories[subject_category_name] - for data in scenario.subject_assignments[subject_name]: - subject_data_id = scenario.subject_data[subject_category_name][data[subject_category_name]] - policies.add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) - else: - for subject_category_name in scenario.subject_assignments[subject_name]: - subject_id = scenario.subjects[subject_name] - subject_cat_id = scenario.subject_categories[subject_category_name] - subject_data_id = scenario.subject_data[subject_category_name][scenario.subject_assignments[subject_name][subject_category_name]] - policies.add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) - - logger.info("Add object assignments") - for object_name in scenario.object_assignments: - if type(scenario.object_assignments[object_name]) in (list, tuple): - for items in scenario.object_assignments[object_name]: - for object_category_name in items: - object_id = scenario.objects[object_name] - object_cat_id = scenario.object_categories[object_category_name] - for data in scenario.object_assignments[object_name]: - object_data_id = scenario.object_data[object_category_name][data[object_category_name]] - policies.add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) - else: - for object_category_name in scenario.object_assignments[object_name]: - object_id = scenario.objects[object_name] - object_cat_id = scenario.object_categories[object_category_name] - object_data_id = scenario.object_data[object_category_name][scenario.object_assignments[object_name][object_category_name]] - policies.add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) - - logger.info("Add action assignments") - for action_name in scenario.action_assignments: - if type(scenario.action_assignments[action_name]) in (list, tuple): - for items in scenario.action_assignments[action_name]: - for action_category_name in items: - action_id = scenario.actions[action_name] - action_cat_id = scenario.action_categories[action_category_name] - for data in scenario.action_assignments[action_name]: - action_data_id = scenario.action_data[action_category_name][data[action_category_name]] - policies.add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) - else: - for action_category_name in scenario.action_assignments[action_name]: - action_id = scenario.actions[action_name] - action_cat_id = scenario.action_categories[action_category_name] - action_data_id = scenario.action_data[action_category_name][scenario.action_assignments[action_name][action_category_name]] - policies.add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) - - logger.info("Add rules") - for meta_rule_name in scenario.rules: - meta_rule_value = scenario.meta_rule[meta_rule_name] - for rule in scenario.rules[meta_rule_name]: - data_list = [] - _meta_rule = list(meta_rule_value["value"]) - for data_name in rule["rule"]: - category_name = _meta_rule.pop(0) - if category_name in scenario.subject_categories: - data_list.append(scenario.subject_data[category_name][data_name]) - elif category_name in scenario.object_categories: - data_list.append(scenario.object_data[category_name][data_name]) - elif category_name in scenario.action_categories: - data_list.append(scenario.action_data[category_name][data_name]) - instructions = rule["instructions"] - policies.add_rule(policy_id, meta_rule_value["id"], data_list, instructions) - return policy_id - - -def create_pdp(policy_id=None, project_id=None): - logger.info("Creating PDP {}".format(scenario.pdp_name)) - projects = pdp.get_keystone_projects() - if not project_id: - for _project in projects['projects']: - if _project['name'] == "admin": - project_id = _project['id'] - assert project_id - pdps = pdp.check_pdp()["pdps"] - for pdp_id, pdp_value in pdps.items(): - if scenario.pdp_name == pdp_value["name"]: - pdp.update_pdp(pdp_id, policy_id=policy_id) - logger.debug("Found existing PDP named {} (will add policy {})".format(scenario.pdp_name, policy_id)) - return pdp_id - _pdp_id = pdp.add_pdp(name=scenario.pdp_name, policy_id=policy_id) - pdp.map_to_keystone(pdp_id=_pdp_id, keystone_project_id=project_id) - return _pdp_id +logger = logging.getLogger("moonforming") if __name__ == "__main__": - logger = logging.getLogger("moonforming") requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.WARNING) requests_log.propagate = True @@ -212,9 +29,9 @@ if __name__ == "__main__": if _model_value['name'] == scenario.model_name: model_id = _model_id meta_rule_list = _model_value['meta_rules'] - create_model(model_id) + models.create_model(scenario, model_id) break else: - model_id, meta_rule_list = create_model() - policy_id = create_policy(model_id, meta_rule_list) - pdp_id = create_pdp(policy_id=policy_id, project_id=project_id) + model_id, meta_rule_list = models.create_model(scenario) + policy_id = policies.create_policy(scenario, model_id, meta_rule_list) + pdp_id = pdp.create_pdp(scenario, policy_id=policy_id, project_id=project_id) diff --git a/moonv4/tests/send_authz.py b/moonv4/tests/send_authz.py index 5766a0ec..b4ed1d2f 100644 --- a/moonv4/tests/send_authz.py +++ b/moonv4/tests/send_authz.py @@ -1,233 +1,26 @@ -import sys -import copy -import logging -import threading from importlib.machinery import SourceFileLoader -import requests -import time -import json -import random -from uuid import uuid4 -from utils.pdp import check_pdp -from utils.parse import parse -import utils.config +from python_moonclient import config, parse, models, policies, pdp, authz -logger = None -HOST_MANAGER = None -PORT_MANAGER = None -HOST_AUTHZ = None -PORT_AUTHZ = None -HOST_KEYSTONE = None -PORT_KEYSTONE = None - -lock = threading.Lock() -logger = logging.getLogger(__name__) - - -def get_scenario(args): - m = SourceFileLoader("scenario", args.filename[0]) - return m.load_module() - - -def get_keystone_id(pdp_name): - global HOST_MANAGER, PORT_MANAGER - keystone_project_id = None - for pdp_key, pdp_value in check_pdp(moon_url="http://{}:{}".format(HOST_MANAGER, PORT_MANAGER))["pdps"].items(): - if pdp_name: - if pdp_name != pdp_value["name"]: - continue - if pdp_value['security_pipeline'] and pdp_value["keystone_project_id"]: - logger.debug("Found pdp with keystone_project_id={}".format(pdp_value["keystone_project_id"])) - keystone_project_id = pdp_value["keystone_project_id"] - - if not keystone_project_id: - logger.error("Cannot find PDP with keystone project ID") - sys.exit(1) - return keystone_project_id - - -def _construct_payload(creds, current_rule, enforcer, target): - # Convert instances of object() in target temporarily to - # empty dict to avoid circular reference detection - # errors in jsonutils.dumps(). - temp_target = copy.deepcopy(target) - for key in target.keys(): - element = target.get(key) - if type(element) is object: - temp_target[key] = {} - _data = _json = None - if enforcer: - _data = {'rule': json.dumps(current_rule), - 'target': json.dumps(temp_target), - 'credentials': json.dumps(creds)} - else: - _json = {'rule': current_rule, - 'target': temp_target, - 'credentials': creds} - return _data, _json - - -def _send(url, data=None, stress_test=False): - current_request = dict() - current_request['url'] = url - try: - if stress_test: - current_request['start'] = time.time() - # with lock: - res = requests.get(url) - current_request['end'] = time.time() - current_request['delta'] = current_request["end"] - current_request["start"] - else: - with lock: - current_request['start'] = time.time() - if data: - data, _ = _construct_payload(data['credentials'], data['rule'], True, data['target']) - res = requests.post(url, json=data, - headers={'content-type': "application/x-www-form-urlencode"} - ) - else: - res = requests.get(url) - current_request['end'] = time.time() - current_request['delta'] = current_request["end"] - current_request["start"] - except requests.exceptions.ConnectionError: - logger.warning("Unable to connect to server") - return {} - if not stress_test: - try: - j = res.json() - if res.status_code == 200: - logger.warning("\033[1m{}\033[m \033[32mGrant\033[m".format(url)) - elif res.status_code == 401: - logger.warning("\033[1m{}\033[m \033[31mDeny\033[m".format(url)) - else: - logger.error("\033[1m{}\033[m {} {}".format(url, res.status_code, res.text)) - except Exception as e: - if res.text == "True": - logger.warning("\033[1m{}\033[m \033[32mGrant\033[m".format(url)) - elif res.text == "False": - logger.warning("\033[1m{}\033[m \033[31mDeny\033[m".format(url)) - else: - logger.error("\033[1m{}\033[m {} {}".format(url, res.status_code, res.text)) - logger.exception(e) - logger.error(res.text) - else: - if j.get("result"): - # logger.warning("{} \033[32m{}\033[m".format(url, j.get("result"))) - logger.debug("{}".format(j.get("error", ""))) - current_request['result'] = "Grant" - else: - # logger.warning("{} \033[31m{}\033[m".format(url, "Deny")) - logger.debug("{}".format(j)) - current_request['result'] = "Deny" - return current_request - - -class AsyncGet(threading.Thread): - - def __init__(self, url, semaphore=None, **kwargs): - threading.Thread.__init__(self) - self.url = url - self.kwargs = kwargs - self.sema = semaphore - self.result = dict() - self.uuid = uuid4().hex - self.index = kwargs.get("index", 0) - - def run(self): - self.result = _send(self.url, - data=self.kwargs.get("data"), - stress_test=self.kwargs.get("stress_test", False)) - self.result['index'] = self.index - - -def send_requests(scenario, keystone_project_id, request_second=1, limit=500, - dry_run=None, stress_test=False, destination="wrapper"): - global HOST_AUTHZ, PORT_AUTHZ - backgrounds = [] - time_data = list() - start_timing = time.time() - request_cpt = 0 - SUBJECTS = tuple(scenario.subjects.keys()) - OBJECTS = tuple(scenario.objects.keys()) - ACTIONS = tuple(scenario.actions.keys()) - while request_cpt < limit: - rule = (random.choice(SUBJECTS), random.choice(OBJECTS), random.choice(ACTIONS)) - if destination.lower() == "wrapper": - url = "http://{}:{}/authz".format(HOST_AUTHZ, PORT_AUTHZ) - data = { - 'target': { - "user_id": random.choice(SUBJECTS), - "target": { - "name": random.choice(OBJECTS) - }, - "project_id": keystone_project_id - }, - 'credentials': None, - 'rule': random.choice(ACTIONS) - } - else: - url = "http://{}:{}/authz/{}/{}".format(HOST_AUTHZ, PORT_AUTHZ, keystone_project_id, "/".join(rule)) - data = None - if dry_run: - logger.info(url) - continue - request_cpt += 1 - if stress_test: - time_data.append(copy.deepcopy(_send(url, stress_test=stress_test))) - else: - background = AsyncGet(url, stress_test=stress_test, data=data, - index=request_cpt) - backgrounds.append(background) - background.start() - if request_second > 0: - if request_cpt % request_second == 0: - if time.time()-start_timing < 1: - while True: - if time.time()-start_timing > 1: - break - start_timing = time.time() - if not stress_test: - for background in backgrounds: - background.join() - if background.result: - time_data.append(copy.deepcopy(background.result)) - return time_data - - -def save_data(filename, time_data): - json.dump(time_data, open(filename, "w")) - - -def get_delta(time_data): - time_delta = list() - time_delta_sum1 = 0 - for item in time_data: - time_delta.append(item['delta']) - time_delta_sum1 += item['delta'] - time_delta_average1 = time_delta_sum1 / len(time_data) - return time_delta, time_delta_average1 - - -def main(): - global HOST_MANAGER, PORT_MANAGER, HOST_AUTHZ, PORT_AUTHZ - - args = parse() +if __name__ == "__main__": + args = parse.parse() consul_host = args.consul_host consul_port = args.consul_port - conf_data = utils.config.get_config_data(consul_host, consul_port) - HOST_MANAGER = conf_data['manager_host'] - PORT_MANAGER = conf_data['manager_port'] - HOST_AUTHZ = args.authz_host - PORT_AUTHZ = args.authz_port - # HOST_KEYSTONE = conf_data['keystone_host'] - # PORT_KEYSTONE = conf_data['manager_host'] + models.init(consul_host, consul_port) + policies.init(consul_host, consul_port) + pdp.init(consul_host, consul_port) + + if args.filename: + print("Loading: {}".format(args.filename[0])) + m = SourceFileLoader("scenario", args.filename[0]) + scenario = m.load_module() - scenario = get_scenario(args) - keystone_project_id = get_keystone_id(args.pdp) - time_data = send_requests( + keystone_project_id = pdp.get_keystone_id(args.pdp) + time_data = authz.send_requests( scenario, + args.authz_host, + args.authz_port, keystone_project_id, request_second=args.request_second, limit=args.limit, @@ -236,8 +29,4 @@ def main(): destination=args.destination ) if not args.dry_run: - save_data(args.write, time_data) - - -if __name__ == "__main__": - main() + authz.save_data(args.write, time_data) diff --git a/moonv4/tests/test_models.py b/moonv4/tests/test_models.py deleted file mode 100644 index 0da40ce5..00000000 --- a/moonv4/tests/test_models.py +++ /dev/null @@ -1,37 +0,0 @@ -from utils.models import * - - -def test_models(): - check_model() - model_id = add_model() - check_model(model_id) - delete_model(model_id) - - -def test_meta_data_subject(): - category_id = add_subject_category() - check_subject_category(category_id) - # TODO (asteroide): must implement the deletion of linked data - # delete_subject_category(category_id) - - -def test_meta_data_object(): - category_id = add_object_category() - check_object_category(category_id) - # TODO (asteroide): must implement the deletion of linked data - # delete_object_category(category_id) - - -def test_meta_data_action(): - category_id = add_action_category() - check_action_category(category_id) - # TODO (asteroide): must implement the deletion of linked data - # delete_action_category(category_id) - - -def test_meta_rule(): - meta_rule_id, scat_id, ocat_id, acat_id = add_categories_and_meta_rule() - check_meta_rule(meta_rule_id, scat_id, ocat_id, acat_id) - delete_meta_rule(meta_rule_id) - - diff --git a/moonv4/tests/test_pdp.py b/moonv4/tests/test_pdp.py deleted file mode 100644 index 6cd5365b..00000000 --- a/moonv4/tests/test_pdp.py +++ /dev/null @@ -1,16 +0,0 @@ -from utils.pdp import * - - -def test_pdp(): - projects = get_keystone_projects() - admin_project_id = None - for _project in projects['projects']: - if _project['name'] == "admin": - admin_project_id = _project['id'] - assert admin_project_id - check_pdp() - pdp_id = add_pdp() - check_pdp(pdp_id) - map_to_keystone(pdp_id=pdp_id, keystone_project_id=admin_project_id) - check_pdp(pdp_id=pdp_id, keystone_project_id=admin_project_id) - delete_pdp(pdp_id) diff --git a/moonv4/tests/test_policies.py b/moonv4/tests/test_policies.py deleted file mode 100644 index 8f26d72d..00000000 --- a/moonv4/tests/test_policies.py +++ /dev/null @@ -1,157 +0,0 @@ -from utils.policies import * -from utils.models import * - - -def test_policies(): - check_policy() - policy_id = add_policy() - check_policy(policy_id) - delete_policy(policy_id) - - -def test_subjects(): - policy_id = add_policy() - subject_id = add_subject() - - update_subject(subject_id=subject_id, policy_id=policy_id) - - check_subject(subject_id=subject_id, policy_id=policy_id) - - delete_subject(subject_id, policy_id=policy_id) - delete_subject(subject_id) - - -def test_objects(): - policy_id = add_policy() - object_id = add_object() - - update_object(object_id=object_id, policy_id=policy_id) - check_object(object_id=object_id, policy_id=policy_id) - - delete_object(object_id=object_id, policy_id=policy_id) - delete_object(object_id=object_id) - - -def test_actions(): - policy_id = add_policy() - action_id = add_action() - - update_action(action_id=action_id, policy_id=policy_id) - check_action(action_id=action_id, policy_id=policy_id) - - delete_action(action_id=action_id, policy_id=policy_id) - delete_action(action_id=action_id) - - -def test_subject_data(): - policy_id = add_policy() - - model_id = add_model() - - update_policy(policy_id, model_id) - - meta_rule_id, subject_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule() - add_meta_rule_to_model(model_id, meta_rule_id) - - subject_data_id = add_subject_data(policy_id=policy_id, category_id=subject_cat_id) - check_subject_data(policy_id=policy_id, data_id=subject_data_id, category_id=subject_cat_id) - delete_subject_data(policy_id=policy_id, data_id=subject_data_id, category_id=subject_cat_id) - - -def test_object_data(): - policy_id = add_policy() - - model_id = add_model() - - update_policy(policy_id, model_id) - - meta_rule_id, object_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule() - add_meta_rule_to_model(model_id, meta_rule_id) - - object_data_id = add_object_data(policy_id=policy_id, category_id=object_cat_id) - check_object_data(policy_id=policy_id, data_id=object_data_id, category_id=object_cat_id) - delete_object_data(policy_id=policy_id, data_id=object_data_id, category_id=object_cat_id) - - -def test_action_data(): - policy_id = add_policy() - - model_id = add_model() - - update_policy(policy_id, model_id) - - meta_rule_id, action_cat_id, action_cat_id, action_cat_id = add_categories_and_meta_rule() - add_meta_rule_to_model(model_id, meta_rule_id) - - action_data_id = add_action_data(policy_id=policy_id, category_id=action_cat_id) - check_action_data(policy_id=policy_id, data_id=action_data_id, category_id=action_cat_id) - delete_action_data(policy_id=policy_id, data_id=action_data_id, category_id=action_cat_id) - - -def test_assignments(): - policy_id = add_policy() - - model_id = add_model() - - update_policy(policy_id, model_id) - - meta_rule_id, subject_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule() - add_meta_rule_to_model(model_id, meta_rule_id) - - subject_data_id = add_subject_data(policy_id=policy_id, category_id=subject_cat_id) - subject_data_id_bis = add_subject_data(policy_id=policy_id, category_id=subject_cat_id) - object_data_id = add_object_data(policy_id=policy_id, category_id=object_cat_id) - object_data_id_bis = add_object_data(policy_id=policy_id, category_id=object_cat_id) - action_data_id = add_action_data(policy_id=policy_id, category_id=action_cat_id) - action_data_id_bis = add_action_data(policy_id=policy_id, category_id=action_cat_id) - - subject_id = add_subject(policy_id) - object_id = add_object(policy_id) - action_id = add_action(policy_id) - - add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) - add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id_bis) - add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) - add_object_assignments(policy_id, object_id, object_cat_id, object_data_id_bis) - add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) - add_action_assignments(policy_id, action_id, action_cat_id, action_data_id_bis) - - check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) - check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id_bis) - check_object_assignments(policy_id, object_id, object_cat_id, object_data_id) - check_object_assignments(policy_id, object_id, object_cat_id, object_data_id_bis) - check_action_assignments(policy_id, action_id, action_cat_id, action_data_id) - check_action_assignments(policy_id, action_id, action_cat_id, action_data_id_bis) - - delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id) - delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id) - delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id) - - -def test_rule(): - policy_id = add_policy() - - model_id = add_model() - - update_policy(policy_id, model_id) - - meta_rule_id, subject_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule() - add_meta_rule_to_model(model_id, meta_rule_id) - - subject_data_id = add_subject_data(policy_id=policy_id, category_id=subject_cat_id) - object_data_id = add_object_data(policy_id=policy_id, category_id=object_cat_id) - action_data_id = add_action_data(policy_id=policy_id, category_id=action_cat_id) - - subject_id = add_subject(policy_id) - object_id = add_object(policy_id) - action_id = add_action(policy_id) - - add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) - add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) - add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) - - rule_id = add_rule(policy_id, meta_rule_id, [subject_data_id, object_data_id, action_data_id]) - check_rule(policy_id, meta_rule_id, rule_id, [subject_data_id, object_data_id, action_data_id]) - - delete_rule(policy_id, rule_id) - diff --git a/moonv4/tests/utils/__init__.py b/moonv4/tests/utils/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/moonv4/tests/utils/__init__.py +++ /dev/null diff --git a/moonv4/tests/utils/config.py b/moonv4/tests/utils/config.py deleted file mode 100644 index d6317820..00000000 --- a/moonv4/tests/utils/config.py +++ /dev/null @@ -1,44 +0,0 @@ -import base64 -import json -import requests - - -def get_configuration(consul_host, consul_port, key): - url = "http://{}:{}/v1/kv/{}".format(consul_host, consul_port, key) - req = requests.get(url) - if req.status_code != 200: - raise Exception("xxx") - data = req.json() - if len(data) == 1: - data = data[0] - return {data["Key"]: json.loads(base64.b64decode(data["Value"]).decode("utf-8"))} - else: - return [ - {item["Key"]: json.loads(base64.b64decode(item["Value"]).decode("utf-8"))} - for item in data - ] - - -def get_config_data(consul_host, consul_port): - conf_data = dict() - conf_data['manager_host'] = get_configuration(consul_host, consul_port, - 'components/manager')['components/manager']['external']['hostname'] - conf_data['manager_port'] = get_configuration(consul_host, consul_port, - 'components/manager')['components/manager']['external']['port'] - # conf_data['authz_host'] = get_configuration(consul_host, consul_port, - # 'components/interface')['components/interface']['external']['hostname'] - # conf_data['authz_port'] = get_configuration(consul_host, consul_port, - # 'components/interface')['components/interface']['external']['port'] - conf_data['keystone_host'] = get_configuration(consul_host, consul_port, - 'openstack/keystone')['openstack/keystone']['external']['url'] - # conf_data['keystone_port'] = '5000' - conf_data['keystone_user'] = get_configuration(consul_host, consul_port, - 'openstack/keystone')['openstack/keystone']['user'] - conf_data['keystone_password'] = get_configuration(consul_host, consul_port, - 'openstack/keystone')['openstack/keystone']['password'] - conf_data['keystone_project'] = get_configuration(consul_host, consul_port, - 'openstack/keystone')['openstack/keystone']['project'] - return conf_data - -# get_conf_data('88.88.88.2', '30005') -# get_conf_data('127.0.0.1', 8082) diff --git a/moonv4/tests/utils/models.py b/moonv4/tests/utils/models.py deleted file mode 100644 index 61fa6179..00000000 --- a/moonv4/tests/utils/models.py +++ /dev/null @@ -1,275 +0,0 @@ -import requests -import copy -import utils.config - -URL = None -HEADERS = None - -model_template = { - "name": "test_model", - "description": "test", - "meta_rules": [] -} - -category_template = { - "name": "name of the category", - "description": "description of the category" -} - -meta_rule_template = { - "name": "test_meta_rule", - "subject_categories": [], - "object_categories": [], - "action_categories": [] -} - - -def init(consul_host, consul_port): - conf_data = utils.config.get_config_data(consul_host, consul_port) - global URL, HEADERS - URL = "http://{}:{}".format( - conf_data['manager_host'], - conf_data['manager_port']) - URL = URL + "{}" - HEADERS = {"content-type": "application/json"} - - -def check_model(model_id=None, check_model_name=True): - req = requests.get(URL.format("/models")) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "models" in result - if model_id: - assert result["models"] - assert model_id in result['models'] - assert "name" in result['models'][model_id] - if check_model_name: - assert model_template["name"] == result['models'][model_id]["name"] - return result - - -def add_model(name=None): - if name: - model_template['name'] = name - req = requests.post(URL.format("/models"), json=model_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - model_id = list(result['models'].keys())[0] - if "result" in result: - assert result["result"] - assert "name" in result['models'][model_id] - assert model_template["name"] == result['models'][model_id]["name"] - return model_id - - -def delete_model(model_id): - req = requests.delete(URL.format("/models/{}".format(model_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] - - -def add_subject_category(name="subject_cat_1"): - category_template["name"] = name - req = requests.post(URL.format("/subject_categories"), json=category_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "subject_categories" in result - category_id = list(result['subject_categories'].keys())[0] - if "result" in result: - assert result["result"] - assert "name" in result['subject_categories'][category_id] - assert category_template["name"] == result['subject_categories'][category_id]["name"] - return category_id - - -def check_subject_category(category_id): - req = requests.get(URL.format("/subject_categories")) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "subject_categories" in result - if "result" in result: - assert result["result"] - assert category_id in result['subject_categories'] - assert "name" in result['subject_categories'][category_id] - assert category_template["name"] == result['subject_categories'][category_id]["name"] - - -def delete_subject_category(category_id): - req = requests.delete(URL.format("/subject_categories/{}".format(category_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - if "result" in result: - assert result["result"] - - -def add_object_category(name="object_cat_1"): - category_template["name"] = name - req = requests.post(URL.format("/object_categories"), json=category_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "object_categories" in result - category_id = list(result['object_categories'].keys())[0] - if "result" in result: - assert result["result"] - assert "name" in result['object_categories'][category_id] - assert category_template["name"] == result['object_categories'][category_id]["name"] - return category_id - - -def check_object_category(category_id): - req = requests.get(URL.format("/object_categories")) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "object_categories" in result - if "result" in result: - assert result["result"] - assert category_id in result['object_categories'] - assert "name" in result['object_categories'][category_id] - assert category_template["name"] == result['object_categories'][category_id]["name"] - - -def delete_object_category(category_id): - req = requests.delete(URL.format("/object_categories/{}".format(category_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - if "result" in result: - assert result["result"] - - -def add_action_category(name="action_cat_1"): - category_template["name"] = name - req = requests.post(URL.format("/action_categories"), json=category_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "action_categories" in result - category_id = list(result['action_categories'].keys())[0] - if "result" in result: - assert result["result"] - assert "name" in result['action_categories'][category_id] - assert category_template["name"] == result['action_categories'][category_id]["name"] - return category_id - - -def check_action_category(category_id): - req = requests.get(URL.format("/action_categories")) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "action_categories" in result - if "result" in result: - assert result["result"] - assert category_id in result['action_categories'] - assert "name" in result['action_categories'][category_id] - assert category_template["name"] == result['action_categories'][category_id]["name"] - - -def delete_action_category(category_id): - req = requests.delete(URL.format("/action_categories/{}".format(category_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - if "result" in result: - assert result["result"] - - -def add_categories_and_meta_rule(name="test_meta_rule"): - scat_id = add_subject_category() - ocat_id = add_object_category() - acat_id = add_action_category() - _meta_rule_template = copy.deepcopy(meta_rule_template) - _meta_rule_template["name"] = name - _meta_rule_template["subject_categories"].append(scat_id) - _meta_rule_template["object_categories"].append(ocat_id) - _meta_rule_template["action_categories"].append(acat_id) - req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "meta_rules" in result - meta_rule_id = list(result['meta_rules'].keys())[0] - if "result" in result: - assert result["result"] - assert "name" in result['meta_rules'][meta_rule_id] - assert _meta_rule_template["name"] == result['meta_rules'][meta_rule_id]["name"] - return meta_rule_id, scat_id, ocat_id, acat_id - - -def add_meta_rule(name="test_meta_rule", scat=[], ocat=[], acat=[]): - _meta_rule_template = copy.deepcopy(meta_rule_template) - _meta_rule_template["name"] = name - _meta_rule_template["subject_categories"] = [] - _meta_rule_template["subject_categories"].extend(scat) - _meta_rule_template["object_categories"] = [] - _meta_rule_template["object_categories"].extend(ocat) - _meta_rule_template["action_categories"] = [] - _meta_rule_template["action_categories"].extend(acat) - req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "meta_rules" in result - meta_rule_id = list(result['meta_rules'].keys())[0] - if "result" in result: - assert result["result"] - assert "name" in result['meta_rules'][meta_rule_id] - assert _meta_rule_template["name"] == result['meta_rules'][meta_rule_id]["name"] - return meta_rule_id - - -def check_meta_rule(meta_rule_id, scat_id=None, ocat_id=None, acat_id=None): - req = requests.get(URL.format("/meta_rules")) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "meta_rules" in result - if "result" in result: - assert result["result"] - if not meta_rule_id: - return result - assert meta_rule_id in result['meta_rules'] - assert "name" in result['meta_rules'][meta_rule_id] - if scat_id: - assert scat_id in result['meta_rules'][meta_rule_id]["subject_categories"] - if ocat_id: - assert ocat_id in result['meta_rules'][meta_rule_id]["object_categories"] - if acat_id: - assert acat_id in result['meta_rules'][meta_rule_id]["action_categories"] - - -def delete_meta_rule(meta_rule_id): - req = requests.delete(URL.format("/meta_rules/{}".format(meta_rule_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - if "result" in result: - assert result["result"] - - -def add_meta_rule_to_model(model_id, meta_rule_id): - model = check_model(model_id, check_model_name=False)['models'] - meta_rule_list = model[model_id]["meta_rules"] - if meta_rule_id not in meta_rule_list: - meta_rule_list.append(meta_rule_id) - req = requests.patch(URL.format("/models/{}".format(model_id)), - json={"meta_rules": meta_rule_list}, - headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - model_id = list(result['models'].keys())[0] - if "result" in result: - assert result["result"] - assert "meta_rules" in result['models'][model_id] - assert meta_rule_list == result['models'][model_id]["meta_rules"] diff --git a/moonv4/tests/utils/parse.py b/moonv4/tests/utils/parse.py deleted file mode 100644 index 34a4a996..00000000 --- a/moonv4/tests/utils/parse.py +++ /dev/null @@ -1,83 +0,0 @@ -import logging -import argparse - - -logger = None - - -def parse(): - global logger - logger = logging.getLogger(__name__) - requests_log = logging.getLogger("requests.packages.urllib3") - requests_log.setLevel(logging.WARNING) - requests_log.propagate = True - - parser = argparse.ArgumentParser() - parser.add_argument('filename', help='scenario filename', nargs=1) - parser.add_argument("--verbose", "-v", action='store_true', - help="verbose mode") - parser.add_argument("--debug", "-d", action='store_true', - help="debug mode") - parser.add_argument("--dry-run", "-n", action='store_true', - help="Dry run", dest="dry_run") - parser.add_argument("--destination", - help="Set the type of output needed " - "(default: wrapper, other possible type: " - "interface).", - default="wrapper") - parser.add_argument("--consul-host", - help="Set the name of the consul server" - "(default: 127.0.0.1).", - default="127.0.0.1") - parser.add_argument("--consul-port", - help="Set the port of the consult server" - "(default: 8082).", - default="8082") - parser.add_argument("--authz-host", - help="Set the name of the authz server to test" - "(default: 127.0.0.1).", - default="127.0.0.1") - parser.add_argument("--authz-port", - help="Set the port of the authz server to test" - "(default: 31002).", - default="31002") - parser.add_argument("--keystone-pid", "--keystone-project-id", - help="Set the Keystone project ID" - "(default: None).", - default=None) - parser.add_argument("--stress-test", "-s", action='store_true', - dest='stress_test', - help="Execute stressing tests (warning delta measures " - "will be false, implies -t)") - parser.add_argument("--write", "-w", help="Write test data to a JSON file", - default="/tmp/data.json") - parser.add_argument("--pdp", help="Test on pdp PDP") - parser.add_argument("--request-per-second", - help="Number of requests per seconds", - type=int, dest="request_second", default=-1) - parser.add_argument("--limit", help="Limit request to LIMIT", type=int, - default=500) - - args = parser.parse_args() - - FORMAT = '%(asctime)-15s %(levelname)s %(message)s' - if args.debug: - logging.basicConfig( - format=FORMAT, - level=logging.DEBUG) - elif args.verbose: - logging.basicConfig( - format=FORMAT, - level=logging.INFO) - else: - logging.basicConfig( - format=FORMAT, - level=logging.WARNING) - - if args.stress_test: - args.testonly = True - - if args.filename: - logger.info("Loading: {}".format(args.filename[0])) - - return args diff --git a/moonv4/tests/utils/pdp.py b/moonv4/tests/utils/pdp.py deleted file mode 100644 index 50998507..00000000 --- a/moonv4/tests/utils/pdp.py +++ /dev/null @@ -1,175 +0,0 @@ -import logging -import requests -import utils.config - -logger = logging.getLogger("moonforming.utils.policies") -URL = None -HEADER = None -KEYSTONE_USER = None -KEYSTONE_PASSWORD = None -KEYSTONE_PROJECT = None -KEYSTONE_SERVER = None - -# config = utils.config.get_config_data() - - -pdp_template = { - "name": "test_pdp", - "security_pipeline": [], - "keystone_project_id": None, - "description": "test", -} - - -def init(consul_host, consul_port): - conf_data = utils.config.get_config_data(consul_host, consul_port) - global URL, HEADER, KEYSTONE_USER, KEYSTONE_PASSWORD, KEYSTONE_PROJECT, KEYSTONE_SERVER - URL = "http://{}:{}".format( - conf_data['manager_host'], - conf_data['manager_port']) - # URL = URL + "{}" - HEADER = {"content-type": "application/json"} - KEYSTONE_USER = conf_data['keystone_user'] - KEYSTONE_PASSWORD = conf_data['keystone_password'] - KEYSTONE_PROJECT = conf_data['keystone_project'] - KEYSTONE_SERVER = conf_data['keystone_host'] - - -def get_keystone_projects(): - global HEADERS - HEADERS = { - "Content-Type": "application/json" - } - - data_auth = { - "auth": { - "identity": { - "methods": [ - "password" - ], - "password": { - "user": { - "name": KEYSTONE_USER, - "domain": { - "name": "Default" - }, - "password": KEYSTONE_PASSWORD - } - } - } - } - } - - req = requests.post("{}/auth/tokens".format(KEYSTONE_SERVER), json=data_auth, headers=HEADERS) - logger.debug("{}/auth/tokens".format(KEYSTONE_SERVER)) - logger.debug(req.text) - assert req.status_code in (200, 201) - TOKEN = req.headers['X-Subject-Token'] - HEADERS['X-Auth-Token'] = TOKEN - req = requests.get("{}/projects".format(KEYSTONE_SERVER), headers=HEADERS) - if req.status_code not in (200, 201): - data_auth["auth"]["scope"] = { - "project": { - "name": KEYSTONE_PROJECT, - "domain": { - "id": "default" - } - } - } - req = requests.post("{}/auth/tokens".format(KEYSTONE_SERVER), json=data_auth, headers=HEADERS) - assert req.status_code in (200, 201) - TOKEN = req.headers['X-Subject-Token'] - HEADERS['X-Auth-Token'] = TOKEN - req = requests.get("{}/projects".format(KEYSTONE_SERVER), headers=HEADERS) - assert req.status_code in (200, 201) - return req.json() - - -def check_pdp(pdp_id=None, keystone_project_id=None, moon_url=None): - _URL = URL - if moon_url: - _URL = moon_url - req = requests.get(_URL + "/pdp") - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "pdps" in result - if pdp_id: - assert result["pdps"] - assert pdp_id in result['pdps'] - assert "name" in result['pdps'][pdp_id] - assert pdp_template["name"] == result['pdps'][pdp_id]["name"] - if keystone_project_id: - assert result["pdps"] - assert pdp_id in result['pdps'] - assert "keystone_project_id" in result['pdps'][pdp_id] - assert keystone_project_id == result['pdps'][pdp_id]["keystone_project_id"] - return result - - -def add_pdp(name="test_pdp", policy_id=None): - pdp_template['name'] = name - if policy_id: - pdp_template['security_pipeline'].append(policy_id) - req = requests.post(URL + "/pdp", json=pdp_template, headers=HEADERS) - logger.debug(req.status_code) - logger.debug(req) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - pdp_id = list(result['pdps'].keys())[0] - if "result" in result: - assert result["result"] - assert "name" in result['pdps'][pdp_id] - assert pdp_template["name"] == result['pdps'][pdp_id]["name"] - return pdp_id - - -def update_pdp(pdp_id, policy_id=None): - req = requests.get(URL + "/pdp/{}".format(pdp_id)) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "pdps" in result - assert pdp_id in result['pdps'] - pipeline = result['pdps'][pdp_id]["security_pipeline"] - if policy_id not in pipeline: - pipeline.append(policy_id) - req = requests.patch(URL + "/pdp/{}".format(pdp_id), - json={"security_pipeline": pipeline}) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "pdps" in result - assert pdp_id in result['pdps'] - - req = requests.get(URL + "/pdp/{}".format(pdp_id)) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "pdps" in result - assert pdp_id in result['pdps'] - assert policy_id in pipeline - - -def map_to_keystone(pdp_id, keystone_project_id): - req = requests.patch(URL + "/pdp/{}".format(pdp_id), json={"keystone_project_id": keystone_project_id}, - headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - if "result" in result: - assert result["result"] - assert pdp_id in result['pdps'] - assert "name" in result['pdps'][pdp_id] - assert pdp_template["name"] == result['pdps'][pdp_id]["name"] - return pdp_id - - -def delete_pdp(pdp_id): - req = requests.delete(URL + "/pdp/{}".format(pdp_id)) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] diff --git a/moonv4/tests/utils/policies.py b/moonv4/tests/utils/policies.py deleted file mode 100644 index fd4d238f..00000000 --- a/moonv4/tests/utils/policies.py +++ /dev/null @@ -1,642 +0,0 @@ -import logging -import requests -import utils.config - -URL = None -HEADERS = None -FILE = open("/tmp/test.log", "w") -logger = logging.getLogger("utils.policies") - -policy_template = { - "name": "test_policy", - "model_id": "", - "genre": "authz", - "description": "test", -} - -subject_template = { - "name": "test_subject", - "description": "test", - "email": "mail", - "password": "my_pass", -} - -object_template = { - "name": "test_subject", - "description": "test" -} - -action_template = { - "name": "test_subject", - "description": "test" -} - -subject_data_template = { - "name": "subject_data1", - "description": "description of the data subject" -} - -object_data_template = { - "name": "object_data1", - "description": "description of the data subject" -} - -action_data_template = { - "name": "action_data1", - "description": "description of the data subject" -} - -subject_assignment_template = { - "id": "", - "category_id": "", - "scope_id": "" -} - - -def init(consul_host, consul_port): - conf_data = utils.config.get_config_data(consul_host, consul_port) - global URL, HEADERS - URL = "http://{}:{}".format( - conf_data['manager_host'], - conf_data['manager_port']) - URL = URL + "{}" - HEADERS = {"content-type": "application/json"} - - -def check_policy(policy_id=None): - req = requests.get(URL.format("/policies")) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "policies" in result - if policy_id: - assert result["policies"] - assert policy_id in result['policies'] - assert "name" in result['policies'][policy_id] - assert policy_template["name"] == result['policies'][policy_id]["name"] - return result - - -def add_policy(name="test_policy", genre="authz"): - policy_template["name"] = name - policy_template["genre"] = genre - req = requests.post(URL.format("/policies"), json=policy_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - policy_id = list(result['policies'].keys())[0] - if "result" in result: - assert result["result"] - assert "name" in result['policies'][policy_id] - assert policy_template["name"] == result['policies'][policy_id]["name"] - return policy_id - - -def update_policy(policy_id, model_id): - req = requests.patch(URL.format("/policies/{}".format(policy_id)), - json={"model_id": model_id}, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - policy_id = list(result['policies'].keys())[0] - if "result" in result: - assert result["result"] - assert "model_id" in result['policies'][policy_id] - assert model_id == result['policies'][policy_id]["model_id"] - - -def delete_policy(policy_id): - req = requests.delete(URL.format("/policies/{}".format(policy_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] - - -def add_subject(policy_id=None, name="test_subject"): - subject_template['name'] = name - if policy_id: - logger.debug(URL.format("/policies/{}/subjects".format(policy_id))) - req = requests.post(URL.format("/policies/{}/subjects".format(policy_id)), - json=subject_template, headers=HEADERS) - else: - logger.debug(URL.format("/subjects")) - req = requests.post(URL.format("/subjects"), json=subject_template, headers=HEADERS) - logger.debug(req.text) - assert req.status_code == 200 - result = req.json() - assert "subjects" in result - subject_id = list(result['subjects'].keys())[0] - return subject_id - - -def update_subject(subject_id, policy_id=None, description=None): - if policy_id and not description: - req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)), - json={}) - elif policy_id and description: - req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)), - json={"description": description}) - else: - req = requests.patch(URL.format("/subjects/{}".format(subject_id)), - json={"description": description}) - assert req.status_code == 200 - result = req.json() - assert "subjects" in result - assert "name" in result["subjects"][subject_id] - assert subject_template["name"] == result["subjects"][subject_id]["name"] - assert "policy_list" in result["subjects"][subject_id] - if policy_id: - assert policy_id in result["subjects"][subject_id]["policy_list"] - if description: - assert description in result["subjects"][subject_id]["description"] - - -def check_subject(subject_id=None, policy_id=None): - if policy_id: - req = requests.get(URL.format("/policies/{}/subjects".format(policy_id))) - else: - req = requests.get(URL.format("/subjects")) - assert req.status_code == 200 - result = req.json() - assert "subjects" in result - assert "name" in result["subjects"][subject_id] - assert subject_template["name"] == result["subjects"][subject_id]["name"] - if policy_id: - assert "policy_list" in result["subjects"][subject_id] - assert policy_id in result["subjects"][subject_id]["policy_list"] - - -def delete_subject(subject_id, policy_id=None): - if policy_id: - req = requests.delete(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id))) - else: - req = requests.delete(URL.format("/subjects/{}".format(subject_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] - - if policy_id: - req = requests.get(URL.format("/policies/{}/subjects".format(policy_id))) - else: - req = requests.get(URL.format("/subjects")) - assert req.status_code == 200 - result = req.json() - assert "subjects" in result - if subject_id in result["subjects"]: - assert "name" in result["subjects"][subject_id] - assert subject_template["name"] == result["subjects"][subject_id]["name"] - if policy_id: - assert "policy_list" in result["subjects"][subject_id] - assert policy_id not in result["subjects"][subject_id]["policy_list"] - - -def add_object(policy_id=None, name="test_object"): - object_template['name'] = name - if policy_id: - req = requests.post(URL.format("/policies/{}/objects".format(policy_id)), - json=object_template, headers=HEADERS) - else: - req = requests.post(URL.format("/objects"), json=object_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "objects" in result - object_id = list(result['objects'].keys())[0] - return object_id - - -def update_object(object_id, policy_id): - req = requests.patch(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)), json={}) - assert req.status_code == 200 - result = req.json() - assert "objects" in result - assert "name" in result["objects"][object_id] - assert object_template["name"] == result["objects"][object_id]["name"] - assert "policy_list" in result["objects"][object_id] - assert policy_id in result["objects"][object_id]["policy_list"] - - -def check_object(object_id=None, policy_id=None): - if policy_id: - req = requests.get(URL.format("/policies/{}/objects".format(policy_id))) - else: - req = requests.get(URL.format("/objects")) - assert req.status_code == 200 - result = req.json() - assert "objects" in result - assert "name" in result["objects"][object_id] - assert object_template["name"] == result["objects"][object_id]["name"] - if policy_id: - assert "policy_list" in result["objects"][object_id] - assert policy_id in result["objects"][object_id]["policy_list"] - - -def delete_object(object_id, policy_id=None): - if policy_id: - req = requests.delete(URL.format("/policies/{}/objects/{}".format(policy_id, object_id))) - else: - req = requests.delete(URL.format("/objects/{}".format(object_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] - - if policy_id: - req = requests.get(URL.format("/policies/{}/objects".format(policy_id))) - else: - req = requests.get(URL.format("/objects")) - assert req.status_code == 200 - result = req.json() - assert "objects" in result - if object_id in result["objects"]: - assert "name" in result["objects"][object_id] - assert object_template["name"] == result["objects"][object_id]["name"] - if policy_id: - assert "policy_list" in result["objects"][object_id] - assert policy_id not in result["objects"][object_id]["policy_list"] - - -def add_action(policy_id=None, name="test_action"): - action_template['name'] = name - if policy_id: - req = requests.post(URL.format("/policies/{}/actions".format(policy_id)), - json=action_template, headers=HEADERS) - else: - req = requests.post(URL.format("/actions"), json=action_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "actions" in result - action_id = list(result['actions'].keys())[0] - return action_id - - -def update_action(action_id, policy_id): - req = requests.patch(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)), json={}) - assert req.status_code == 200 - result = req.json() - assert "actions" in result - assert "name" in result["actions"][action_id] - assert action_template["name"] == result["actions"][action_id]["name"] - assert "policy_list" in result["actions"][action_id] - assert policy_id in result["actions"][action_id]["policy_list"] - - -def check_action(action_id=None, policy_id=None): - if policy_id: - req = requests.get(URL.format("/policies/{}/actions".format(policy_id))) - else: - req = requests.get(URL.format("/actions")) - assert req.status_code == 200 - result = req.json() - assert "actions" in result - assert "name" in result["actions"][action_id] - assert action_template["name"] == result["actions"][action_id]["name"] - if policy_id: - assert "policy_list" in result["actions"][action_id] - assert policy_id in result["actions"][action_id]["policy_list"] - - -def delete_action(action_id, policy_id=None): - if policy_id: - req = requests.delete(URL.format("/policies/{}/actions/{}".format(policy_id, action_id))) - else: - req = requests.delete(URL.format("/actions/{}".format(action_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] - - if policy_id: - req = requests.get(URL.format("/policies/{}/actions".format(policy_id))) - else: - req = requests.get(URL.format("/actions")) - assert req.status_code == 200 - result = req.json() - assert "actions" in result - if action_id in result["actions"]: - assert "name" in result["actions"][action_id] - assert action_template["name"] == result["actions"][action_id]["name"] - if policy_id: - assert "policy_list" in result["actions"][action_id] - assert policy_id not in result["actions"][action_id]["policy_list"] - - -def add_subject_data(policy_id, category_id, name="subject_data1"): - subject_data_template['name'] = name - req = requests.post(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)), - json=subject_data_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "subject_data" in result - subject_id = list(result['subject_data']['data'].keys())[0] - return subject_id - - -def check_subject_data(policy_id, data_id, category_id): - req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "subject_data" in result - for _data in result['subject_data']: - assert data_id in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def delete_subject_data(policy_id, category_id, data_id): - req = requests.delete(URL.format("/policies/{}/subject_data/{}/{}".format(policy_id, category_id, data_id)), - headers=HEADERS) - assert req.status_code == 200 - req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "subject_data" in result - for _data in result['subject_data']: - assert data_id not in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def add_object_data(policy_id, category_id, name="object_data1"): - object_data_template['name'] = name - req = requests.post(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)), - json=object_data_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "object_data" in result - object_id = list(result['object_data']['data'].keys())[0] - return object_id - - -def check_object_data(policy_id, data_id, category_id): - req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "object_data" in result - for _data in result['object_data']: - assert data_id in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def delete_object_data(policy_id, category_id, data_id): - req = requests.delete(URL.format("/policies/{}/object_data/{}/{}".format(policy_id, category_id, data_id)), - headers=HEADERS) - assert req.status_code == 200 - req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "object_data" in result - for _data in result['object_data']: - assert data_id not in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def add_action_data(policy_id, category_id, name="action_data1"): - action_data_template['name'] = name - req = requests.post(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)), - json=action_data_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "action_data" in result - action_id = list(result['action_data']['data'].keys())[0] - return action_id - - -def check_action_data(policy_id, data_id, category_id): - req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "action_data" in result - for _data in result['action_data']: - assert data_id in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def delete_action_data(policy_id, category_id, data_id): - req = requests.delete(URL.format("/policies/{}/action_data/{}/{}".format(policy_id, category_id, data_id)), - headers=HEADERS) - assert req.status_code == 200 - req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "action_data" in result - for _data in result['action_data']: - assert data_id not in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id): - req = requests.post(URL.format("/policies/{}/subject_assignments".format(policy_id)), - json={ - "id": subject_id, - "category_id": subject_cat_id, - "data_id": subject_data_id - }, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "subject_assignments" in result - assert result["subject_assignments"] - - -def check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id): - req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( - policy_id, subject_id, subject_cat_id, subject_data_id))) - assert req.status_code == 200 - result = req.json() - assert "subject_assignments" in result - assert result["subject_assignments"] - for key in result["subject_assignments"]: - assert "subject_id" in result["subject_assignments"][key] - assert "category_id" in result["subject_assignments"][key] - assert "assignments" in result["subject_assignments"][key] - if result["subject_assignments"][key]['subject_id'] == subject_id and \ - result["subject_assignments"][key]["category_id"] == subject_cat_id: - assert subject_data_id in result["subject_assignments"][key]["assignments"] - - -def check_object_assignments(policy_id, object_id, object_cat_id, object_data_id): - req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( - policy_id, object_id, object_cat_id, object_data_id))) - assert req.status_code == 200 - result = req.json() - assert "object_assignments" in result - assert result["object_assignments"] - for key in result["object_assignments"]: - assert "object_id" in result["object_assignments"][key] - assert "category_id" in result["object_assignments"][key] - assert "assignments" in result["object_assignments"][key] - if result["object_assignments"][key]['object_id'] == object_id and \ - result["object_assignments"][key]["category_id"] == object_cat_id: - assert object_data_id in result["object_assignments"][key]["assignments"] - - -def check_action_assignments(policy_id, action_id, action_cat_id, action_data_id): - req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( - policy_id, action_id, action_cat_id, action_data_id))) - assert req.status_code == 200 - result = req.json() - assert "action_assignments" in result - assert result["action_assignments"] - for key in result["action_assignments"]: - assert "action_id" in result["action_assignments"][key] - assert "category_id" in result["action_assignments"][key] - assert "assignments" in result["action_assignments"][key] - if result["action_assignments"][key]['action_id'] == action_id and \ - result["action_assignments"][key]["category_id"] == action_cat_id: - assert action_data_id in result["action_assignments"][key]["assignments"] - - -def add_object_assignments(policy_id, object_id, object_cat_id, object_data_id): - req = requests.post(URL.format("/policies/{}/object_assignments".format(policy_id)), - json={ - "id": object_id, - "category_id": object_cat_id, - "data_id": object_data_id - }, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "object_assignments" in result - assert result["object_assignments"] - - -def add_action_assignments(policy_id, action_id, action_cat_id, action_data_id): - req = requests.post(URL.format("/policies/{}/action_assignments".format(policy_id)), - json={ - "id": action_id, - "category_id": action_cat_id, - "data_id": action_data_id - }, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "action_assignments" in result - assert result["action_assignments"] - - -def delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id): - req = requests.delete(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( - policy_id, subject_id, subject_cat_id, subject_data_id))) - assert req.status_code == 200 - result = req.json() - assert "result" in result - assert result["result"] - - req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( - policy_id, subject_id, subject_cat_id, subject_data_id))) - assert req.status_code == 200 - result = req.json() - assert "subject_assignments" in result - assert result["subject_assignments"] - for key in result["subject_assignments"]: - assert "subject_id" in result["subject_assignments"][key] - assert "category_id" in result["subject_assignments"][key] - assert "assignments" in result["subject_assignments"][key] - if result["subject_assignments"][key]['subject_id'] == subject_id and \ - result["subject_assignments"][key]["category_id"] == subject_cat_id: - assert subject_data_id not in result["subject_assignments"][key]["assignments"] - - -def delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id): - req = requests.delete(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( - policy_id, object_id, object_cat_id, object_data_id))) - assert req.status_code == 200 - result = req.json() - assert "result" in result - assert result["result"] - - req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( - policy_id, object_id, object_cat_id, object_data_id))) - assert req.status_code == 200 - result = req.json() - assert "object_assignments" in result - assert result["object_assignments"] - for key in result["object_assignments"]: - assert "object_id" in result["object_assignments"][key] - assert "category_id" in result["object_assignments"][key] - assert "assignments" in result["object_assignments"][key] - if result["object_assignments"][key]['object_id'] == object_id and \ - result["object_assignments"][key]["category_id"] == object_cat_id: - assert object_data_id not in result["object_assignments"][key]["assignments"] - - -def delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id): - req = requests.delete(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( - policy_id, action_id, action_cat_id, action_data_id))) - assert req.status_code == 200 - result = req.json() - assert "result" in result - assert result["result"] - - req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( - policy_id, action_id, action_cat_id, action_data_id))) - assert req.status_code == 200 - result = req.json() - assert "action_assignments" in result - assert result["action_assignments"] - for key in result["action_assignments"]: - assert "action_id" in result["action_assignments"][key] - assert "category_id" in result["action_assignments"][key] - assert "assignments" in result["action_assignments"][key] - if result["action_assignments"][key]['action_id'] == action_id and \ - result["action_assignments"][key]["category_id"] == action_cat_id: - assert action_data_id not in result["action_assignments"][key]["assignments"] - - -def add_rule(policy_id, meta_rule_id, rule, instructions={"chain": [{"security_pipeline": "rbac"}]}): - req = requests.post(URL.format("/policies/{}/rules".format(policy_id)), - json={ - "meta_rule_id": meta_rule_id, - "rule": rule, - "instructions": instructions, - "enabled": True - }, - headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "rules" in result - try: - rule_id = list(result["rules"].keys())[0] - except Exception as e: - return False - assert "policy_id" in result["rules"][rule_id] - assert policy_id == result["rules"][rule_id]["policy_id"] - assert "meta_rule_id" in result["rules"][rule_id] - assert meta_rule_id == result["rules"][rule_id]["meta_rule_id"] - assert rule == result["rules"][rule_id]["rule"] - return rule_id - - -def check_rule(policy_id, meta_rule_id, rule_id, rule): - req = requests.get(URL.format("/policies/{}/rules".format(policy_id))) - assert req.status_code == 200 - result = req.json() - assert "rules" in result - assert "policy_id" in result["rules"] - assert policy_id == result["rules"]["policy_id"] - for item in result["rules"]["rules"]: - assert "meta_rule_id" in item - if meta_rule_id == item["meta_rule_id"]: - if rule_id == item["id"]: - assert rule == item["rule"] - - -def delete_rule(policy_id, rule_id): - req = requests.delete(URL.format("/policies/{}/rules/{}".format(policy_id, rule_id))) - assert req.status_code == 200 - result = req.json() - assert "result" in result - assert result["result"] - - req = requests.get(URL.format("/policies/{}/rules".format(policy_id))) - assert req.status_code == 200 - result = req.json() - assert "rules" in result - assert "policy_id" in result["rules"] - assert policy_id == result["rules"]["policy_id"] - found_rule = False - for item in result["rules"]["rules"]: - if rule_id == item["id"]: - found_rule = True - assert not found_rule |