diff options
Diffstat (limited to 'python_moonutilities')
-rw-r--r-- | python_moonutilities/Changelog | 16 | ||||
-rw-r--r-- | python_moonutilities/Jenkinsfile | 10 | ||||
-rw-r--r-- | python_moonutilities/python_moonutilities/__init__.py | 2 | ||||
-rw-r--r-- | python_moonutilities/python_moonutilities/auth.py | 76 | ||||
-rw-r--r-- | python_moonutilities/python_moonutilities/cache.py | 25 | ||||
-rw-r--r-- | python_moonutilities/python_moonutilities/context.py | 91 | ||||
-rw-r--r-- | python_moonutilities/python_moonutilities/exceptions.py | 96 | ||||
-rw-r--r-- | python_moonutilities/python_moonutilities/security_functions.py | 111 | ||||
-rw-r--r-- | python_moonutilities/tests/unit_python/test_validated_input.py | 191 |
9 files changed, 500 insertions, 118 deletions
diff --git a/python_moonutilities/Changelog b/python_moonutilities/Changelog index ffc03809..ae7f352f 100644 --- a/python_moonutilities/Changelog +++ b/python_moonutilities/Changelog @@ -82,3 +82,19 @@ CHANGES 1.4.6 ----- - Add WrapperConflict, PipelineConflict, SlaveNameUnknown exceptions + +1.4.7 +----- +- Delete the auth.py file to remove some code duplication + +1.4.8 +----- +- Add SubjectScopeExisting, ObjectScopeExisting, ActionScopeExisting exceptions + +1.4.9 +----- +- Add some exceptions when deletion of elements is impossible + +1.4.10 +----- +- Add CategoryNameInvalid and PerimeterNameInvalid exceptions diff --git a/python_moonutilities/Jenkinsfile b/python_moonutilities/Jenkinsfile new file mode 100644 index 00000000..95939e9b --- /dev/null +++ b/python_moonutilities/Jenkinsfile @@ -0,0 +1,10 @@ +pipeline { + agent { docker { image 'python:3.5.1' } } + stages { + stage('build') { + steps { + sh 'python --version' + } + } + } +}
\ No newline at end of file diff --git a/python_moonutilities/python_moonutilities/__init__.py b/python_moonutilities/python_moonutilities/__init__.py index 741ba4f6..6b30dedc 100644 --- a/python_moonutilities/python_moonutilities/__init__.py +++ b/python_moonutilities/python_moonutilities/__init__.py @@ -3,6 +3,6 @@ # license which can be found in the file 'LICENSE' in this package distribution # or at 'http://www.apache.org/licenses/LICENSE-2.0'. -__version__ = "1.4.6" +__version__ = "1.4.10" diff --git a/python_moonutilities/python_moonutilities/auth.py b/python_moonutilities/python_moonutilities/auth.py deleted file mode 100644 index 5f921d0b..00000000 --- a/python_moonutilities/python_moonutilities/auth.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors -# This software is distributed under the terms and conditions of the 'Apache-2.0' -# license which can be found in the file 'LICENSE' in this package distribution -# or at 'http://www.apache.org/licenses/LICENSE-2.0'. - -import os -import requests -import time -from functools import wraps -from flask import request -from oslo_log import log as logging -from python_moonutilities import exceptions, configuration - - -logger = logging.getLogger(__name__) -KEYSTONE_CONFIG = configuration.get_configuration("openstack/keystone")["openstack/keystone"] -TOKENS = {} - - -def check_token(token, url=None): - _verify = False - if KEYSTONE_CONFIG['certificate']: - _verify = KEYSTONE_CONFIG['certificate'] - try: - os.environ.pop("http_proxy") - os.environ.pop("https_proxy") - except KeyError: - pass - if not url: - url = KEYSTONE_CONFIG['url'] - headers = { - "Content-Type": "application/json", - 'X-Subject-Token': token, - 'X-Auth-Token': token, - } - if KEYSTONE_CONFIG['check_token'].lower() in ("false", "no", "n"): - # TODO (asteroide): must send the admin id - return "admin" if not token else token - if KEYSTONE_CONFIG['check_token'].lower() in ("yes", "y", "true"): - if token in TOKENS: - delta = time.mktime(TOKENS[token]["expires_at"]) - time.mktime(time.gmtime()) - if delta > 0: - return TOKENS[token]["user"] - raise exceptions.KeystoneError - else: - req = requests.get("{}/auth/tokens".format(url), headers=headers, verify=_verify) - if req.status_code in (200, 201): - # Note (asteroide): the time stamps is not in ISO 8601, so it is necessary to delete - # characters after the dot - token_time = req.json().get("token").get("expires_at").split(".") - TOKENS[token] = dict() - TOKENS[token]["expires_at"] = time.strptime(token_time[0], "%Y-%m-%dT%H:%M:%S") - TOKENS[token]["user"] = req.json().get("token").get("user").get("id") - return TOKENS[token]["user"] - logger.error("{} - {}".format(req.status_code, req.text)) - raise exceptions.KeystoneError - elif KEYSTONE_CONFIG['check_token'].lower() == "strict": - req = requests.head("{}/auth/tokens".format(url), headers=headers, verify=_verify) - if req.status_code in (200, 201): - return token - logger.error("{} - {}".format(req.status_code, req.text)) - raise exceptions.KeystoneError - raise exceptions.KeystoneError - - -def check_auth(function): - @wraps(function) - def wrapper(*args, **kwargs): - token = request.headers.get('X-Auth-Token') - token = check_token(token) - if not token: - raise exceptions.AuthException - user_id = kwargs.pop("user_id", token) - result = function(*args, **kwargs, user_id=user_id) - return result - return wrapper diff --git a/python_moonutilities/python_moonutilities/cache.py b/python_moonutilities/python_moonutilities/cache.py index 1ea59d3a..1bb9d09e 100644 --- a/python_moonutilities/python_moonutilities/cache.py +++ b/python_moonutilities/python_moonutilities/cache.py @@ -101,14 +101,14 @@ class Cache(object): raise exceptions.PolicyUnknown("Cannot find policy within policy_id {}".format(policy_id)) if policy_id in self.subjects: - for _subject_id, _subject_dict in self.__SUBJECTS[policy_id].items(): + for _subject_id, _subject_dict in self.subjects[policy_id].items(): if "name" in _subject_dict and _subject_dict["name"] == name: return _subject_id self.__update_subjects(policy_id) if policy_id in self.subjects: - for _subject_id, _subject_dict in self.__SUBJECTS[policy_id].items(): + for _subject_id, _subject_dict in self.subjects[policy_id].items(): if "name" in _subject_dict and _subject_dict["name"] == name: return _subject_id @@ -488,6 +488,20 @@ class Cache(object): logger.warning("Cannot find 'security_pipeline' " "key within pdp ") + def get_meta_rule_ids_from_pdp_value(self, pdp_value): + meta_rules = [] + if "security_pipeline" in pdp_value: + for policy_id in pdp_value["security_pipeline"]: + if policy_id not in self.policies or "model_id" not in self.policies[policy_id]: + raise exceptions.PolicyUnknown("Cannot find 'models' key") + model_id = self.policies[policy_id]["model_id"] + if model_id not in self.models or 'meta_rules' not in self.models[model_id]: + raise exceptions.ModelNotFound("Cannot find 'models' key") + for meta_rule in self.models[model_id]["meta_rules"]: + meta_rules.append(meta_rule) + return meta_rules + raise exceptions.PdpContentError + def get_pdp_from_keystone_project(self, keystone_project_id): for pdp_key, pdp_value in self.pdp.items(): if "keystone_project_id" in pdp_value and \ @@ -566,8 +580,8 @@ class Cache(object): :return: """ if all(k in container_data for k in ("keystone_project_id", "name", "container_id", "policy_id", - "meta_rule_id", "port")) \ - and all(k in container_data['port'] for k in ("PublicPort", "Type", "IP", "PrivatePort")): + "meta_rule_id", "port")) \ + and all(k in container_data['port'] for k in ("PublicPort", "Type", "IP", "PrivatePort")): self.__CONTAINERS[uuid4().hex] = { "keystone_project_id": container_data['keystone_project_id'], @@ -641,7 +655,7 @@ class Cache(object): container_ids = [] for pdp_id, pdp_value, in self.__PDP.items(): if pdp_value: - if all(k in pdp_value for k in ("keystone_project_id", "security_pipeline")) \ + if all(k in pdp_value for k in ("keystone_project_id", "security_pipeline")) \ and pdp_value["keystone_project_id"] == keystone_project_id: for policy_id in pdp_value["security_pipeline"]: if policy_id in self.policies and "model_id" in self.policies[policy_id]: @@ -677,4 +691,3 @@ class Cache(object): "and may not contains 'model_id' key".format(policy_id)) self.__CONTAINER_CHAINING[keystone_project_id] = container_ids - diff --git a/python_moonutilities/python_moonutilities/context.py b/python_moonutilities/python_moonutilities/context.py index 626b25dc..1d25cda2 100644 --- a/python_moonutilities/python_moonutilities/context.py +++ b/python_moonutilities/python_moonutilities/context.py @@ -14,39 +14,35 @@ logger = logging.getLogger("moon.utilities." + __name__) class Context: def __init__(self, init_context, cache): + if init_context is None: + raise Exception("Invalid context content object") + self.cache = cache self.__keystone_project_id = init_context.get("project_id") - self.__pdp_id = None - self.__pdp_value = None - for _pdp_key, _pdp_value in self.cache.pdp.items(): - if _pdp_value["keystone_project_id"] == self.__keystone_project_id: - self.__pdp_id = _pdp_key - self.__pdp_value = copy.deepcopy(_pdp_value) - break - if not self.__pdp_value: + self.__pdp_id = self.cache.get_pdp_from_keystone_project(self.__keystone_project_id) + + if not self.__pdp_id: raise exceptions.AuthzException( "Cannot create context for authz " "with Keystone project ID {}".format( self.__keystone_project_id - )) + )) + self.__pdp_value = copy.deepcopy(self.cache.pdp[self.__pdp_id]) + self.__subject = init_context.get("subject_name") self.__object = init_context.get("object_name") self.__action = init_context.get("action_name") - self.__current_request = None self.__request_id = init_context.get("req_id") self.__cookie = init_context.get("cookie") self.__manager_url = init_context.get("manager_url") self.__interface_name = init_context.get("interface_name") + self.__current_request = None + self.__index = -1 # self.__init_initial_request() - self.__headers = [] - policies = self.cache.policies - models = self.cache.models - for policy_id in self.__pdp_value["security_pipeline"]: - model_id = policies[policy_id]["model_id"] - for meta_rule in models[model_id]["meta_rules"]: - self.__headers.append(meta_rule) + self.__meta_rule_ids = self.cache.get_meta_rule_ids_from_pdp_value(self.__pdp_value) self.__meta_rules = self.cache.meta_rules + self.__pdp_set = {} # self.__init_pdp_set() @@ -63,20 +59,25 @@ class Context: @property def current_state(self): - return self.__pdp_set[self.__headers[self.__index]]['effect'] + self.__validate_meta_rule_content(self.__meta_rule_ids[self.__index]) + return self.__pdp_set[self.__meta_rule_ids[self.__index]]['effect'] @current_state.setter def current_state(self, state): if state not in ("grant", "deny", "passed"): state = "passed" - self.__pdp_set[self.__headers[self.__index]]['effect'] = state + self.__validate_meta_rule_content(self.__meta_rule_ids[self.__index]) + self.__pdp_set[self.__meta_rule_ids[self.__index]]['effect'] = state @current_state.deleter def current_state(self): - self.__pdp_set[self.__headers[self.__index]]['effect'] = "unset" + self.__validate_meta_rule_content(self.__meta_rule_ids[self.__index]) + self.__pdp_set[self.__meta_rule_ids[self.__index]]['effect'] = "unset" @property def current_policy_id(self): + if "security_pipeline" not in self.__pdp_value: + raise exceptions.AuthzException('Cannot find security_pipeline key within pdp.') return self.__pdp_value["security_pipeline"][self.__index] @current_policy_id.setter @@ -88,6 +89,8 @@ class Context: pass def __init_current_request(self): + if "security_pipeline" not in self.__pdp_value: + raise exceptions.PdpContentError self.__subject = self.cache.get_subject( self.__pdp_value["security_pipeline"][self.__index], self.__subject) @@ -100,11 +103,11 @@ class Context: self.__current_request = dict(self.initial_request) def __init_pdp_set(self): - for header in self.__headers: - self.__pdp_set[header] = dict() - self.__pdp_set[header]["meta_rules"] = self.__meta_rules[header] - self.__pdp_set[header]["target"] = self.__add_target(header) - self.__pdp_set[header]["effect"] = "unset" + for meta_rule_id in self.__meta_rule_ids: + self.__pdp_set[meta_rule_id] = dict() + self.__pdp_set[meta_rule_id]["meta_rules"] = self.__meta_rules[meta_rule_id] + self.__pdp_set[meta_rule_id]["target"] = self.__add_target(meta_rule_id) + self.__pdp_set[meta_rule_id]["effect"] = "unset" self.__pdp_set["effect"] = "deny" # def update_target(self, context): @@ -151,23 +154,37 @@ class Context: _subject = self.__current_request["subject"] _object = self.__current_request["object"] _action = self.__current_request["action"] + meta_rules = self.cache.meta_rules policy_id = self.cache.get_policy_from_meta_rules(meta_rule_id) + + if 'subject_categories' not in meta_rules[meta_rule_id]: + raise exceptions.MetaRuleContentError(" 'subject_categories' key not found ") + for sub_cat in meta_rules[meta_rule_id]['subject_categories']: if sub_cat not in result: result[sub_cat] = [] result[sub_cat].extend( self.cache.get_subject_assignments(policy_id, _subject, sub_cat)) + + if 'object_categories' not in meta_rules[meta_rule_id]: + raise exceptions.MetaRuleContentError(" 'object_categories' key not found ") + for obj_cat in meta_rules[meta_rule_id]['object_categories']: if obj_cat not in result: result[obj_cat] = [] result[obj_cat].extend( self.cache.get_object_assignments(policy_id, _object, obj_cat)) + + if 'action_categories' not in meta_rules[meta_rule_id]: + raise exceptions.MetaRuleContentError(" 'action_categories' key not found ") + for act_cat in meta_rules[meta_rule_id]['action_categories']: if act_cat not in result: result[act_cat] = [] result[act_cat].extend( self.cache.get_action_assignments(policy_id, _action, act_cat)) + return result def __repr__(self): @@ -181,7 +198,7 @@ pdp_set: {pdp_set} id=self.__pdp_id, current_request=self.__current_request, request_id=self.__request_id, - headers=self.__headers, + headers=self.__meta_rule_ids, pdp_set=self.__pdp_set, index=self.__index ) @@ -190,7 +207,7 @@ pdp_set: {pdp_set} return { "initial_request": copy.deepcopy(self.initial_request), "current_request": copy.deepcopy(self.__current_request), - "headers": copy.deepcopy(self.__headers), + "headers": copy.deepcopy(self.__meta_rule_ids), "index": copy.deepcopy(self.__index), "pdp_set": copy.deepcopy(self.__pdp_set), "request_id": copy.deepcopy(self.__request_id), @@ -265,11 +282,12 @@ pdp_set: {pdp_set} @property def current_request(self): if not self.__current_request: - self.__current_request = copy.deepcopy(self.initial_request) + self.__current_request = dict(self.initial_request) return self.__current_request @current_request.setter def current_request(self, value): + self.__current_request = copy.deepcopy(value) # Note (asteroide): if the current request is modified, # we must update the PDP Set. @@ -280,17 +298,22 @@ pdp_set: {pdp_set} self.__current_request = {} self.__pdp_set = {} + ''' + [Note ] Refactor name of headers to meta_rule_ids done , + may need to refactor getter and setter of headers + ''' + @property def headers(self): - return self.__headers + return self.__meta_rule_ids @headers.setter - def headers(self, headers): - self.__headers = headers + def headers(self, meta_rule_ids): + self.__meta_rule_ids = meta_rule_ids @headers.deleter def headers(self): - self.__headers = list() + self.__meta_rule_ids = list() @property def index(self): @@ -316,4 +339,6 @@ pdp_set: {pdp_set} def pdp_set(self): self.__pdp_set = {} - + def __validate_meta_rule_content(self, meta_rules): + if 'effect' not in meta_rules: + raise exceptions.PdpContentError diff --git a/python_moonutilities/python_moonutilities/exceptions.py b/python_moonutilities/python_moonutilities/exceptions.py index 1298f9e4..a43ac89f 100644 --- a/python_moonutilities/python_moonutilities/exceptions.py +++ b/python_moonutilities/python_moonutilities/exceptions.py @@ -197,6 +197,11 @@ class AdminRule(AdminException): code = 400 title = 'Rule Exception' +class CategoryNameInvalid(AdminMetaData): + description = _("The given category name is invalid.") + code = 409 + title = 'Category Name Invalid' + logger = "ERROR" class SubjectCategoryNameExisting(AdminMetaData): description = _("The given subject category name already exists.") @@ -261,6 +266,12 @@ class ActionCategoryUnknown(AdminMetaData): logger = "ERROR" +class PerimeterNameInvalid(AdminPerimeter): + description = _("The given name is not valid.") + code = 400 + title = 'Perimeter Name is Invalid' + logger = "ERROR" + class SubjectUnknown(AdminPerimeter): description = _("The given subject is unknown.") code = 400 @@ -282,6 +293,26 @@ class ActionUnknown(AdminPerimeter): logger = "ERROR" +class SubjectExisting(AdminPerimeter): + description = _("The given subject is existing.") + code = 409 + title = 'Subject Existing' + logger = "ERROR" + + +class ObjectExisting(AdminPerimeter): + description = _("The given object is existing.") + code = 409 + title = 'Object Existing' + logger = "ERROR" + + +class ActionExisting(AdminPerimeter): + description = _("The given action is existing.") + code = 409 + title = 'Action Existing' + logger = "ERROR" + class SubjectNameExisting(AdminPerimeter): description = _("The given subject name is existing.") code = 400 @@ -338,6 +369,27 @@ class ActionScopeUnknown(AdminScope): logger = "ERROR" +class SubjectScopeExisting(AdminScope): + description = _("The given subject scope is existing.") + code = 409 + title = 'Subject Scope Existing' + logger = "ERROR" + + +class ObjectScopeExisting(AdminScope): + description = _("The given object scope is existing.") + code = 409 + title = 'Object Scope Existing' + logger = "ERROR" + + +class ActionScopeExisting(AdminScope): + description = _("The given action scope is existing.") + code = 409 + title = 'Action Scope Existing' + logger = "ERROR" + + class SubjectScopeNameExisting(AdminScope): description = _("The given subject scope name is existing.") code = 400 @@ -444,7 +496,7 @@ class MetaRuleExisting(AdminMetaRule): class MetaRuleContentError(AdminMetaRule): - description = _("Invalid content of pdp.") + description = _("Invalid content of meta rule.") code = 400 title = 'Meta Rule Error' logger = "ERROR" @@ -610,3 +662,45 @@ class PolicyExisting(MoonError): code = 409 title = 'Policy Error' logger = "Error" + + +class DeleteData(MoonError): + description = _("Cannot delete data with assignment") + code = 400 + title = 'Data Error' + logger = "Error" + + +class DeleteCategoryWithData(MoonError): + description = _("Cannot delete category with data") + code = 400 + title = 'Category Error' + logger = "Error" + + +class DeleteCategoryWithMetaRule(MoonError): + description = _("Cannot delete category with meta rule") + code = 400 + title = 'Category Error' + logger = "Error" + + +class DeleteModelWithPolicy(MoonError): + description = _("Cannot delete model with policy") + code = 400 + title = 'Model Error' + logger = "Error" + + +class DeletePolicyWithPdp(MoonError): + description = _("Cannot delete policy with pdp") + code = 400 + title = 'Policy Error' + logger = "Error" + + +class DeleteMetaRuleWithModel(MoonError): + description = _("Cannot delete meta rule with model") + code = 400 + title = 'Meta rule Error' + logger = "Error" diff --git a/python_moonutilities/python_moonutilities/security_functions.py b/python_moonutilities/python_moonutilities/security_functions.py index 15cbc8be..5d5275ee 100644 --- a/python_moonutilities/python_moonutilities/security_functions.py +++ b/python_moonutilities/python_moonutilities/security_functions.py @@ -22,7 +22,6 @@ __targets = {} def filter_input(func_or_str): - def __filter(string): if string and type(string) is str: return "".join(re.findall("[\w\- +]*", string)) @@ -82,15 +81,124 @@ def filter_input(func_or_str): return None +""" +To do should check value of Dictionary but it's dependent on from where it's coming +""" + + +def validate_data(data): + def __validate_string(string): + if not string: + raise ValueError('Empty String') + ''' + is it valid to contains space inbetween + + ''' + + if " " in string: + raise ValueError('String contains space') + + def __validate_list_or_tuple(container): + if not container: + raise ValueError('Empty Container') + for i in container: + validate_data(i) + + def __validate_dict(dictionary): + if not dictionary: + raise ValueError('Empty Dictionary') + for key in dictionary: + validate_data(dictionary[key]) + + if isinstance(data, str): + __validate_string(data) + elif isinstance(data, list) or isinstance(data, tuple): + __validate_list_or_tuple(data) + elif isinstance(data, dict): + __validate_dict(data) + else: + raise ValueError('Value is Not String or Container or Dictionary') + + +def validate_input(type='get', args_state=[], kwargs_state=[], body_state=[]): + """ + this fucntion works only on List or tuple or dictionary of Strings ,and String direct + Check if input of function is Valid or not, Valid if not has spaces and values is not None or empty. + + :param type: type of request if function is used as decorator + :param args_state: list of Booleans for args, + values must be order as target values of arguments, + True if None is not Allowed and False if is allowed + :param kwargs_state: list of Booleans for kwargs as order of input kwargs, + values must be order as target values of arguments, + True if None is not Allowed and False if is allowed + :param body_state: list of Booleans for arguments in body of request if request is post, + values must be order as target values of arguments, + True if None is not Allowed and False if is allowed + :return: + """ + + def validate_input_decorator(func): + def wrapped(*args, **kwargs): + + temp_args = [] + """ + this loop made to filter args from object class, + when put this function as decorator in function control + then there is copy of this class add to front of args + """ + for arg in args: + if isinstance(arg, str) == True or \ + isinstance(arg, list) == True or \ + isinstance(arg, dict) == True: + temp_args.append(arg) + + while len(args_state) < len(temp_args): + args_state.append(True) + + for i in range(0, len(temp_args)): + if args_state[i]: + validate_data(temp_args[i]) + + while len(kwargs_state) < len(kwargs): + kwargs_state.append(True) + counter = 0 + for i in kwargs: + if kwargs_state[counter]: + validate_data({i: kwargs[i]}) + + counter = counter + 1 + + if type == "post" or type == "patch": + body = request.json + while len(body_state) < len(body): + body_state.append(True) + counter = 0 + for i in body: + if body_state[counter]: + validate_data({i: body[i]}) + + counter = counter + 1 + + return func(*args, **kwargs) + + return wrapped + + return validate_input_decorator + + def enforce(action_names, object_name, **extra): """Fake version of the enforce decorator""" + def wrapper_func(func): def wrapper_args(*args, **kwargs): # LOG.info("kwargs={}".format(kwargs)) # kwargs['user_id'] = kwargs.pop('user_id', "admin") # LOG.info("Calling enforce on {} with args={} kwargs={}".format(func.__name__, args, kwargs)) return func(*args, **kwargs) + return wrapper_args + return wrapper_func @@ -221,4 +329,5 @@ def check_auth(function): user_id = kwargs.pop("user_id", token) result = function(*args, **kwargs, user_id=user_id) return result + return wrapper diff --git a/python_moonutilities/tests/unit_python/test_validated_input.py b/python_moonutilities/tests/unit_python/test_validated_input.py new file mode 100644 index 00000000..c8e681e9 --- /dev/null +++ b/python_moonutilities/tests/unit_python/test_validated_input.py @@ -0,0 +1,191 @@ +import pytest + + +def test_valid_string(): + from python_moonutilities.security_functions import validate_data + validate_data("CorrectString") + +def test_unvalid_string(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data("Notcorrect String") + + assert str(exception_info.value) == 'String contains space' + +def test_empty_string(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data("") + + assert str(exception_info.value) == 'Empty String' + + +def test_none_value(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data(None) + + assert str(exception_info.value) == 'Value is Not String or Container or Dictionary' + + +def test_int_value(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data(1) + + assert str(exception_info.value) == 'Value is Not String or Container or Dictionary' + + +def test_float_value(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data(1.23) + + assert str(exception_info.value) == 'Value is Not String or Container or Dictionary' + + +def test_correct_list(): + from python_moonutilities.security_functions import validate_data + validate_data(["skjdnfa","dao","daosdjpw"]) + + +def test_correct_list(): + from python_moonutilities.security_functions import validate_data + validate_data(["skjdnfa"]) + + +def test_correct_instead_list(): + from python_moonutilities.security_functions import validate_data + validate_data([["skjdnfa","daswi"],[["daskdlw"],["daklwo"]],["dawl","afioa"],["dawno"]]) + + +def test_empty_list(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data([]) + + assert str(exception_info.value) == 'Empty Container' + + +def test_empty_list_inside_other_list(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data(["dajiwdj",[]]) + + assert str(exception_info.value) == 'Empty Container' + + +def test_incorrect_string_inside_list(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data(["dajiwdj",["dakwe","daow awoepa"]]) + + assert str(exception_info.value) == 'String contains space' + + +def test_empty_string_inside_list(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data(["dajiwdj", ["dakwe", ""]]) + + assert str(exception_info.value) == 'Empty String' + + +def test_correct_tuples(): + from python_moonutilities.security_functions import validate_data + validate_data(("dasdw","dawdwa")) + + +def test_empty_tuples(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data(()) + + assert str(exception_info.value) == 'Empty Container' + +def test_correct_tuple_of_tuple(): + from python_moonutilities.security_functions import validate_data + validate_data(("gjosjefa",("diwajdi","oejfoea"),(("jwdi","fjia"),("nfioa","ifao")))) + + +def test_incorrect_tuple(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data(("djawo","dowa afw")) + + assert str(exception_info.value) == 'String contains space' + + +def test_correct_dictionary(): + from python_moonutilities.security_functions import validate_data + validate_data({"daiwdw":"dwioajd"}) + + +def test_incorrect_dictionary(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data({"daiwdw":"dwioa jd"}) + + assert str(exception_info.value) == 'String contains space' + +def test_empty_dictionary(): + from python_moonutilities.security_functions import validate_data + with pytest.raises(Exception) as exception_info: + validate_data({}) + + assert str(exception_info.value) == 'Empty Dictionary' + + +def test_correct_function_pass(): + from python_moonutilities.security_functions import validate_input + + @validate_input() + def temp_function(string,list,tuple): + if string!="teststring" : + raise ValueError("values which passed incorrect") + + temp_function("teststring",["teststring",["teststring"]],("teststring",("teststring"))) + +def test_incorrect_function_pass1(): + from python_moonutilities.security_functions import validate_input + + @validate_input() + def temp_function(string, list, tuple): + if string != "teststring": + raise ValueError("values which passed incorrect") + + with pytest.raises(Exception) as exception_info: + temp_function("teststring",list=["teststring", ["testst ring"]],tuple=("teststring", ("teststri ng"))) + + assert str(exception_info.value) == 'String contains space' + + +def test_incorrect_function_pass2(): + from python_moonutilities.security_functions import validate_input + + @validate_input() + def temp_function(string, list, dictionary): + if string != "teststring": + raise ValueError("values which passed incorrect") + + with pytest.raises(Exception) as exception_info: + temp_function("teststring", ["teststring", ["teststri ng"]], {"teststring": ("teststring")}) + + assert str(exception_info.value) == 'String contains space' + + +def test_incorrect_function_pass3(): + from python_moonutilities.security_functions import validate_input + + class x: + @validate_input() + def temp_function(string, list, dictionary): + if string != "teststring": + raise ValueError("values which passed incorrect") + + e=x; + + with pytest.raises(Exception) as exception_info: + e.temp_function("teststring", ["teststring", ["teststri ng"]], {"teststring": ("teststring")}) + + assert str(exception_info.value) == 'String contains space' |