path: root/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
diff options
authorRHE <rebirthmonkey@gmail.com>2017-11-24 13:54:26 +0100
committerRHE <rebirthmonkey@gmail.com>2017-11-24 13:54:26 +0100
commit920a49cfa055733d575282973e23558c33087a4a (patch)
treed371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
parentef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff)
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py')
1 files changed, 0 insertions, 1376 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py b/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
deleted file mode 100644
index 2a3fad86..00000000
--- a/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
+++ /dev/null
@@ -1,1376 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import datetime
-import uuid
-from keystoneclient.contrib.ec2 import utils as ec2_utils
-from keystoneclient import exceptions as client_exceptions
-from keystoneclient.v2_0 import client as ks_client
-import mock
-from oslo_config import cfg
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-from six.moves import http_client
-from six.moves import range
-import webob
-from keystone.tests import unit
-from keystone.tests.unit import default_fixtures
-from keystone.tests.unit.ksfixtures import appserver
-from keystone.tests.unit.ksfixtures import database
-CONF = cfg.CONF
-DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
-class ClientDrivenTestCase(unit.TestCase):
- def config_files(self):
- config_files = super(ClientDrivenTestCase, self).config_files()
- config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
- return config_files
- def setUp(self):
- super(ClientDrivenTestCase, self).setUp()
- # FIXME(morganfainberg): Since we are running tests through the
- # controllers and some internal api drivers are SQL-only, the correct
- # approach is to ensure we have the correct backing store. The
- # credential api makes some very SQL specific assumptions that should
- # be addressed allowing for non-SQL based testing to occur.
- self.useFixture(database.Database())
- self.load_backends()
- self.load_fixtures(default_fixtures)
- # TODO(termie): add an admin user to the fixtures and use that user
- # override the fixtures, for now
- self.assignment_api.add_role_to_user_and_project(
- self.user_foo['id'],
- self.tenant_bar['id'],
- self.role_admin['id'])
- conf = self._paste_config('keystone')
- fixture = self.useFixture(appserver.AppServer(conf, appserver.MAIN))
- self.public_server = fixture.server
- fixture = self.useFixture(appserver.AppServer(conf, appserver.ADMIN))
- self.admin_server = fixture.server
- self.default_client = self.get_client()
- self.addCleanup(self.cleanup_instance('public_server', 'admin_server',
- 'default_client'))
- def _public_url(self):
- public_port = self.public_server.socket_info['socket'][1]
- return "http://localhost:%s/v2.0" % public_port
- def _admin_url(self):
- admin_port = self.admin_server.socket_info['socket'][1]
- return "http://localhost:%s/v2.0" % admin_port
- def _client(self, admin=False, **kwargs):
- url = self._admin_url() if admin else self._public_url()
- kc = ks_client.Client(endpoint=url,
- auth_url=self._public_url(),
- **kwargs)
- kc.authenticate()
- # have to manually overwrite the management url after authentication
- kc.management_url = url
- return kc
- def get_client(self, user_ref=None, tenant_ref=None, admin=False):
- if user_ref is None:
- user_ref = self.user_foo
- if tenant_ref is None:
- for user in default_fixtures.USERS:
- # The fixture ID is no longer used as the ID in the database
- # The fixture ID, however, is still used as part of the
- # attribute name when storing the created object on the test
- # case. This means that we need to use the fixture ID below to
- # find the actial object so that we can get the ID as stored
- # in the database to compare against.
- if (getattr(self, 'user_%s' % user['id'])['id'] ==
- user_ref['id']):
- tenant_id = user['tenants'][0]
- else:
- tenant_id = tenant_ref['id']
- return self._client(username=user_ref['name'],
- password=user_ref['password'],
- tenant_id=tenant_id,
- admin=admin)
- def test_authenticate_tenant_name_and_tenants(self):
- client = self.get_client()
- tenants = client.tenants.list()
- self.assertEqual(self.tenant_bar['id'], tenants[0].id)
- def test_authenticate_tenant_id_and_tenants(self):
- client = self._client(username=self.user_foo['name'],
- password=self.user_foo['password'],
- tenant_id='bar')
- tenants = client.tenants.list()
- self.assertEqual(self.tenant_bar['id'], tenants[0].id)
- def test_authenticate_invalid_tenant_id(self):
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=self.user_foo['name'],
- password=self.user_foo['password'],
- tenant_id='baz')
- def test_authenticate_token_no_tenant(self):
- client = self.get_client()
- token = client.auth_token
- token_client = self._client(token=token)
- tenants = token_client.tenants.list()
- self.assertEqual(self.tenant_bar['id'], tenants[0].id)
- def test_authenticate_token_tenant_id(self):
- client = self.get_client()
- token = client.auth_token
- token_client = self._client(token=token, tenant_id='bar')
- tenants = token_client.tenants.list()
- self.assertEqual(self.tenant_bar['id'], tenants[0].id)
- def test_authenticate_token_invalid_tenant_id(self):
- client = self.get_client()
- token = client.auth_token
- self.assertRaises(client_exceptions.Unauthorized,
- self._client, token=token,
- tenant_id=uuid.uuid4().hex)
- def test_authenticate_token_invalid_tenant_name(self):
- client = self.get_client()
- token = client.auth_token
- self.assertRaises(client_exceptions.Unauthorized,
- self._client, token=token,
- tenant_name=uuid.uuid4().hex)
- def test_authenticate_token_tenant_name(self):
- client = self.get_client()
- token = client.auth_token
- token_client = self._client(token=token, tenant_name='BAR')
- tenants = token_client.tenants.list()
- self.assertEqual(self.tenant_bar['id'], tenants[0].id)
- self.assertEqual(self.tenant_bar['id'], tenants[0].id)
- def test_authenticate_and_delete_token(self):
- client = self.get_client(admin=True)
- token = client.auth_token
- token_client = self._client(token=token)
- tenants = token_client.tenants.list()
- self.assertEqual(self.tenant_bar['id'], tenants[0].id)
- client.tokens.delete(token_client.auth_token)
- self.assertRaises(client_exceptions.Unauthorized,
- token_client.tenants.list)
- def test_authenticate_no_password(self):
- user_ref = self.user_foo.copy()
- user_ref['password'] = None
- self.assertRaises(client_exceptions.AuthorizationFailure,
- self.get_client,
- user_ref)
- def test_authenticate_no_username(self):
- user_ref = self.user_foo.copy()
- user_ref['name'] = None
- self.assertRaises(client_exceptions.AuthorizationFailure,
- self.get_client,
- user_ref)
- def test_authenticate_disabled_tenant(self):
- admin_client = self.get_client(admin=True)
- tenant = {
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- 'enabled': False,
- }
- tenant_ref = admin_client.tenants.create(
- tenant_name=tenant['name'],
- description=tenant['description'],
- enabled=tenant['enabled'])
- tenant['id'] = tenant_ref.id
- user = {
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- 'email': uuid.uuid4().hex,
- 'tenant_id': tenant['id'],
- }
- user_ref = admin_client.users.create(
- name=user['name'],
- password=user['password'],
- email=user['email'],
- tenant_id=user['tenant_id'])
- user['id'] = user_ref.id
- # password authentication
- self.assertRaises(
- client_exceptions.Unauthorized,
- self._client,
- username=user['name'],
- password=user['password'],
- tenant_id=tenant['id'])
- # token authentication
- client = self._client(
- username=user['name'],
- password=user['password'])
- self.assertRaises(
- client_exceptions.Unauthorized,
- self._client,
- token=client.auth_token,
- tenant_id=tenant['id'])
- # FIXME(ja): this test should require the "keystone:admin" roled
- # (probably the role set via --keystone_admin_role flag)
- # FIXME(ja): add a test that admin endpoint is only sent to admin user
- # FIXME(ja): add a test that admin endpoint returns unauthorized if not
- # admin
- def test_tenant_create_update_and_delete(self):
- tenant_name = 'original_tenant'
- tenant_description = 'My original tenant!'
- tenant_enabled = True
- client = self.get_client(admin=True)
- # create, get, and list a tenant
- tenant = client.tenants.create(tenant_name=tenant_name,
- description=tenant_description,
- enabled=tenant_enabled)
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertEqual(tenant_enabled, tenant.enabled)
- tenant = client.tenants.get(tenant_id=tenant.id)
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertEqual(tenant_enabled, tenant.enabled)
- tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertEqual(tenant_enabled, tenant.enabled)
- # update, get, and list a tenant
- tenant_name = 'updated_tenant'
- tenant_description = 'Updated tenant!'
- tenant_enabled = False
- tenant = client.tenants.update(tenant_id=tenant.id,
- tenant_name=tenant_name,
- enabled=tenant_enabled,
- description=tenant_description)
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertEqual(tenant_enabled, tenant.enabled)
- tenant = client.tenants.get(tenant_id=tenant.id)
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertEqual(tenant_enabled, tenant.enabled)
- tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertEqual(tenant_enabled, tenant.enabled)
- # delete, get, and list a tenant
- client.tenants.delete(tenant=tenant.id)
- self.assertRaises(client_exceptions.NotFound, client.tenants.get,
- tenant.id)
- self.assertFalse([t for t in client.tenants.list()
- if t.id == tenant.id])
- def test_tenant_create_update_and_delete_unicode(self):
- tenant_name = u'original \u540d\u5b57'
- tenant_description = 'My original tenant!'
- tenant_enabled = True
- client = self.get_client(admin=True)
- # create, get, and list a tenant
- tenant = client.tenants.create(tenant_name,
- description=tenant_description,
- enabled=tenant_enabled)
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertIs(tenant.enabled, tenant_enabled)
- tenant = client.tenants.get(tenant.id)
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertIs(tenant.enabled, tenant_enabled)
- # multiple tenants exist due to fixtures, so find the one we're testing
- tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertIs(tenant.enabled, tenant_enabled)
- # update, get, and list a tenant
- tenant_name = u'updated \u540d\u5b57'
- tenant_description = 'Updated tenant!'
- tenant_enabled = False
- tenant = client.tenants.update(tenant.id,
- tenant_name=tenant_name,
- enabled=tenant_enabled,
- description=tenant_description)
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertIs(tenant.enabled, tenant_enabled)
- tenant = client.tenants.get(tenant.id)
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertIs(tenant.enabled, tenant_enabled)
- tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()
- self.assertEqual(tenant_name, tenant.name)
- self.assertEqual(tenant_description, tenant.description)
- self.assertIs(tenant.enabled, tenant_enabled)
- # delete, get, and list a tenant
- client.tenants.delete(tenant.id)
- self.assertRaises(client_exceptions.NotFound, client.tenants.get,
- tenant.id)
- self.assertFalse([t for t in client.tenants.list()
- if t.id == tenant.id])
- def test_tenant_create_no_name(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.BadRequest,
- client.tenants.create,
- tenant_name="")
- def test_tenant_delete_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.tenants.delete,
- tenant=uuid.uuid4().hex)
- def test_tenant_get_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.tenants.get,
- tenant_id=uuid.uuid4().hex)
- def test_tenant_update_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.tenants.update,
- tenant_id=uuid.uuid4().hex)
- def test_tenant_list(self):
- client = self.get_client()
- tenants = client.tenants.list()
- self.assertEqual(1, len(tenants))
- # Admin endpoint should return *all* tenants
- client = self.get_client(admin=True)
- tenants = client.tenants.list()
- self.assertEqual(len(default_fixtures.TENANTS), len(tenants))
- def test_invalid_password(self):
- good_client = self._client(username=self.user_foo['name'],
- password=self.user_foo['password'])
- good_client.tenants.list()
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=self.user_foo['name'],
- password=uuid.uuid4().hex)
- def test_invalid_user_and_password(self):
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=uuid.uuid4().hex,
- password=uuid.uuid4().hex)
- def test_change_password_invalidates_token(self):
- admin_client = self.get_client(admin=True)
- username = uuid.uuid4().hex
- password = uuid.uuid4().hex
- user = admin_client.users.create(name=username, password=password,
- email=uuid.uuid4().hex)
- # auth as user should work before a password change
- client = self._client(username=username, password=password)
- # auth as user with a token should work before a password change
- self._client(token=client.auth_token)
- # administrative password reset
- admin_client.users.update_password(
- user=user.id,
- password=uuid.uuid4().hex)
- # auth as user with original password should not work after change
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=username,
- password=password)
- # authenticate with an old token should not work after change
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- token=client.auth_token)
- def test_user_change_own_password_invalidates_token(self):
- # bootstrap a user as admin
- client = self.get_client(admin=True)
- username = uuid.uuid4().hex
- password = uuid.uuid4().hex
- client.users.create(name=username, password=password,
- email=uuid.uuid4().hex)
- # auth as user should work before a password change
- client = self._client(username=username, password=password)
- # auth as user with a token should work before a password change
- self._client(token=client.auth_token)
- # change the user's own password
- # TODO(dolphm): This should NOT raise an HTTPError at all, but rather
- # this should succeed with a 2xx. This 500 does not prevent the test
- # from demonstrating the desired consequences below, though.
- self.assertRaises(client_exceptions.HTTPError,
- client.users.update_own_password,
- password, uuid.uuid4().hex)
- # auth as user with original password should not work after change
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=username,
- password=password)
- # auth as user with an old token should not work after change
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- token=client.auth_token)
- def test_disable_tenant_invalidates_token(self):
- admin_client = self.get_client(admin=True)
- foo_client = self.get_client(self.user_foo)
- tenant_bar = admin_client.tenants.get(self.tenant_bar['id'])
- # Disable the tenant.
- tenant_bar.update(enabled=False)
- # Test that the token has been removed.
- self.assertRaises(client_exceptions.Unauthorized,
- foo_client.tokens.authenticate,
- token=foo_client.auth_token)
- # Test that the user access has been disabled.
- self.assertRaises(client_exceptions.Unauthorized,
- self.get_client,
- self.user_foo)
- def test_delete_tenant_invalidates_token(self):
- admin_client = self.get_client(admin=True)
- foo_client = self.get_client(self.user_foo)
- tenant_bar = admin_client.tenants.get(self.tenant_bar['id'])
- # Delete the tenant.
- tenant_bar.delete()
- # Test that the token has been removed.
- self.assertRaises(client_exceptions.Unauthorized,
- foo_client.tokens.authenticate,
- token=foo_client.auth_token)
- # Test that the user access has been disabled.
- self.assertRaises(client_exceptions.Unauthorized,
- self.get_client,
- self.user_foo)
- def test_disable_user_invalidates_token(self):
- admin_client = self.get_client(admin=True)
- foo_client = self.get_client(self.user_foo)
- admin_client.users.update_enabled(user=self.user_foo['id'],
- enabled=False)
- self.assertRaises(client_exceptions.Unauthorized,
- foo_client.tokens.authenticate,
- token=foo_client.auth_token)
- self.assertRaises(client_exceptions.Unauthorized,
- self.get_client,
- self.user_foo)
- def test_delete_user_invalidates_token(self):
- admin_client = self.get_client(admin=True)
- client = self.get_client(admin=False)
- username = uuid.uuid4().hex
- password = uuid.uuid4().hex
- user_id = admin_client.users.create(
- name=username, password=password, email=uuid.uuid4().hex).id
- token_id = client.tokens.authenticate(
- username=username, password=password).id
- # token should be usable before the user is deleted
- client.tokens.authenticate(token=token_id)
- admin_client.users.delete(user=user_id)
- # authenticate with a token should not work after the user is deleted
- self.assertRaises(client_exceptions.Unauthorized,
- client.tokens.authenticate,
- token=token_id)
- @mock.patch.object(timeutils, 'utcnow')
- def test_token_expiry_maintained(self, mock_utcnow):
- now = datetime.datetime.utcnow()
- mock_utcnow.return_value = now
- foo_client = self.get_client(self.user_foo)
- orig_token = foo_client.service_catalog.catalog['token']
- mock_utcnow.return_value = now + datetime.timedelta(seconds=1)
- reauthenticated_token = foo_client.tokens.authenticate(
- token=foo_client.auth_token)
- self.assertCloseEnoughForGovernmentWork(
- timeutils.parse_isotime(orig_token['expires']),
- timeutils.parse_isotime(reauthenticated_token.expires))
- def test_user_create_update_delete(self):
- test_username = 'new_user'
- client = self.get_client(admin=True)
- user = client.users.create(name=test_username,
- password='password',
- email='user1@test.com')
- self.assertEqual(test_username, user.name)
- user = client.users.get(user=user.id)
- self.assertEqual(test_username, user.name)
- user = client.users.update(user=user,
- name=test_username,
- email='user2@test.com')
- self.assertEqual('user2@test.com', user.email)
- # NOTE(termie): update_enabled doesn't return anything, probably a bug
- client.users.update_enabled(user=user, enabled=False)
- user = client.users.get(user.id)
- self.assertFalse(user.enabled)
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=test_username,
- password='password')
- client.users.update_enabled(user, True)
- user = client.users.update_password(user=user, password='password2')
- self._client(username=test_username,
- password='password2')
- user = client.users.update_tenant(user=user, tenant='bar')
- # TODO(ja): once keystonelight supports default tenant
- # when you login without specifying tenant, the
- # token should be scoped to tenant 'bar'
- client.users.delete(user.id)
- self.assertRaises(client_exceptions.NotFound, client.users.get,
- user.id)
- # Test creating a user with a tenant (auto-add to tenant)
- user2 = client.users.create(name=test_username,
- password='password',
- email='user1@test.com',
- tenant_id='bar')
- self.assertEqual(test_username, user2.name)
- def test_update_default_tenant_to_existing_value(self):
- client = self.get_client(admin=True)
- user = client.users.create(
- name=uuid.uuid4().hex,
- password=uuid.uuid4().hex,
- email=uuid.uuid4().hex,
- tenant_id=self.tenant_bar['id'])
- # attempting to update the tenant with the existing value should work
- user = client.users.update_tenant(
- user=user, tenant=self.tenant_bar['id'])
- def test_user_create_no_string_password(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.BadRequest,
- client.users.create,
- name='test_user',
- password=12345,
- email=uuid.uuid4().hex)
- def test_user_create_no_name(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.BadRequest,
- client.users.create,
- name="",
- password=uuid.uuid4().hex,
- email=uuid.uuid4().hex)
- def test_user_create_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.create,
- name=uuid.uuid4().hex,
- password=uuid.uuid4().hex,
- email=uuid.uuid4().hex,
- tenant_id=uuid.uuid4().hex)
- def test_user_get_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.get,
- user=uuid.uuid4().hex)
- def test_user_list_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.list,
- tenant_id=uuid.uuid4().hex)
- def test_user_update_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.update,
- user=uuid.uuid4().hex)
- def test_user_update_tenant(self):
- client = self.get_client(admin=True)
- tenant_id = uuid.uuid4().hex
- user = client.users.update(user=self.user_foo['id'],
- tenant_id=tenant_id)
- self.assertEqual(tenant_id, user.tenant_id)
- def test_user_update_password_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.update_password,
- user=uuid.uuid4().hex,
- password=uuid.uuid4().hex)
- def test_user_delete_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.delete,
- user=uuid.uuid4().hex)
- def test_user_list(self):
- client = self.get_client(admin=True)
- users = client.users.list()
- self.assertTrue(len(users) > 0)
- user = users[0]
- self.assertRaises(AttributeError, lambda: user.password)
- def test_user_get(self):
- client = self.get_client(admin=True)
- user = client.users.get(user=self.user_foo['id'])
- self.assertRaises(AttributeError, lambda: user.password)
- def test_role_get(self):
- client = self.get_client(admin=True)
- role = client.roles.get(role=self.role_admin['id'])
- self.assertEqual(self.role_admin['id'], role.id)
- def test_role_crud(self):
- test_role = 'new_role'
- client = self.get_client(admin=True)
- role = client.roles.create(name=test_role)
- self.assertEqual(test_role, role.name)
- role = client.roles.get(role=role.id)
- self.assertEqual(test_role, role.name)
- client.roles.delete(role=role.id)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.delete,
- role=role.id)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.get,
- role=role.id)
- def test_role_create_no_name(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.BadRequest,
- client.roles.create,
- name="")
- def test_role_create_member_role(self):
- # delete the member role so that we can recreate it
- client = self.get_client(admin=True)
- client.roles.delete(role=CONF.member_role_id)
- # deleting the member role revokes our token, so re-authenticate
- client = self.get_client(admin=True)
- # specify only the role name on creation
- role = client.roles.create(name=CONF.member_role_name)
- # the ID should be set as defined in CONF
- self.assertEqual(CONF.member_role_id, role.id)
- def test_role_get_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.get,
- role=uuid.uuid4().hex)
- def test_role_delete_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.delete,
- role=uuid.uuid4().hex)
- def test_role_list_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.roles_for_user,
- user=uuid.uuid4().hex,
- tenant=uuid.uuid4().hex)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.roles_for_user,
- user=self.user_foo['id'],
- tenant=uuid.uuid4().hex)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.roles_for_user,
- user=uuid.uuid4().hex,
- tenant=self.tenant_bar['id'])
- def test_role_list(self):
- client = self.get_client(admin=True)
- roles = client.roles.list()
- # TODO(devcamcar): This assert should be more specific.
- self.assertTrue(len(roles) > 0)
- def test_service_crud(self):
- client = self.get_client(admin=True)
- service_name = uuid.uuid4().hex
- service_type = uuid.uuid4().hex
- service_desc = uuid.uuid4().hex
- # create & read
- service = client.services.create(name=service_name,
- service_type=service_type,
- description=service_desc)
- self.assertEqual(service_name, service.name)
- self.assertEqual(service_type, service.type)
- self.assertEqual(service_desc, service.description)
- service = client.services.get(id=service.id)
- self.assertEqual(service_name, service.name)
- self.assertEqual(service_type, service.type)
- self.assertEqual(service_desc, service.description)
- service = [x for x in client.services.list() if x.id == service.id][0]
- self.assertEqual(service_name, service.name)
- self.assertEqual(service_type, service.type)
- self.assertEqual(service_desc, service.description)
- # update is not supported in API v2...
- # delete & read
- client.services.delete(id=service.id)
- self.assertRaises(client_exceptions.NotFound,
- client.services.get,
- id=service.id)
- services = [x for x in client.services.list() if x.id == service.id]
- self.assertEqual(0, len(services))
- def test_service_delete_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.services.delete,
- id=uuid.uuid4().hex)
- def test_service_get_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.services.get,
- id=uuid.uuid4().hex)
- def test_endpoint_delete_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.endpoints.delete,
- id=uuid.uuid4().hex)
- def test_admin_requires_adminness(self):
- # FIXME(ja): this should be Unauthorized
- exception = client_exceptions.ClientException
- two = self.get_client(self.user_two, admin=True) # non-admin user
- self.assertRaises(exception,
- two.users.list)
- self.assertRaises(exception,
- two.users.get,
- user=self.user_two['id'])
- self.assertRaises(exception,
- two.users.create,
- name='oops',
- password='password',
- email='oops@test.com')
- self.assertRaises(exception,
- two.users.delete,
- user=self.user_foo['id'])
- self.assertRaises(exception,
- two.tenants.list)
- self.assertRaises(exception,
- two.tenants.get,
- tenant_id=self.tenant_bar['id'])
- self.assertRaises(exception,
- two.tenants.create,
- tenant_name='oops',
- description="shouldn't work!",
- enabled=True)
- self.assertRaises(exception,
- two.tenants.delete,
- tenant=self.tenant_baz['id'])
- self.assertRaises(exception,
- two.roles.get,
- role=self.role_admin['id'])
- self.assertRaises(exception,
- two.roles.list)
- self.assertRaises(exception,
- two.roles.create,
- name='oops')
- self.assertRaises(exception,
- two.roles.delete,
- role=self.role_admin['id'])
- # TODO(ja): determine what else todo
- def test_tenant_add_and_remove_user(self):
- client = self.get_client(admin=True)
- client.roles.add_user_role(tenant=self.tenant_bar['id'],
- user=self.user_two['id'],
- role=self.role_other['id'])
- user_refs = client.tenants.list_users(tenant=self.tenant_bar['id'])
- self.assertIn(self.user_two['id'], [x.id for x in user_refs])
- client.roles.remove_user_role(tenant=self.tenant_bar['id'],
- user=self.user_two['id'],
- role=self.role_other['id'])
- roles = client.roles.roles_for_user(user=self.user_foo['id'],
- tenant=self.tenant_bar['id'])
- self.assertNotIn(self.role_other['id'], roles)
- user_refs = client.tenants.list_users(tenant=self.tenant_bar['id'])
- self.assertNotIn(self.user_two['id'], [x.id for x in user_refs])
- def test_user_role_add_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.add_user_role,
- tenant=uuid.uuid4().hex,
- user=self.user_foo['id'],
- role=self.role_member['id'])
- self.assertRaises(client_exceptions.NotFound,
- client.roles.add_user_role,
- tenant=self.tenant_baz['id'],
- user=self.user_foo['id'],
- role=uuid.uuid4().hex)
- def test_user_role_add_no_user(self):
- # If add_user_role and user doesn't exist, doesn't fail.
- client = self.get_client(admin=True)
- client.roles.add_user_role(tenant=self.tenant_baz['id'],
- user=uuid.uuid4().hex,
- role=self.role_member['id'])
- def test_user_role_remove_404(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.remove_user_role,
- tenant=uuid.uuid4().hex,
- user=self.user_foo['id'],
- role=self.role_member['id'])
- self.assertRaises(client_exceptions.NotFound,
- client.roles.remove_user_role,
- tenant=self.tenant_baz['id'],
- user=uuid.uuid4().hex,
- role=self.role_member['id'])
- self.assertRaises(client_exceptions.NotFound,
- client.roles.remove_user_role,
- tenant=self.tenant_baz['id'],
- user=self.user_foo['id'],
- role=uuid.uuid4().hex)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.remove_user_role,
- tenant=self.tenant_baz['id'],
- user=self.user_foo['id'],
- role=self.role_member['id'])
- def test_tenant_list_marker(self):
- client = self.get_client()
- # Add two arbitrary tenants to user for testing purposes
- for i in range(2):
- tenant_id = uuid.uuid4().hex
- tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id,
- 'domain_id': DEFAULT_DOMAIN_ID}
- self.resource_api.create_project(tenant_id, tenant)
- self.assignment_api.add_user_to_project(tenant_id,
- self.user_foo['id'])
- tenants = client.tenants.list()
- self.assertEqual(3, len(tenants))
- tenants_marker = client.tenants.list(marker=tenants[0].id)
- self.assertEqual(2, len(tenants_marker))
- self.assertEqual(tenants_marker[0].name, tenants[1].name)
- self.assertEqual(tenants_marker[1].name, tenants[2].name)
- def test_tenant_list_marker_not_found(self):
- client = self.get_client()
- self.assertRaises(client_exceptions.BadRequest,
- client.tenants.list, marker=uuid.uuid4().hex)
- def test_tenant_list_limit(self):
- client = self.get_client()
- # Add two arbitrary tenants to user for testing purposes
- for i in range(2):
- tenant_id = uuid.uuid4().hex
- tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id,
- 'domain_id': DEFAULT_DOMAIN_ID}
- self.resource_api.create_project(tenant_id, tenant)
- self.assignment_api.add_user_to_project(tenant_id,
- self.user_foo['id'])
- tenants = client.tenants.list()
- self.assertEqual(3, len(tenants))
- tenants_limited = client.tenants.list(limit=2)
- self.assertEqual(2, len(tenants_limited))
- self.assertEqual(tenants[0].name, tenants_limited[0].name)
- self.assertEqual(tenants[1].name, tenants_limited[1].name)
- def test_tenant_list_limit_bad_value(self):
- client = self.get_client()
- self.assertRaises(client_exceptions.BadRequest,
- client.tenants.list, limit='a')
- self.assertRaises(client_exceptions.BadRequest,
- client.tenants.list, limit=-1)
- def test_roles_get_by_user(self):
- client = self.get_client(admin=True)
- roles = client.roles.roles_for_user(user=self.user_foo['id'],
- tenant=self.tenant_bar['id'])
- self.assertTrue(len(roles) > 0)
- def test_user_can_update_passwd(self):
- client = self.get_client(self.user_two)
- token_id = client.auth_token
- new_password = uuid.uuid4().hex
- # TODO(derekh): Update to use keystoneclient when available
- class FakeResponse(object):
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
- responseobject = FakeResponse()
- req = webob.Request.blank(
- '/v2.0/OS-KSCRUD/users/%s' % self.user_two['id'],
- headers={'X-Auth-Token': token_id})
- req.method = 'PATCH'
- req.body = ('{"user":{"password":"%s","original_password":"%s"}}' %
- (new_password, self.user_two['password']))
- self.public_server.application(req.environ,
- responseobject.start_fake_response)
- self.user_two['password'] = new_password
- self.get_client(self.user_two)
- def test_user_cannot_update_other_users_passwd(self):
- client = self.get_client(self.user_two)
- token_id = client.auth_token
- new_password = uuid.uuid4().hex
- # TODO(derekh): Update to use keystoneclient when available
- class FakeResponse(object):
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
- responseobject = FakeResponse()
- req = webob.Request.blank(
- '/v2.0/OS-KSCRUD/users/%s' % self.user_foo['id'],
- headers={'X-Auth-Token': token_id})
- req.method = 'PATCH'
- req.body = ('{"user":{"password":"%s","original_password":"%s"}}' %
- (new_password, self.user_two['password']))
- self.public_server.application(req.environ,
- responseobject.start_fake_response)
- self.assertEqual(http_client.FORBIDDEN,
- responseobject.response_status)
- self.user_two['password'] = new_password
- self.assertRaises(client_exceptions.Unauthorized,
- self.get_client, self.user_two)
- def test_tokens_after_user_update_passwd(self):
- client = self.get_client(self.user_two)
- token_id = client.auth_token
- new_password = uuid.uuid4().hex
- # TODO(derekh): Update to use keystoneclient when available
- class FakeResponse(object):
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
- responseobject = FakeResponse()
- req = webob.Request.blank(
- '/v2.0/OS-KSCRUD/users/%s' % self.user_two['id'],
- headers={'X-Auth-Token': token_id})
- req.method = 'PATCH'
- req.body = ('{"user":{"password":"%s","original_password":"%s"}}' %
- (new_password, self.user_two['password']))
- rv = self.public_server.application(
- req.environ,
- responseobject.start_fake_response)
- response_json = jsonutils.loads(rv.pop())
- new_token_id = response_json['access']['token']['id']
- self.assertRaises(client_exceptions.Unauthorized, client.tenants.list)
- client.auth_token = new_token_id
- client.tenants.list()
- def test_endpoint_crud(self):
- client = self.get_client(admin=True)
- service = client.services.create(name=uuid.uuid4().hex,
- service_type=uuid.uuid4().hex,
- description=uuid.uuid4().hex)
- endpoint_region = uuid.uuid4().hex
- invalid_service_id = uuid.uuid4().hex
- endpoint_publicurl = uuid.uuid4().hex
- endpoint_internalurl = uuid.uuid4().hex
- endpoint_adminurl = uuid.uuid4().hex
- # a non-existent service ID should trigger a 400
- self.assertRaises(client_exceptions.BadRequest,
- client.endpoints.create,
- region=endpoint_region,
- service_id=invalid_service_id,
- publicurl=endpoint_publicurl,
- adminurl=endpoint_adminurl,
- internalurl=endpoint_internalurl)
- endpoint = client.endpoints.create(region=endpoint_region,
- service_id=service.id,
- publicurl=endpoint_publicurl,
- adminurl=endpoint_adminurl,
- internalurl=endpoint_internalurl)
- self.assertEqual(endpoint_region, endpoint.region)
- self.assertEqual(service.id, endpoint.service_id)
- self.assertEqual(endpoint_publicurl, endpoint.publicurl)
- self.assertEqual(endpoint_internalurl, endpoint.internalurl)
- self.assertEqual(endpoint_adminurl, endpoint.adminurl)
- client.endpoints.delete(id=endpoint.id)
- self.assertRaises(client_exceptions.NotFound, client.endpoints.delete,
- id=endpoint.id)
- def _send_ec2_auth_request(self, credentials, client=None):
- if not client:
- client = self.default_client
- url = '%s/ec2tokens' % self.default_client.auth_url
- resp = client.session.request(
- url=url, method='POST',
- json={'credentials': credentials})
- return resp, resp.json()
- def _generate_default_user_ec2_credentials(self):
- cred = self. default_client.ec2.create(
- user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
- return self._generate_user_ec2_credentials(cred.access, cred.secret)
- def _generate_user_ec2_credentials(self, access, secret):
- signer = ec2_utils.Ec2Signer(secret)
- credentials = {'params': {'SignatureVersion': '2'},
- 'access': access,
- 'verb': 'GET',
- 'host': 'localhost',
- 'path': '/service/cloud'}
- signature = signer.generate(credentials)
- return credentials, signature
- def test_ec2_auth_success(self):
- credentials, signature = self._generate_default_user_ec2_credentials()
- credentials['signature'] = signature
- resp, token = self._send_ec2_auth_request(credentials)
- self.assertEqual(200, resp.status_code)
- self.assertIn('access', token)
- def test_ec2_auth_success_trust(self):
- # Add "other" role user_foo and create trust delegating it to user_two
- self.assignment_api.add_role_to_user_and_project(
- self.user_foo['id'],
- self.tenant_bar['id'],
- self.role_other['id'])
- trust_id = 'atrust123'
- trust = {'trustor_user_id': self.user_foo['id'],
- 'trustee_user_id': self.user_two['id'],
- 'project_id': self.tenant_bar['id'],
- 'impersonation': True}
- roles = [self.role_other]
- self.trust_api.create_trust(trust_id, trust, roles)
- # Create a client for user_two, scoped to the trust
- client = self.get_client(self.user_two)
- ret = client.authenticate(trust_id=trust_id,
- tenant_id=self.tenant_bar['id'])
- self.assertTrue(ret)
- self.assertTrue(client.auth_ref.trust_scoped)
- self.assertEqual(trust_id, client.auth_ref.trust_id)
- # Create an ec2 keypair using the trust client impersonating user_foo
- cred = client.ec2.create(user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
- credentials, signature = self._generate_user_ec2_credentials(
- cred.access, cred.secret)
- credentials['signature'] = signature
- resp, token = self._send_ec2_auth_request(credentials)
- self.assertEqual(200, resp.status_code)
- self.assertEqual(trust_id, token['access']['trust']['id'])
- # TODO(shardy) we really want to check the roles and trustee
- # but because of where the stubbing happens we don't seem to
- # hit the necessary code in controllers.py _authenticate_token
- # so although all is OK via a real request, it incorrect in
- # this test..
- def test_ec2_auth_failure(self):
- credentials, signature = self._generate_default_user_ec2_credentials()
- credentials['signature'] = uuid.uuid4().hex
- self.assertRaises(client_exceptions.Unauthorized,
- self._send_ec2_auth_request,
- credentials)
- def test_ec2_credential_crud(self):
- creds = self.default_client.ec2.list(user_id=self.user_foo['id'])
- self.assertEqual([], creds)
- cred = self.default_client.ec2.create(user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
- creds = self.default_client.ec2.list(user_id=self.user_foo['id'])
- self.assertEqual(creds, [cred])
- got = self.default_client.ec2.get(user_id=self.user_foo['id'],
- access=cred.access)
- self.assertEqual(cred, got)
- self.default_client.ec2.delete(user_id=self.user_foo['id'],
- access=cred.access)
- creds = self.default_client.ec2.list(user_id=self.user_foo['id'])
- self.assertEqual([], creds)
- def test_ec2_credential_crud_non_admin(self):
- na_client = self.get_client(self.user_two)
- creds = na_client.ec2.list(user_id=self.user_two['id'])
- self.assertEqual([], creds)
- cred = na_client.ec2.create(user_id=self.user_two['id'],
- tenant_id=self.tenant_baz['id'])
- creds = na_client.ec2.list(user_id=self.user_two['id'])
- self.assertEqual(creds, [cred])
- got = na_client.ec2.get(user_id=self.user_two['id'],
- access=cred.access)
- self.assertEqual(cred, got)
- na_client.ec2.delete(user_id=self.user_two['id'],
- access=cred.access)
- creds = na_client.ec2.list(user_id=self.user_two['id'])
- self.assertEqual([], creds)
- def test_ec2_list_credentials(self):
- cred_1 = self.default_client.ec2.create(
- user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
- cred_2 = self.default_client.ec2.create(
- user_id=self.user_foo['id'],
- tenant_id=self.tenant_service['id'])
- cred_3 = self.default_client.ec2.create(
- user_id=self.user_foo['id'],
- tenant_id=self.tenant_mtu['id'])
- two = self.get_client(self.user_two)
- cred_4 = two.ec2.create(user_id=self.user_two['id'],
- tenant_id=self.tenant_bar['id'])
- creds = self.default_client.ec2.list(user_id=self.user_foo['id'])
- self.assertEqual(3, len(creds))
- self.assertEqual(sorted([cred_1, cred_2, cred_3],
- key=lambda x: x.access),
- sorted(creds, key=lambda x: x.access))
- self.assertNotIn(cred_4, creds)
- def test_ec2_credentials_create_404(self):
- self.assertRaises(client_exceptions.NotFound,
- self.default_client.ec2.create,
- user_id=uuid.uuid4().hex,
- tenant_id=self.tenant_bar['id'])
- self.assertRaises(client_exceptions.NotFound,
- self.default_client.ec2.create,
- user_id=self.user_foo['id'],
- tenant_id=uuid.uuid4().hex)
- def test_ec2_credentials_delete_404(self):
- self.assertRaises(client_exceptions.NotFound,
- self.default_client.ec2.delete,
- user_id=uuid.uuid4().hex,
- access=uuid.uuid4().hex)
- def test_ec2_credentials_get_404(self):
- self.assertRaises(client_exceptions.NotFound,
- self.default_client.ec2.get,
- user_id=uuid.uuid4().hex,
- access=uuid.uuid4().hex)
- def test_ec2_credentials_list_404(self):
- self.assertRaises(client_exceptions.NotFound,
- self.default_client.ec2.list,
- user_id=uuid.uuid4().hex)
- def test_ec2_credentials_list_user_forbidden(self):
- two = self.get_client(self.user_two)
- self.assertRaises(client_exceptions.Forbidden, two.ec2.list,
- user_id=self.user_foo['id'])
- def test_ec2_credentials_get_user_forbidden(self):
- cred = self.default_client.ec2.create(user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
- two = self.get_client(self.user_two)
- self.assertRaises(client_exceptions.Forbidden, two.ec2.get,
- user_id=self.user_foo['id'], access=cred.access)
- self.default_client.ec2.delete(user_id=self.user_foo['id'],
- access=cred.access)
- def test_ec2_credentials_delete_user_forbidden(self):
- cred = self.default_client.ec2.create(user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
- two = self.get_client(self.user_two)
- self.assertRaises(client_exceptions.Forbidden, two.ec2.delete,
- user_id=self.user_foo['id'], access=cred.access)
- self.default_client.ec2.delete(user_id=self.user_foo['id'],
- access=cred.access)
- def test_endpoint_create_nonexistent_service(self):
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.BadRequest,
- client.endpoints.create,
- region=uuid.uuid4().hex,
- service_id=uuid.uuid4().hex,
- publicurl=uuid.uuid4().hex,
- adminurl=uuid.uuid4().hex,
- internalurl=uuid.uuid4().hex)
- def test_policy_crud(self):
- # FIXME(dolph): this test was written prior to the v3 implementation of
- # the client and essentially refers to a non-existent
- # policy manager in the v2 client. this test needs to be
- # moved to a test suite running against the v3 api
- self.skipTest('Written prior to v3 client; needs refactor')
- client = self.get_client(admin=True)
- policy_blob = uuid.uuid4().hex
- policy_type = uuid.uuid4().hex
- service = client.services.create(
- name=uuid.uuid4().hex,
- service_type=uuid.uuid4().hex,
- description=uuid.uuid4().hex)
- endpoint = client.endpoints.create(
- service_id=service.id,
- region=uuid.uuid4().hex,
- adminurl=uuid.uuid4().hex,
- internalurl=uuid.uuid4().hex,
- publicurl=uuid.uuid4().hex)
- # create
- policy = client.policies.create(
- blob=policy_blob,
- type=policy_type,
- endpoint=endpoint.id)
- self.assertEqual(policy_blob, policy.policy)
- self.assertEqual(policy_type, policy.type)
- self.assertEqual(endpoint.id, policy.endpoint_id)
- policy = client.policies.get(policy=policy.id)
- self.assertEqual(policy_blob, policy.policy)
- self.assertEqual(policy_type, policy.type)
- self.assertEqual(endpoint.id, policy.endpoint_id)
- endpoints = [x for x in client.endpoints.list() if x.id == endpoint.id]
- endpoint = endpoints[0]
- self.assertEqual(policy_blob, policy.policy)
- self.assertEqual(policy_type, policy.type)
- self.assertEqual(endpoint.id, policy.endpoint_id)
- # update
- policy_blob = uuid.uuid4().hex
- policy_type = uuid.uuid4().hex
- endpoint = client.endpoints.create(
- service_id=service.id,
- region=uuid.uuid4().hex,
- adminurl=uuid.uuid4().hex,
- internalurl=uuid.uuid4().hex,
- publicurl=uuid.uuid4().hex)
- policy = client.policies.update(
- policy=policy.id,
- blob=policy_blob,
- type=policy_type,
- endpoint=endpoint.id)
- policy = client.policies.get(policy=policy.id)
- self.assertEqual(policy_blob, policy.policy)
- self.assertEqual(policy_type, policy.type)
- self.assertEqual(endpoint.id, policy.endpoint_id)
- # delete
- client.policies.delete(policy=policy.id)
- self.assertRaises(
- client_exceptions.NotFound,
- client.policies.get,
- policy=policy.id)
- policies = [x for x in client.policies.list() if x.id == policy.id]
- self.assertEqual(0, len(policies))