aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3.py177
1 files changed, 48 insertions, 129 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py
index 9bbfa103..7afe6ad8 100644
--- a/keystone-moon/keystone/tests/unit/test_v3.py
+++ b/keystone-moon/keystone/tests/unit/test_v3.py
@@ -18,7 +18,7 @@ import uuid
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import timeutils
-import six
+from six.moves import http_client
from testtools import matchers
from keystone import auth
@@ -27,14 +27,14 @@ from keystone.common import cache
from keystone import exception
from keystone import middleware
from keystone.policy.backends import rules
-from keystone.tests import unit as tests
+from keystone.tests import unit
from keystone.tests.unit import rest
CONF = cfg.CONF
DEFAULT_DOMAIN_ID = 'default'
-TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
+TIME_FORMAT = unit.TIME_FORMAT
class AuthTestMixin(object):
@@ -116,11 +116,11 @@ class AuthTestMixin(object):
return {'auth': auth_data}
-class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
+class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
AuthTestMixin):
def config_files(self):
config_files = super(RestfulTestCase, self).config_files()
- config_files.append(tests.dirs.tests_conf('backend_sql.conf'))
+ config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
return config_files
def get_extensions(self):
@@ -132,7 +132,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
def generate_paste_config(self):
new_paste_file = None
try:
- new_paste_file = tests.generate_paste_config(self.EXTENSION_TO_ADD)
+ new_paste_file = unit.generate_paste_config(self.EXTENSION_TO_ADD)
except AttributeError:
# no need to report this error here, as most tests will not have
# EXTENSION_TO_ADD defined.
@@ -142,7 +142,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
def remove_generated_paste_config(self):
try:
- tests.remove_generated_paste_config(self.EXTENSION_TO_ADD)
+ unit.remove_generated_paste_config(self.EXTENSION_TO_ADD)
except AttributeError:
pass
@@ -176,7 +176,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
self.load_sample_data()
def _populate_default_domain(self):
- if CONF.database.connection == tests.IN_MEM_DB_CONN_STRING:
+ if CONF.database.connection == unit.IN_MEM_DB_CONN_STRING:
# NOTE(morganfainberg): If an in-memory db is being used, be sure
# to populate the default domain, this is typically done by
# a migration, but the in-mem db uses model definitions to create
@@ -265,122 +265,52 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
self.endpoint['enabled'] = True
def new_ref(self):
- """Populates a ref with attributes common to all API entities."""
- return {
- 'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- 'enabled': True}
+ """Populates a ref with attributes common to some API entities."""
+ return unit.new_ref()
def new_region_ref(self):
- ref = self.new_ref()
- # Region doesn't have name or enabled.
- del ref['name']
- del ref['enabled']
- ref['parent_region_id'] = None
- return ref
+ return unit.new_region_ref()
def new_service_ref(self):
- ref = self.new_ref()
- ref['type'] = uuid.uuid4().hex
- return ref
+ return unit.new_service_ref()
def new_endpoint_ref(self, service_id, interface='public', **kwargs):
- ref = self.new_ref()
- del ref['enabled'] # enabled is optional
- ref['interface'] = interface
- ref['service_id'] = service_id
- ref['url'] = 'https://' + uuid.uuid4().hex + '.com'
- ref['region_id'] = self.region_id
- ref.update(kwargs)
- return ref
+ return unit.new_endpoint_ref(
+ service_id, interface=interface, default_region_id=self.region_id,
+ **kwargs)
def new_domain_ref(self):
- ref = self.new_ref()
- return ref
+ return unit.new_domain_ref()
def new_project_ref(self, domain_id=None, parent_id=None, is_domain=False):
- ref = self.new_ref()
- ref['domain_id'] = domain_id
- ref['parent_id'] = parent_id
- ref['is_domain'] = is_domain
- return ref
+ return unit.new_project_ref(domain_id=domain_id, parent_id=parent_id,
+ is_domain=is_domain)
def new_user_ref(self, domain_id, project_id=None):
- ref = self.new_ref()
- ref['domain_id'] = domain_id
- ref['email'] = uuid.uuid4().hex
- ref['password'] = uuid.uuid4().hex
- if project_id:
- ref['default_project_id'] = project_id
- return ref
+ return unit.new_user_ref(domain_id, project_id=project_id)
def new_group_ref(self, domain_id):
- ref = self.new_ref()
- ref['domain_id'] = domain_id
- return ref
+ return unit.new_group_ref(domain_id)
def new_credential_ref(self, user_id, project_id=None, cred_type=None):
- ref = dict()
- ref['id'] = uuid.uuid4().hex
- ref['user_id'] = user_id
- if cred_type == 'ec2':
- ref['type'] = 'ec2'
- ref['blob'] = {'blah': 'test'}
- else:
- ref['type'] = 'cert'
- ref['blob'] = uuid.uuid4().hex
- if project_id:
- ref['project_id'] = project_id
- return ref
+ return unit.new_credential_ref(user_id, project_id=project_id,
+ cred_type=cred_type)
def new_role_ref(self):
- ref = self.new_ref()
- # Roles don't have a description or the enabled flag
- del ref['description']
- del ref['enabled']
- return ref
+ return unit.new_role_ref()
def new_policy_ref(self):
- ref = self.new_ref()
- ref['blob'] = uuid.uuid4().hex
- ref['type'] = uuid.uuid4().hex
- return ref
+ return unit.new_policy_ref()
def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None,
impersonation=None, expires=None, role_ids=None,
role_names=None, remaining_uses=None,
allow_redelegation=False):
- ref = dict()
- ref['id'] = uuid.uuid4().hex
- ref['trustor_user_id'] = trustor_user_id
- ref['trustee_user_id'] = trustee_user_id
- ref['impersonation'] = impersonation or False
- ref['project_id'] = project_id
- ref['remaining_uses'] = remaining_uses
- ref['allow_redelegation'] = allow_redelegation
-
- if isinstance(expires, six.string_types):
- ref['expires_at'] = expires
- elif isinstance(expires, dict):
- ref['expires_at'] = (
- timeutils.utcnow() + datetime.timedelta(**expires)
- ).strftime(TIME_FORMAT)
- elif expires is None:
- pass
- else:
- raise NotImplementedError('Unexpected value for "expires"')
-
- role_ids = role_ids or []
- role_names = role_names or []
- if role_ids or role_names:
- ref['roles'] = []
- for role_id in role_ids:
- ref['roles'].append({'id': role_id})
- for role_name in role_names:
- ref['roles'].append({'name': role_name})
-
- return ref
+ return unit.new_trust_ref(
+ trustor_user_id, trustee_user_id, project_id=project_id,
+ impersonation=impersonation, expires=expires, role_ids=role_ids,
+ role_names=role_names, remaining_uses=remaining_uses,
+ allow_redelegation=allow_redelegation)
def create_new_default_project_for_user(self, user_id, domain_id,
enable_project=True):
@@ -482,7 +412,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
r = self.v3_authenticate_token(auth)
return r.headers.get('X-Subject-Token')
- def v3_authenticate_token(self, auth, expected_status=201):
+ def v3_authenticate_token(self, auth, expected_status=http_client.CREATED):
return self.admin_request(method='POST',
path='/v3/auth/tokens',
body=auth,
@@ -511,42 +441,31 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
return self.admin_request(path=path, token=token, **kwargs)
- def get(self, path, **kwargs):
- r = self.v3_request(method='GET', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 200)
- return r
+ def get(self, path, expected_status=http_client.OK, **kwargs):
+ return self.v3_request(path, method='GET',
+ expected_status=expected_status, **kwargs)
- def head(self, path, **kwargs):
- r = self.v3_request(method='HEAD', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 204)
+ def head(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
+ r = self.v3_request(path, method='HEAD',
+ expected_status=expected_status, **kwargs)
self.assertEqual('', r.body)
return r
- def post(self, path, **kwargs):
- r = self.v3_request(method='POST', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 201)
- return r
+ def post(self, path, expected_status=http_client.CREATED, **kwargs):
+ return self.v3_request(path, method='POST',
+ expected_status=expected_status, **kwargs)
- def put(self, path, **kwargs):
- r = self.v3_request(method='PUT', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 204)
- return r
+ def put(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
+ return self.v3_request(path, method='PUT',
+ expected_status=expected_status, **kwargs)
- def patch(self, path, **kwargs):
- r = self.v3_request(method='PATCH', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 200)
- return r
+ def patch(self, path, expected_status=http_client.OK, **kwargs):
+ return self.v3_request(path, method='PATCH',
+ expected_status=expected_status, **kwargs)
- def delete(self, path, **kwargs):
- r = self.v3_request(method='DELETE', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 204)
- return r
+ def delete(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
+ return self.v3_request(path, method='DELETE',
+ expected_status=expected_status, **kwargs)
def assertValidErrorResponse(self, r):
resp = r.result