aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_validation.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_validation.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_validation.py352
1 files changed, 295 insertions, 57 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_validation.py b/keystone-moon/keystone/tests/unit/test_validation.py
index f7a224a0..73cb6ef6 100644
--- a/keystone-moon/keystone/tests/unit/test_validation.py
+++ b/keystone-moon/keystone/tests/unit/test_validation.py
@@ -21,11 +21,11 @@ from keystone.catalog import schema as catalog_schema
from keystone.common import validation
from keystone.common.validation import parameter_types
from keystone.common.validation import validators
-from keystone.contrib.endpoint_filter import schema as endpoint_filter_schema
-from keystone.contrib.federation import schema as federation_schema
from keystone.credential import schema as credential_schema
from keystone import exception
+from keystone.federation import schema as federation_schema
from keystone.identity import schema as identity_schema
+from keystone.oauth1 import schema as oauth1_schema
from keystone.policy import schema as policy_schema
from keystone.resource import schema as resource_schema
from keystone.tests import unit
@@ -67,6 +67,12 @@ entity_create = {
'additionalProperties': True,
}
+entity_create_optional_body = {
+ 'type': 'object',
+ 'properties': _entity_properties,
+ 'additionalProperties': True,
+}
+
entity_update = {
'type': 'object',
'properties': _entity_properties,
@@ -78,6 +84,8 @@ _VALID_ENABLED_FORMATS = [True, False]
_INVALID_ENABLED_FORMATS = ['some string', 1, 0, 'True', 'False']
+_INVALID_DESC_FORMATS = [False, 1, 2.0]
+
_VALID_URLS = ['https://example.com', 'http://EXAMPLE.com/v3',
'http://localhost', 'http://127.0.0.1:5000',
'http://1.1.1.1', 'http://255.255.255.255',
@@ -90,7 +98,7 @@ _VALID_URLS = ['https://example.com', 'http://EXAMPLE.com/v3',
_INVALID_URLS = [False, 'this is not a URL', 1234, 'www.example.com',
'localhost', 'http//something.com',
- 'https//something.com']
+ 'https//something.com', ' http://example.com']
_VALID_FILTERS = [{'interface': 'admin'},
{'region': 'US-WEST',
@@ -99,6 +107,17 @@ _VALID_FILTERS = [{'interface': 'admin'},
_INVALID_FILTERS = ['some string', 1, 0, True, False]
+def expected_validation_failure(msg):
+ def wrapper(f):
+ def wrapped(self, *args, **kwargs):
+ args = (self,) + args
+ e = self.assertRaises(exception.ValidationError, f,
+ *args, **kwargs)
+ self.assertIn(msg, six.text_type(e))
+ return wrapped
+ return wrapper
+
+
class ValidatedDecoratorTests(unit.BaseTestCase):
entity_schema = {
@@ -113,42 +132,51 @@ class ValidatedDecoratorTests(unit.BaseTestCase):
'name': uuid.uuid4().hex,
}
- invalid_entity = {}
-
- @validation.validated(entity_schema, 'entity')
- def do_something(self, entity):
- pass
+ invalid_entity = {
+ 'name': 1.0, # NOTE(dstanek): this is the incorrect type for name
+ }
@validation.validated(entity_create, 'entity')
def create_entity(self, entity):
- pass
+ """Used to test cases where validated param is the only param."""
+
+ @validation.validated(entity_create_optional_body, 'entity')
+ def create_entity_optional_body(self, entity):
+ """Used to test cases where there is an optional body."""
@validation.validated(entity_update, 'entity')
def update_entity(self, entity_id, entity):
- pass
+ """Used to test cases where validated param is not the only param."""
- def _assert_call_entity_method_fails(self, method, *args, **kwargs):
- e = self.assertRaises(exception.ValidationError, method,
- *args, **kwargs)
+ def test_calling_create_with_valid_entity_kwarg_succeeds(self):
+ self.create_entity(entity=self.valid_entity)
- self.assertIn('Expecting to find entity in request body',
- six.text_type(e))
+ def test_calling_create_with_empty_entity_kwarg_succeeds(self):
+ """Test the case when client passing in an empty kwarg reference."""
+ self.create_entity_optional_body(entity={})
- def test_calling_with_valid_entity_kwarg_succeeds(self):
- self.do_something(entity=self.valid_entity)
+ @expected_validation_failure('Expecting to find entity in request body')
+ def test_calling_create_with_kwarg_as_None_fails(self):
+ self.create_entity(entity=None)
- def test_calling_with_invalid_entity_kwarg_fails(self):
- self.assertRaises(exception.ValidationError,
- self.do_something,
- entity=self.invalid_entity)
+ def test_calling_create_with_valid_entity_arg_succeeds(self):
+ self.create_entity(self.valid_entity)
- def test_calling_with_valid_entity_arg_succeeds(self):
- self.do_something(self.valid_entity)
+ def test_calling_create_with_empty_entity_arg_succeeds(self):
+ """Test the case when client passing in an empty entity reference."""
+ self.create_entity_optional_body({})
- def test_calling_with_invalid_entity_arg_fails(self):
- self.assertRaises(exception.ValidationError,
- self.do_something,
- self.invalid_entity)
+ @expected_validation_failure("Invalid input for field 'name'")
+ def test_calling_create_with_invalid_entity_fails(self):
+ self.create_entity(self.invalid_entity)
+
+ @expected_validation_failure('Expecting to find entity in request body')
+ def test_calling_create_with_entity_arg_as_None_fails(self):
+ self.create_entity(None)
+
+ @expected_validation_failure('Expecting to find entity in request body')
+ def test_calling_create_without_an_entity_fails(self):
+ self.create_entity()
def test_using_the_wrong_name_with_the_decorator_fails(self):
with testtools.ExpectedException(TypeError):
@@ -156,24 +184,26 @@ class ValidatedDecoratorTests(unit.BaseTestCase):
def function(entity):
pass
- def test_create_entity_no_request_body_with_decorator(self):
- """Test the case when request body is not provided."""
- self._assert_call_entity_method_fails(self.create_entity)
+ # NOTE(dstanek): below are the test cases for making sure the validation
+ # works when the validated param is not the only param. Since all of the
+ # actual validation cases are tested above these test are for a sanity
+ # check.
- def test_create_entity_empty_request_body_with_decorator(self):
- """Test the case when client passing in an empty entity reference."""
- self._assert_call_entity_method_fails(self.create_entity, entity={})
+ def test_calling_update_with_valid_entity_succeeds(self):
+ self.update_entity(uuid.uuid4().hex, self.valid_entity)
- def test_update_entity_no_request_body_with_decorator(self):
- """Test the case when request body is not provided."""
- self._assert_call_entity_method_fails(self.update_entity,
- uuid.uuid4().hex)
+ @expected_validation_failure("Invalid input for field 'name'")
+ def test_calling_update_with_invalid_entity_fails(self):
+ self.update_entity(uuid.uuid4().hex, self.invalid_entity)
- def test_update_entity_empty_request_body_with_decorator(self):
+ def test_calling_update_with_empty_entity_kwarg_succeeds(self):
"""Test the case when client passing in an empty entity reference."""
- self._assert_call_entity_method_fails(self.update_entity,
- uuid.uuid4().hex,
- entity={})
+ global entity_update
+ original_entity_update = entity_update.copy()
+ # pop 'minProperties' from schema so that empty body is allowed.
+ entity_update.pop('minProperties')
+ self.update_entity(uuid.uuid4().hex, entity={})
+ entity_update = original_entity_update
class EntityValidationTestCase(unit.BaseTestCase):
@@ -499,11 +529,22 @@ class ProjectValidationTestCase(unit.BaseTestCase):
self.update_project_validator.validate,
request_to_validate)
- def test_validate_project_update_request_with_null_domain_id_fails(self):
- request_to_validate = {'domain_id': None}
- self.assertRaises(exception.SchemaValidationError,
- self.update_project_validator.validate,
- request_to_validate)
+ def test_validate_project_create_request_with_valid_domain_id(self):
+ """Test that we validate `domain_id` in create project requests."""
+ # domain_id is nullable
+ for domain_id in [None, uuid.uuid4().hex]:
+ request_to_validate = {'name': self.project_name,
+ 'domain_id': domain_id}
+ self.create_project_validator.validate(request_to_validate)
+
+ def test_validate_project_request_with_invalid_domain_id_fails(self):
+ """Exception is raised when `domain_id` is a non-id value."""
+ for domain_id in [False, 'fake_project']:
+ request_to_validate = {'name': self.project_name,
+ 'domain_id': domain_id}
+ self.assertRaises(exception.SchemaValidationError,
+ self.create_project_validator.validate,
+ request_to_validate)
class DomainValidationTestCase(unit.BaseTestCase):
@@ -897,6 +938,11 @@ class RegionValidationTestCase(unit.BaseTestCase):
request_to_validate = {'other_attr': uuid.uuid4().hex}
self.create_region_validator.validate(request_to_validate)
+ def test_validate_region_create_succeeds_with_no_parameters(self):
+ """Validate create region request with no parameters."""
+ request_to_validate = {}
+ self.create_region_validator.validate(request_to_validate)
+
def test_validate_region_update_succeeds(self):
"""Test that we validate a region update request."""
request_to_validate = {'id': 'us-west',
@@ -1298,8 +1344,8 @@ class EndpointGroupValidationTestCase(unit.BaseTestCase):
def setUp(self):
super(EndpointGroupValidationTestCase, self).setUp()
- create = endpoint_filter_schema.endpoint_group_create
- update = endpoint_filter_schema.endpoint_group_update
+ create = catalog_schema.endpoint_group_create
+ update = catalog_schema.endpoint_group_update
self.create_endpoint_grp_validator = validators.SchemaValidator(create)
self.update_endpoint_grp_validator = validators.SchemaValidator(update)
@@ -1321,8 +1367,7 @@ class EndpointGroupValidationTestCase(unit.BaseTestCase):
self.create_endpoint_grp_validator.validate(request_to_validate)
def test_validate_endpoint_group_create_succeeds_with_valid_filters(self):
- """Validate dict values as `filters` in endpoint group create requests.
- """
+ """Validate `filters` in endpoint group create requests."""
request_to_validate = {'description': 'endpoint group description',
'name': 'endpoint_group_name'}
for valid_filters in _VALID_FILTERS:
@@ -1718,13 +1763,8 @@ class UserValidationTestCase(unit.BaseTestCase):
def test_validate_user_create_with_all_valid_parameters_succeeds(self):
"""Test that validating a user create request succeeds."""
- request_to_validate = {'name': self.user_name,
- 'default_project_id': uuid.uuid4().hex,
- 'domain_id': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- 'enabled': True,
- 'email': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex}
+ request_to_validate = unit.new_user_ref(domain_id=uuid.uuid4().hex,
+ name=self.user_name)
self.create_user_validator.validate(request_to_validate)
def test_validate_user_create_fails_without_name(self):
@@ -1875,3 +1915,201 @@ class GroupValidationTestCase(unit.BaseTestCase):
"""Validate group update requests with extra parameters."""
request_to_validate = {'other_attr': uuid.uuid4().hex}
self.update_group_validator.validate(request_to_validate)
+
+
+class IdentityProviderValidationTestCase(unit.BaseTestCase):
+ """Test for V3 Identity Provider API validation."""
+
+ def setUp(self):
+ super(IdentityProviderValidationTestCase, self).setUp()
+
+ create = federation_schema.identity_provider_create
+ update = federation_schema.identity_provider_update
+ self.create_idp_validator = validators.SchemaValidator(create)
+ self.update_idp_validator = validators.SchemaValidator(update)
+
+ def test_validate_idp_request_succeeds(self):
+ """Test that we validate an identity provider request."""
+ request_to_validate = {'description': 'identity provider description',
+ 'enabled': True,
+ 'remote_ids': [uuid.uuid4().hex,
+ uuid.uuid4().hex]}
+ self.create_idp_validator.validate(request_to_validate)
+ self.update_idp_validator.validate(request_to_validate)
+
+ def test_validate_idp_request_fails_with_invalid_params(self):
+ """Exception raised when unknown parameter is found."""
+ request_to_validate = {'bogus': uuid.uuid4().hex}
+ self.assertRaises(exception.SchemaValidationError,
+ self.create_idp_validator.validate,
+ request_to_validate)
+
+ self.assertRaises(exception.SchemaValidationError,
+ self.update_idp_validator.validate,
+ request_to_validate)
+
+ def test_validate_idp_request_with_enabled(self):
+ """Validate `enabled` as boolean-like values."""
+ for valid_enabled in _VALID_ENABLED_FORMATS:
+ request_to_validate = {'enabled': valid_enabled}
+ self.create_idp_validator.validate(request_to_validate)
+ self.update_idp_validator.validate(request_to_validate)
+
+ def test_validate_idp_request_with_invalid_enabled_fails(self):
+ """Exception is raised when `enabled` isn't a boolean-like value."""
+ for invalid_enabled in _INVALID_ENABLED_FORMATS:
+ request_to_validate = {'enabled': invalid_enabled}
+ self.assertRaises(exception.SchemaValidationError,
+ self.create_idp_validator.validate,
+ request_to_validate)
+
+ self.assertRaises(exception.SchemaValidationError,
+ self.update_idp_validator.validate,
+ request_to_validate)
+
+ def test_validate_idp_request_no_parameters(self):
+ """Test that schema validation with empty request body."""
+ request_to_validate = {}
+ self.create_idp_validator.validate(request_to_validate)
+
+ # Exception raised when no property on IdP update.
+ self.assertRaises(exception.SchemaValidationError,
+ self.update_idp_validator.validate,
+ request_to_validate)
+
+ def test_validate_idp_request_with_invalid_description_fails(self):
+ """Exception is raised when `description` as a non-string value."""
+ request_to_validate = {'description': False}
+ self.assertRaises(exception.SchemaValidationError,
+ self.create_idp_validator.validate,
+ request_to_validate)
+
+ self.assertRaises(exception.SchemaValidationError,
+ self.update_idp_validator.validate,
+ request_to_validate)
+
+ def test_validate_idp_request_with_invalid_remote_id_fails(self):
+ """Exception is raised when `remote_ids` is not a array."""
+ request_to_validate = {"remote_ids": uuid.uuid4().hex}
+ self.assertRaises(exception.SchemaValidationError,
+ self.create_idp_validator.validate,
+ request_to_validate)
+
+ self.assertRaises(exception.SchemaValidationError,
+ self.update_idp_validator.validate,
+ request_to_validate)
+
+ def test_validate_idp_request_with_duplicated_remote_id(self):
+ """Exception is raised when the duplicated `remote_ids` is found."""
+ idp_id = uuid.uuid4().hex
+ request_to_validate = {"remote_ids": [idp_id, idp_id]}
+ self.assertRaises(exception.SchemaValidationError,
+ self.create_idp_validator.validate,
+ request_to_validate)
+
+ self.assertRaises(exception.SchemaValidationError,
+ self.update_idp_validator.validate,
+ request_to_validate)
+
+ def test_validate_idp_request_remote_id_nullable(self):
+ """Test that `remote_ids` could be explicitly set to None"""
+ request_to_validate = {'remote_ids': None}
+ self.create_idp_validator.validate(request_to_validate)
+ self.update_idp_validator.validate(request_to_validate)
+
+
+class FederationProtocolValidationTestCase(unit.BaseTestCase):
+ """Test for V3 Federation Protocol API validation."""
+
+ def setUp(self):
+ super(FederationProtocolValidationTestCase, self).setUp()
+
+ schema = federation_schema.federation_protocol_schema
+ # create protocol and update protocol have the same shema definition,
+ # combine them together, no need to validate separately.
+ self.protocol_validator = validators.SchemaValidator(schema)
+
+ def test_validate_protocol_request_succeeds(self):
+ """Test that we validate a protocol request successfully."""
+ request_to_validate = {'mapping_id': uuid.uuid4().hex}
+ self.protocol_validator.validate(request_to_validate)
+
+ def test_validate_protocol_request_succeeds_with_nonuuid_mapping_id(self):
+ """Test that we allow underscore in mapping_id value."""
+ request_to_validate = {'mapping_id': 'my_mapping_id'}
+ self.protocol_validator.validate(request_to_validate)
+
+ def test_validate_protocol_request_fails_with_invalid_params(self):
+ """Exception raised when unknown parameter is found."""
+ request_to_validate = {'bogus': uuid.uuid4().hex}
+ self.assertRaises(exception.SchemaValidationError,
+ self.protocol_validator.validate,
+ request_to_validate)
+
+ def test_validate_protocol_request_no_parameters(self):
+ """Test that schema validation with empty request body."""
+ request_to_validate = {}
+ # 'mapping_id' is required.
+ self.assertRaises(exception.SchemaValidationError,
+ self.protocol_validator.validate,
+ request_to_validate)
+
+ def test_validate_protocol_request_fails_with_invalid_mapping_id(self):
+ """Exception raised when mapping_id is not string."""
+ request_to_validate = {'mapping_id': 12334}
+ self.assertRaises(exception.SchemaValidationError,
+ self.protocol_validator.validate,
+ request_to_validate)
+
+
+class OAuth1ValidationTestCase(unit.BaseTestCase):
+ """Test for V3 Identity OAuth1 API validation."""
+
+ def setUp(self):
+ super(OAuth1ValidationTestCase, self).setUp()
+
+ create = oauth1_schema.consumer_create
+ update = oauth1_schema.consumer_update
+ self.create_consumer_validator = validators.SchemaValidator(create)
+ self.update_consumer_validator = validators.SchemaValidator(update)
+
+ def test_validate_consumer_request_succeeds(self):
+ """Test that we validate a consumer request successfully."""
+ request_to_validate = {'description': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex}
+ self.create_consumer_validator.validate(request_to_validate)
+ self.update_consumer_validator.validate(request_to_validate)
+
+ def test_validate_consumer_request_with_no_parameters(self):
+ """Test that schema validation with empty request body."""
+ request_to_validate = {}
+ self.create_consumer_validator.validate(request_to_validate)
+ # At least one property should be given.
+ self.assertRaises(exception.SchemaValidationError,
+ self.update_consumer_validator.validate,
+ request_to_validate)
+
+ def test_validate_consumer_request_with_invalid_description_fails(self):
+ """Exception is raised when `description` as a non-string value."""
+ for invalid_desc in _INVALID_DESC_FORMATS:
+ request_to_validate = {'description': invalid_desc}
+ self.assertRaises(exception.SchemaValidationError,
+ self.create_consumer_validator.validate,
+ request_to_validate)
+
+ self.assertRaises(exception.SchemaValidationError,
+ self.update_consumer_validator.validate,
+ request_to_validate)
+
+ def test_validate_update_consumer_request_fails_with_secret(self):
+ """Exception raised when secret is given."""
+ request_to_validate = {'secret': uuid.uuid4().hex}
+ self.assertRaises(exception.SchemaValidationError,
+ self.update_consumer_validator.validate,
+ request_to_validate)
+
+ def test_validate_consumer_request_with_none_desc(self):
+ """Test that schema validation with None desc."""
+ request_to_validate = {'description': None}
+ self.create_consumer_validator.validate(request_to_validate)
+ self.update_consumer_validator.validate(request_to_validate)