diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py | 1045 |
1 files changed, 1045 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py b/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py new file mode 100644 index 00000000..7abc5bc4 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py @@ -0,0 +1,1045 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import uuid + +from keystoneclient import exceptions as client_exceptions +from keystoneclient.v2_0 import client as ks_client +import mock +from oslo_config import cfg +from oslo_serialization import jsonutils +from oslo_utils import timeutils +import webob + +from keystone.tests import unit as tests +from keystone.tests.unit import default_fixtures +from keystone.tests.unit.ksfixtures import appserver +from keystone.tests.unit.ksfixtures import database + + +CONF = cfg.CONF +DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id + + +class ClientDrivenTestCase(tests.TestCase): + + def setUp(self): + super(ClientDrivenTestCase, self).setUp() + + # FIXME(morganfainberg): Since we are running tests through the + # controllers and some internal api drivers are SQL-only, the correct + # approach is to ensure we have the correct backing store. The + # credential api makes some very SQL specific assumptions that should + # be addressed allowing for non-SQL based testing to occur. + self.useFixture(database.Database()) + self.load_backends() + + self.load_fixtures(default_fixtures) + + # TODO(termie): add an admin user to the fixtures and use that user + # override the fixtures, for now + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], + self.tenant_bar['id'], + self.role_admin['id']) + + conf = self._paste_config('keystone') + fixture = self.useFixture(appserver.AppServer(conf, appserver.MAIN)) + self.public_server = fixture.server + fixture = self.useFixture(appserver.AppServer(conf, appserver.ADMIN)) + self.admin_server = fixture.server + + self.addCleanup(self.cleanup_instance('public_server', 'admin_server')) + + def _public_url(self): + public_port = self.public_server.socket_info['socket'][1] + return "http://localhost:%s/v2.0" % public_port + + def _admin_url(self): + admin_port = self.admin_server.socket_info['socket'][1] + return "http://localhost:%s/v2.0" % admin_port + + def _client(self, admin=False, **kwargs): + url = self._admin_url() if admin else self._public_url() + kc = ks_client.Client(endpoint=url, + auth_url=self._public_url(), + **kwargs) + kc.authenticate() + # have to manually overwrite the management url after authentication + kc.management_url = url + return kc + + def get_client(self, user_ref=None, tenant_ref=None, admin=False): + if user_ref is None: + user_ref = self.user_foo + if tenant_ref is None: + for user in default_fixtures.USERS: + # The fixture ID is no longer used as the ID in the database + # The fixture ID, however, is still used as part of the + # attribute name when storing the created object on the test + # case. This means that we need to use the fixture ID below to + # find the actial object so that we can get the ID as stored + # in the database to compare against. + if (getattr(self, 'user_%s' % user['id'])['id'] == + user_ref['id']): + tenant_id = user['tenants'][0] + else: + tenant_id = tenant_ref['id'] + + return self._client(username=user_ref['name'], + password=user_ref['password'], + tenant_id=tenant_id, + admin=admin) + + def test_authenticate_tenant_name_and_tenants(self): + client = self.get_client() + tenants = client.tenants.list() + self.assertEqual(self.tenant_bar['id'], tenants[0].id) + + def test_authenticate_tenant_id_and_tenants(self): + client = self._client(username=self.user_foo['name'], + password=self.user_foo['password'], + tenant_id='bar') + tenants = client.tenants.list() + self.assertEqual(self.tenant_bar['id'], tenants[0].id) + + def test_authenticate_invalid_tenant_id(self): + self.assertRaises(client_exceptions.Unauthorized, + self._client, + username=self.user_foo['name'], + password=self.user_foo['password'], + tenant_id='baz') + + def test_authenticate_token_no_tenant(self): + client = self.get_client() + token = client.auth_token + token_client = self._client(token=token) + tenants = token_client.tenants.list() + self.assertEqual(self.tenant_bar['id'], tenants[0].id) + + def test_authenticate_token_tenant_id(self): + client = self.get_client() + token = client.auth_token + token_client = self._client(token=token, tenant_id='bar') + tenants = token_client.tenants.list() + self.assertEqual(self.tenant_bar['id'], tenants[0].id) + + def test_authenticate_token_invalid_tenant_id(self): + client = self.get_client() + token = client.auth_token + self.assertRaises(client_exceptions.Unauthorized, + self._client, token=token, + tenant_id=uuid.uuid4().hex) + + def test_authenticate_token_invalid_tenant_name(self): + client = self.get_client() + token = client.auth_token + self.assertRaises(client_exceptions.Unauthorized, + self._client, token=token, + tenant_name=uuid.uuid4().hex) + + def test_authenticate_token_tenant_name(self): + client = self.get_client() + token = client.auth_token + token_client = self._client(token=token, tenant_name='BAR') + tenants = token_client.tenants.list() + self.assertEqual(self.tenant_bar['id'], tenants[0].id) + self.assertEqual(self.tenant_bar['id'], tenants[0].id) + + def test_authenticate_and_delete_token(self): + client = self.get_client(admin=True) + token = client.auth_token + token_client = self._client(token=token) + tenants = token_client.tenants.list() + self.assertEqual(self.tenant_bar['id'], tenants[0].id) + + client.tokens.delete(token_client.auth_token) + + self.assertRaises(client_exceptions.Unauthorized, + token_client.tenants.list) + + def test_authenticate_no_password(self): + user_ref = self.user_foo.copy() + user_ref['password'] = None + self.assertRaises(client_exceptions.AuthorizationFailure, + self.get_client, + user_ref) + + def test_authenticate_no_username(self): + user_ref = self.user_foo.copy() + user_ref['name'] = None + self.assertRaises(client_exceptions.AuthorizationFailure, + self.get_client, + user_ref) + + def test_authenticate_disabled_tenant(self): + admin_client = self.get_client(admin=True) + + tenant = { + 'name': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + 'enabled': False, + } + tenant_ref = admin_client.tenants.create( + tenant_name=tenant['name'], + description=tenant['description'], + enabled=tenant['enabled']) + tenant['id'] = tenant_ref.id + + user = { + 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, + 'email': uuid.uuid4().hex, + 'tenant_id': tenant['id'], + } + user_ref = admin_client.users.create( + name=user['name'], + password=user['password'], + email=user['email'], + tenant_id=user['tenant_id']) + user['id'] = user_ref.id + + # password authentication + self.assertRaises( + client_exceptions.Unauthorized, + self._client, + username=user['name'], + password=user['password'], + tenant_id=tenant['id']) + + # token authentication + client = self._client( + username=user['name'], + password=user['password']) + self.assertRaises( + client_exceptions.Unauthorized, + self._client, + token=client.auth_token, + tenant_id=tenant['id']) + + # FIXME(ja): this test should require the "keystone:admin" roled + # (probably the role set via --keystone_admin_role flag) + # FIXME(ja): add a test that admin endpoint is only sent to admin user + # FIXME(ja): add a test that admin endpoint returns unauthorized if not + # admin + def test_tenant_create_update_and_delete(self): + tenant_name = 'original_tenant' + tenant_description = 'My original tenant!' + tenant_enabled = True + client = self.get_client(admin=True) + + # create, get, and list a tenant + tenant = client.tenants.create(tenant_name=tenant_name, + description=tenant_description, + enabled=tenant_enabled) + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertEqual(tenant_enabled, tenant.enabled) + + tenant = client.tenants.get(tenant_id=tenant.id) + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertEqual(tenant_enabled, tenant.enabled) + + tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop() + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertEqual(tenant_enabled, tenant.enabled) + + # update, get, and list a tenant + tenant_name = 'updated_tenant' + tenant_description = 'Updated tenant!' + tenant_enabled = False + tenant = client.tenants.update(tenant_id=tenant.id, + tenant_name=tenant_name, + enabled=tenant_enabled, + description=tenant_description) + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertEqual(tenant_enabled, tenant.enabled) + + tenant = client.tenants.get(tenant_id=tenant.id) + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertEqual(tenant_enabled, tenant.enabled) + + tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop() + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertEqual(tenant_enabled, tenant.enabled) + + # delete, get, and list a tenant + client.tenants.delete(tenant=tenant.id) + self.assertRaises(client_exceptions.NotFound, client.tenants.get, + tenant.id) + self.assertFalse([t for t in client.tenants.list() + if t.id == tenant.id]) + + def test_tenant_create_update_and_delete_unicode(self): + tenant_name = u'original \u540d\u5b57' + tenant_description = 'My original tenant!' + tenant_enabled = True + client = self.get_client(admin=True) + + # create, get, and list a tenant + tenant = client.tenants.create(tenant_name, + description=tenant_description, + enabled=tenant_enabled) + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertIs(tenant.enabled, tenant_enabled) + + tenant = client.tenants.get(tenant.id) + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertIs(tenant.enabled, tenant_enabled) + + # multiple tenants exist due to fixtures, so find the one we're testing + tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop() + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertIs(tenant.enabled, tenant_enabled) + + # update, get, and list a tenant + tenant_name = u'updated \u540d\u5b57' + tenant_description = 'Updated tenant!' + tenant_enabled = False + tenant = client.tenants.update(tenant.id, + tenant_name=tenant_name, + enabled=tenant_enabled, + description=tenant_description) + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertIs(tenant.enabled, tenant_enabled) + + tenant = client.tenants.get(tenant.id) + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertIs(tenant.enabled, tenant_enabled) + + tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop() + self.assertEqual(tenant_name, tenant.name) + self.assertEqual(tenant_description, tenant.description) + self.assertIs(tenant.enabled, tenant_enabled) + + # delete, get, and list a tenant + client.tenants.delete(tenant.id) + self.assertRaises(client_exceptions.NotFound, client.tenants.get, + tenant.id) + self.assertFalse([t for t in client.tenants.list() + if t.id == tenant.id]) + + def test_tenant_create_no_name(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.BadRequest, + client.tenants.create, + tenant_name="") + + def test_tenant_delete_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.tenants.delete, + tenant=uuid.uuid4().hex) + + def test_tenant_get_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.tenants.get, + tenant_id=uuid.uuid4().hex) + + def test_tenant_update_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.tenants.update, + tenant_id=uuid.uuid4().hex) + + def test_tenant_list(self): + client = self.get_client() + tenants = client.tenants.list() + self.assertEqual(1, len(tenants)) + + # Admin endpoint should return *all* tenants + client = self.get_client(admin=True) + tenants = client.tenants.list() + self.assertEqual(len(default_fixtures.TENANTS), len(tenants)) + + def test_invalid_password(self): + good_client = self._client(username=self.user_foo['name'], + password=self.user_foo['password']) + good_client.tenants.list() + + self.assertRaises(client_exceptions.Unauthorized, + self._client, + username=self.user_foo['name'], + password=uuid.uuid4().hex) + + def test_invalid_user_and_password(self): + self.assertRaises(client_exceptions.Unauthorized, + self._client, + username=uuid.uuid4().hex, + password=uuid.uuid4().hex) + + def test_change_password_invalidates_token(self): + admin_client = self.get_client(admin=True) + + username = uuid.uuid4().hex + password = uuid.uuid4().hex + user = admin_client.users.create(name=username, password=password, + email=uuid.uuid4().hex) + + # auth as user should work before a password change + client = self._client(username=username, password=password) + + # auth as user with a token should work before a password change + self._client(token=client.auth_token) + + # administrative password reset + admin_client.users.update_password( + user=user.id, + password=uuid.uuid4().hex) + + # auth as user with original password should not work after change + self.assertRaises(client_exceptions.Unauthorized, + self._client, + username=username, + password=password) + + # authenticate with an old token should not work after change + self.assertRaises(client_exceptions.Unauthorized, + self._client, + token=client.auth_token) + + def test_user_change_own_password_invalidates_token(self): + # bootstrap a user as admin + client = self.get_client(admin=True) + username = uuid.uuid4().hex + password = uuid.uuid4().hex + client.users.create(name=username, password=password, + email=uuid.uuid4().hex) + + # auth as user should work before a password change + client = self._client(username=username, password=password) + + # auth as user with a token should work before a password change + self._client(token=client.auth_token) + + # change the user's own password + # TODO(dolphm): This should NOT raise an HTTPError at all, but rather + # this should succeed with a 2xx. This 500 does not prevent the test + # from demonstrating the desired consequences below, though. + self.assertRaises(client_exceptions.HTTPError, + client.users.update_own_password, + password, uuid.uuid4().hex) + + # auth as user with original password should not work after change + self.assertRaises(client_exceptions.Unauthorized, + self._client, + username=username, + password=password) + + # auth as user with an old token should not work after change + self.assertRaises(client_exceptions.Unauthorized, + self._client, + token=client.auth_token) + + def test_disable_tenant_invalidates_token(self): + admin_client = self.get_client(admin=True) + foo_client = self.get_client(self.user_foo) + tenant_bar = admin_client.tenants.get(self.tenant_bar['id']) + + # Disable the tenant. + tenant_bar.update(enabled=False) + + # Test that the token has been removed. + self.assertRaises(client_exceptions.Unauthorized, + foo_client.tokens.authenticate, + token=foo_client.auth_token) + + # Test that the user access has been disabled. + self.assertRaises(client_exceptions.Unauthorized, + self.get_client, + self.user_foo) + + def test_delete_tenant_invalidates_token(self): + admin_client = self.get_client(admin=True) + foo_client = self.get_client(self.user_foo) + tenant_bar = admin_client.tenants.get(self.tenant_bar['id']) + + # Delete the tenant. + tenant_bar.delete() + + # Test that the token has been removed. + self.assertRaises(client_exceptions.Unauthorized, + foo_client.tokens.authenticate, + token=foo_client.auth_token) + + # Test that the user access has been disabled. + self.assertRaises(client_exceptions.Unauthorized, + self.get_client, + self.user_foo) + + def test_disable_user_invalidates_token(self): + admin_client = self.get_client(admin=True) + foo_client = self.get_client(self.user_foo) + + admin_client.users.update_enabled(user=self.user_foo['id'], + enabled=False) + + self.assertRaises(client_exceptions.Unauthorized, + foo_client.tokens.authenticate, + token=foo_client.auth_token) + + self.assertRaises(client_exceptions.Unauthorized, + self.get_client, + self.user_foo) + + def test_delete_user_invalidates_token(self): + admin_client = self.get_client(admin=True) + client = self.get_client(admin=False) + + username = uuid.uuid4().hex + password = uuid.uuid4().hex + user_id = admin_client.users.create( + name=username, password=password, email=uuid.uuid4().hex).id + + token_id = client.tokens.authenticate( + username=username, password=password).id + + # token should be usable before the user is deleted + client.tokens.authenticate(token=token_id) + + admin_client.users.delete(user=user_id) + + # authenticate with a token should not work after the user is deleted + self.assertRaises(client_exceptions.Unauthorized, + client.tokens.authenticate, + token=token_id) + + @mock.patch.object(timeutils, 'utcnow') + def test_token_expiry_maintained(self, mock_utcnow): + now = datetime.datetime.utcnow() + mock_utcnow.return_value = now + foo_client = self.get_client(self.user_foo) + + orig_token = foo_client.service_catalog.catalog['token'] + mock_utcnow.return_value = now + datetime.timedelta(seconds=1) + reauthenticated_token = foo_client.tokens.authenticate( + token=foo_client.auth_token) + + self.assertCloseEnoughForGovernmentWork( + timeutils.parse_isotime(orig_token['expires']), + timeutils.parse_isotime(reauthenticated_token.expires)) + + def test_user_create_update_delete(self): + test_username = 'new_user' + client = self.get_client(admin=True) + user = client.users.create(name=test_username, + password='password', + email='user1@test.com') + self.assertEqual(test_username, user.name) + + user = client.users.get(user=user.id) + self.assertEqual(test_username, user.name) + + user = client.users.update(user=user, + name=test_username, + email='user2@test.com') + self.assertEqual('user2@test.com', user.email) + + # NOTE(termie): update_enabled doesn't return anything, probably a bug + client.users.update_enabled(user=user, enabled=False) + user = client.users.get(user.id) + self.assertFalse(user.enabled) + + self.assertRaises(client_exceptions.Unauthorized, + self._client, + username=test_username, + password='password') + client.users.update_enabled(user, True) + + user = client.users.update_password(user=user, password='password2') + + self._client(username=test_username, + password='password2') + + user = client.users.update_tenant(user=user, tenant='bar') + # TODO(ja): once keystonelight supports default tenant + # when you login without specifying tenant, the + # token should be scoped to tenant 'bar' + + client.users.delete(user.id) + self.assertRaises(client_exceptions.NotFound, client.users.get, + user.id) + + # Test creating a user with a tenant (auto-add to tenant) + user2 = client.users.create(name=test_username, + password='password', + email='user1@test.com', + tenant_id='bar') + self.assertEqual(test_username, user2.name) + + def test_update_default_tenant_to_existing_value(self): + client = self.get_client(admin=True) + + user = client.users.create( + name=uuid.uuid4().hex, + password=uuid.uuid4().hex, + email=uuid.uuid4().hex, + tenant_id=self.tenant_bar['id']) + + # attempting to update the tenant with the existing value should work + user = client.users.update_tenant( + user=user, tenant=self.tenant_bar['id']) + + def test_user_create_no_string_password(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.BadRequest, + client.users.create, + name='test_user', + password=12345, + email=uuid.uuid4().hex) + + def test_user_create_no_name(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.BadRequest, + client.users.create, + name="", + password=uuid.uuid4().hex, + email=uuid.uuid4().hex) + + def test_user_create_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.users.create, + name=uuid.uuid4().hex, + password=uuid.uuid4().hex, + email=uuid.uuid4().hex, + tenant_id=uuid.uuid4().hex) + + def test_user_get_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.users.get, + user=uuid.uuid4().hex) + + def test_user_list_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.users.list, + tenant_id=uuid.uuid4().hex) + + def test_user_update_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.users.update, + user=uuid.uuid4().hex) + + def test_user_update_tenant(self): + client = self.get_client(admin=True) + tenant_id = uuid.uuid4().hex + user = client.users.update(user=self.user_foo['id'], + tenant_id=tenant_id) + self.assertEqual(tenant_id, user.tenant_id) + + def test_user_update_password_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.users.update_password, + user=uuid.uuid4().hex, + password=uuid.uuid4().hex) + + def test_user_delete_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.users.delete, + user=uuid.uuid4().hex) + + def test_user_list(self): + client = self.get_client(admin=True) + users = client.users.list() + self.assertTrue(len(users) > 0) + user = users[0] + self.assertRaises(AttributeError, lambda: user.password) + + def test_user_get(self): + client = self.get_client(admin=True) + user = client.users.get(user=self.user_foo['id']) + self.assertRaises(AttributeError, lambda: user.password) + + def test_role_get(self): + client = self.get_client(admin=True) + role = client.roles.get(role=self.role_admin['id']) + self.assertEqual(self.role_admin['id'], role.id) + + def test_role_crud(self): + test_role = 'new_role' + client = self.get_client(admin=True) + role = client.roles.create(name=test_role) + self.assertEqual(test_role, role.name) + + role = client.roles.get(role=role.id) + self.assertEqual(test_role, role.name) + + client.roles.delete(role=role.id) + + self.assertRaises(client_exceptions.NotFound, + client.roles.delete, + role=role.id) + self.assertRaises(client_exceptions.NotFound, + client.roles.get, + role=role.id) + + def test_role_create_no_name(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.BadRequest, + client.roles.create, + name="") + + def test_role_get_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.roles.get, + role=uuid.uuid4().hex) + + def test_role_delete_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.roles.delete, + role=uuid.uuid4().hex) + + def test_role_list_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.roles.roles_for_user, + user=uuid.uuid4().hex, + tenant=uuid.uuid4().hex) + self.assertRaises(client_exceptions.NotFound, + client.roles.roles_for_user, + user=self.user_foo['id'], + tenant=uuid.uuid4().hex) + self.assertRaises(client_exceptions.NotFound, + client.roles.roles_for_user, + user=uuid.uuid4().hex, + tenant=self.tenant_bar['id']) + + def test_role_list(self): + client = self.get_client(admin=True) + roles = client.roles.list() + # TODO(devcamcar): This assert should be more specific. + self.assertTrue(len(roles) > 0) + + def test_service_crud(self): + client = self.get_client(admin=True) + + service_name = uuid.uuid4().hex + service_type = uuid.uuid4().hex + service_desc = uuid.uuid4().hex + + # create & read + service = client.services.create(name=service_name, + service_type=service_type, + description=service_desc) + self.assertEqual(service_name, service.name) + self.assertEqual(service_type, service.type) + self.assertEqual(service_desc, service.description) + + service = client.services.get(id=service.id) + self.assertEqual(service_name, service.name) + self.assertEqual(service_type, service.type) + self.assertEqual(service_desc, service.description) + + service = [x for x in client.services.list() if x.id == service.id][0] + self.assertEqual(service_name, service.name) + self.assertEqual(service_type, service.type) + self.assertEqual(service_desc, service.description) + + # update is not supported in API v2... + + # delete & read + client.services.delete(id=service.id) + self.assertRaises(client_exceptions.NotFound, + client.services.get, + id=service.id) + services = [x for x in client.services.list() if x.id == service.id] + self.assertEqual(0, len(services)) + + def test_service_delete_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.services.delete, + id=uuid.uuid4().hex) + + def test_service_get_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.services.get, + id=uuid.uuid4().hex) + + def test_endpoint_delete_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.endpoints.delete, + id=uuid.uuid4().hex) + + def test_admin_requires_adminness(self): + # FIXME(ja): this should be Unauthorized + exception = client_exceptions.ClientException + + two = self.get_client(self.user_two, admin=True) # non-admin user + + # USER CRUD + self.assertRaises(exception, + two.users.list) + self.assertRaises(exception, + two.users.get, + user=self.user_two['id']) + self.assertRaises(exception, + two.users.create, + name='oops', + password='password', + email='oops@test.com') + self.assertRaises(exception, + two.users.delete, + user=self.user_foo['id']) + + # TENANT CRUD + self.assertRaises(exception, + two.tenants.list) + self.assertRaises(exception, + two.tenants.get, + tenant_id=self.tenant_bar['id']) + self.assertRaises(exception, + two.tenants.create, + tenant_name='oops', + description="shouldn't work!", + enabled=True) + self.assertRaises(exception, + two.tenants.delete, + tenant=self.tenant_baz['id']) + + # ROLE CRUD + self.assertRaises(exception, + two.roles.get, + role=self.role_admin['id']) + self.assertRaises(exception, + two.roles.list) + self.assertRaises(exception, + two.roles.create, + name='oops') + self.assertRaises(exception, + two.roles.delete, + role=self.role_admin['id']) + + # TODO(ja): MEMBERSHIP CRUD + # TODO(ja): determine what else todo + + def test_tenant_add_and_remove_user(self): + client = self.get_client(admin=True) + client.roles.add_user_role(tenant=self.tenant_bar['id'], + user=self.user_two['id'], + role=self.role_other['id']) + user_refs = client.tenants.list_users(tenant=self.tenant_bar['id']) + self.assertIn(self.user_two['id'], [x.id for x in user_refs]) + client.roles.remove_user_role(tenant=self.tenant_bar['id'], + user=self.user_two['id'], + role=self.role_other['id']) + roles = client.roles.roles_for_user(user=self.user_foo['id'], + tenant=self.tenant_bar['id']) + self.assertNotIn(self.role_other['id'], roles) + user_refs = client.tenants.list_users(tenant=self.tenant_bar['id']) + self.assertNotIn(self.user_two['id'], [x.id for x in user_refs]) + + def test_user_role_add_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.roles.add_user_role, + tenant=uuid.uuid4().hex, + user=self.user_foo['id'], + role=self.role_member['id']) + self.assertRaises(client_exceptions.NotFound, + client.roles.add_user_role, + tenant=self.tenant_baz['id'], + user=self.user_foo['id'], + role=uuid.uuid4().hex) + + def test_user_role_add_no_user(self): + # If add_user_role and user doesn't exist, doesn't fail. + client = self.get_client(admin=True) + client.roles.add_user_role(tenant=self.tenant_baz['id'], + user=uuid.uuid4().hex, + role=self.role_member['id']) + + def test_user_role_remove_404(self): + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.NotFound, + client.roles.remove_user_role, + tenant=uuid.uuid4().hex, + user=self.user_foo['id'], + role=self.role_member['id']) + self.assertRaises(client_exceptions.NotFound, + client.roles.remove_user_role, + tenant=self.tenant_baz['id'], + user=uuid.uuid4().hex, + role=self.role_member['id']) + self.assertRaises(client_exceptions.NotFound, + client.roles.remove_user_role, + tenant=self.tenant_baz['id'], + user=self.user_foo['id'], + role=uuid.uuid4().hex) + self.assertRaises(client_exceptions.NotFound, + client.roles.remove_user_role, + tenant=self.tenant_baz['id'], + user=self.user_foo['id'], + role=self.role_member['id']) + + def test_tenant_list_marker(self): + client = self.get_client() + + # Add two arbitrary tenants to user for testing purposes + for i in range(2): + tenant_id = uuid.uuid4().hex + tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id, + 'domain_id': DEFAULT_DOMAIN_ID} + self.resource_api.create_project(tenant_id, tenant) + self.assignment_api.add_user_to_project(tenant_id, + self.user_foo['id']) + + tenants = client.tenants.list() + self.assertEqual(3, len(tenants)) + + tenants_marker = client.tenants.list(marker=tenants[0].id) + self.assertEqual(2, len(tenants_marker)) + self.assertEqual(tenants_marker[0].name, tenants[1].name) + self.assertEqual(tenants_marker[1].name, tenants[2].name) + + def test_tenant_list_marker_not_found(self): + client = self.get_client() + self.assertRaises(client_exceptions.BadRequest, + client.tenants.list, marker=uuid.uuid4().hex) + + def test_tenant_list_limit(self): + client = self.get_client() + + # Add two arbitrary tenants to user for testing purposes + for i in range(2): + tenant_id = uuid.uuid4().hex + tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id, + 'domain_id': DEFAULT_DOMAIN_ID} + self.resource_api.create_project(tenant_id, tenant) + self.assignment_api.add_user_to_project(tenant_id, + self.user_foo['id']) + + tenants = client.tenants.list() + self.assertEqual(3, len(tenants)) + + tenants_limited = client.tenants.list(limit=2) + self.assertEqual(2, len(tenants_limited)) + self.assertEqual(tenants[0].name, tenants_limited[0].name) + self.assertEqual(tenants[1].name, tenants_limited[1].name) + + def test_tenant_list_limit_bad_value(self): + client = self.get_client() + self.assertRaises(client_exceptions.BadRequest, + client.tenants.list, limit='a') + self.assertRaises(client_exceptions.BadRequest, + client.tenants.list, limit=-1) + + def test_roles_get_by_user(self): + client = self.get_client(admin=True) + roles = client.roles.roles_for_user(user=self.user_foo['id'], + tenant=self.tenant_bar['id']) + self.assertTrue(len(roles) > 0) + + def test_user_can_update_passwd(self): + client = self.get_client(self.user_two) + + token_id = client.auth_token + new_password = uuid.uuid4().hex + + # TODO(derekh): Update to use keystoneclient when available + class FakeResponse(object): + def start_fake_response(self, status, headers): + self.response_status = int(status.split(' ', 1)[0]) + self.response_headers = dict(headers) + responseobject = FakeResponse() + + req = webob.Request.blank( + '/v2.0/OS-KSCRUD/users/%s' % self.user_two['id'], + headers={'X-Auth-Token': token_id}) + req.method = 'PATCH' + req.body = ('{"user":{"password":"%s","original_password":"%s"}}' % + (new_password, self.user_two['password'])) + self.public_server.application(req.environ, + responseobject.start_fake_response) + + self.user_two['password'] = new_password + self.get_client(self.user_two) + + def test_user_cannot_update_other_users_passwd(self): + client = self.get_client(self.user_two) + + token_id = client.auth_token + new_password = uuid.uuid4().hex + + # TODO(derekh): Update to use keystoneclient when available + class FakeResponse(object): + def start_fake_response(self, status, headers): + self.response_status = int(status.split(' ', 1)[0]) + self.response_headers = dict(headers) + responseobject = FakeResponse() + + req = webob.Request.blank( + '/v2.0/OS-KSCRUD/users/%s' % self.user_foo['id'], + headers={'X-Auth-Token': token_id}) + req.method = 'PATCH' + req.body = ('{"user":{"password":"%s","original_password":"%s"}}' % + (new_password, self.user_two['password'])) + self.public_server.application(req.environ, + responseobject.start_fake_response) + self.assertEqual(403, responseobject.response_status) + + self.user_two['password'] = new_password + self.assertRaises(client_exceptions.Unauthorized, + self.get_client, self.user_two) + + def test_tokens_after_user_update_passwd(self): + client = self.get_client(self.user_two) + + token_id = client.auth_token + new_password = uuid.uuid4().hex + + # TODO(derekh): Update to use keystoneclient when available + class FakeResponse(object): + def start_fake_response(self, status, headers): + self.response_status = int(status.split(' ', 1)[0]) + self.response_headers = dict(headers) + responseobject = FakeResponse() + + req = webob.Request.blank( + '/v2.0/OS-KSCRUD/users/%s' % self.user_two['id'], + headers={'X-Auth-Token': token_id}) + req.method = 'PATCH' + req.body = ('{"user":{"password":"%s","original_password":"%s"}}' % + (new_password, self.user_two['password'])) + + rv = self.public_server.application( + req.environ, + responseobject.start_fake_response) + response_json = jsonutils.loads(rv.pop()) + new_token_id = response_json['access']['token']['id'] + + self.assertRaises(client_exceptions.Unauthorized, client.tenants.list) + client.auth_token = new_token_id + client.tenants.list() |