diff options
author | asteroide <thomas.duval@orange.com> | 2015-09-24 16:27:16 +0200 |
---|---|---|
committer | asteroide <thomas.duval@orange.com> | 2015-09-24 16:27:16 +0200 |
commit | 92d11d139e9f76d4fd76859aea78643fc32ef36b (patch) | |
tree | bd5a2e7b50853498074ab55bdaee4452c460010b /keystone-moon/keystone/tests/unit/test_v3.py | |
parent | 49325d99acfadaadfad99c596c4ada6b5ec849de (diff) |
Update Keystone code from repository.
Change-Id: Ib3d0a06b10902fcc6d520f58e85aa617bc326d00
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3.py | 177 |
1 files changed, 48 insertions, 129 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py index 9bbfa103..7afe6ad8 100644 --- a/keystone-moon/keystone/tests/unit/test_v3.py +++ b/keystone-moon/keystone/tests/unit/test_v3.py @@ -18,7 +18,7 @@ import uuid from oslo_config import cfg from oslo_serialization import jsonutils from oslo_utils import timeutils -import six +from six.moves import http_client from testtools import matchers from keystone import auth @@ -27,14 +27,14 @@ from keystone.common import cache from keystone import exception from keystone import middleware from keystone.policy.backends import rules -from keystone.tests import unit as tests +from keystone.tests import unit from keystone.tests.unit import rest CONF = cfg.CONF DEFAULT_DOMAIN_ID = 'default' -TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' +TIME_FORMAT = unit.TIME_FORMAT class AuthTestMixin(object): @@ -116,11 +116,11 @@ class AuthTestMixin(object): return {'auth': auth_data} -class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, +class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, AuthTestMixin): def config_files(self): config_files = super(RestfulTestCase, self).config_files() - config_files.append(tests.dirs.tests_conf('backend_sql.conf')) + config_files.append(unit.dirs.tests_conf('backend_sql.conf')) return config_files def get_extensions(self): @@ -132,7 +132,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, def generate_paste_config(self): new_paste_file = None try: - new_paste_file = tests.generate_paste_config(self.EXTENSION_TO_ADD) + new_paste_file = unit.generate_paste_config(self.EXTENSION_TO_ADD) except AttributeError: # no need to report this error here, as most tests will not have # EXTENSION_TO_ADD defined. @@ -142,7 +142,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, def remove_generated_paste_config(self): try: - tests.remove_generated_paste_config(self.EXTENSION_TO_ADD) + unit.remove_generated_paste_config(self.EXTENSION_TO_ADD) except AttributeError: pass @@ -176,7 +176,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, self.load_sample_data() def _populate_default_domain(self): - if CONF.database.connection == tests.IN_MEM_DB_CONN_STRING: + if CONF.database.connection == unit.IN_MEM_DB_CONN_STRING: # NOTE(morganfainberg): If an in-memory db is being used, be sure # to populate the default domain, this is typically done by # a migration, but the in-mem db uses model definitions to create @@ -265,122 +265,52 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, self.endpoint['enabled'] = True def new_ref(self): - """Populates a ref with attributes common to all API entities.""" - return { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - 'enabled': True} + """Populates a ref with attributes common to some API entities.""" + return unit.new_ref() def new_region_ref(self): - ref = self.new_ref() - # Region doesn't have name or enabled. - del ref['name'] - del ref['enabled'] - ref['parent_region_id'] = None - return ref + return unit.new_region_ref() def new_service_ref(self): - ref = self.new_ref() - ref['type'] = uuid.uuid4().hex - return ref + return unit.new_service_ref() def new_endpoint_ref(self, service_id, interface='public', **kwargs): - ref = self.new_ref() - del ref['enabled'] # enabled is optional - ref['interface'] = interface - ref['service_id'] = service_id - ref['url'] = 'https://' + uuid.uuid4().hex + '.com' - ref['region_id'] = self.region_id - ref.update(kwargs) - return ref + return unit.new_endpoint_ref( + service_id, interface=interface, default_region_id=self.region_id, + **kwargs) def new_domain_ref(self): - ref = self.new_ref() - return ref + return unit.new_domain_ref() def new_project_ref(self, domain_id=None, parent_id=None, is_domain=False): - ref = self.new_ref() - ref['domain_id'] = domain_id - ref['parent_id'] = parent_id - ref['is_domain'] = is_domain - return ref + return unit.new_project_ref(domain_id=domain_id, parent_id=parent_id, + is_domain=is_domain) def new_user_ref(self, domain_id, project_id=None): - ref = self.new_ref() - ref['domain_id'] = domain_id - ref['email'] = uuid.uuid4().hex - ref['password'] = uuid.uuid4().hex - if project_id: - ref['default_project_id'] = project_id - return ref + return unit.new_user_ref(domain_id, project_id=project_id) def new_group_ref(self, domain_id): - ref = self.new_ref() - ref['domain_id'] = domain_id - return ref + return unit.new_group_ref(domain_id) def new_credential_ref(self, user_id, project_id=None, cred_type=None): - ref = dict() - ref['id'] = uuid.uuid4().hex - ref['user_id'] = user_id - if cred_type == 'ec2': - ref['type'] = 'ec2' - ref['blob'] = {'blah': 'test'} - else: - ref['type'] = 'cert' - ref['blob'] = uuid.uuid4().hex - if project_id: - ref['project_id'] = project_id - return ref + return unit.new_credential_ref(user_id, project_id=project_id, + cred_type=cred_type) def new_role_ref(self): - ref = self.new_ref() - # Roles don't have a description or the enabled flag - del ref['description'] - del ref['enabled'] - return ref + return unit.new_role_ref() def new_policy_ref(self): - ref = self.new_ref() - ref['blob'] = uuid.uuid4().hex - ref['type'] = uuid.uuid4().hex - return ref + return unit.new_policy_ref() def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None, impersonation=None, expires=None, role_ids=None, role_names=None, remaining_uses=None, allow_redelegation=False): - ref = dict() - ref['id'] = uuid.uuid4().hex - ref['trustor_user_id'] = trustor_user_id - ref['trustee_user_id'] = trustee_user_id - ref['impersonation'] = impersonation or False - ref['project_id'] = project_id - ref['remaining_uses'] = remaining_uses - ref['allow_redelegation'] = allow_redelegation - - if isinstance(expires, six.string_types): - ref['expires_at'] = expires - elif isinstance(expires, dict): - ref['expires_at'] = ( - timeutils.utcnow() + datetime.timedelta(**expires) - ).strftime(TIME_FORMAT) - elif expires is None: - pass - else: - raise NotImplementedError('Unexpected value for "expires"') - - role_ids = role_ids or [] - role_names = role_names or [] - if role_ids or role_names: - ref['roles'] = [] - for role_id in role_ids: - ref['roles'].append({'id': role_id}) - for role_name in role_names: - ref['roles'].append({'name': role_name}) - - return ref + return unit.new_trust_ref( + trustor_user_id, trustee_user_id, project_id=project_id, + impersonation=impersonation, expires=expires, role_ids=role_ids, + role_names=role_names, remaining_uses=remaining_uses, + allow_redelegation=allow_redelegation) def create_new_default_project_for_user(self, user_id, domain_id, enable_project=True): @@ -482,7 +412,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, r = self.v3_authenticate_token(auth) return r.headers.get('X-Subject-Token') - def v3_authenticate_token(self, auth, expected_status=201): + def v3_authenticate_token(self, auth, expected_status=http_client.CREATED): return self.admin_request(method='POST', path='/v3/auth/tokens', body=auth, @@ -511,42 +441,31 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, return self.admin_request(path=path, token=token, **kwargs) - def get(self, path, **kwargs): - r = self.v3_request(method='GET', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 200) - return r + def get(self, path, expected_status=http_client.OK, **kwargs): + return self.v3_request(path, method='GET', + expected_status=expected_status, **kwargs) - def head(self, path, **kwargs): - r = self.v3_request(method='HEAD', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 204) + def head(self, path, expected_status=http_client.NO_CONTENT, **kwargs): + r = self.v3_request(path, method='HEAD', + expected_status=expected_status, **kwargs) self.assertEqual('', r.body) return r - def post(self, path, **kwargs): - r = self.v3_request(method='POST', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 201) - return r + def post(self, path, expected_status=http_client.CREATED, **kwargs): + return self.v3_request(path, method='POST', + expected_status=expected_status, **kwargs) - def put(self, path, **kwargs): - r = self.v3_request(method='PUT', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 204) - return r + def put(self, path, expected_status=http_client.NO_CONTENT, **kwargs): + return self.v3_request(path, method='PUT', + expected_status=expected_status, **kwargs) - def patch(self, path, **kwargs): - r = self.v3_request(method='PATCH', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 200) - return r + def patch(self, path, expected_status=http_client.OK, **kwargs): + return self.v3_request(path, method='PATCH', + expected_status=expected_status, **kwargs) - def delete(self, path, **kwargs): - r = self.v3_request(method='DELETE', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 204) - return r + def delete(self, path, expected_status=http_client.NO_CONTENT, **kwargs): + return self.v3_request(path, method='DELETE', + expected_status=expected_status, **kwargs) def assertValidErrorResponse(self, r): resp = r.result |