aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py331
1 files changed, 330 insertions, 1 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py b/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
index 7abc5bc4..e0843605 100644
--- a/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
+++ b/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
@@ -15,12 +15,14 @@
import datetime
import uuid
+from keystoneclient.contrib.ec2 import utils as ec2_utils
from keystoneclient import exceptions as client_exceptions
from keystoneclient.v2_0 import client as ks_client
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import timeutils
+from six.moves import range
import webob
from keystone.tests import unit as tests
@@ -35,6 +37,11 @@ DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
class ClientDrivenTestCase(tests.TestCase):
+ def config_files(self):
+ config_files = super(ClientDrivenTestCase, self).config_files()
+ config_files.append(tests.dirs.tests_conf('backend_sql.conf'))
+ return config_files
+
def setUp(self):
super(ClientDrivenTestCase, self).setUp()
@@ -61,7 +68,10 @@ class ClientDrivenTestCase(tests.TestCase):
fixture = self.useFixture(appserver.AppServer(conf, appserver.ADMIN))
self.admin_server = fixture.server
- self.addCleanup(self.cleanup_instance('public_server', 'admin_server'))
+ self.default_client = self.get_client()
+
+ self.addCleanup(self.cleanup_instance('public_server', 'admin_server',
+ 'default_client'))
def _public_url(self):
public_port = self.public_server.socket_info['socket'][1]
@@ -707,6 +717,20 @@ class ClientDrivenTestCase(tests.TestCase):
client.roles.create,
name="")
+ def test_role_create_member_role(self):
+ # delete the member role so that we can recreate it
+ client = self.get_client(admin=True)
+ client.roles.delete(role=CONF.member_role_id)
+
+ # deleting the member role revokes our token, so re-authenticate
+ client = self.get_client(admin=True)
+
+ # specify only the role name on creation
+ role = client.roles.create(name=CONF.member_role_name)
+
+ # the ID should be set as defined in CONF
+ self.assertEqual(CONF.member_role_id, role.id)
+
def test_role_get_404(self):
client = self.get_client(admin=True)
self.assertRaises(client_exceptions.NotFound,
@@ -1043,3 +1067,308 @@ class ClientDrivenTestCase(tests.TestCase):
self.assertRaises(client_exceptions.Unauthorized, client.tenants.list)
client.auth_token = new_token_id
client.tenants.list()
+
+ def test_endpoint_crud(self):
+ client = self.get_client(admin=True)
+
+ service = client.services.create(name=uuid.uuid4().hex,
+ service_type=uuid.uuid4().hex,
+ description=uuid.uuid4().hex)
+
+ endpoint_region = uuid.uuid4().hex
+ invalid_service_id = uuid.uuid4().hex
+ endpoint_publicurl = uuid.uuid4().hex
+ endpoint_internalurl = uuid.uuid4().hex
+ endpoint_adminurl = uuid.uuid4().hex
+
+ # a non-existent service ID should trigger a 400
+ self.assertRaises(client_exceptions.BadRequest,
+ client.endpoints.create,
+ region=endpoint_region,
+ service_id=invalid_service_id,
+ publicurl=endpoint_publicurl,
+ adminurl=endpoint_adminurl,
+ internalurl=endpoint_internalurl)
+
+ endpoint = client.endpoints.create(region=endpoint_region,
+ service_id=service.id,
+ publicurl=endpoint_publicurl,
+ adminurl=endpoint_adminurl,
+ internalurl=endpoint_internalurl)
+
+ self.assertEqual(endpoint_region, endpoint.region)
+ self.assertEqual(service.id, endpoint.service_id)
+ self.assertEqual(endpoint_publicurl, endpoint.publicurl)
+ self.assertEqual(endpoint_internalurl, endpoint.internalurl)
+ self.assertEqual(endpoint_adminurl, endpoint.adminurl)
+
+ client.endpoints.delete(id=endpoint.id)
+ self.assertRaises(client_exceptions.NotFound, client.endpoints.delete,
+ id=endpoint.id)
+
+ def _send_ec2_auth_request(self, credentials, client=None):
+ if not client:
+ client = self.default_client
+ url = '%s/ec2tokens' % self.default_client.auth_url
+ (resp, token) = client.request(
+ url=url, method='POST',
+ body={'credentials': credentials})
+ return resp, token
+
+ def _generate_default_user_ec2_credentials(self):
+ cred = self. default_client.ec2.create(
+ user_id=self.user_foo['id'],
+ tenant_id=self.tenant_bar['id'])
+ return self._generate_user_ec2_credentials(cred.access, cred.secret)
+
+ def _generate_user_ec2_credentials(self, access, secret):
+ signer = ec2_utils.Ec2Signer(secret)
+ credentials = {'params': {'SignatureVersion': '2'},
+ 'access': access,
+ 'verb': 'GET',
+ 'host': 'localhost',
+ 'path': '/service/cloud'}
+ signature = signer.generate(credentials)
+ return credentials, signature
+
+ def test_ec2_auth_success(self):
+ credentials, signature = self._generate_default_user_ec2_credentials()
+ credentials['signature'] = signature
+ resp, token = self._send_ec2_auth_request(credentials)
+ self.assertEqual(200, resp.status_code)
+ self.assertIn('access', token)
+
+ def test_ec2_auth_success_trust(self):
+ # Add "other" role user_foo and create trust delegating it to user_two
+ self.assignment_api.add_role_to_user_and_project(
+ self.user_foo['id'],
+ self.tenant_bar['id'],
+ self.role_other['id'])
+ trust_id = 'atrust123'
+ trust = {'trustor_user_id': self.user_foo['id'],
+ 'trustee_user_id': self.user_two['id'],
+ 'project_id': self.tenant_bar['id'],
+ 'impersonation': True}
+ roles = [self.role_other]
+ self.trust_api.create_trust(trust_id, trust, roles)
+
+ # Create a client for user_two, scoped to the trust
+ client = self.get_client(self.user_two)
+ ret = client.authenticate(trust_id=trust_id,
+ tenant_id=self.tenant_bar['id'])
+ self.assertTrue(ret)
+ self.assertTrue(client.auth_ref.trust_scoped)
+ self.assertEqual(trust_id, client.auth_ref.trust_id)
+
+ # Create an ec2 keypair using the trust client impersonating user_foo
+ cred = client.ec2.create(user_id=self.user_foo['id'],
+ tenant_id=self.tenant_bar['id'])
+ credentials, signature = self._generate_user_ec2_credentials(
+ cred.access, cred.secret)
+ credentials['signature'] = signature
+ resp, token = self._send_ec2_auth_request(credentials)
+ self.assertEqual(200, resp.status_code)
+ self.assertEqual(trust_id, token['access']['trust']['id'])
+ # TODO(shardy) we really want to check the roles and trustee
+ # but because of where the stubbing happens we don't seem to
+ # hit the necessary code in controllers.py _authenticate_token
+ # so although all is OK via a real request, it incorrect in
+ # this test..
+
+ def test_ec2_auth_failure(self):
+ credentials, signature = self._generate_default_user_ec2_credentials()
+ credentials['signature'] = uuid.uuid4().hex
+ self.assertRaises(client_exceptions.Unauthorized,
+ self._send_ec2_auth_request,
+ credentials)
+
+ def test_ec2_credential_crud(self):
+ creds = self.default_client.ec2.list(user_id=self.user_foo['id'])
+ self.assertEqual([], creds)
+
+ cred = self.default_client.ec2.create(user_id=self.user_foo['id'],
+ tenant_id=self.tenant_bar['id'])
+ creds = self.default_client.ec2.list(user_id=self.user_foo['id'])
+ self.assertEqual(creds, [cred])
+ got = self.default_client.ec2.get(user_id=self.user_foo['id'],
+ access=cred.access)
+ self.assertEqual(cred, got)
+
+ self.default_client.ec2.delete(user_id=self.user_foo['id'],
+ access=cred.access)
+ creds = self.default_client.ec2.list(user_id=self.user_foo['id'])
+ self.assertEqual([], creds)
+
+ def test_ec2_credential_crud_non_admin(self):
+ na_client = self.get_client(self.user_two)
+ creds = na_client.ec2.list(user_id=self.user_two['id'])
+ self.assertEqual([], creds)
+
+ cred = na_client.ec2.create(user_id=self.user_two['id'],
+ tenant_id=self.tenant_baz['id'])
+ creds = na_client.ec2.list(user_id=self.user_two['id'])
+ self.assertEqual(creds, [cred])
+ got = na_client.ec2.get(user_id=self.user_two['id'],
+ access=cred.access)
+ self.assertEqual(cred, got)
+
+ na_client.ec2.delete(user_id=self.user_two['id'],
+ access=cred.access)
+ creds = na_client.ec2.list(user_id=self.user_two['id'])
+ self.assertEqual([], creds)
+
+ def test_ec2_list_credentials(self):
+ cred_1 = self.default_client.ec2.create(
+ user_id=self.user_foo['id'],
+ tenant_id=self.tenant_bar['id'])
+ cred_2 = self.default_client.ec2.create(
+ user_id=self.user_foo['id'],
+ tenant_id=self.tenant_service['id'])
+ cred_3 = self.default_client.ec2.create(
+ user_id=self.user_foo['id'],
+ tenant_id=self.tenant_mtu['id'])
+ two = self.get_client(self.user_two)
+ cred_4 = two.ec2.create(user_id=self.user_two['id'],
+ tenant_id=self.tenant_bar['id'])
+ creds = self.default_client.ec2.list(user_id=self.user_foo['id'])
+ self.assertEqual(3, len(creds))
+ self.assertEqual(sorted([cred_1, cred_2, cred_3],
+ key=lambda x: x.access),
+ sorted(creds, key=lambda x: x.access))
+ self.assertNotIn(cred_4, creds)
+
+ def test_ec2_credentials_create_404(self):
+ self.assertRaises(client_exceptions.NotFound,
+ self.default_client.ec2.create,
+ user_id=uuid.uuid4().hex,
+ tenant_id=self.tenant_bar['id'])
+ self.assertRaises(client_exceptions.NotFound,
+ self.default_client.ec2.create,
+ user_id=self.user_foo['id'],
+ tenant_id=uuid.uuid4().hex)
+
+ def test_ec2_credentials_delete_404(self):
+ self.assertRaises(client_exceptions.NotFound,
+ self.default_client.ec2.delete,
+ user_id=uuid.uuid4().hex,
+ access=uuid.uuid4().hex)
+
+ def test_ec2_credentials_get_404(self):
+ self.assertRaises(client_exceptions.NotFound,
+ self.default_client.ec2.get,
+ user_id=uuid.uuid4().hex,
+ access=uuid.uuid4().hex)
+
+ def test_ec2_credentials_list_404(self):
+ self.assertRaises(client_exceptions.NotFound,
+ self.default_client.ec2.list,
+ user_id=uuid.uuid4().hex)
+
+ def test_ec2_credentials_list_user_forbidden(self):
+ two = self.get_client(self.user_two)
+ self.assertRaises(client_exceptions.Forbidden, two.ec2.list,
+ user_id=self.user_foo['id'])
+
+ def test_ec2_credentials_get_user_forbidden(self):
+ cred = self.default_client.ec2.create(user_id=self.user_foo['id'],
+ tenant_id=self.tenant_bar['id'])
+
+ two = self.get_client(self.user_two)
+ self.assertRaises(client_exceptions.Forbidden, two.ec2.get,
+ user_id=self.user_foo['id'], access=cred.access)
+
+ self.default_client.ec2.delete(user_id=self.user_foo['id'],
+ access=cred.access)
+
+ def test_ec2_credentials_delete_user_forbidden(self):
+ cred = self.default_client.ec2.create(user_id=self.user_foo['id'],
+ tenant_id=self.tenant_bar['id'])
+
+ two = self.get_client(self.user_two)
+ self.assertRaises(client_exceptions.Forbidden, two.ec2.delete,
+ user_id=self.user_foo['id'], access=cred.access)
+
+ self.default_client.ec2.delete(user_id=self.user_foo['id'],
+ access=cred.access)
+
+ def test_endpoint_create_nonexistent_service(self):
+ client = self.get_client(admin=True)
+ self.assertRaises(client_exceptions.BadRequest,
+ client.endpoints.create,
+ region=uuid.uuid4().hex,
+ service_id=uuid.uuid4().hex,
+ publicurl=uuid.uuid4().hex,
+ adminurl=uuid.uuid4().hex,
+ internalurl=uuid.uuid4().hex)
+
+ def test_policy_crud(self):
+ # FIXME(dolph): this test was written prior to the v3 implementation of
+ # the client and essentially refers to a non-existent
+ # policy manager in the v2 client. this test needs to be
+ # moved to a test suite running against the v3 api
+ self.skipTest('Written prior to v3 client; needs refactor')
+
+ client = self.get_client(admin=True)
+
+ policy_blob = uuid.uuid4().hex
+ policy_type = uuid.uuid4().hex
+ service = client.services.create(
+ name=uuid.uuid4().hex,
+ service_type=uuid.uuid4().hex,
+ description=uuid.uuid4().hex)
+ endpoint = client.endpoints.create(
+ service_id=service.id,
+ region=uuid.uuid4().hex,
+ adminurl=uuid.uuid4().hex,
+ internalurl=uuid.uuid4().hex,
+ publicurl=uuid.uuid4().hex)
+
+ # create
+ policy = client.policies.create(
+ blob=policy_blob,
+ type=policy_type,
+ endpoint=endpoint.id)
+ self.assertEqual(policy_blob, policy.policy)
+ self.assertEqual(policy_type, policy.type)
+ self.assertEqual(endpoint.id, policy.endpoint_id)
+
+ policy = client.policies.get(policy=policy.id)
+ self.assertEqual(policy_blob, policy.policy)
+ self.assertEqual(policy_type, policy.type)
+ self.assertEqual(endpoint.id, policy.endpoint_id)
+
+ endpoints = [x for x in client.endpoints.list() if x.id == endpoint.id]
+ endpoint = endpoints[0]
+ self.assertEqual(policy_blob, policy.policy)
+ self.assertEqual(policy_type, policy.type)
+ self.assertEqual(endpoint.id, policy.endpoint_id)
+
+ # update
+ policy_blob = uuid.uuid4().hex
+ policy_type = uuid.uuid4().hex
+ endpoint = client.endpoints.create(
+ service_id=service.id,
+ region=uuid.uuid4().hex,
+ adminurl=uuid.uuid4().hex,
+ internalurl=uuid.uuid4().hex,
+ publicurl=uuid.uuid4().hex)
+
+ policy = client.policies.update(
+ policy=policy.id,
+ blob=policy_blob,
+ type=policy_type,
+ endpoint=endpoint.id)
+
+ policy = client.policies.get(policy=policy.id)
+ self.assertEqual(policy_blob, policy.policy)
+ self.assertEqual(policy_type, policy.type)
+ self.assertEqual(endpoint.id, policy.endpoint_id)
+
+ # delete
+ client.policies.delete(policy=policy.id)
+ self.assertRaises(
+ client_exceptions.NotFound,
+ client.policies.get,
+ policy=policy.id)
+ policies = [x for x in client.policies.list() if x.id == policy.id]
+ self.assertEqual(0, len(policies))