diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_validation.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_validation.py | 352 |
1 files changed, 295 insertions, 57 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_validation.py b/keystone-moon/keystone/tests/unit/test_validation.py index f7a224a0..73cb6ef6 100644 --- a/keystone-moon/keystone/tests/unit/test_validation.py +++ b/keystone-moon/keystone/tests/unit/test_validation.py @@ -21,11 +21,11 @@ from keystone.catalog import schema as catalog_schema from keystone.common import validation from keystone.common.validation import parameter_types from keystone.common.validation import validators -from keystone.contrib.endpoint_filter import schema as endpoint_filter_schema -from keystone.contrib.federation import schema as federation_schema from keystone.credential import schema as credential_schema from keystone import exception +from keystone.federation import schema as federation_schema from keystone.identity import schema as identity_schema +from keystone.oauth1 import schema as oauth1_schema from keystone.policy import schema as policy_schema from keystone.resource import schema as resource_schema from keystone.tests import unit @@ -67,6 +67,12 @@ entity_create = { 'additionalProperties': True, } +entity_create_optional_body = { + 'type': 'object', + 'properties': _entity_properties, + 'additionalProperties': True, +} + entity_update = { 'type': 'object', 'properties': _entity_properties, @@ -78,6 +84,8 @@ _VALID_ENABLED_FORMATS = [True, False] _INVALID_ENABLED_FORMATS = ['some string', 1, 0, 'True', 'False'] +_INVALID_DESC_FORMATS = [False, 1, 2.0] + _VALID_URLS = ['https://example.com', 'http://EXAMPLE.com/v3', 'http://localhost', 'http://127.0.0.1:5000', 'http://1.1.1.1', 'http://255.255.255.255', @@ -90,7 +98,7 @@ _VALID_URLS = ['https://example.com', 'http://EXAMPLE.com/v3', _INVALID_URLS = [False, 'this is not a URL', 1234, 'www.example.com', 'localhost', 'http//something.com', - 'https//something.com'] + 'https//something.com', ' http://example.com'] _VALID_FILTERS = [{'interface': 'admin'}, {'region': 'US-WEST', @@ -99,6 +107,17 @@ _VALID_FILTERS = [{'interface': 'admin'}, _INVALID_FILTERS = ['some string', 1, 0, True, False] +def expected_validation_failure(msg): + def wrapper(f): + def wrapped(self, *args, **kwargs): + args = (self,) + args + e = self.assertRaises(exception.ValidationError, f, + *args, **kwargs) + self.assertIn(msg, six.text_type(e)) + return wrapped + return wrapper + + class ValidatedDecoratorTests(unit.BaseTestCase): entity_schema = { @@ -113,42 +132,51 @@ class ValidatedDecoratorTests(unit.BaseTestCase): 'name': uuid.uuid4().hex, } - invalid_entity = {} - - @validation.validated(entity_schema, 'entity') - def do_something(self, entity): - pass + invalid_entity = { + 'name': 1.0, # NOTE(dstanek): this is the incorrect type for name + } @validation.validated(entity_create, 'entity') def create_entity(self, entity): - pass + """Used to test cases where validated param is the only param.""" + + @validation.validated(entity_create_optional_body, 'entity') + def create_entity_optional_body(self, entity): + """Used to test cases where there is an optional body.""" @validation.validated(entity_update, 'entity') def update_entity(self, entity_id, entity): - pass + """Used to test cases where validated param is not the only param.""" - def _assert_call_entity_method_fails(self, method, *args, **kwargs): - e = self.assertRaises(exception.ValidationError, method, - *args, **kwargs) + def test_calling_create_with_valid_entity_kwarg_succeeds(self): + self.create_entity(entity=self.valid_entity) - self.assertIn('Expecting to find entity in request body', - six.text_type(e)) + def test_calling_create_with_empty_entity_kwarg_succeeds(self): + """Test the case when client passing in an empty kwarg reference.""" + self.create_entity_optional_body(entity={}) - def test_calling_with_valid_entity_kwarg_succeeds(self): - self.do_something(entity=self.valid_entity) + @expected_validation_failure('Expecting to find entity in request body') + def test_calling_create_with_kwarg_as_None_fails(self): + self.create_entity(entity=None) - def test_calling_with_invalid_entity_kwarg_fails(self): - self.assertRaises(exception.ValidationError, - self.do_something, - entity=self.invalid_entity) + def test_calling_create_with_valid_entity_arg_succeeds(self): + self.create_entity(self.valid_entity) - def test_calling_with_valid_entity_arg_succeeds(self): - self.do_something(self.valid_entity) + def test_calling_create_with_empty_entity_arg_succeeds(self): + """Test the case when client passing in an empty entity reference.""" + self.create_entity_optional_body({}) - def test_calling_with_invalid_entity_arg_fails(self): - self.assertRaises(exception.ValidationError, - self.do_something, - self.invalid_entity) + @expected_validation_failure("Invalid input for field 'name'") + def test_calling_create_with_invalid_entity_fails(self): + self.create_entity(self.invalid_entity) + + @expected_validation_failure('Expecting to find entity in request body') + def test_calling_create_with_entity_arg_as_None_fails(self): + self.create_entity(None) + + @expected_validation_failure('Expecting to find entity in request body') + def test_calling_create_without_an_entity_fails(self): + self.create_entity() def test_using_the_wrong_name_with_the_decorator_fails(self): with testtools.ExpectedException(TypeError): @@ -156,24 +184,26 @@ class ValidatedDecoratorTests(unit.BaseTestCase): def function(entity): pass - def test_create_entity_no_request_body_with_decorator(self): - """Test the case when request body is not provided.""" - self._assert_call_entity_method_fails(self.create_entity) + # NOTE(dstanek): below are the test cases for making sure the validation + # works when the validated param is not the only param. Since all of the + # actual validation cases are tested above these test are for a sanity + # check. - def test_create_entity_empty_request_body_with_decorator(self): - """Test the case when client passing in an empty entity reference.""" - self._assert_call_entity_method_fails(self.create_entity, entity={}) + def test_calling_update_with_valid_entity_succeeds(self): + self.update_entity(uuid.uuid4().hex, self.valid_entity) - def test_update_entity_no_request_body_with_decorator(self): - """Test the case when request body is not provided.""" - self._assert_call_entity_method_fails(self.update_entity, - uuid.uuid4().hex) + @expected_validation_failure("Invalid input for field 'name'") + def test_calling_update_with_invalid_entity_fails(self): + self.update_entity(uuid.uuid4().hex, self.invalid_entity) - def test_update_entity_empty_request_body_with_decorator(self): + def test_calling_update_with_empty_entity_kwarg_succeeds(self): """Test the case when client passing in an empty entity reference.""" - self._assert_call_entity_method_fails(self.update_entity, - uuid.uuid4().hex, - entity={}) + global entity_update + original_entity_update = entity_update.copy() + # pop 'minProperties' from schema so that empty body is allowed. + entity_update.pop('minProperties') + self.update_entity(uuid.uuid4().hex, entity={}) + entity_update = original_entity_update class EntityValidationTestCase(unit.BaseTestCase): @@ -499,11 +529,22 @@ class ProjectValidationTestCase(unit.BaseTestCase): self.update_project_validator.validate, request_to_validate) - def test_validate_project_update_request_with_null_domain_id_fails(self): - request_to_validate = {'domain_id': None} - self.assertRaises(exception.SchemaValidationError, - self.update_project_validator.validate, - request_to_validate) + def test_validate_project_create_request_with_valid_domain_id(self): + """Test that we validate `domain_id` in create project requests.""" + # domain_id is nullable + for domain_id in [None, uuid.uuid4().hex]: + request_to_validate = {'name': self.project_name, + 'domain_id': domain_id} + self.create_project_validator.validate(request_to_validate) + + def test_validate_project_request_with_invalid_domain_id_fails(self): + """Exception is raised when `domain_id` is a non-id value.""" + for domain_id in [False, 'fake_project']: + request_to_validate = {'name': self.project_name, + 'domain_id': domain_id} + self.assertRaises(exception.SchemaValidationError, + self.create_project_validator.validate, + request_to_validate) class DomainValidationTestCase(unit.BaseTestCase): @@ -897,6 +938,11 @@ class RegionValidationTestCase(unit.BaseTestCase): request_to_validate = {'other_attr': uuid.uuid4().hex} self.create_region_validator.validate(request_to_validate) + def test_validate_region_create_succeeds_with_no_parameters(self): + """Validate create region request with no parameters.""" + request_to_validate = {} + self.create_region_validator.validate(request_to_validate) + def test_validate_region_update_succeeds(self): """Test that we validate a region update request.""" request_to_validate = {'id': 'us-west', @@ -1298,8 +1344,8 @@ class EndpointGroupValidationTestCase(unit.BaseTestCase): def setUp(self): super(EndpointGroupValidationTestCase, self).setUp() - create = endpoint_filter_schema.endpoint_group_create - update = endpoint_filter_schema.endpoint_group_update + create = catalog_schema.endpoint_group_create + update = catalog_schema.endpoint_group_update self.create_endpoint_grp_validator = validators.SchemaValidator(create) self.update_endpoint_grp_validator = validators.SchemaValidator(update) @@ -1321,8 +1367,7 @@ class EndpointGroupValidationTestCase(unit.BaseTestCase): self.create_endpoint_grp_validator.validate(request_to_validate) def test_validate_endpoint_group_create_succeeds_with_valid_filters(self): - """Validate dict values as `filters` in endpoint group create requests. - """ + """Validate `filters` in endpoint group create requests.""" request_to_validate = {'description': 'endpoint group description', 'name': 'endpoint_group_name'} for valid_filters in _VALID_FILTERS: @@ -1718,13 +1763,8 @@ class UserValidationTestCase(unit.BaseTestCase): def test_validate_user_create_with_all_valid_parameters_succeeds(self): """Test that validating a user create request succeeds.""" - request_to_validate = {'name': self.user_name, - 'default_project_id': uuid.uuid4().hex, - 'domain_id': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - 'enabled': True, - 'email': uuid.uuid4().hex, - 'password': uuid.uuid4().hex} + request_to_validate = unit.new_user_ref(domain_id=uuid.uuid4().hex, + name=self.user_name) self.create_user_validator.validate(request_to_validate) def test_validate_user_create_fails_without_name(self): @@ -1875,3 +1915,201 @@ class GroupValidationTestCase(unit.BaseTestCase): """Validate group update requests with extra parameters.""" request_to_validate = {'other_attr': uuid.uuid4().hex} self.update_group_validator.validate(request_to_validate) + + +class IdentityProviderValidationTestCase(unit.BaseTestCase): + """Test for V3 Identity Provider API validation.""" + + def setUp(self): + super(IdentityProviderValidationTestCase, self).setUp() + + create = federation_schema.identity_provider_create + update = federation_schema.identity_provider_update + self.create_idp_validator = validators.SchemaValidator(create) + self.update_idp_validator = validators.SchemaValidator(update) + + def test_validate_idp_request_succeeds(self): + """Test that we validate an identity provider request.""" + request_to_validate = {'description': 'identity provider description', + 'enabled': True, + 'remote_ids': [uuid.uuid4().hex, + uuid.uuid4().hex]} + self.create_idp_validator.validate(request_to_validate) + self.update_idp_validator.validate(request_to_validate) + + def test_validate_idp_request_fails_with_invalid_params(self): + """Exception raised when unknown parameter is found.""" + request_to_validate = {'bogus': uuid.uuid4().hex} + self.assertRaises(exception.SchemaValidationError, + self.create_idp_validator.validate, + request_to_validate) + + self.assertRaises(exception.SchemaValidationError, + self.update_idp_validator.validate, + request_to_validate) + + def test_validate_idp_request_with_enabled(self): + """Validate `enabled` as boolean-like values.""" + for valid_enabled in _VALID_ENABLED_FORMATS: + request_to_validate = {'enabled': valid_enabled} + self.create_idp_validator.validate(request_to_validate) + self.update_idp_validator.validate(request_to_validate) + + def test_validate_idp_request_with_invalid_enabled_fails(self): + """Exception is raised when `enabled` isn't a boolean-like value.""" + for invalid_enabled in _INVALID_ENABLED_FORMATS: + request_to_validate = {'enabled': invalid_enabled} + self.assertRaises(exception.SchemaValidationError, + self.create_idp_validator.validate, + request_to_validate) + + self.assertRaises(exception.SchemaValidationError, + self.update_idp_validator.validate, + request_to_validate) + + def test_validate_idp_request_no_parameters(self): + """Test that schema validation with empty request body.""" + request_to_validate = {} + self.create_idp_validator.validate(request_to_validate) + + # Exception raised when no property on IdP update. + self.assertRaises(exception.SchemaValidationError, + self.update_idp_validator.validate, + request_to_validate) + + def test_validate_idp_request_with_invalid_description_fails(self): + """Exception is raised when `description` as a non-string value.""" + request_to_validate = {'description': False} + self.assertRaises(exception.SchemaValidationError, + self.create_idp_validator.validate, + request_to_validate) + + self.assertRaises(exception.SchemaValidationError, + self.update_idp_validator.validate, + request_to_validate) + + def test_validate_idp_request_with_invalid_remote_id_fails(self): + """Exception is raised when `remote_ids` is not a array.""" + request_to_validate = {"remote_ids": uuid.uuid4().hex} + self.assertRaises(exception.SchemaValidationError, + self.create_idp_validator.validate, + request_to_validate) + + self.assertRaises(exception.SchemaValidationError, + self.update_idp_validator.validate, + request_to_validate) + + def test_validate_idp_request_with_duplicated_remote_id(self): + """Exception is raised when the duplicated `remote_ids` is found.""" + idp_id = uuid.uuid4().hex + request_to_validate = {"remote_ids": [idp_id, idp_id]} + self.assertRaises(exception.SchemaValidationError, + self.create_idp_validator.validate, + request_to_validate) + + self.assertRaises(exception.SchemaValidationError, + self.update_idp_validator.validate, + request_to_validate) + + def test_validate_idp_request_remote_id_nullable(self): + """Test that `remote_ids` could be explicitly set to None""" + request_to_validate = {'remote_ids': None} + self.create_idp_validator.validate(request_to_validate) + self.update_idp_validator.validate(request_to_validate) + + +class FederationProtocolValidationTestCase(unit.BaseTestCase): + """Test for V3 Federation Protocol API validation.""" + + def setUp(self): + super(FederationProtocolValidationTestCase, self).setUp() + + schema = federation_schema.federation_protocol_schema + # create protocol and update protocol have the same shema definition, + # combine them together, no need to validate separately. + self.protocol_validator = validators.SchemaValidator(schema) + + def test_validate_protocol_request_succeeds(self): + """Test that we validate a protocol request successfully.""" + request_to_validate = {'mapping_id': uuid.uuid4().hex} + self.protocol_validator.validate(request_to_validate) + + def test_validate_protocol_request_succeeds_with_nonuuid_mapping_id(self): + """Test that we allow underscore in mapping_id value.""" + request_to_validate = {'mapping_id': 'my_mapping_id'} + self.protocol_validator.validate(request_to_validate) + + def test_validate_protocol_request_fails_with_invalid_params(self): + """Exception raised when unknown parameter is found.""" + request_to_validate = {'bogus': uuid.uuid4().hex} + self.assertRaises(exception.SchemaValidationError, + self.protocol_validator.validate, + request_to_validate) + + def test_validate_protocol_request_no_parameters(self): + """Test that schema validation with empty request body.""" + request_to_validate = {} + # 'mapping_id' is required. + self.assertRaises(exception.SchemaValidationError, + self.protocol_validator.validate, + request_to_validate) + + def test_validate_protocol_request_fails_with_invalid_mapping_id(self): + """Exception raised when mapping_id is not string.""" + request_to_validate = {'mapping_id': 12334} + self.assertRaises(exception.SchemaValidationError, + self.protocol_validator.validate, + request_to_validate) + + +class OAuth1ValidationTestCase(unit.BaseTestCase): + """Test for V3 Identity OAuth1 API validation.""" + + def setUp(self): + super(OAuth1ValidationTestCase, self).setUp() + + create = oauth1_schema.consumer_create + update = oauth1_schema.consumer_update + self.create_consumer_validator = validators.SchemaValidator(create) + self.update_consumer_validator = validators.SchemaValidator(update) + + def test_validate_consumer_request_succeeds(self): + """Test that we validate a consumer request successfully.""" + request_to_validate = {'description': uuid.uuid4().hex, + 'name': uuid.uuid4().hex} + self.create_consumer_validator.validate(request_to_validate) + self.update_consumer_validator.validate(request_to_validate) + + def test_validate_consumer_request_with_no_parameters(self): + """Test that schema validation with empty request body.""" + request_to_validate = {} + self.create_consumer_validator.validate(request_to_validate) + # At least one property should be given. + self.assertRaises(exception.SchemaValidationError, + self.update_consumer_validator.validate, + request_to_validate) + + def test_validate_consumer_request_with_invalid_description_fails(self): + """Exception is raised when `description` as a non-string value.""" + for invalid_desc in _INVALID_DESC_FORMATS: + request_to_validate = {'description': invalid_desc} + self.assertRaises(exception.SchemaValidationError, + self.create_consumer_validator.validate, + request_to_validate) + + self.assertRaises(exception.SchemaValidationError, + self.update_consumer_validator.validate, + request_to_validate) + + def test_validate_update_consumer_request_fails_with_secret(self): + """Exception raised when secret is given.""" + request_to_validate = {'secret': uuid.uuid4().hex} + self.assertRaises(exception.SchemaValidationError, + self.update_consumer_validator.validate, + request_to_validate) + + def test_validate_consumer_request_with_none_desc(self): + """Test that schema validation with None desc.""" + request_to_validate = {'description': None} + self.create_consumer_validator.validate(request_to_validate) + self.update_consumer_validator.validate(request_to_validate) |