aboutsummaryrefslogtreecommitdiffstats
path: root/python_moonutilities
diff options
context:
space:
mode:
Diffstat (limited to 'python_moonutilities')
-rw-r--r--python_moonutilities/Changelog16
-rw-r--r--python_moonutilities/Jenkinsfile10
-rw-r--r--python_moonutilities/python_moonutilities/__init__.py2
-rw-r--r--python_moonutilities/python_moonutilities/auth.py76
-rw-r--r--python_moonutilities/python_moonutilities/cache.py25
-rw-r--r--python_moonutilities/python_moonutilities/context.py91
-rw-r--r--python_moonutilities/python_moonutilities/exceptions.py96
-rw-r--r--python_moonutilities/python_moonutilities/security_functions.py111
-rw-r--r--python_moonutilities/tests/unit_python/test_validated_input.py191
9 files changed, 500 insertions, 118 deletions
diff --git a/python_moonutilities/Changelog b/python_moonutilities/Changelog
index ffc03809..ae7f352f 100644
--- a/python_moonutilities/Changelog
+++ b/python_moonutilities/Changelog
@@ -82,3 +82,19 @@ CHANGES
1.4.6
-----
- Add WrapperConflict, PipelineConflict, SlaveNameUnknown exceptions
+
+1.4.7
+-----
+- Delete the auth.py file to remove some code duplication
+
+1.4.8
+-----
+- Add SubjectScopeExisting, ObjectScopeExisting, ActionScopeExisting exceptions
+
+1.4.9
+-----
+- Add some exceptions when deletion of elements is impossible
+
+1.4.10
+-----
+- Add CategoryNameInvalid and PerimeterNameInvalid exceptions
diff --git a/python_moonutilities/Jenkinsfile b/python_moonutilities/Jenkinsfile
new file mode 100644
index 00000000..95939e9b
--- /dev/null
+++ b/python_moonutilities/Jenkinsfile
@@ -0,0 +1,10 @@
+pipeline {
+ agent { docker { image 'python:3.5.1' } }
+ stages {
+ stage('build') {
+ steps {
+ sh 'python --version'
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/python_moonutilities/python_moonutilities/__init__.py b/python_moonutilities/python_moonutilities/__init__.py
index 741ba4f6..6b30dedc 100644
--- a/python_moonutilities/python_moonutilities/__init__.py
+++ b/python_moonutilities/python_moonutilities/__init__.py
@@ -3,6 +3,6 @@
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-__version__ = "1.4.6"
+__version__ = "1.4.10"
diff --git a/python_moonutilities/python_moonutilities/auth.py b/python_moonutilities/python_moonutilities/auth.py
deleted file mode 100644
index 5f921d0b..00000000
--- a/python_moonutilities/python_moonutilities/auth.py
+++ /dev/null
@@ -1,76 +0,0 @@
-# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
-# This software is distributed under the terms and conditions of the 'Apache-2.0'
-# license which can be found in the file 'LICENSE' in this package distribution
-# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-
-import os
-import requests
-import time
-from functools import wraps
-from flask import request
-from oslo_log import log as logging
-from python_moonutilities import exceptions, configuration
-
-
-logger = logging.getLogger(__name__)
-KEYSTONE_CONFIG = configuration.get_configuration("openstack/keystone")["openstack/keystone"]
-TOKENS = {}
-
-
-def check_token(token, url=None):
- _verify = False
- if KEYSTONE_CONFIG['certificate']:
- _verify = KEYSTONE_CONFIG['certificate']
- try:
- os.environ.pop("http_proxy")
- os.environ.pop("https_proxy")
- except KeyError:
- pass
- if not url:
- url = KEYSTONE_CONFIG['url']
- headers = {
- "Content-Type": "application/json",
- 'X-Subject-Token': token,
- 'X-Auth-Token': token,
- }
- if KEYSTONE_CONFIG['check_token'].lower() in ("false", "no", "n"):
- # TODO (asteroide): must send the admin id
- return "admin" if not token else token
- if KEYSTONE_CONFIG['check_token'].lower() in ("yes", "y", "true"):
- if token in TOKENS:
- delta = time.mktime(TOKENS[token]["expires_at"]) - time.mktime(time.gmtime())
- if delta > 0:
- return TOKENS[token]["user"]
- raise exceptions.KeystoneError
- else:
- req = requests.get("{}/auth/tokens".format(url), headers=headers, verify=_verify)
- if req.status_code in (200, 201):
- # Note (asteroide): the time stamps is not in ISO 8601, so it is necessary to delete
- # characters after the dot
- token_time = req.json().get("token").get("expires_at").split(".")
- TOKENS[token] = dict()
- TOKENS[token]["expires_at"] = time.strptime(token_time[0], "%Y-%m-%dT%H:%M:%S")
- TOKENS[token]["user"] = req.json().get("token").get("user").get("id")
- return TOKENS[token]["user"]
- logger.error("{} - {}".format(req.status_code, req.text))
- raise exceptions.KeystoneError
- elif KEYSTONE_CONFIG['check_token'].lower() == "strict":
- req = requests.head("{}/auth/tokens".format(url), headers=headers, verify=_verify)
- if req.status_code in (200, 201):
- return token
- logger.error("{} - {}".format(req.status_code, req.text))
- raise exceptions.KeystoneError
- raise exceptions.KeystoneError
-
-
-def check_auth(function):
- @wraps(function)
- def wrapper(*args, **kwargs):
- token = request.headers.get('X-Auth-Token')
- token = check_token(token)
- if not token:
- raise exceptions.AuthException
- user_id = kwargs.pop("user_id", token)
- result = function(*args, **kwargs, user_id=user_id)
- return result
- return wrapper
diff --git a/python_moonutilities/python_moonutilities/cache.py b/python_moonutilities/python_moonutilities/cache.py
index 1ea59d3a..1bb9d09e 100644
--- a/python_moonutilities/python_moonutilities/cache.py
+++ b/python_moonutilities/python_moonutilities/cache.py
@@ -101,14 +101,14 @@ class Cache(object):
raise exceptions.PolicyUnknown("Cannot find policy within policy_id {}".format(policy_id))
if policy_id in self.subjects:
- for _subject_id, _subject_dict in self.__SUBJECTS[policy_id].items():
+ for _subject_id, _subject_dict in self.subjects[policy_id].items():
if "name" in _subject_dict and _subject_dict["name"] == name:
return _subject_id
self.__update_subjects(policy_id)
if policy_id in self.subjects:
- for _subject_id, _subject_dict in self.__SUBJECTS[policy_id].items():
+ for _subject_id, _subject_dict in self.subjects[policy_id].items():
if "name" in _subject_dict and _subject_dict["name"] == name:
return _subject_id
@@ -488,6 +488,20 @@ class Cache(object):
logger.warning("Cannot find 'security_pipeline' "
"key within pdp ")
+ def get_meta_rule_ids_from_pdp_value(self, pdp_value):
+ meta_rules = []
+ if "security_pipeline" in pdp_value:
+ for policy_id in pdp_value["security_pipeline"]:
+ if policy_id not in self.policies or "model_id" not in self.policies[policy_id]:
+ raise exceptions.PolicyUnknown("Cannot find 'models' key")
+ model_id = self.policies[policy_id]["model_id"]
+ if model_id not in self.models or 'meta_rules' not in self.models[model_id]:
+ raise exceptions.ModelNotFound("Cannot find 'models' key")
+ for meta_rule in self.models[model_id]["meta_rules"]:
+ meta_rules.append(meta_rule)
+ return meta_rules
+ raise exceptions.PdpContentError
+
def get_pdp_from_keystone_project(self, keystone_project_id):
for pdp_key, pdp_value in self.pdp.items():
if "keystone_project_id" in pdp_value and \
@@ -566,8 +580,8 @@ class Cache(object):
:return:
"""
if all(k in container_data for k in ("keystone_project_id", "name", "container_id", "policy_id",
- "meta_rule_id", "port")) \
- and all(k in container_data['port'] for k in ("PublicPort", "Type", "IP", "PrivatePort")):
+ "meta_rule_id", "port")) \
+ and all(k in container_data['port'] for k in ("PublicPort", "Type", "IP", "PrivatePort")):
self.__CONTAINERS[uuid4().hex] = {
"keystone_project_id": container_data['keystone_project_id'],
@@ -641,7 +655,7 @@ class Cache(object):
container_ids = []
for pdp_id, pdp_value, in self.__PDP.items():
if pdp_value:
- if all(k in pdp_value for k in ("keystone_project_id", "security_pipeline")) \
+ if all(k in pdp_value for k in ("keystone_project_id", "security_pipeline")) \
and pdp_value["keystone_project_id"] == keystone_project_id:
for policy_id in pdp_value["security_pipeline"]:
if policy_id in self.policies and "model_id" in self.policies[policy_id]:
@@ -677,4 +691,3 @@ class Cache(object):
"and may not contains 'model_id' key".format(policy_id))
self.__CONTAINER_CHAINING[keystone_project_id] = container_ids
-
diff --git a/python_moonutilities/python_moonutilities/context.py b/python_moonutilities/python_moonutilities/context.py
index 626b25dc..1d25cda2 100644
--- a/python_moonutilities/python_moonutilities/context.py
+++ b/python_moonutilities/python_moonutilities/context.py
@@ -14,39 +14,35 @@ logger = logging.getLogger("moon.utilities." + __name__)
class Context:
def __init__(self, init_context, cache):
+ if init_context is None:
+ raise Exception("Invalid context content object")
+
self.cache = cache
self.__keystone_project_id = init_context.get("project_id")
- self.__pdp_id = None
- self.__pdp_value = None
- for _pdp_key, _pdp_value in self.cache.pdp.items():
- if _pdp_value["keystone_project_id"] == self.__keystone_project_id:
- self.__pdp_id = _pdp_key
- self.__pdp_value = copy.deepcopy(_pdp_value)
- break
- if not self.__pdp_value:
+ self.__pdp_id = self.cache.get_pdp_from_keystone_project(self.__keystone_project_id)
+
+ if not self.__pdp_id:
raise exceptions.AuthzException(
"Cannot create context for authz "
"with Keystone project ID {}".format(
self.__keystone_project_id
- ))
+ ))
+ self.__pdp_value = copy.deepcopy(self.cache.pdp[self.__pdp_id])
+
self.__subject = init_context.get("subject_name")
self.__object = init_context.get("object_name")
self.__action = init_context.get("action_name")
- self.__current_request = None
self.__request_id = init_context.get("req_id")
self.__cookie = init_context.get("cookie")
self.__manager_url = init_context.get("manager_url")
self.__interface_name = init_context.get("interface_name")
+ self.__current_request = None
+
self.__index = -1
# self.__init_initial_request()
- self.__headers = []
- policies = self.cache.policies
- models = self.cache.models
- for policy_id in self.__pdp_value["security_pipeline"]:
- model_id = policies[policy_id]["model_id"]
- for meta_rule in models[model_id]["meta_rules"]:
- self.__headers.append(meta_rule)
+ self.__meta_rule_ids = self.cache.get_meta_rule_ids_from_pdp_value(self.__pdp_value)
self.__meta_rules = self.cache.meta_rules
+
self.__pdp_set = {}
# self.__init_pdp_set()
@@ -63,20 +59,25 @@ class Context:
@property
def current_state(self):
- return self.__pdp_set[self.__headers[self.__index]]['effect']
+ self.__validate_meta_rule_content(self.__meta_rule_ids[self.__index])
+ return self.__pdp_set[self.__meta_rule_ids[self.__index]]['effect']
@current_state.setter
def current_state(self, state):
if state not in ("grant", "deny", "passed"):
state = "passed"
- self.__pdp_set[self.__headers[self.__index]]['effect'] = state
+ self.__validate_meta_rule_content(self.__meta_rule_ids[self.__index])
+ self.__pdp_set[self.__meta_rule_ids[self.__index]]['effect'] = state
@current_state.deleter
def current_state(self):
- self.__pdp_set[self.__headers[self.__index]]['effect'] = "unset"
+ self.__validate_meta_rule_content(self.__meta_rule_ids[self.__index])
+ self.__pdp_set[self.__meta_rule_ids[self.__index]]['effect'] = "unset"
@property
def current_policy_id(self):
+ if "security_pipeline" not in self.__pdp_value:
+ raise exceptions.AuthzException('Cannot find security_pipeline key within pdp.')
return self.__pdp_value["security_pipeline"][self.__index]
@current_policy_id.setter
@@ -88,6 +89,8 @@ class Context:
pass
def __init_current_request(self):
+ if "security_pipeline" not in self.__pdp_value:
+ raise exceptions.PdpContentError
self.__subject = self.cache.get_subject(
self.__pdp_value["security_pipeline"][self.__index],
self.__subject)
@@ -100,11 +103,11 @@ class Context:
self.__current_request = dict(self.initial_request)
def __init_pdp_set(self):
- for header in self.__headers:
- self.__pdp_set[header] = dict()
- self.__pdp_set[header]["meta_rules"] = self.__meta_rules[header]
- self.__pdp_set[header]["target"] = self.__add_target(header)
- self.__pdp_set[header]["effect"] = "unset"
+ for meta_rule_id in self.__meta_rule_ids:
+ self.__pdp_set[meta_rule_id] = dict()
+ self.__pdp_set[meta_rule_id]["meta_rules"] = self.__meta_rules[meta_rule_id]
+ self.__pdp_set[meta_rule_id]["target"] = self.__add_target(meta_rule_id)
+ self.__pdp_set[meta_rule_id]["effect"] = "unset"
self.__pdp_set["effect"] = "deny"
# def update_target(self, context):
@@ -151,23 +154,37 @@ class Context:
_subject = self.__current_request["subject"]
_object = self.__current_request["object"]
_action = self.__current_request["action"]
+
meta_rules = self.cache.meta_rules
policy_id = self.cache.get_policy_from_meta_rules(meta_rule_id)
+
+ if 'subject_categories' not in meta_rules[meta_rule_id]:
+ raise exceptions.MetaRuleContentError(" 'subject_categories' key not found ")
+
for sub_cat in meta_rules[meta_rule_id]['subject_categories']:
if sub_cat not in result:
result[sub_cat] = []
result[sub_cat].extend(
self.cache.get_subject_assignments(policy_id, _subject, sub_cat))
+
+ if 'object_categories' not in meta_rules[meta_rule_id]:
+ raise exceptions.MetaRuleContentError(" 'object_categories' key not found ")
+
for obj_cat in meta_rules[meta_rule_id]['object_categories']:
if obj_cat not in result:
result[obj_cat] = []
result[obj_cat].extend(
self.cache.get_object_assignments(policy_id, _object, obj_cat))
+
+ if 'action_categories' not in meta_rules[meta_rule_id]:
+ raise exceptions.MetaRuleContentError(" 'action_categories' key not found ")
+
for act_cat in meta_rules[meta_rule_id]['action_categories']:
if act_cat not in result:
result[act_cat] = []
result[act_cat].extend(
self.cache.get_action_assignments(policy_id, _action, act_cat))
+
return result
def __repr__(self):
@@ -181,7 +198,7 @@ pdp_set: {pdp_set}
id=self.__pdp_id,
current_request=self.__current_request,
request_id=self.__request_id,
- headers=self.__headers,
+ headers=self.__meta_rule_ids,
pdp_set=self.__pdp_set,
index=self.__index
)
@@ -190,7 +207,7 @@ pdp_set: {pdp_set}
return {
"initial_request": copy.deepcopy(self.initial_request),
"current_request": copy.deepcopy(self.__current_request),
- "headers": copy.deepcopy(self.__headers),
+ "headers": copy.deepcopy(self.__meta_rule_ids),
"index": copy.deepcopy(self.__index),
"pdp_set": copy.deepcopy(self.__pdp_set),
"request_id": copy.deepcopy(self.__request_id),
@@ -265,11 +282,12 @@ pdp_set: {pdp_set}
@property
def current_request(self):
if not self.__current_request:
- self.__current_request = copy.deepcopy(self.initial_request)
+ self.__current_request = dict(self.initial_request)
return self.__current_request
@current_request.setter
def current_request(self, value):
+
self.__current_request = copy.deepcopy(value)
# Note (asteroide): if the current request is modified,
# we must update the PDP Set.
@@ -280,17 +298,22 @@ pdp_set: {pdp_set}
self.__current_request = {}
self.__pdp_set = {}
+ '''
+ [Note ] Refactor name of headers to meta_rule_ids done ,
+ may need to refactor getter and setter of headers
+ '''
+
@property
def headers(self):
- return self.__headers
+ return self.__meta_rule_ids
@headers.setter
- def headers(self, headers):
- self.__headers = headers
+ def headers(self, meta_rule_ids):
+ self.__meta_rule_ids = meta_rule_ids
@headers.deleter
def headers(self):
- self.__headers = list()
+ self.__meta_rule_ids = list()
@property
def index(self):
@@ -316,4 +339,6 @@ pdp_set: {pdp_set}
def pdp_set(self):
self.__pdp_set = {}
-
+ def __validate_meta_rule_content(self, meta_rules):
+ if 'effect' not in meta_rules:
+ raise exceptions.PdpContentError
diff --git a/python_moonutilities/python_moonutilities/exceptions.py b/python_moonutilities/python_moonutilities/exceptions.py
index 1298f9e4..a43ac89f 100644
--- a/python_moonutilities/python_moonutilities/exceptions.py
+++ b/python_moonutilities/python_moonutilities/exceptions.py
@@ -197,6 +197,11 @@ class AdminRule(AdminException):
code = 400
title = 'Rule Exception'
+class CategoryNameInvalid(AdminMetaData):
+ description = _("The given category name is invalid.")
+ code = 409
+ title = 'Category Name Invalid'
+ logger = "ERROR"
class SubjectCategoryNameExisting(AdminMetaData):
description = _("The given subject category name already exists.")
@@ -261,6 +266,12 @@ class ActionCategoryUnknown(AdminMetaData):
logger = "ERROR"
+class PerimeterNameInvalid(AdminPerimeter):
+ description = _("The given name is not valid.")
+ code = 400
+ title = 'Perimeter Name is Invalid'
+ logger = "ERROR"
+
class SubjectUnknown(AdminPerimeter):
description = _("The given subject is unknown.")
code = 400
@@ -282,6 +293,26 @@ class ActionUnknown(AdminPerimeter):
logger = "ERROR"
+class SubjectExisting(AdminPerimeter):
+ description = _("The given subject is existing.")
+ code = 409
+ title = 'Subject Existing'
+ logger = "ERROR"
+
+
+class ObjectExisting(AdminPerimeter):
+ description = _("The given object is existing.")
+ code = 409
+ title = 'Object Existing'
+ logger = "ERROR"
+
+
+class ActionExisting(AdminPerimeter):
+ description = _("The given action is existing.")
+ code = 409
+ title = 'Action Existing'
+ logger = "ERROR"
+
class SubjectNameExisting(AdminPerimeter):
description = _("The given subject name is existing.")
code = 400
@@ -338,6 +369,27 @@ class ActionScopeUnknown(AdminScope):
logger = "ERROR"
+class SubjectScopeExisting(AdminScope):
+ description = _("The given subject scope is existing.")
+ code = 409
+ title = 'Subject Scope Existing'
+ logger = "ERROR"
+
+
+class ObjectScopeExisting(AdminScope):
+ description = _("The given object scope is existing.")
+ code = 409
+ title = 'Object Scope Existing'
+ logger = "ERROR"
+
+
+class ActionScopeExisting(AdminScope):
+ description = _("The given action scope is existing.")
+ code = 409
+ title = 'Action Scope Existing'
+ logger = "ERROR"
+
+
class SubjectScopeNameExisting(AdminScope):
description = _("The given subject scope name is existing.")
code = 400
@@ -444,7 +496,7 @@ class MetaRuleExisting(AdminMetaRule):
class MetaRuleContentError(AdminMetaRule):
- description = _("Invalid content of pdp.")
+ description = _("Invalid content of meta rule.")
code = 400
title = 'Meta Rule Error'
logger = "ERROR"
@@ -610,3 +662,45 @@ class PolicyExisting(MoonError):
code = 409
title = 'Policy Error'
logger = "Error"
+
+
+class DeleteData(MoonError):
+ description = _("Cannot delete data with assignment")
+ code = 400
+ title = 'Data Error'
+ logger = "Error"
+
+
+class DeleteCategoryWithData(MoonError):
+ description = _("Cannot delete category with data")
+ code = 400
+ title = 'Category Error'
+ logger = "Error"
+
+
+class DeleteCategoryWithMetaRule(MoonError):
+ description = _("Cannot delete category with meta rule")
+ code = 400
+ title = 'Category Error'
+ logger = "Error"
+
+
+class DeleteModelWithPolicy(MoonError):
+ description = _("Cannot delete model with policy")
+ code = 400
+ title = 'Model Error'
+ logger = "Error"
+
+
+class DeletePolicyWithPdp(MoonError):
+ description = _("Cannot delete policy with pdp")
+ code = 400
+ title = 'Policy Error'
+ logger = "Error"
+
+
+class DeleteMetaRuleWithModel(MoonError):
+ description = _("Cannot delete meta rule with model")
+ code = 400
+ title = 'Meta rule Error'
+ logger = "Error"
diff --git a/python_moonutilities/python_moonutilities/security_functions.py b/python_moonutilities/python_moonutilities/security_functions.py
index 15cbc8be..5d5275ee 100644
--- a/python_moonutilities/python_moonutilities/security_functions.py
+++ b/python_moonutilities/python_moonutilities/security_functions.py
@@ -22,7 +22,6 @@ __targets = {}
def filter_input(func_or_str):
-
def __filter(string):
if string and type(string) is str:
return "".join(re.findall("[\w\- +]*", string))
@@ -82,15 +81,124 @@ def filter_input(func_or_str):
return None
+"""
+To do should check value of Dictionary but it's dependent on from where it's coming
+"""
+
+
+def validate_data(data):
+ def __validate_string(string):
+ if not string:
+ raise ValueError('Empty String')
+ '''
+ is it valid to contains space inbetween
+
+ '''
+
+ if " " in string:
+ raise ValueError('String contains space')
+
+ def __validate_list_or_tuple(container):
+ if not container:
+ raise ValueError('Empty Container')
+ for i in container:
+ validate_data(i)
+
+ def __validate_dict(dictionary):
+ if not dictionary:
+ raise ValueError('Empty Dictionary')
+ for key in dictionary:
+ validate_data(dictionary[key])
+
+ if isinstance(data, str):
+ __validate_string(data)
+ elif isinstance(data, list) or isinstance(data, tuple):
+ __validate_list_or_tuple(data)
+ elif isinstance(data, dict):
+ __validate_dict(data)
+ else:
+ raise ValueError('Value is Not String or Container or Dictionary')
+
+
+def validate_input(type='get', args_state=[], kwargs_state=[], body_state=[]):
+ """
+ this fucntion works only on List or tuple or dictionary of Strings ,and String direct
+ Check if input of function is Valid or not, Valid if not has spaces and values is not None or empty.
+
+ :param type: type of request if function is used as decorator
+ :param args_state: list of Booleans for args,
+ values must be order as target values of arguments,
+ True if None is not Allowed and False if is allowed
+ :param kwargs_state: list of Booleans for kwargs as order of input kwargs,
+ values must be order as target values of arguments,
+ True if None is not Allowed and False if is allowed
+ :param body_state: list of Booleans for arguments in body of request if request is post,
+ values must be order as target values of arguments,
+ True if None is not Allowed and False if is allowed
+ :return:
+ """
+
+ def validate_input_decorator(func):
+ def wrapped(*args, **kwargs):
+
+ temp_args = []
+ """
+ this loop made to filter args from object class,
+ when put this function as decorator in function control
+ then there is copy of this class add to front of args
+ """
+ for arg in args:
+ if isinstance(arg, str) == True or \
+ isinstance(arg, list) == True or \
+ isinstance(arg, dict) == True:
+ temp_args.append(arg)
+
+ while len(args_state) < len(temp_args):
+ args_state.append(True)
+
+ for i in range(0, len(temp_args)):
+ if args_state[i]:
+ validate_data(temp_args[i])
+
+ while len(kwargs_state) < len(kwargs):
+ kwargs_state.append(True)
+ counter = 0
+ for i in kwargs:
+ if kwargs_state[counter]:
+ validate_data({i: kwargs[i]})
+
+ counter = counter + 1
+
+ if type == "post" or type == "patch":
+ body = request.json
+ while len(body_state) < len(body):
+ body_state.append(True)
+ counter = 0
+ for i in body:
+ if body_state[counter]:
+ validate_data({i: body[i]})
+
+ counter = counter + 1
+
+ return func(*args, **kwargs)
+
+ return wrapped
+
+ return validate_input_decorator
+
+
def enforce(action_names, object_name, **extra):
"""Fake version of the enforce decorator"""
+
def wrapper_func(func):
def wrapper_args(*args, **kwargs):
# LOG.info("kwargs={}".format(kwargs))
# kwargs['user_id'] = kwargs.pop('user_id', "admin")
# LOG.info("Calling enforce on {} with args={} kwargs={}".format(func.__name__, args, kwargs))
return func(*args, **kwargs)
+
return wrapper_args
+
return wrapper_func
@@ -221,4 +329,5 @@ def check_auth(function):
user_id = kwargs.pop("user_id", token)
result = function(*args, **kwargs, user_id=user_id)
return result
+
return wrapper
diff --git a/python_moonutilities/tests/unit_python/test_validated_input.py b/python_moonutilities/tests/unit_python/test_validated_input.py
new file mode 100644
index 00000000..c8e681e9
--- /dev/null
+++ b/python_moonutilities/tests/unit_python/test_validated_input.py
@@ -0,0 +1,191 @@
+import pytest
+
+
+def test_valid_string():
+ from python_moonutilities.security_functions import validate_data
+ validate_data("CorrectString")
+
+def test_unvalid_string():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data("Notcorrect String")
+
+ assert str(exception_info.value) == 'String contains space'
+
+def test_empty_string():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data("")
+
+ assert str(exception_info.value) == 'Empty String'
+
+
+def test_none_value():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data(None)
+
+ assert str(exception_info.value) == 'Value is Not String or Container or Dictionary'
+
+
+def test_int_value():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data(1)
+
+ assert str(exception_info.value) == 'Value is Not String or Container or Dictionary'
+
+
+def test_float_value():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data(1.23)
+
+ assert str(exception_info.value) == 'Value is Not String or Container or Dictionary'
+
+
+def test_correct_list():
+ from python_moonutilities.security_functions import validate_data
+ validate_data(["skjdnfa","dao","daosdjpw"])
+
+
+def test_correct_list():
+ from python_moonutilities.security_functions import validate_data
+ validate_data(["skjdnfa"])
+
+
+def test_correct_instead_list():
+ from python_moonutilities.security_functions import validate_data
+ validate_data([["skjdnfa","daswi"],[["daskdlw"],["daklwo"]],["dawl","afioa"],["dawno"]])
+
+
+def test_empty_list():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data([])
+
+ assert str(exception_info.value) == 'Empty Container'
+
+
+def test_empty_list_inside_other_list():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data(["dajiwdj",[]])
+
+ assert str(exception_info.value) == 'Empty Container'
+
+
+def test_incorrect_string_inside_list():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data(["dajiwdj",["dakwe","daow awoepa"]])
+
+ assert str(exception_info.value) == 'String contains space'
+
+
+def test_empty_string_inside_list():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data(["dajiwdj", ["dakwe", ""]])
+
+ assert str(exception_info.value) == 'Empty String'
+
+
+def test_correct_tuples():
+ from python_moonutilities.security_functions import validate_data
+ validate_data(("dasdw","dawdwa"))
+
+
+def test_empty_tuples():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data(())
+
+ assert str(exception_info.value) == 'Empty Container'
+
+def test_correct_tuple_of_tuple():
+ from python_moonutilities.security_functions import validate_data
+ validate_data(("gjosjefa",("diwajdi","oejfoea"),(("jwdi","fjia"),("nfioa","ifao"))))
+
+
+def test_incorrect_tuple():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data(("djawo","dowa afw"))
+
+ assert str(exception_info.value) == 'String contains space'
+
+
+def test_correct_dictionary():
+ from python_moonutilities.security_functions import validate_data
+ validate_data({"daiwdw":"dwioajd"})
+
+
+def test_incorrect_dictionary():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data({"daiwdw":"dwioa jd"})
+
+ assert str(exception_info.value) == 'String contains space'
+
+def test_empty_dictionary():
+ from python_moonutilities.security_functions import validate_data
+ with pytest.raises(Exception) as exception_info:
+ validate_data({})
+
+ assert str(exception_info.value) == 'Empty Dictionary'
+
+
+def test_correct_function_pass():
+ from python_moonutilities.security_functions import validate_input
+
+ @validate_input()
+ def temp_function(string,list,tuple):
+ if string!="teststring" :
+ raise ValueError("values which passed incorrect")
+
+ temp_function("teststring",["teststring",["teststring"]],("teststring",("teststring")))
+
+def test_incorrect_function_pass1():
+ from python_moonutilities.security_functions import validate_input
+
+ @validate_input()
+ def temp_function(string, list, tuple):
+ if string != "teststring":
+ raise ValueError("values which passed incorrect")
+
+ with pytest.raises(Exception) as exception_info:
+ temp_function("teststring",list=["teststring", ["testst ring"]],tuple=("teststring", ("teststri ng")))
+
+ assert str(exception_info.value) == 'String contains space'
+
+
+def test_incorrect_function_pass2():
+ from python_moonutilities.security_functions import validate_input
+
+ @validate_input()
+ def temp_function(string, list, dictionary):
+ if string != "teststring":
+ raise ValueError("values which passed incorrect")
+
+ with pytest.raises(Exception) as exception_info:
+ temp_function("teststring", ["teststring", ["teststri ng"]], {"teststring": ("teststring")})
+
+ assert str(exception_info.value) == 'String contains space'
+
+
+def test_incorrect_function_pass3():
+ from python_moonutilities.security_functions import validate_input
+
+ class x:
+ @validate_input()
+ def temp_function(string, list, dictionary):
+ if string != "teststring":
+ raise ValueError("values which passed incorrect")
+
+ e=x;
+
+ with pytest.raises(Exception) as exception_info:
+ e.temp_function("teststring", ["teststring", ["teststri ng"]], {"teststring": ("teststring")})
+
+ assert str(exception_info.value) == 'String contains space'