aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v2.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v2.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v2.py1590
1 files changed, 0 insertions, 1590 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v2.py b/keystone-moon/keystone/tests/unit/test_v2.py
deleted file mode 100644
index e81c6040..00000000
--- a/keystone-moon/keystone/tests/unit/test_v2.py
+++ /dev/null
@@ -1,1590 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import json
-import time
-import uuid
-
-from keystoneclient.common import cms
-from oslo_config import cfg
-import six
-from six.moves import http_client
-from testtools import matchers
-
-from keystone.common import extension as keystone_extension
-from keystone.tests import unit
-from keystone.tests.unit import default_fixtures
-from keystone.tests.unit import ksfixtures
-from keystone.tests.unit import rest
-from keystone.tests.unit.schema import v2
-
-CONF = cfg.CONF
-
-
-class CoreApiTests(object):
- def assertValidError(self, error):
- self.assertIsNotNone(error.get('code'))
- self.assertIsNotNone(error.get('title'))
- self.assertIsNotNone(error.get('message'))
-
- def assertValidVersion(self, version):
- self.assertIsNotNone(version)
- self.assertIsNotNone(version.get('id'))
- self.assertIsNotNone(version.get('status'))
- self.assertIsNotNone(version.get('updated'))
-
- def assertValidExtension(self, extension):
- self.assertIsNotNone(extension)
- self.assertIsNotNone(extension.get('name'))
- self.assertIsNotNone(extension.get('namespace'))
- self.assertIsNotNone(extension.get('alias'))
- self.assertIsNotNone(extension.get('updated'))
-
- def assertValidExtensionLink(self, link):
- self.assertIsNotNone(link.get('rel'))
- self.assertIsNotNone(link.get('type'))
- self.assertIsNotNone(link.get('href'))
-
- def assertValidTenant(self, tenant):
- self.assertIsNotNone(tenant.get('id'))
- self.assertIsNotNone(tenant.get('name'))
- self.assertNotIn('domain_id', tenant)
- self.assertNotIn('parent_id', tenant)
-
- def assertValidUser(self, user):
- self.assertIsNotNone(user.get('id'))
- self.assertIsNotNone(user.get('name'))
-
- def assertValidRole(self, tenant):
- self.assertIsNotNone(tenant.get('id'))
- self.assertIsNotNone(tenant.get('name'))
-
- def test_public_not_found(self):
- r = self.public_request(
- path='/%s' % uuid.uuid4().hex,
- expected_status=http_client.NOT_FOUND)
- self.assertValidErrorResponse(r)
-
- def test_admin_not_found(self):
- r = self.admin_request(
- path='/%s' % uuid.uuid4().hex,
- expected_status=http_client.NOT_FOUND)
- self.assertValidErrorResponse(r)
-
- def test_public_multiple_choice(self):
- r = self.public_request(path='/', expected_status=300)
- self.assertValidMultipleChoiceResponse(r)
-
- def test_admin_multiple_choice(self):
- r = self.admin_request(path='/', expected_status=300)
- self.assertValidMultipleChoiceResponse(r)
-
- def test_public_version(self):
- r = self.public_request(path='/v2.0/')
- self.assertValidVersionResponse(r)
-
- def test_admin_version(self):
- r = self.admin_request(path='/v2.0/')
- self.assertValidVersionResponse(r)
-
- def test_public_extensions(self):
- r = self.public_request(path='/v2.0/extensions')
- self.assertValidExtensionListResponse(
- r, keystone_extension.PUBLIC_EXTENSIONS)
-
- def test_admin_extensions(self):
- r = self.admin_request(path='/v2.0/extensions')
- self.assertValidExtensionListResponse(
- r, keystone_extension.ADMIN_EXTENSIONS)
-
- def test_admin_extensions_returns_not_found(self):
- self.admin_request(path='/v2.0/extensions/invalid-extension',
- expected_status=http_client.NOT_FOUND)
-
- def test_public_osksadm_extension_returns_not_found(self):
- self.public_request(path='/v2.0/extensions/OS-KSADM',
- expected_status=http_client.NOT_FOUND)
-
- def test_admin_osksadm_extension(self):
- r = self.admin_request(path='/v2.0/extensions/OS-KSADM')
- self.assertValidExtensionResponse(
- r, keystone_extension.ADMIN_EXTENSIONS)
-
- def test_authenticate(self):
- r = self.public_request(
- method='POST',
- path='/v2.0/tokens',
- body={
- 'auth': {
- 'passwordCredentials': {
- 'username': self.user_foo['name'],
- 'password': self.user_foo['password'],
- },
- 'tenantId': self.tenant_bar['id'],
- },
- },
- expected_status=http_client.OK)
- self.assertValidAuthenticationResponse(r, require_service_catalog=True)
-
- def test_authenticate_unscoped(self):
- r = self.public_request(
- method='POST',
- path='/v2.0/tokens',
- body={
- 'auth': {
- 'passwordCredentials': {
- 'username': self.user_foo['name'],
- 'password': self.user_foo['password'],
- },
- },
- },
- expected_status=http_client.OK)
- self.assertValidAuthenticationResponse(r)
-
- def test_get_tenants_for_token(self):
- r = self.public_request(path='/v2.0/tenants',
- token=self.get_scoped_token())
- self.assertValidTenantListResponse(r)
-
- def test_validate_token(self):
- token = self.get_scoped_token()
- r = self.admin_request(
- path='/v2.0/tokens/%(token_id)s' % {
- 'token_id': token,
- },
- token=token)
- self.assertValidAuthenticationResponse(r)
-
- def test_invalid_token_returns_not_found(self):
- token = self.get_scoped_token()
- self.admin_request(
- path='/v2.0/tokens/%(token_id)s' % {
- 'token_id': 'invalid',
- },
- token=token,
- expected_status=http_client.NOT_FOUND)
-
- def test_validate_token_service_role(self):
- self.md_foobar = self.assignment_api.add_role_to_user_and_project(
- self.user_foo['id'],
- self.tenant_service['id'],
- self.role_service['id'])
-
- token = self.get_scoped_token(
- tenant_id=default_fixtures.SERVICE_TENANT_ID)
- r = self.admin_request(
- path='/v2.0/tokens/%s' % token,
- token=token)
- self.assertValidAuthenticationResponse(r)
-
- def test_remove_role_revokes_token(self):
- self.md_foobar = self.assignment_api.add_role_to_user_and_project(
- self.user_foo['id'],
- self.tenant_service['id'],
- self.role_service['id'])
-
- token = self.get_scoped_token(
- tenant_id=default_fixtures.SERVICE_TENANT_ID)
- r = self.admin_request(
- path='/v2.0/tokens/%s' % token,
- token=token)
- self.assertValidAuthenticationResponse(r)
-
- self.assignment_api.remove_role_from_user_and_project(
- self.user_foo['id'],
- self.tenant_service['id'],
- self.role_service['id'])
-
- r = self.admin_request(
- path='/v2.0/tokens/%s' % token,
- token=token,
- expected_status=http_client.UNAUTHORIZED)
-
- def test_validate_token_belongs_to(self):
- token = self.get_scoped_token()
- path = ('/v2.0/tokens/%s?belongsTo=%s' % (token,
- self.tenant_bar['id']))
- r = self.admin_request(path=path, token=token)
- self.assertValidAuthenticationResponse(r, require_service_catalog=True)
-
- def test_validate_token_no_belongs_to_still_returns_catalog(self):
- token = self.get_scoped_token()
- path = ('/v2.0/tokens/%s' % token)
- r = self.admin_request(path=path, token=token)
- self.assertValidAuthenticationResponse(r, require_service_catalog=True)
-
- def test_validate_token_head(self):
- """The same call as above, except using HEAD.
-
- There's no response to validate here, but this is included for the
- sake of completely covering the core API.
-
- """
- token = self.get_scoped_token()
- self.admin_request(
- method='HEAD',
- path='/v2.0/tokens/%(token_id)s' % {
- 'token_id': token,
- },
- token=token,
- expected_status=http_client.OK)
-
- def test_endpoints(self):
- token = self.get_scoped_token()
- r = self.admin_request(
- path='/v2.0/tokens/%(token_id)s/endpoints' % {
- 'token_id': token,
- },
- token=token)
- self.assertValidEndpointListResponse(r)
-
- def test_get_tenant(self):
- token = self.get_scoped_token()
- r = self.admin_request(
- path='/v2.0/tenants/%(tenant_id)s' % {
- 'tenant_id': self.tenant_bar['id'],
- },
- token=token)
- self.assertValidTenantResponse(r)
-
- def test_get_tenant_by_name(self):
- token = self.get_scoped_token()
- r = self.admin_request(
- path='/v2.0/tenants?name=%(tenant_name)s' % {
- 'tenant_name': self.tenant_bar['name'],
- },
- token=token)
- self.assertValidTenantResponse(r)
-
- def test_get_user_roles_with_tenant(self):
- token = self.get_scoped_token()
- r = self.admin_request(
- path='/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
- 'tenant_id': self.tenant_bar['id'],
- 'user_id': self.user_foo['id'],
- },
- token=token)
- self.assertValidRoleListResponse(r)
-
- def test_get_user_roles_without_tenant(self):
- token = self.get_scoped_token()
- self.admin_request(
- path='/v2.0/users/%(user_id)s/roles' % {
- 'user_id': self.user_foo['id'],
- },
- token=token, expected_status=http_client.NOT_IMPLEMENTED)
-
- def test_get_user(self):
- token = self.get_scoped_token()
- r = self.admin_request(
- path='/v2.0/users/%(user_id)s' % {
- 'user_id': self.user_foo['id'],
- },
- token=token)
- self.assertValidUserResponse(r)
-
- def test_get_user_by_name(self):
- token = self.get_scoped_token()
- r = self.admin_request(
- path='/v2.0/users?name=%(user_name)s' % {
- 'user_name': self.user_foo['name'],
- },
- token=token)
- self.assertValidUserResponse(r)
-
- def test_create_update_user_invalid_enabled_type(self):
- # Enforce usage of boolean for 'enabled' field
- token = self.get_scoped_token()
-
- # Test CREATE request
- r = self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- 'enabled': "False",
- },
- },
- token=token,
- expected_status=http_client.BAD_REQUEST)
- self.assertValidErrorResponse(r)
-
- r = self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- # In JSON, 0|1 are not booleans
- 'enabled': 0,
- },
- },
- token=token,
- expected_status=http_client.BAD_REQUEST)
- self.assertValidErrorResponse(r)
-
- # Test UPDATE request
- path = '/v2.0/users/%(user_id)s' % {
- 'user_id': self.user_foo['id'],
- }
-
- r = self.admin_request(
- method='PUT',
- path=path,
- body={
- 'user': {
- 'enabled': "False",
- },
- },
- token=token,
- expected_status=http_client.BAD_REQUEST)
- self.assertValidErrorResponse(r)
-
- r = self.admin_request(
- method='PUT',
- path=path,
- body={
- 'user': {
- # In JSON, 0|1 are not booleans
- 'enabled': 1,
- },
- },
- token=token,
- expected_status=http_client.BAD_REQUEST)
- self.assertValidErrorResponse(r)
-
- def test_create_update_user_valid_enabled_type(self):
- # Enforce usage of boolean for 'enabled' field
- token = self.get_scoped_token()
-
- # Test CREATE request
- self.admin_request(method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- 'enabled': False,
- },
- },
- token=token,
- expected_status=http_client.OK)
-
- def test_error_response(self):
- """This triggers assertValidErrorResponse by convention."""
- self.public_request(path='/v2.0/tenants',
- expected_status=http_client.UNAUTHORIZED)
-
- def test_invalid_parameter_error_response(self):
- token = self.get_scoped_token()
- bad_body = {
- 'OS-KSADM:service%s' % uuid.uuid4().hex: {
- 'name': uuid.uuid4().hex,
- 'type': uuid.uuid4().hex,
- },
- }
- res = self.admin_request(method='POST',
- path='/v2.0/OS-KSADM/services',
- body=bad_body,
- token=token,
- expected_status=http_client.BAD_REQUEST)
- self.assertValidErrorResponse(res)
- res = self.admin_request(method='POST',
- path='/v2.0/users',
- body=bad_body,
- token=token,
- expected_status=http_client.BAD_REQUEST)
- self.assertValidErrorResponse(res)
-
- def _get_user_id(self, r):
- """Helper method to return user ID from a response.
-
- This needs to be overridden by child classes
- based on their content type.
-
- """
- raise NotImplementedError()
-
- def _get_role_id(self, r):
- """Helper method to return a role ID from a response.
-
- This needs to be overridden by child classes
- based on their content type.
-
- """
- raise NotImplementedError()
-
- def _get_role_name(self, r):
- """Helper method to return role NAME from a response.
-
- This needs to be overridden by child classes
- based on their content type.
-
- """
- raise NotImplementedError()
-
- def _get_project_id(self, r):
- """Helper method to return project ID from a response.
-
- This needs to be overridden by child classes
- based on their content type.
-
- """
- raise NotImplementedError()
-
- def assertNoRoles(self, r):
- """Helper method to assert No Roles
-
- This needs to be overridden by child classes
- based on their content type.
-
- """
- raise NotImplementedError()
-
- def test_update_user_tenant(self):
- token = self.get_scoped_token()
-
- # Create a new user
- r = self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- 'tenantId': self.tenant_bar['id'],
- 'enabled': True,
- },
- },
- token=token,
- expected_status=http_client.OK)
-
- user_id = self._get_user_id(r.result)
-
- # Check if member_role is in tenant_bar
- r = self.admin_request(
- path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
- 'project_id': self.tenant_bar['id'],
- 'user_id': user_id
- },
- token=token,
- expected_status=http_client.OK)
- self.assertEqual(CONF.member_role_name, self._get_role_name(r.result))
-
- # Create a new tenant
- r = self.admin_request(
- method='POST',
- path='/v2.0/tenants',
- body={
- 'tenant': {
- 'name': 'test_update_user',
- 'description': 'A description ...',
- 'enabled': True,
- },
- },
- token=token,
- expected_status=http_client.OK)
-
- project_id = self._get_project_id(r.result)
-
- # Update user's tenant
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%(user_id)s' % {
- 'user_id': user_id,
- },
- body={
- 'user': {
- 'tenantId': project_id,
- },
- },
- token=token,
- expected_status=http_client.OK)
-
- # 'member_role' should be in new_tenant
- r = self.admin_request(
- path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
- 'project_id': project_id,
- 'user_id': user_id
- },
- token=token,
- expected_status=http_client.OK)
- self.assertEqual('_member_', self._get_role_name(r.result))
-
- # 'member_role' should not be in tenant_bar any more
- r = self.admin_request(
- path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
- 'project_id': self.tenant_bar['id'],
- 'user_id': user_id
- },
- token=token,
- expected_status=http_client.OK)
- self.assertNoRoles(r.result)
-
- def test_update_user_with_invalid_tenant(self):
- token = self.get_scoped_token()
-
- # Create a new user
- r = self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': 'test_invalid_tenant',
- 'password': uuid.uuid4().hex,
- 'tenantId': self.tenant_bar['id'],
- 'enabled': True,
- },
- },
- token=token,
- expected_status=http_client.OK)
- user_id = self._get_user_id(r.result)
-
- # Update user with an invalid tenant
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%(user_id)s' % {
- 'user_id': user_id,
- },
- body={
- 'user': {
- 'tenantId': 'abcde12345heha',
- },
- },
- token=token,
- expected_status=http_client.NOT_FOUND)
-
- def test_update_user_with_invalid_tenant_no_prev_tenant(self):
- token = self.get_scoped_token()
-
- # Create a new user
- r = self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': 'test_invalid_tenant',
- 'password': uuid.uuid4().hex,
- 'enabled': True,
- },
- },
- token=token,
- expected_status=http_client.OK)
- user_id = self._get_user_id(r.result)
-
- # Update user with an invalid tenant
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%(user_id)s' % {
- 'user_id': user_id,
- },
- body={
- 'user': {
- 'tenantId': 'abcde12345heha',
- },
- },
- token=token,
- expected_status=http_client.NOT_FOUND)
-
- def test_update_user_with_old_tenant(self):
- token = self.get_scoped_token()
-
- # Create a new user
- r = self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- 'tenantId': self.tenant_bar['id'],
- 'enabled': True,
- },
- },
- token=token,
- expected_status=http_client.OK)
-
- user_id = self._get_user_id(r.result)
-
- # Check if member_role is in tenant_bar
- r = self.admin_request(
- path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
- 'project_id': self.tenant_bar['id'],
- 'user_id': user_id
- },
- token=token,
- expected_status=http_client.OK)
- self.assertEqual(CONF.member_role_name, self._get_role_name(r.result))
-
- # Update user's tenant with old tenant id
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%(user_id)s' % {
- 'user_id': user_id,
- },
- body={
- 'user': {
- 'tenantId': self.tenant_bar['id'],
- },
- },
- token=token,
- expected_status=http_client.OK)
-
- # 'member_role' should still be in tenant_bar
- r = self.admin_request(
- path='/v2.0/tenants/%(project_id)s/users/%(user_id)s/roles' % {
- 'project_id': self.tenant_bar['id'],
- 'user_id': user_id
- },
- token=token,
- expected_status=http_client.OK)
- self.assertEqual('_member_', self._get_role_name(r.result))
-
- def test_authenticating_a_user_with_no_password(self):
- token = self.get_scoped_token()
-
- username = uuid.uuid4().hex
-
- # create the user
- self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': username,
- 'enabled': True,
- },
- },
- token=token)
-
- # fail to authenticate
- r = self.public_request(
- method='POST',
- path='/v2.0/tokens',
- body={
- 'auth': {
- 'passwordCredentials': {
- 'username': username,
- 'password': 'password',
- },
- },
- },
- expected_status=http_client.UNAUTHORIZED)
- self.assertValidErrorResponse(r)
-
- def test_www_authenticate_header(self):
- r = self.public_request(
- path='/v2.0/tenants',
- expected_status=http_client.UNAUTHORIZED)
- self.assertEqual('Keystone uri="http://localhost"',
- r.headers.get('WWW-Authenticate'))
-
- def test_www_authenticate_header_host(self):
- test_url = 'http://%s:4187' % uuid.uuid4().hex
- self.config_fixture.config(public_endpoint=test_url)
- r = self.public_request(
- path='/v2.0/tenants',
- expected_status=http_client.UNAUTHORIZED)
- self.assertEqual('Keystone uri="%s"' % test_url,
- r.headers.get('WWW-Authenticate'))
-
-
-class LegacyV2UsernameTests(object):
- """Tests to show the broken username behavior in V2.
-
- The V2 API is documented to use `username` instead of `name`. The
- API forced used to use name and left the username to fall into the
- `extra` field.
-
- These tests ensure this behavior works so fixes to `username`/`name`
- will be backward compatible.
- """
-
- def create_user(self, **user_attrs):
- """Creates a users and returns the response object.
-
- :param user_attrs: attributes added to the request body (optional)
- """
- token = self.get_scoped_token()
- body = {
- 'user': {
- 'name': uuid.uuid4().hex,
- 'enabled': True,
- },
- }
- body['user'].update(user_attrs)
-
- return self.admin_request(
- method='POST',
- path='/v2.0/users',
- token=token,
- body=body,
- expected_status=http_client.OK)
-
- def test_create_with_extra_username(self):
- """The response for creating a user will contain the extra fields."""
- fake_username = uuid.uuid4().hex
- r = self.create_user(username=fake_username)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual(fake_username, user.get('username'))
-
- def test_get_returns_username_from_extra(self):
- """The response for getting a user will contain the extra fields."""
- token = self.get_scoped_token()
-
- fake_username = uuid.uuid4().hex
- r = self.create_user(username=fake_username)
-
- id_ = self.get_user_attribute_from_response(r, 'id')
- r = self.admin_request(path='/v2.0/users/%s' % id_, token=token)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual(fake_username, user.get('username'))
-
- def test_update_returns_new_username_when_adding_username(self):
- """The response for updating a user will contain the extra fields.
-
- This is specifically testing for updating a username when a value
- was not previously set.
- """
- token = self.get_scoped_token()
-
- r = self.create_user()
-
- id_ = self.get_user_attribute_from_response(r, 'id')
- name = self.get_user_attribute_from_response(r, 'name')
- enabled = self.get_user_attribute_from_response(r, 'enabled')
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%s' % id_,
- token=token,
- body={
- 'user': {
- 'name': name,
- 'username': 'new_username',
- 'enabled': enabled,
- },
- },
- expected_status=http_client.OK)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual('new_username', user.get('username'))
-
- def test_update_returns_new_username_when_updating_username(self):
- """The response for updating a user will contain the extra fields.
-
- This tests updating a username that was previously set.
- """
- token = self.get_scoped_token()
-
- r = self.create_user(username='original_username')
-
- id_ = self.get_user_attribute_from_response(r, 'id')
- name = self.get_user_attribute_from_response(r, 'name')
- enabled = self.get_user_attribute_from_response(r, 'enabled')
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%s' % id_,
- token=token,
- body={
- 'user': {
- 'name': name,
- 'username': 'new_username',
- 'enabled': enabled,
- },
- },
- expected_status=http_client.OK)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual('new_username', user.get('username'))
-
- def test_username_is_always_returned_create(self):
- """Username is set as the value of name if no username is provided.
-
- This matches the v2.0 spec where we really should be using username
- and not name.
- """
- r = self.create_user()
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual(user.get('name'), user.get('username'))
-
- def test_username_is_always_returned_get(self):
- """Username is set as the value of name if no username is provided.
-
- This matches the v2.0 spec where we really should be using username
- and not name.
- """
- token = self.get_scoped_token()
-
- r = self.create_user()
-
- id_ = self.get_user_attribute_from_response(r, 'id')
- r = self.admin_request(path='/v2.0/users/%s' % id_, token=token)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual(user.get('name'), user.get('username'))
-
- def test_username_is_always_returned_get_by_name(self):
- """Username is set as the value of name if no username is provided.
-
- This matches the v2.0 spec where we really should be using username
- and not name.
- """
- token = self.get_scoped_token()
-
- r = self.create_user()
-
- name = self.get_user_attribute_from_response(r, 'name')
- r = self.admin_request(path='/v2.0/users?name=%s' % name, token=token)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual(user.get('name'), user.get('username'))
-
- def test_username_is_always_returned_update_no_username_provided(self):
- """Username is set as the value of name if no username is provided.
-
- This matches the v2.0 spec where we really should be using username
- and not name.
- """
- token = self.get_scoped_token()
-
- r = self.create_user()
-
- id_ = self.get_user_attribute_from_response(r, 'id')
- name = self.get_user_attribute_from_response(r, 'name')
- enabled = self.get_user_attribute_from_response(r, 'enabled')
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%s' % id_,
- token=token,
- body={
- 'user': {
- 'name': name,
- 'enabled': enabled,
- },
- },
- expected_status=http_client.OK)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual(user.get('name'), user.get('username'))
-
- def test_updated_username_is_returned(self):
- """Username is set as the value of name if no username is provided.
-
- This matches the v2.0 spec where we really should be using username
- and not name.
- """
- token = self.get_scoped_token()
-
- r = self.create_user()
-
- id_ = self.get_user_attribute_from_response(r, 'id')
- name = self.get_user_attribute_from_response(r, 'name')
- enabled = self.get_user_attribute_from_response(r, 'enabled')
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%s' % id_,
- token=token,
- body={
- 'user': {
- 'name': name,
- 'enabled': enabled,
- },
- },
- expected_status=http_client.OK)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual(user.get('name'), user.get('username'))
-
- def test_username_can_be_used_instead_of_name_create(self):
- token = self.get_scoped_token()
-
- r = self.admin_request(
- method='POST',
- path='/v2.0/users',
- token=token,
- body={
- 'user': {
- 'username': uuid.uuid4().hex,
- 'enabled': True,
- },
- },
- expected_status=http_client.OK)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual(user.get('name'), user.get('username'))
-
- def test_username_can_be_used_instead_of_name_update(self):
- token = self.get_scoped_token()
-
- r = self.create_user()
-
- id_ = self.get_user_attribute_from_response(r, 'id')
- new_username = uuid.uuid4().hex
- enabled = self.get_user_attribute_from_response(r, 'enabled')
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%s' % id_,
- token=token,
- body={
- 'user': {
- 'username': new_username,
- 'enabled': enabled,
- },
- },
- expected_status=http_client.OK)
-
- self.assertValidUserResponse(r)
-
- user = self.get_user_from_response(r)
- self.assertEqual(new_username, user.get('name'))
- self.assertEqual(user.get('name'), user.get('username'))
-
-
-class RestfulTestCase(rest.RestfulTestCase):
-
- def setUp(self):
- super(RestfulTestCase, self).setUp()
-
- # TODO(termie): add an admin user to the fixtures and use that user
- # override the fixtures, for now
- self.assignment_api.add_role_to_user_and_project(
- self.user_foo['id'],
- self.tenant_bar['id'],
- self.role_admin['id'])
-
-
-class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
-
- def config_overrides(self):
- super(V2TestCase, self).config_overrides()
- self.config_fixture.config(
- group='catalog',
- driver='templated',
- template_file=unit.dirs.tests('default_catalog.templates'))
-
- def _get_user_id(self, r):
- return r['user']['id']
-
- def _get_role_name(self, r):
- return r['roles'][0]['name']
-
- def _get_role_id(self, r):
- return r['roles'][0]['id']
-
- def _get_project_id(self, r):
- return r['tenant']['id']
-
- def _get_token_id(self, r):
- return r.result['access']['token']['id']
-
- def assertNoRoles(self, r):
- self.assertEqual([], r['roles'])
-
- def assertValidErrorResponse(self, r):
- self.assertIsNotNone(r.result.get('error'))
- self.assertValidError(r.result['error'])
- self.assertEqual(r.result['error']['code'], r.status_code)
-
- def assertValidExtension(self, extension, expected):
- super(V2TestCase, self).assertValidExtension(extension)
- descriptions = [ext['description'] for ext in six.itervalues(expected)]
- description = extension.get('description')
- self.assertIsNotNone(description)
- self.assertIn(description, descriptions)
- self.assertIsNotNone(extension.get('links'))
- self.assertNotEmpty(extension.get('links'))
- for link in extension.get('links'):
- self.assertValidExtensionLink(link)
-
- def assertValidExtensionListResponse(self, r, expected):
- self.assertIsNotNone(r.result.get('extensions'))
- self.assertIsNotNone(r.result['extensions'].get('values'))
- self.assertNotEmpty(r.result['extensions'].get('values'))
- for extension in r.result['extensions']['values']:
- self.assertValidExtension(extension, expected)
-
- def assertValidExtensionResponse(self, r, expected):
- self.assertValidExtension(r.result.get('extension'), expected)
-
- def assertValidUser(self, user):
- super(V2TestCase, self).assertValidUser(user)
- self.assertNotIn('default_project_id', user)
- if 'tenantId' in user:
- # NOTE(morganfainberg): tenantId should never be "None", it gets
- # filtered out of the object if it is there. This is suspenders
- # and a belt check to avoid unintended regressions.
- self.assertIsNotNone(user.get('tenantId'))
-
- def assertValidAuthenticationResponse(self, r,
- require_service_catalog=False):
- self.assertIsNotNone(r.result.get('access'))
- self.assertIsNotNone(r.result['access'].get('token'))
- self.assertIsNotNone(r.result['access'].get('user'))
-
- # validate token
- self.assertIsNotNone(r.result['access']['token'].get('id'))
- self.assertIsNotNone(r.result['access']['token'].get('expires'))
- tenant = r.result['access']['token'].get('tenant')
- if tenant is not None:
- # validate tenant
- self.assertIsNotNone(tenant.get('id'))
- self.assertIsNotNone(tenant.get('name'))
-
- # validate user
- self.assertIsNotNone(r.result['access']['user'].get('id'))
- self.assertIsNotNone(r.result['access']['user'].get('name'))
-
- if require_service_catalog:
- # roles are only provided with a service catalog
- roles = r.result['access']['user'].get('roles')
- self.assertNotEmpty(roles)
- for role in roles:
- self.assertIsNotNone(role.get('name'))
-
- serviceCatalog = r.result['access'].get('serviceCatalog')
- # validate service catalog
- if require_service_catalog:
- self.assertIsNotNone(serviceCatalog)
- if serviceCatalog is not None:
- self.assertIsInstance(serviceCatalog, list)
- if require_service_catalog:
- self.assertNotEmpty(serviceCatalog)
- for service in r.result['access']['serviceCatalog']:
- # validate service
- self.assertIsNotNone(service.get('name'))
- self.assertIsNotNone(service.get('type'))
-
- # services contain at least one endpoint
- self.assertIsNotNone(service.get('endpoints'))
- self.assertNotEmpty(service['endpoints'])
- for endpoint in service['endpoints']:
- # validate service endpoint
- self.assertIsNotNone(endpoint.get('publicURL'))
-
- def assertValidTenantListResponse(self, r):
- self.assertIsNotNone(r.result.get('tenants'))
- self.assertNotEmpty(r.result['tenants'])
- for tenant in r.result['tenants']:
- self.assertValidTenant(tenant)
- self.assertIsNotNone(tenant.get('enabled'))
- self.assertIn(tenant.get('enabled'), [True, False])
-
- def assertValidUserResponse(self, r):
- self.assertIsNotNone(r.result.get('user'))
- self.assertValidUser(r.result['user'])
-
- def assertValidTenantResponse(self, r):
- self.assertIsNotNone(r.result.get('tenant'))
- self.assertValidTenant(r.result['tenant'])
-
- def assertValidRoleListResponse(self, r):
- self.assertIsNotNone(r.result.get('roles'))
- self.assertNotEmpty(r.result['roles'])
- for role in r.result['roles']:
- self.assertValidRole(role)
-
- def assertValidVersion(self, version):
- super(V2TestCase, self).assertValidVersion(version)
-
- self.assertIsNotNone(version.get('links'))
- self.assertNotEmpty(version.get('links'))
- for link in version.get('links'):
- self.assertIsNotNone(link.get('rel'))
- self.assertIsNotNone(link.get('href'))
-
- self.assertIsNotNone(version.get('media-types'))
- self.assertNotEmpty(version.get('media-types'))
- for media in version.get('media-types'):
- self.assertIsNotNone(media.get('base'))
- self.assertIsNotNone(media.get('type'))
-
- def assertValidMultipleChoiceResponse(self, r):
- self.assertIsNotNone(r.result.get('versions'))
- self.assertIsNotNone(r.result['versions'].get('values'))
- self.assertNotEmpty(r.result['versions']['values'])
- for version in r.result['versions']['values']:
- self.assertValidVersion(version)
-
- def assertValidVersionResponse(self, r):
- self.assertValidVersion(r.result.get('version'))
-
- def assertValidEndpointListResponse(self, r):
- self.assertIsNotNone(r.result.get('endpoints'))
- self.assertNotEmpty(r.result['endpoints'])
- for endpoint in r.result['endpoints']:
- self.assertIsNotNone(endpoint.get('id'))
- self.assertIsNotNone(endpoint.get('name'))
- self.assertIsNotNone(endpoint.get('type'))
- self.assertIsNotNone(endpoint.get('publicURL'))
- self.assertIsNotNone(endpoint.get('internalURL'))
- self.assertIsNotNone(endpoint.get('adminURL'))
-
- def get_user_from_response(self, r):
- return r.result.get('user')
-
- def get_user_attribute_from_response(self, r, attribute_name):
- return r.result['user'][attribute_name]
-
- def test_service_crud_requires_auth(self):
- """Service CRUD should return unauthorized without an X-Auth-Token."""
- # values here don't matter because it will be unauthorized before
- # they're checked (bug 1006822).
- service_path = '/v2.0/OS-KSADM/services/%s' % uuid.uuid4().hex
- service_body = {
- 'OS-KSADM:service': {
- 'name': uuid.uuid4().hex,
- 'type': uuid.uuid4().hex,
- },
- }
-
- r = self.admin_request(method='GET',
- path='/v2.0/OS-KSADM/services',
- expected_status=http_client.UNAUTHORIZED)
- self.assertValidErrorResponse(r)
-
- r = self.admin_request(method='POST',
- path='/v2.0/OS-KSADM/services',
- body=service_body,
- expected_status=http_client.UNAUTHORIZED)
- self.assertValidErrorResponse(r)
-
- r = self.admin_request(method='GET',
- path=service_path,
- expected_status=http_client.UNAUTHORIZED)
- self.assertValidErrorResponse(r)
-
- r = self.admin_request(method='DELETE',
- path=service_path,
- expected_status=http_client.UNAUTHORIZED)
- self.assertValidErrorResponse(r)
-
- def test_user_role_list_requires_auth(self):
- """User role list return unauthorized without an X-Auth-Token."""
- # values here don't matter because it will be unauthorized before
- # they're checked (bug 1006815).
- path = '/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
- 'tenant_id': uuid.uuid4().hex,
- 'user_id': uuid.uuid4().hex,
- }
-
- r = self.admin_request(path=path,
- expected_status=http_client.UNAUTHORIZED)
- self.assertValidErrorResponse(r)
-
- def test_fetch_revocation_list_nonadmin_fails(self):
- self.admin_request(
- method='GET',
- path='/v2.0/tokens/revoked',
- expected_status=http_client.UNAUTHORIZED)
-
- def test_fetch_revocation_list_admin_200(self):
- token = self.get_scoped_token()
- r = self.admin_request(
- method='GET',
- path='/v2.0/tokens/revoked',
- token=token,
- expected_status=http_client.OK)
- self.assertValidRevocationListResponse(r)
-
- def assertValidRevocationListResponse(self, response):
- self.assertIsNotNone(response.result['signed'])
-
- def _fetch_parse_revocation_list(self):
-
- token1 = self.get_scoped_token()
-
- # TODO(morganfainberg): Because this is making a restful call to the
- # app a change to UTCNOW via mock.patch will not affect the returned
- # token. The only surefire way to ensure there is not a transient bug
- # based upon when the second token is issued is with a sleep. This
- # issue all stems from the limited resolution (no microseconds) on the
- # expiry time of tokens and the way revocation events utilizes token
- # expiry to revoke individual tokens. This is a stop-gap until all
- # associated issues with resolution on expiration and revocation events
- # are resolved.
- time.sleep(1)
-
- token2 = self.get_scoped_token()
-
- self.admin_request(method='DELETE',
- path='/v2.0/tokens/%s' % token2,
- token=token1)
-
- r = self.admin_request(
- method='GET',
- path='/v2.0/tokens/revoked',
- token=token1,
- expected_status=http_client.OK)
- signed_text = r.result['signed']
-
- data_json = cms.cms_verify(signed_text, CONF.signing.certfile,
- CONF.signing.ca_certs)
-
- data = json.loads(data_json)
-
- return (data, token2)
-
- def test_fetch_revocation_list_md5(self):
- """Hash for tokens in revocation list and server config should match.
-
- If the server is configured for md5, then the revocation list has
- tokens hashed with MD5.
- """
- # The default hash algorithm is md5.
- hash_algorithm = 'md5'
-
- (data, token) = self._fetch_parse_revocation_list()
- token_hash = cms.cms_hash_token(token, mode=hash_algorithm)
- self.assertThat(token_hash, matchers.Equals(data['revoked'][0]['id']))
-
- def test_fetch_revocation_list_sha256(self):
- """Hash for tokens in revocation list and server config should match.
-
- If the server is configured for sha256, then the revocation list has
- tokens hashed with SHA256.
- """
- hash_algorithm = 'sha256'
- self.config_fixture.config(group='token',
- hash_algorithm=hash_algorithm)
-
- (data, token) = self._fetch_parse_revocation_list()
- token_hash = cms.cms_hash_token(token, mode=hash_algorithm)
- self.assertThat(token_hash, matchers.Equals(data['revoked'][0]['id']))
-
- def test_create_update_user_invalid_enabled_type(self):
- # Enforce usage of boolean for 'enabled' field
- token = self.get_scoped_token()
-
- # Test CREATE request
- r = self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- # In JSON, "true|false" are not boolean
- 'enabled': "true",
- },
- },
- token=token,
- expected_status=http_client.BAD_REQUEST)
- self.assertValidErrorResponse(r)
-
- # Test UPDATE request
- r = self.admin_request(
- method='PUT',
- path='/v2.0/users/%(user_id)s' % {
- 'user_id': self.user_foo['id'],
- },
- body={
- 'user': {
- # In JSON, "true|false" are not boolean
- 'enabled': "true",
- },
- },
- token=token,
- expected_status=http_client.BAD_REQUEST)
- self.assertValidErrorResponse(r)
-
- def test_authenticating_a_user_with_an_OSKSADM_password(self):
- token = self.get_scoped_token()
-
- username = uuid.uuid4().hex
- password = uuid.uuid4().hex
-
- # create the user
- r = self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': username,
- 'OS-KSADM:password': password,
- 'enabled': True,
- },
- },
- token=token)
-
- # successfully authenticate
- self.public_request(
- method='POST',
- path='/v2.0/tokens',
- body={
- 'auth': {
- 'passwordCredentials': {
- 'username': username,
- 'password': password,
- },
- },
- },
- expected_status=http_client.OK)
-
- # ensure password doesn't leak
- user_id = r.result['user']['id']
- r = self.admin_request(
- method='GET',
- path='/v2.0/users/%s' % user_id,
- token=token,
- expected_status=http_client.OK)
- self.assertNotIn('OS-KSADM:password', r.result['user'])
-
- def test_updating_a_user_with_an_OSKSADM_password(self):
- token = self.get_scoped_token()
-
- user_id = self.user_foo['id']
- password = uuid.uuid4().hex
-
- # update the user
- self.admin_request(
- method='PUT',
- path='/v2.0/users/%s/OS-KSADM/password' % user_id,
- body={
- 'user': {
- 'password': password,
- },
- },
- token=token,
- expected_status=http_client.OK)
-
- # successfully authenticate
- self.public_request(
- method='POST',
- path='/v2.0/tokens',
- body={
- 'auth': {
- 'passwordCredentials': {
- 'username': self.user_foo['name'],
- 'password': password,
- },
- },
- },
- expected_status=http_client.OK)
-
-
-class RevokeApiTestCase(V2TestCase):
- def config_overrides(self):
- super(RevokeApiTestCase, self).config_overrides()
- self.config_fixture.config(
- group='token',
- provider='pki',
- revoke_by_id=False)
-
- def test_fetch_revocation_list_admin_200(self):
- self.skipTest('Revoke API disables revocation_list.')
-
- def test_fetch_revocation_list_md5(self):
- self.skipTest('Revoke API disables revocation_list.')
-
- def test_fetch_revocation_list_sha256(self):
- self.skipTest('Revoke API disables revocation_list.')
-
-
-class TestFernetTokenProviderV2(RestfulTestCase):
-
- def setUp(self):
- super(TestFernetTokenProviderV2, self).setUp()
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
-
- # Add catalog data
- self.region = unit.new_region_ref()
- self.region_id = self.region['id']
- self.catalog_api.create_region(self.region)
-
- self.service = unit.new_service_ref()
- self.service_id = self.service['id']
- self.catalog_api.create_service(self.service_id, self.service)
-
- self.endpoint = unit.new_endpoint_ref(service_id=self.service_id,
- interface='public',
- region_id=self.region_id)
- self.endpoint_id = self.endpoint['id']
- self.catalog_api.create_endpoint(self.endpoint_id, self.endpoint)
-
- def assertValidUnscopedTokenResponse(self, r):
- v2.unscoped_validator.validate(r.json['access'])
-
- def assertValidScopedTokenResponse(self, r):
- v2.scoped_validator.validate(r.json['access'])
-
- # Used by RestfulTestCase
- def _get_token_id(self, r):
- return r.result['access']['token']['id']
-
- def new_project_ref(self):
- return {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- 'domain_id': 'default',
- 'enabled': True}
-
- def config_overrides(self):
- super(TestFernetTokenProviderV2, self).config_overrides()
- self.config_fixture.config(group='token', provider='fernet')
-
- def test_authenticate_unscoped_token(self):
- unscoped_token = self.get_unscoped_token()
- # Fernet token must be of length 255 per usability requirements
- self.assertLess(len(unscoped_token), 255)
-
- def test_validate_unscoped_token(self):
- # Grab an admin token to validate with
- project_ref = self.new_project_ref()
- self.resource_api.create_project(project_ref['id'], project_ref)
- self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
- project_ref['id'],
- self.role_admin['id'])
- admin_token = self.get_scoped_token(tenant_id=project_ref['id'])
- unscoped_token = self.get_unscoped_token()
- path = ('/v2.0/tokens/%s' % unscoped_token)
- resp = self.admin_request(
- method='GET',
- path=path,
- token=admin_token,
- expected_status=http_client.OK)
- self.assertValidUnscopedTokenResponse(resp)
-
- def test_authenticate_scoped_token(self):
- project_ref = self.new_project_ref()
- self.resource_api.create_project(project_ref['id'], project_ref)
- self.assignment_api.add_role_to_user_and_project(
- self.user_foo['id'], project_ref['id'], self.role_service['id'])
- token = self.get_scoped_token(tenant_id=project_ref['id'])
- # Fernet token must be of length 255 per usability requirements
- self.assertLess(len(token), 255)
-
- def test_validate_scoped_token(self):
- project_ref = self.new_project_ref()
- self.resource_api.create_project(project_ref['id'], project_ref)
- self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
- project_ref['id'],
- self.role_admin['id'])
- project2_ref = self.new_project_ref()
- self.resource_api.create_project(project2_ref['id'], project2_ref)
- self.assignment_api.add_role_to_user_and_project(
- self.user_foo['id'], project2_ref['id'], self.role_member['id'])
- admin_token = self.get_scoped_token(tenant_id=project_ref['id'])
- member_token = self.get_scoped_token(tenant_id=project2_ref['id'])
- path = ('/v2.0/tokens/%s?belongsTo=%s' % (member_token,
- project2_ref['id']))
- # Validate token belongs to project
- resp = self.admin_request(
- method='GET',
- path=path,
- token=admin_token,
- expected_status=http_client.OK)
- self.assertValidScopedTokenResponse(resp)
-
- def test_token_authentication_and_validation(self):
- """Test token authentication for Fernet token provider.
-
- Verify that token authentication returns validate response code and
- valid token belongs to project.
- """
- project_ref = self.new_project_ref()
- self.resource_api.create_project(project_ref['id'], project_ref)
- unscoped_token = self.get_unscoped_token()
- self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
- project_ref['id'],
- self.role_admin['id'])
- r = self.public_request(
- method='POST',
- path='/v2.0/tokens',
- body={
- 'auth': {
- 'tenantName': project_ref['name'],
- 'token': {
- 'id': unscoped_token.encode('ascii')
- }
- }
- },
- expected_status=http_client.OK)
-
- token_id = self._get_token_id(r)
- path = ('/v2.0/tokens/%s?belongsTo=%s' % (token_id, project_ref['id']))
- # Validate token belongs to project
- resp = self.admin_request(
- method='GET',
- path=path,
- token=self.get_admin_token(),
- expected_status=http_client.OK)
- self.assertValidScopedTokenResponse(resp)
-
- def test_rescoped_tokens_maintain_original_expiration(self):
- project_ref = self.new_project_ref()
- self.resource_api.create_project(project_ref['id'], project_ref)
- self.assignment_api.add_role_to_user_and_project(self.user_foo['id'],
- project_ref['id'],
- self.role_admin['id'])
- resp = self.public_request(
- method='POST',
- path='/v2.0/tokens',
- body={
- 'auth': {
- 'tenantName': project_ref['name'],
- 'passwordCredentials': {
- 'username': self.user_foo['name'],
- 'password': self.user_foo['password']
- }
- }
- },
- # NOTE(lbragstad): This test may need to be refactored if Keystone
- # decides to disallow rescoping using a scoped token.
- expected_status=http_client.OK)
- original_token = resp.result['access']['token']['id']
- original_expiration = resp.result['access']['token']['expires']
-
- resp = self.public_request(
- method='POST',
- path='/v2.0/tokens',
- body={
- 'auth': {
- 'tenantName': project_ref['name'],
- 'token': {
- 'id': original_token,
- }
- }
- },
- expected_status=http_client.OK)
- rescoped_token = resp.result['access']['token']['id']
- rescoped_expiration = resp.result['access']['token']['expires']
- self.assertNotEqual(original_token, rescoped_token)
- self.assertEqual(original_expiration, rescoped_expiration)
- self.assertValidScopedTokenResponse(resp)