diff options
author | Ruan HE <ruan.he@orange.com> | 2016-06-09 08:12:34 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@172.30.200.206> | 2016-06-09 08:12:34 +0000 |
commit | 4bc079a2664f9a407e332291f34d174625a9d5ea (patch) | |
tree | 7481cd5d0a9b3ce37c44c797a1e0d39881221cbe /keystone-moon/keystone/tests/unit/test_auth.py | |
parent | 2f179c5790fbbf6144205d3c6e5089e6eb5f048a (diff) | |
parent | 2e7b4f2027a1147ca28301e4f88adf8274b39a1f (diff) |
Merge "Update Keystone core to Mitaka."
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_auth.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_auth.py | 202 |
1 files changed, 103 insertions, 99 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_auth.py b/keystone-moon/keystone/tests/unit/test_auth.py index 6dd52c8a..6f44b316 100644 --- a/keystone-moon/keystone/tests/unit/test_auth.py +++ b/keystone-moon/keystone/tests/unit/test_auth.py @@ -14,6 +14,8 @@ import copy import datetime +import random +import string import uuid import mock @@ -26,11 +28,12 @@ from testtools import matchers from keystone import assignment from keystone import auth from keystone.common import authorization -from keystone import config +from keystone.common import config from keystone import exception from keystone.models import token_model from keystone.tests import unit from keystone.tests.unit import default_fixtures +from keystone.tests.unit import ksfixtures from keystone.tests.unit.ksfixtures import database from keystone import token from keystone.token import provider @@ -39,9 +42,10 @@ from keystone import trust CONF = cfg.CONF TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' -DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id -HOST_URL = 'http://keystone:5001' +HOST = ''.join(random.choice(string.ascii_lowercase) for x in range( + random.randint(5, 15))) +HOST_URL = 'http://%s' % (HOST) def _build_user_auth(token=None, user_id=None, username=None, @@ -127,9 +131,7 @@ class AuthBadRequests(AuthTest): context={}, auth={}) def test_empty_remote_user(self): - """Verify that _authenticate_external() raises exception if - REMOTE_USER is set as the empty string. - """ + """Verify exception is raised when REMOTE_USER is an empty string.""" context = {'environment': {'REMOTE_USER': ''}} self.assertRaises( token.controllers.ExternalAuthNotApplicable, @@ -223,6 +225,36 @@ class AuthBadRequests(AuthTest): self.controller.authenticate, {}, body_dict) + def test_authenticate_fails_if_project_unsafe(self): + """Verify authenticate to a project with unsafe name fails.""" + # Start with url name restrictions off, so we can create the unsafe + # named project + self.config_fixture.config(group='resource', + project_name_url_safe='off') + unsafe_name = 'i am not / safe' + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, name=unsafe_name) + self.resource_api.create_project(project['id'], project) + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], project['id'], self.role_member['id']) + no_context = {} + + body_dict = _build_user_auth( + username=self.user_foo['name'], + password=self.user_foo['password'], + tenant_name=project['name']) + + # Since name url restriction is off, we should be able to autenticate + self.controller.authenticate(no_context, body_dict) + + # Set the name url restriction to strict and we should fail to + # authenticate + self.config_fixture.config(group='resource', + project_name_url_safe='strict') + self.assertRaises(exception.Unauthorized, + self.controller.authenticate, + no_context, body_dict) + class AuthWithToken(AuthTest): def test_unscoped_token(self): @@ -286,7 +318,7 @@ class AuthWithToken(AuthTest): def test_auth_scoped_token_bad_project_with_debug(self): """Authenticating with an invalid project fails.""" - # Bug 1379952 reports poor user feedback, even in debug mode, + # Bug 1379952 reports poor user feedback, even in insecure_debug mode, # when the user accidentally passes a project name as an ID. # This test intentionally does exactly that. body_dict = _build_user_auth( @@ -294,8 +326,8 @@ class AuthWithToken(AuthTest): password=self.user_foo['password'], tenant_id=self.tenant_bar['name']) - # with debug enabled, this produces a friendly exception. - self.config_fixture.config(debug=True) + # with insecure_debug enabled, this produces a friendly exception. + self.config_fixture.config(debug=True, insecure_debug=True) e = self.assertRaises( exception.Unauthorized, self.controller.authenticate, @@ -308,7 +340,7 @@ class AuthWithToken(AuthTest): def test_auth_scoped_token_bad_project_without_debug(self): """Authenticating with an invalid project fails.""" - # Bug 1379952 reports poor user feedback, even in debug mode, + # Bug 1379952 reports poor user feedback, even in insecure_debug mode, # when the user accidentally passes a project name as an ID. # This test intentionally does exactly that. body_dict = _build_user_auth( @@ -316,8 +348,8 @@ class AuthWithToken(AuthTest): password=self.user_foo['password'], tenant_id=self.tenant_bar['name']) - # with debug disabled, authentication failure details are suppressed. - self.config_fixture.config(debug=False) + # with insecure_debug disabled (the default), authentication failure + # details are suppressed. e = self.assertRaises( exception.Unauthorized, self.controller.authenticate, @@ -336,9 +368,9 @@ class AuthWithToken(AuthTest): self.tenant_bar['id'], self.role_member['id']) # Now create a group role for this user as well - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + domain1 = unit.new_domain_ref() self.resource_api.create_domain(domain1['id'], domain1) - new_group = {'domain_id': domain1['id'], 'name': uuid.uuid4().hex} + new_group = unit.new_group_ref(domain_id=domain1['id']) new_group = self.identity_api.create_group(new_group) self.identity_api.add_user_to_group(self.user_foo['id'], new_group['id']) @@ -428,10 +460,10 @@ class AuthWithToken(AuthTest): def test_deleting_role_revokes_token(self): role_controller = assignment.controllers.Role() - project1 = {'id': 'Project1', 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} + project1 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) self.resource_api.create_project(project1['id'], project1) - role_one = {'id': 'role_one', 'name': uuid.uuid4().hex} + role_one = unit.new_role_ref(id='role_one') self.role_api.create_role(role_one['id'], role_one) self.assignment_api.add_role_to_user_and_project( self.user_foo['id'], project1['id'], role_one['id']) @@ -464,12 +496,10 @@ class AuthWithToken(AuthTest): no_context = {} admin_context = dict(is_admin=True, query_string={}) - project = { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) self.resource_api.create_project(project['id'], project) - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) self.assignment_api.add_role_to_user_and_project( self.user_foo['id'], project['id'], role['id']) @@ -642,6 +672,27 @@ class AuthWithToken(AuthTest): token_id=token_2_id) +class FernetAuthWithToken(AuthWithToken): + def config_overrides(self): + super(FernetAuthWithToken, self).config_overrides() + self.config_fixture.config(group='token', provider='fernet') + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + + def test_token_auth_with_binding(self): + self.config_fixture.config(group='token', bind=['kerberos']) + body_dict = _build_user_auth() + self.assertRaises(exception.NotImplemented, + self.controller.authenticate, + self.context_with_remote_user, + body_dict) + + def test_revoke_with_no_audit_info(self): + self.skipTest('Fernet with v2.0 and revocation is broken') + + def test_deleting_role_revokes_token(self): + self.skipTest('Fernet with v2.0 and revocation is broken') + + class AuthWithPasswordCredentials(AuthTest): def test_auth_invalid_user(self): """Verify exception is raised if invalid user.""" @@ -682,7 +733,7 @@ class AuthWithPasswordCredentials(AuthTest): {}, body_dict) def test_authenticate_blank_password_credentials(self): - """Sending empty dict as passwordCredentials raises a 400 error.""" + """Sending empty dict as passwordCredentials raises 400 Bad Requset.""" body_dict = {'passwordCredentials': {}, 'tenantName': 'demo'} self.assertRaises(exception.ValidationError, self.controller.authenticate, @@ -708,27 +759,16 @@ class AuthWithPasswordCredentials(AuthTest): # user in auth data is from the new default domain. # 1) Create a new domain. - new_domain_id = uuid.uuid4().hex - new_domain = { - 'description': uuid.uuid4().hex, - 'enabled': True, - 'id': new_domain_id, - 'name': uuid.uuid4().hex, - } + new_domain = unit.new_domain_ref() + new_domain_id = new_domain['id'] self.resource_api.create_domain(new_domain_id, new_domain) # 2) Create user "foo" in new domain with different password than # default-domain foo. - new_user_password = uuid.uuid4().hex - new_user = { - 'name': self.user_foo['name'], - 'domain_id': new_domain_id, - 'password': new_user_password, - 'email': 'foo@bar2.com', - } - - new_user = self.identity_api.create_user(new_user) + new_user = unit.create_user(self.identity_api, + name=self.user_foo['name'], + domain_id=new_domain_id) # 3) Update the default_domain_id config option to the new domain @@ -739,7 +779,7 @@ class AuthWithPasswordCredentials(AuthTest): body_dict = _build_user_auth( username=self.user_foo['name'], - password=new_user_password) + password=new_user['password']) # The test is successful if this doesn't raise, so no need to assert. self.controller.authenticate({}, body_dict) @@ -856,7 +896,16 @@ class AuthWithTrust(AuthTest): token_id=token_id, token_data=self.token_provider_api.validate_token(token_id)) auth_context = authorization.token_to_auth_context(token_ref) - return {'environment': {authorization.AUTH_CONTEXT_ENV: auth_context}, + # NOTE(gyee): if public_endpoint and admin_endpoint are not set, which + # is the default, the base url will be constructed from the environment + # variables wsgi.url_scheme, SERVER_NAME, SERVER_PORT, and SCRIPT_NAME. + # We have to set them in the context so the base url can be constructed + # accordingly. + return {'environment': {authorization.AUTH_CONTEXT_ENV: auth_context, + 'wsgi.url_scheme': 'http', + 'SCRIPT_NAME': '/v3', + 'SERVER_PORT': '80', + 'SERVER_NAME': HOST}, 'token_id': token_id, 'host_url': HOST_URL} @@ -945,8 +994,9 @@ class AuthWithTrust(AuthTest): expires_at="2010-06-04T08:44:31.999999Z") def test_create_trust_without_project_id(self): - """Verify that trust can be created without project id and - token can be generated with that trust. + """Verify that trust can be created without project id. + + Also, token can be generated with that trust. """ unscoped_token = self.get_unscoped_token(self.trustor['name']) context = self._create_auth_context( @@ -977,9 +1027,7 @@ class AuthWithTrust(AuthTest): self.assertIn(role['id'], role_ids) def test_get_trust_without_auth_context(self): - """Verify that a trust cannot be retrieved when the auth context is - missing. - """ + """Verify a trust cannot be retrieved if auth context is missing.""" unscoped_token = self.get_unscoped_token(self.trustor['name']) context = self._create_auth_context( unscoped_token['access']['token']['id']) @@ -1001,8 +1049,6 @@ class AuthWithTrust(AuthTest): token_user = auth_response['access']['user'] self.assertEqual(token_user['id'], new_trust['trustee_user_id']) - # TODO(ayoung): Endpoints - def test_create_trust_impersonation(self): new_trust = self.create_trust(self.sample_data, self.trustor['name']) self.assertEqual(self.trustor['id'], new_trust['trustor_user_id']) @@ -1131,7 +1177,7 @@ class AuthWithTrust(AuthTest): request_body = _build_user_auth(token={'id': trust_token_id}, tenant_id=self.tenant_bar['id']) self.assertRaises( - exception.Forbidden, + exception.Unauthorized, self.controller.authenticate, {}, request_body) def test_delete_trust_revokes_token(self): @@ -1211,35 +1257,6 @@ class AuthWithTrust(AuthTest): new_trust['id'])['trust'] self.assertEqual(3, trust['remaining_uses']) - def test_v2_trust_token_contains_trustor_user_id_and_impersonation(self): - new_trust = self.create_trust(self.sample_data, self.trustor['name']) - auth_response = self.fetch_v2_token_from_trust(new_trust) - - self.assertEqual(new_trust['trustee_user_id'], - auth_response['access']['trust']['trustee_user_id']) - self.assertEqual(new_trust['trustor_user_id'], - auth_response['access']['trust']['trustor_user_id']) - self.assertEqual(new_trust['impersonation'], - auth_response['access']['trust']['impersonation']) - self.assertEqual(new_trust['id'], - auth_response['access']['trust']['id']) - - validate_response = self.controller.validate_token( - context=dict(is_admin=True, query_string={}), - token_id=auth_response['access']['token']['id']) - self.assertEqual( - new_trust['trustee_user_id'], - validate_response['access']['trust']['trustee_user_id']) - self.assertEqual( - new_trust['trustor_user_id'], - validate_response['access']['trust']['trustor_user_id']) - self.assertEqual( - new_trust['impersonation'], - validate_response['access']['trust']['impersonation']) - self.assertEqual( - new_trust['id'], - validate_response['access']['trust']['id']) - def disable_user(self, user): user['enabled'] = False self.identity_api.update_user(user['id'], user) @@ -1328,34 +1345,21 @@ class AuthCatalog(unit.SQLDriverOverrides, AuthTest): def _create_endpoints(self): def create_region(**kwargs): - ref = {'id': uuid.uuid4().hex} - ref.update(kwargs) + ref = unit.new_region_ref(**kwargs) self.catalog_api.create_region(ref) return ref def create_endpoint(service_id, region, **kwargs): - id_ = uuid.uuid4().hex - ref = { - 'id': id_, - 'interface': 'public', - 'region_id': region, - 'service_id': service_id, - 'url': 'http://localhost/%s' % uuid.uuid4().hex, - } - ref.update(kwargs) - self.catalog_api.create_endpoint(id_, ref) - return ref + endpoint = unit.new_endpoint_ref(region_id=region, + service_id=service_id, **kwargs) + + self.catalog_api.create_endpoint(endpoint['id'], endpoint) + return endpoint # Create a service for use with the endpoints. def create_service(**kwargs): - id_ = uuid.uuid4().hex - ref = { - 'id': id_, - 'name': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - } - ref.update(kwargs) - self.catalog_api.create_service(id_, ref) + ref = unit.new_service_ref(**kwargs) + self.catalog_api.create_service(ref['id'], ref) return ref enabled_service_ref = create_service(enabled=True) |