diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_backend.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_backend.py | 6851 |
1 files changed, 0 insertions, 6851 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_backend.py b/keystone-moon/keystone/tests/unit/test_backend.py deleted file mode 100644 index 302fc2c2..00000000 --- a/keystone-moon/keystone/tests/unit/test_backend.py +++ /dev/null @@ -1,6851 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import datetime -import hashlib -import uuid - -from keystoneclient.common import cms -import mock -from oslo_config import cfg -from oslo_utils import timeutils -import six -from six.moves import range -from testtools import matchers - -from keystone.catalog import core -from keystone.common import driver_hints -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import default_fixtures -from keystone.tests.unit import filtering -from keystone.tests.unit import utils as test_utils -from keystone.token import provider - - -CONF = cfg.CONF -DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id -NULL_OBJECT = object() - - -class AssignmentTestHelperMixin(object): - """Mixin class to aid testing of assignments. - - This class supports data driven test plans that enable: - - - Creation of initial entities, such as domains, users, groups, projects - and roles - - Creation of assignments referencing the above entities - - A set of input parameters and expected outputs to list_role_assignments - based on the above test data - - A test plan is a dict of the form: - - test_plan = { - entities: details and number of entities, - group_memberships: group-user entity memberships, - assignments: list of assignments to create, - tests: list of pairs of input params and expected outputs} - - An example test plan: - - test_plan = { - # First, create the entities required. Entities are specified by - # a dict with the key being the entity type and the value an - # entity specification which can be one of: - # - # - a simple number, e.g. {'users': 3} creates 3 users - # - a dict where more information regarding the contents of the entity - # is required, e.g. {'domains' : {'users : 3}} creates a domain - # with three users - # - a list of entity specifications if multiple are required - # - # The following creates a domain that contains a single user, group and - # project, as well as creating three roles. - - 'entities': {'domains': {'users': 1, 'groups': 1, 'projects': 1}, - 'roles': 3}, - - # If it is required that an existing domain be used for the new - # entities, then the id of that domain can be included in the - # domain dict. For example, if alternatively we wanted to add 3 users - # to the default domain, add a second domain containing 3 projects as - # well as 5 additional empty domains, the entities would be defined as: - # - # 'entities': {'domains': [{'id': DEFAULT_DOMAIN, 'users': 3}, - # {'projects': 3}, 5]}, - # - # A project hierarchy can be specified within the 'projects' section by - # nesting the 'project' key, for example to create a project with three - # sub-projects you would use: - - 'projects': {'project': 3} - - # A more complex hierarchy can also be defined, for example the - # following would define three projects each containing a - # sub-project, each of which contain a further three sub-projects. - - 'projects': [{'project': {'project': 3}}, - {'project': {'project': 3}}, - {'project': {'project': 3}}] - - # A list of groups and their members. In this case make users with - # index 0 and 1 members of group with index 0. Users and Groups are - # indexed in the order they appear in the 'entities' key above. - - 'group_memberships': [{'group': 0, 'users': [0, 1]}] - - # Next, create assignments between the entities, referencing the - # entities by index, i.e. 'user': 0 refers to user[0]. Entities are - # indexed in the order they appear in the 'entities' key above within - # their entity type. - - 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'group': 0, 'role': 2, 'domain': 0}, - {'user': 0, 'role': 2, 'project': 0}], - - # Finally, define an array of tests where list_role_assignment() is - # called with the given input parameters and the results are then - # confirmed to be as given in 'results'. Again, all entities are - # referenced by index. - - 'tests': [ - {'params': {}, - 'results': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'group': 0, 'role': 2, 'domain': 0}, - {'user': 0, 'role': 2, 'project': 0}]}, - {'params': {'role': 2}, - 'results': [{'group': 0, 'role': 2, 'domain': 0}, - {'user': 0, 'role': 2, 'project': 0}]}] - - # The 'params' key also supports the 'effective' and - # 'inherited_to_projects' options to list_role_assignments.} - - """ - def _handle_project_spec(self, test_data, domain_id, project_spec, - parent_id=None): - """Handle the creation of a project or hierarchy of projects. - - project_spec may either be a count of the number of projects to - create, or it may be a list of the form: - - [{'project': project_spec}, {'project': project_spec}, ...] - - This method is called recursively to handle the creation of a - hierarchy of projects. - - """ - def _create_project(domain_id, parent_id): - new_project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain_id, 'parent_id': parent_id} - new_project = self.resource_api.create_project(new_project['id'], - new_project) - return new_project - - if isinstance(project_spec, list): - for this_spec in project_spec: - self._handle_project_spec( - test_data, domain_id, this_spec, parent_id=parent_id) - elif isinstance(project_spec, dict): - new_proj = _create_project(domain_id, parent_id) - test_data['projects'].append(new_proj) - self._handle_project_spec( - test_data, domain_id, project_spec['project'], - parent_id=new_proj['id']) - else: - for _ in range(project_spec): - test_data['projects'].append( - _create_project(domain_id, parent_id)) - - def _handle_domain_spec(self, test_data, domain_spec): - """Handle the creation of domains and their contents. - - domain_spec may either be a count of the number of empty domains to - create, a dict describing the domain contents, or a list of - domain_specs. - - In the case when a list is provided, this method calls itself - recursively to handle the list elements. - - This method will insert any entities created into test_data - - """ - def _create_domain(domain_id=None): - if domain_id is None: - new_domain = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.resource_api.create_domain(new_domain['id'], - new_domain) - return new_domain - else: - # The test plan specified an existing domain to use - return self.resource_api.get_domain(domain_id) - - def _create_entity_in_domain(entity_type, domain_id): - """Create a user or group entity in the domain.""" - - new_entity = {'name': uuid.uuid4().hex, 'domain_id': domain_id} - if entity_type == 'users': - new_entity = self.identity_api.create_user(new_entity) - elif entity_type == 'groups': - new_entity = self.identity_api.create_group(new_entity) - else: - # Must be a bad test plan - raise exception.NotImplemented() - return new_entity - - if isinstance(domain_spec, list): - for x in domain_spec: - self._handle_domain_spec(test_data, x) - elif isinstance(domain_spec, dict): - # If there is a domain ID specified, then use it - the_domain = _create_domain(domain_spec.get('id')) - test_data['domains'].append(the_domain) - for entity_type, value in domain_spec.items(): - if entity_type == 'id': - # We already used this above to determine whether to - # use and existing domain - continue - if entity_type == 'projects': - # If it's projects, we need to handle the potential - # specification of a project hierarchy - self._handle_project_spec( - test_data, the_domain['id'], value) - else: - # It's a count of number of entities - for _ in range(value): - test_data[entity_type].append( - _create_entity_in_domain( - entity_type, the_domain['id'])) - else: - for _ in range(domain_spec): - test_data['domains'].append(_create_domain()) - - def create_entities(self, entity_pattern): - """Create the entities specified in the test plan. - - Process the 'entities' key in the test plan, creating the requested - entities. Each created entity will be added to the array of entities - stored in the returned test_data object, e.g.: - - test_data['users'] = [user[0], user[1]....] - - """ - def _create_role(): - new_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - return self.role_api.create_role(new_role['id'], new_role) - - test_data = {} - for entity in ['users', 'groups', 'domains', 'projects', 'roles']: - test_data[entity] = [] - - # Create any domains requested and, if specified, any entities within - # those domains - if 'domains' in entity_pattern: - self._handle_domain_spec(test_data, entity_pattern['domains']) - - # Create any roles requested - if 'roles' in entity_pattern: - for _ in range(entity_pattern['roles']): - test_data['roles'].append(_create_role()) - - return test_data - - def _convert_entity_shorthand(self, key, shorthand_data, reference_data): - """Convert a shorthand entity description into a full ID reference. - - In test plan definitions, we allow a shorthand for referencing to an - entity of the form: - - 'user': 0 - - which is actually shorthand for: - - 'user_id': reference_data['users'][0]['id'] - - This method converts the shorthand version into the full reference. - - """ - expanded_key = '%s_id' % key - reference_index = '%ss' % key - index_value = ( - reference_data[reference_index][shorthand_data[key]]['id']) - return expanded_key, index_value - - def create_group_memberships(self, group_pattern, test_data): - """Create the group memberships specified in the test plan.""" - - for group_spec in group_pattern: - # Each membership specification is a dict of the form: - # - # {'group': 0, 'users': [list of user indexes]} - # - # Add all users in the list to the specified group, first - # converting from index to full entity ID. - group_value = test_data['groups'][group_spec['group']]['id'] - for user_index in group_spec['users']: - user_value = test_data['users'][user_index]['id'] - self.identity_api.add_user_to_group(user_value, group_value) - return test_data - - def create_assignments(self, assignment_pattern, test_data): - """Create the assignments specified in the test plan.""" - - # First store how many assignments are already in the system, - # so during the tests we can check the number of new assignments - # created. - test_data['initial_assignment_count'] = ( - len(self.assignment_api.list_role_assignments())) - - # Now create the new assignments in the test plan - for assignment in assignment_pattern: - # Each assignment is a dict of the form: - # - # { 'user': 0, 'project':1, 'role': 6} - # - # where the value of each item is the index into the array of - # entities created earlier. - # - # We process the assignment dict to create the args required to - # make the create_grant() call. - args = {} - for param in assignment: - if param == 'inherited_to_projects': - args[param] = assignment[param] - else: - # Turn 'entity : 0' into 'entity_id = ac6736ba873d' - # where entity in user, group, project or domain - key, value = self._convert_entity_shorthand( - param, assignment, test_data) - args[key] = value - self.assignment_api.create_grant(**args) - return test_data - - def execute_assignment_tests(self, test_plan, test_data): - """Execute the test plan, based on the created test_data.""" - - def check_results(expected, actual, param_arg_count): - if param_arg_count == 0: - # It was an unfiltered call, so default fixture assignments - # might be polluting our answer - so we take into account - # how many assignments there were before the test. - self.assertEqual( - len(expected) + test_data['initial_assignment_count'], - len(actual)) - else: - self.assertThat(actual, matchers.HasLength(len(expected))) - - for each_expected in expected: - expected_assignment = {} - for param in each_expected: - if param == 'inherited_to_projects': - expected_assignment[param] = each_expected[param] - elif param == 'indirect': - # We're expecting the result to contain an indirect - # dict with the details how the role came to be placed - # on this entity - so convert the key/value pairs of - # that dict into real entity references. - indirect_term = {} - for indirect_param in each_expected[param]: - key, value = self._convert_entity_shorthand( - indirect_param, each_expected[param], - test_data) - indirect_term[key] = value - expected_assignment[param] = indirect_term - else: - # Convert a simple shorthand entry into a full - # entity reference - key, value = self._convert_entity_shorthand( - param, each_expected, test_data) - expected_assignment[key] = value - self.assertIn(expected_assignment, actual) - - # Go through each test in the array, processing the input params, which - # we build into an args dict, and then call list_role_assignments. Then - # check the results against those specified in the test plan. - for test in test_plan.get('tests', []): - args = {} - for param in test['params']: - if param in ['effective', 'inherited']: - # Just pass the value into the args - args[param] = test['params'][param] - else: - # Turn 'entity : 0' into 'entity_id = ac6736ba873d' - # where entity in user, group, project or domain - key, value = self._convert_entity_shorthand( - param, test['params'], test_data) - args[key] = value - results = self.assignment_api.list_role_assignments(**args) - check_results(test['results'], results, len(args)) - - def execute_assignment_test_plan(self, test_plan): - """Create entities, assignments and execute the test plan. - - The standard method to call to create entities and assignments and - execute the tests as specified in the test_plan. The test_data - dict is returned so that, if required, the caller can execute - additional manual tests with the entities and assignments created. - - """ - test_data = self.create_entities(test_plan['entities']) - if 'group_memberships' in test_plan: - self.create_group_memberships(test_plan['group_memberships'], - test_data) - if 'assignments' in test_plan: - test_data = self.create_assignments(test_plan['assignments'], - test_data) - self.execute_assignment_tests(test_plan, test_data) - return test_data - - -class IdentityTests(AssignmentTestHelperMixin): - def _get_domain_fixture(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain['id'], domain) - return domain - - def _set_domain_scope(self, domain_id): - # We only provide a domain scope if we have multiple drivers - if CONF.identity.domain_specific_drivers_enabled: - return domain_id - - def test_project_add_and_remove_user_role(self): - user_ids = self.assignment_api.list_user_ids_for_project( - self.tenant_bar['id']) - self.assertNotIn(self.user_two['id'], user_ids) - - self.assignment_api.add_role_to_user_and_project( - tenant_id=self.tenant_bar['id'], - user_id=self.user_two['id'], - role_id=self.role_other['id']) - user_ids = self.assignment_api.list_user_ids_for_project( - self.tenant_bar['id']) - self.assertIn(self.user_two['id'], user_ids) - - self.assignment_api.remove_role_from_user_and_project( - tenant_id=self.tenant_bar['id'], - user_id=self.user_two['id'], - role_id=self.role_other['id']) - - user_ids = self.assignment_api.list_user_ids_for_project( - self.tenant_bar['id']) - self.assertNotIn(self.user_two['id'], user_ids) - - def test_remove_user_role_not_assigned(self): - # Expect failure if attempt to remove a role that was never assigned to - # the user. - self.assertRaises(exception.RoleNotFound, - self.assignment_api. - remove_role_from_user_and_project, - tenant_id=self.tenant_bar['id'], - user_id=self.user_two['id'], - role_id=self.role_other['id']) - - def test_authenticate_bad_user(self): - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=uuid.uuid4().hex, - password=self.user_foo['password']) - - def test_authenticate_bad_password(self): - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=self.user_foo['id'], - password=uuid.uuid4().hex) - - def test_authenticate(self): - user_ref = self.identity_api.authenticate( - context={}, - user_id=self.user_sna['id'], - password=self.user_sna['password']) - # NOTE(termie): the password field is left in user_sna to make - # it easier to authenticate in tests, but should - # not be returned by the api - self.user_sna.pop('password') - self.user_sna['enabled'] = True - self.assertDictEqual(user_ref, self.user_sna) - - def test_authenticate_and_get_roles_no_metadata(self): - user = { - 'name': 'NO_META', - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': 'no_meta2', - } - new_user = self.identity_api.create_user(user) - self.assignment_api.add_user_to_project(self.tenant_baz['id'], - new_user['id']) - user_ref = self.identity_api.authenticate( - context={}, - user_id=new_user['id'], - password=user['password']) - self.assertNotIn('password', user_ref) - # NOTE(termie): the password field is left in user_sna to make - # it easier to authenticate in tests, but should - # not be returned by the api - user.pop('password') - self.assertDictContainsSubset(user, user_ref) - role_list = self.assignment_api.get_roles_for_user_and_project( - new_user['id'], self.tenant_baz['id']) - self.assertEqual(1, len(role_list)) - self.assertIn(CONF.member_role_id, role_list) - - def test_authenticate_if_no_password_set(self): - id_ = uuid.uuid4().hex - user = { - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - } - self.identity_api.create_user(user) - - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=id_, - password='password') - - def test_create_unicode_user_name(self): - unicode_name = u'name \u540d\u5b57' - user = {'name': unicode_name, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex} - ref = self.identity_api.create_user(user) - self.assertEqual(unicode_name, ref['name']) - - def test_get_project(self): - tenant_ref = self.resource_api.get_project(self.tenant_bar['id']) - self.assertDictEqual(tenant_ref, self.tenant_bar) - - def test_get_project_404(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - uuid.uuid4().hex) - - def test_get_project_by_name(self): - tenant_ref = self.resource_api.get_project_by_name( - self.tenant_bar['name'], - DEFAULT_DOMAIN_ID) - self.assertDictEqual(tenant_ref, self.tenant_bar) - - def test_get_project_by_name_404(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project_by_name, - uuid.uuid4().hex, - DEFAULT_DOMAIN_ID) - - def test_list_user_ids_for_project(self): - user_ids = self.assignment_api.list_user_ids_for_project( - self.tenant_baz['id']) - self.assertEqual(2, len(user_ids)) - self.assertIn(self.user_two['id'], user_ids) - self.assertIn(self.user_badguy['id'], user_ids) - - def test_list_user_ids_for_project_no_duplicates(self): - # Create user - user_ref = { - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex, - 'enabled': True} - user_ref = self.identity_api.create_user(user_ref) - # Create project - project_ref = { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project( - project_ref['id'], project_ref) - # Create 2 roles and give user each role in project - for i in range(2): - role_ref = { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.role_api.create_role(role_ref['id'], role_ref) - self.assignment_api.add_role_to_user_and_project( - user_id=user_ref['id'], - tenant_id=project_ref['id'], - role_id=role_ref['id']) - # Get the list of user_ids in project - user_ids = self.assignment_api.list_user_ids_for_project( - project_ref['id']) - # Ensure the user is only returned once - self.assertEqual(1, len(user_ids)) - - def test_get_project_user_ids_404(self): - self.assertRaises(exception.ProjectNotFound, - self.assignment_api.list_user_ids_for_project, - uuid.uuid4().hex) - - def test_get_user(self): - user_ref = self.identity_api.get_user(self.user_foo['id']) - # NOTE(termie): the password field is left in user_foo to make - # it easier to authenticate in tests, but should - # not be returned by the api - self.user_foo.pop('password') - self.assertDictEqual(user_ref, self.user_foo) - - @unit.skip_if_cache_disabled('identity') - def test_cache_layer_get_user(self): - user = { - 'name': uuid.uuid4().hex.lower(), - 'domain_id': DEFAULT_DOMAIN_ID - } - self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - # cache the result. - self.identity_api.get_user(ref['id']) - # delete bypassing identity api - domain_id, driver, entity_id = ( - self.identity_api._get_domain_driver_and_entity_id(ref['id'])) - driver.delete_user(entity_id) - - self.assertDictEqual(ref, self.identity_api.get_user(ref['id'])) - self.identity_api.get_user.invalidate(self.identity_api, ref['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, ref['id']) - user = { - 'name': uuid.uuid4().hex.lower(), - 'domain_id': DEFAULT_DOMAIN_ID - } - self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - user['description'] = uuid.uuid4().hex - # cache the result. - self.identity_api.get_user(ref['id']) - # update using identity api and get back updated user. - user_updated = self.identity_api.update_user(ref['id'], user) - self.assertDictContainsSubset(self.identity_api.get_user(ref['id']), - user_updated) - self.assertDictContainsSubset( - self.identity_api.get_user_by_name(ref['name'], ref['domain_id']), - user_updated) - - def test_get_user_404(self): - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - uuid.uuid4().hex) - - def test_get_user_by_name(self): - user_ref = self.identity_api.get_user_by_name( - self.user_foo['name'], DEFAULT_DOMAIN_ID) - # NOTE(termie): the password field is left in user_foo to make - # it easier to authenticate in tests, but should - # not be returned by the api - self.user_foo.pop('password') - self.assertDictEqual(user_ref, self.user_foo) - - @unit.skip_if_cache_disabled('identity') - def test_cache_layer_get_user_by_name(self): - user = { - 'name': uuid.uuid4().hex.lower(), - 'domain_id': DEFAULT_DOMAIN_ID - } - self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - # delete bypassing the identity api. - domain_id, driver, entity_id = ( - self.identity_api._get_domain_driver_and_entity_id(ref['id'])) - driver.delete_user(entity_id) - - self.assertDictEqual(ref, self.identity_api.get_user_by_name( - user['name'], DEFAULT_DOMAIN_ID)) - self.identity_api.get_user_by_name.invalidate( - self.identity_api, user['name'], DEFAULT_DOMAIN_ID) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user_by_name, - user['name'], DEFAULT_DOMAIN_ID) - user = { - 'name': uuid.uuid4().hex.lower(), - 'domain_id': DEFAULT_DOMAIN_ID - } - self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - user['description'] = uuid.uuid4().hex - user_updated = self.identity_api.update_user(ref['id'], user) - self.assertDictContainsSubset(self.identity_api.get_user(ref['id']), - user_updated) - self.assertDictContainsSubset( - self.identity_api.get_user_by_name(ref['name'], ref['domain_id']), - user_updated) - - def test_get_user_by_name_404(self): - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user_by_name, - uuid.uuid4().hex, - DEFAULT_DOMAIN_ID) - - def test_create_duplicate_user_name_fails(self): - user = {'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': 'fakepass', - 'tenants': ['bar']} - user = self.identity_api.create_user(user) - self.assertRaises(exception.Conflict, - self.identity_api.create_user, - user) - - def test_create_duplicate_user_name_in_different_domains(self): - new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(new_domain['id'], new_domain) - user1 = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex} - user2 = {'name': user1['name'], - 'domain_id': new_domain['id'], - 'password': uuid.uuid4().hex} - self.identity_api.create_user(user1) - self.identity_api.create_user(user2) - - def test_move_user_between_domains(self): - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - user = {'name': uuid.uuid4().hex, - 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex} - user = self.identity_api.create_user(user) - user['domain_id'] = domain2['id'] - self.identity_api.update_user(user['id'], user) - - def test_move_user_between_domains_with_clashing_names_fails(self): - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - # First, create a user in domain1 - user1 = {'name': uuid.uuid4().hex, - 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex} - user1 = self.identity_api.create_user(user1) - # Now create a user in domain2 with a potentially clashing - # name - which should work since we have domain separation - user2 = {'name': user1['name'], - 'domain_id': domain2['id'], - 'password': uuid.uuid4().hex} - user2 = self.identity_api.create_user(user2) - # Now try and move user1 into the 2nd domain - which should - # fail since the names clash - user1['domain_id'] = domain2['id'] - self.assertRaises(exception.Conflict, - self.identity_api.update_user, - user1['id'], - user1) - - def test_rename_duplicate_user_name_fails(self): - user1 = {'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': 'fakepass', - 'tenants': ['bar']} - user2 = {'name': 'fake2', - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': 'fakepass', - 'tenants': ['bar']} - self.identity_api.create_user(user1) - user2 = self.identity_api.create_user(user2) - user2['name'] = 'fake1' - self.assertRaises(exception.Conflict, - self.identity_api.update_user, - user2['id'], - user2) - - def test_update_user_id_fails(self): - user = {'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': 'fakepass', - 'tenants': ['bar']} - user = self.identity_api.create_user(user) - original_id = user['id'] - user['id'] = 'fake2' - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - original_id, - user) - user_ref = self.identity_api.get_user(original_id) - self.assertEqual(original_id, user_ref['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - 'fake2') - - def test_create_duplicate_project_id_fails(self): - tenant = {'id': 'fake1', 'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project('fake1', tenant) - tenant['name'] = 'fake2' - self.assertRaises(exception.Conflict, - self.resource_api.create_project, - 'fake1', - tenant) - - def test_create_duplicate_project_name_fails(self): - tenant = {'id': 'fake1', 'name': 'fake', - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project('fake1', tenant) - tenant['id'] = 'fake2' - self.assertRaises(exception.Conflict, - self.resource_api.create_project, - 'fake1', - tenant) - - def test_create_duplicate_project_name_in_different_domains(self): - new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(new_domain['id'], new_domain) - tenant1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} - tenant2 = {'id': uuid.uuid4().hex, 'name': tenant1['name'], - 'domain_id': new_domain['id']} - self.resource_api.create_project(tenant1['id'], tenant1) - self.resource_api.create_project(tenant2['id'], tenant2) - - def test_move_project_between_domains(self): - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project['id'], project) - project['domain_id'] = domain2['id'] - self.resource_api.update_project(project['id'], project) - - def test_move_project_between_domains_with_clashing_names_fails(self): - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - # First, create a project in domain1 - project1 = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - # Now create a project in domain2 with a potentially clashing - # name - which should work since we have domain separation - project2 = {'id': uuid.uuid4().hex, - 'name': project1['name'], - 'domain_id': domain2['id']} - self.resource_api.create_project(project2['id'], project2) - # Now try and move project1 into the 2nd domain - which should - # fail since the names clash - project1['domain_id'] = domain2['id'] - self.assertRaises(exception.Conflict, - self.resource_api.update_project, - project1['id'], - project1) - - def test_rename_duplicate_project_name_fails(self): - tenant1 = {'id': 'fake1', 'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - tenant2 = {'id': 'fake2', 'name': 'fake2', - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project('fake1', tenant1) - self.resource_api.create_project('fake2', tenant2) - tenant2['name'] = 'fake1' - self.assertRaises(exception.Error, - self.resource_api.update_project, - 'fake2', - tenant2) - - def test_update_project_id_does_nothing(self): - tenant = {'id': 'fake1', 'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project('fake1', tenant) - tenant['id'] = 'fake2' - self.resource_api.update_project('fake1', tenant) - tenant_ref = self.resource_api.get_project('fake1') - self.assertEqual('fake1', tenant_ref['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - 'fake2') - - def test_list_role_assignments_unfiltered(self): - """Test unfiltered listing of role assignments.""" - - test_plan = { - # Create a domain, with a user, group & project - 'entities': {'domains': {'users': 1, 'groups': 1, 'projects': 1}, - 'roles': 3}, - # Create a grant of each type (user/group on project/domain) - 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'group': 0, 'role': 2, 'domain': 0}, - {'group': 0, 'role': 2, 'project': 0}], - 'tests': [ - # Check that we get back the 4 assignments - {'params': {}, - 'results': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'group': 0, 'role': 2, 'domain': 0}, - {'group': 0, 'role': 2, 'project': 0}]} - ] - } - self.execute_assignment_test_plan(test_plan) - - def test_list_role_assignments_filtered_by_role(self): - """Test listing of role assignments filtered by role ID.""" - - test_plan = { - # Create a user, group & project in the default domain - 'entities': {'domains': {'id': DEFAULT_DOMAIN_ID, - 'users': 1, 'groups': 1, 'projects': 1}, - 'roles': 3}, - # Create a grant of each type (user/group on project/domain) - 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'group': 0, 'role': 2, 'domain': 0}, - {'group': 0, 'role': 2, 'project': 0}], - 'tests': [ - # Check that when filtering by role, we only get back those - # that match - {'params': {'role': 2}, - 'results': [{'group': 0, 'role': 2, 'domain': 0}, - {'group': 0, 'role': 2, 'project': 0}]} - ] - } - test_data = self.execute_assignment_test_plan(test_plan) - - # Also test that list_role_assignments_for_role() gives the same answer - assignment_list = self.assignment_api.list_role_assignments_for_role( - role_id=test_data['roles'][2]['id']) - self.assertThat(assignment_list, matchers.HasLength(2)) - - # Now check that each of our two new entries are in the list - self.assertIn( - {'group_id': test_data['groups'][0]['id'], - 'domain_id': DEFAULT_DOMAIN_ID, - 'role_id': test_data['roles'][2]['id']}, - assignment_list) - self.assertIn( - {'group_id': test_data['groups'][0]['id'], - 'project_id': test_data['projects'][0]['id'], - 'role_id': test_data['roles'][2]['id']}, - assignment_list) - - def test_list_group_role_assignment(self): - # When a group role assignment is created and the role assignments are - # listed then the group role assignment is included in the list. - - test_plan = { - 'entities': {'domains': {'id': DEFAULT_DOMAIN_ID, - 'groups': 1, 'projects': 1}, - 'roles': 1}, - 'assignments': [{'group': 0, 'role': 0, 'project': 0}], - 'tests': [ - {'params': {}, - 'results': [{'group': 0, 'role': 0, 'project': 0}]} - ] - } - self.execute_assignment_test_plan(test_plan) - - def test_list_role_assignments_bad_role(self): - assignment_list = self.assignment_api.list_role_assignments_for_role( - role_id=uuid.uuid4().hex) - self.assertEqual([], assignment_list) - - def test_add_duplicate_role_grant(self): - roles_ref = self.assignment_api.get_roles_for_user_and_project( - self.user_foo['id'], self.tenant_bar['id']) - self.assertNotIn(self.role_admin['id'], roles_ref) - self.assignment_api.add_role_to_user_and_project( - self.user_foo['id'], self.tenant_bar['id'], self.role_admin['id']) - self.assertRaises(exception.Conflict, - self.assignment_api.add_role_to_user_and_project, - self.user_foo['id'], - self.tenant_bar['id'], - self.role_admin['id']) - - def test_get_role_by_user_and_project_with_user_in_group(self): - """Test for get role by user and project, user was added into a group. - - Test Plan: - - - Create a user, a project & a group, add this user to group - - Create roles and grant them to user and project - - Check the role list get by the user and project was as expected - - """ - user_ref = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex, - 'enabled': True} - user_ref = self.identity_api.create_user(user_ref) - - project_ref = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project(project_ref['id'], project_ref) - - group = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} - group_id = self.identity_api.create_group(group)['id'] - self.identity_api.add_user_to_group(user_ref['id'], group_id) - - role_ref_list = [] - for i in range(2): - role_ref = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role_ref['id'], role_ref) - role_ref_list.append(role_ref) - - self.assignment_api.add_role_to_user_and_project( - user_id=user_ref['id'], - tenant_id=project_ref['id'], - role_id=role_ref['id']) - - role_list = self.assignment_api.get_roles_for_user_and_project( - user_id=user_ref['id'], - tenant_id=project_ref['id']) - - self.assertEqual(set(role_list), - set([r['id'] for r in role_ref_list])) - - def test_get_role_by_user_and_project(self): - roles_ref = self.assignment_api.get_roles_for_user_and_project( - self.user_foo['id'], self.tenant_bar['id']) - self.assertNotIn(self.role_admin['id'], roles_ref) - self.assignment_api.add_role_to_user_and_project( - self.user_foo['id'], self.tenant_bar['id'], self.role_admin['id']) - roles_ref = self.assignment_api.get_roles_for_user_and_project( - self.user_foo['id'], self.tenant_bar['id']) - self.assertIn(self.role_admin['id'], roles_ref) - self.assertNotIn('member', roles_ref) - - self.assignment_api.add_role_to_user_and_project( - self.user_foo['id'], self.tenant_bar['id'], 'member') - roles_ref = self.assignment_api.get_roles_for_user_and_project( - self.user_foo['id'], self.tenant_bar['id']) - self.assertIn(self.role_admin['id'], roles_ref) - self.assertIn('member', roles_ref) - - def test_get_roles_for_user_and_domain(self): - """Test for getting roles for user on a domain. - - Test Plan: - - - Create a domain, with 2 users - - Check no roles yet exit - - Give user1 two roles on the domain, user2 one role - - Get roles on user1 and the domain - maybe sure we only - get back the 2 roles on user1 - - Delete both roles from user1 - - Check we get no roles back for user1 on domain - - """ - new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(new_domain['id'], new_domain) - new_user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': new_domain['id']} - new_user1 = self.identity_api.create_user(new_user1) - new_user2 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': new_domain['id']} - new_user2 = self.identity_api.create_user(new_user2) - roles_ref = self.assignment_api.list_grants( - user_id=new_user1['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) - # Now create the grants (roles are defined in default_fixtures) - self.assignment_api.create_grant(user_id=new_user1['id'], - domain_id=new_domain['id'], - role_id='member') - self.assignment_api.create_grant(user_id=new_user1['id'], - domain_id=new_domain['id'], - role_id='other') - self.assignment_api.create_grant(user_id=new_user2['id'], - domain_id=new_domain['id'], - role_id='admin') - # Read back the roles for user1 on domain - roles_ids = self.assignment_api.get_roles_for_user_and_domain( - new_user1['id'], new_domain['id']) - self.assertEqual(2, len(roles_ids)) - self.assertIn(self.role_member['id'], roles_ids) - self.assertIn(self.role_other['id'], roles_ids) - - # Now delete both grants for user1 - self.assignment_api.delete_grant(user_id=new_user1['id'], - domain_id=new_domain['id'], - role_id='member') - self.assignment_api.delete_grant(user_id=new_user1['id'], - domain_id=new_domain['id'], - role_id='other') - roles_ref = self.assignment_api.list_grants( - user_id=new_user1['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) - - def test_get_roles_for_user_and_domain_404(self): - """Test errors raised when getting roles for user on a domain. - - Test Plan: - - - Check non-existing user gives UserNotFound - - Check non-existing domain gives DomainNotFound - - """ - new_domain = self._get_domain_fixture() - new_user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': new_domain['id']} - new_user1 = self.identity_api.create_user(new_user1) - - self.assertRaises(exception.UserNotFound, - self.assignment_api.get_roles_for_user_and_domain, - uuid.uuid4().hex, - new_domain['id']) - - self.assertRaises(exception.DomainNotFound, - self.assignment_api.get_roles_for_user_and_domain, - new_user1['id'], - uuid.uuid4().hex) - - def test_get_roles_for_user_and_project_404(self): - self.assertRaises(exception.UserNotFound, - self.assignment_api.get_roles_for_user_and_project, - uuid.uuid4().hex, - self.tenant_bar['id']) - - self.assertRaises(exception.ProjectNotFound, - self.assignment_api.get_roles_for_user_and_project, - self.user_foo['id'], - uuid.uuid4().hex) - - def test_add_role_to_user_and_project_404(self): - self.assertRaises(exception.ProjectNotFound, - self.assignment_api.add_role_to_user_and_project, - self.user_foo['id'], - uuid.uuid4().hex, - self.role_admin['id']) - - self.assertRaises(exception.RoleNotFound, - self.assignment_api.add_role_to_user_and_project, - self.user_foo['id'], - self.tenant_bar['id'], - uuid.uuid4().hex) - - def test_add_role_to_user_and_project_no_user(self): - # If add_role_to_user_and_project and the user doesn't exist, then - # no error. - user_id_not_exist = uuid.uuid4().hex - self.assignment_api.add_role_to_user_and_project( - user_id_not_exist, self.tenant_bar['id'], self.role_admin['id']) - - def test_remove_role_from_user_and_project(self): - self.assignment_api.add_role_to_user_and_project( - self.user_foo['id'], self.tenant_bar['id'], 'member') - self.assignment_api.remove_role_from_user_and_project( - self.user_foo['id'], self.tenant_bar['id'], 'member') - roles_ref = self.assignment_api.get_roles_for_user_and_project( - self.user_foo['id'], self.tenant_bar['id']) - self.assertNotIn('member', roles_ref) - self.assertRaises(exception.NotFound, - self.assignment_api. - remove_role_from_user_and_project, - self.user_foo['id'], - self.tenant_bar['id'], - 'member') - - def test_get_role_grant_by_user_and_project(self): - roles_ref = self.assignment_api.list_grants( - user_id=self.user_foo['id'], - project_id=self.tenant_bar['id']) - self.assertEqual(1, len(roles_ref)) - self.assignment_api.create_grant(user_id=self.user_foo['id'], - project_id=self.tenant_bar['id'], - role_id=self.role_admin['id']) - roles_ref = self.assignment_api.list_grants( - user_id=self.user_foo['id'], - project_id=self.tenant_bar['id']) - self.assertIn(self.role_admin['id'], - [role_ref['id'] for role_ref in roles_ref]) - - self.assignment_api.create_grant(user_id=self.user_foo['id'], - project_id=self.tenant_bar['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - user_id=self.user_foo['id'], - project_id=self.tenant_bar['id']) - - roles_ref_ids = [] - for ref in roles_ref: - roles_ref_ids.append(ref['id']) - self.assertIn(self.role_admin['id'], roles_ref_ids) - self.assertIn('member', roles_ref_ids) - - def test_remove_role_grant_from_user_and_project(self): - self.assignment_api.create_grant(user_id=self.user_foo['id'], - project_id=self.tenant_baz['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - user_id=self.user_foo['id'], - project_id=self.tenant_baz['id']) - self.assertDictEqual(roles_ref[0], self.role_member) - - self.assignment_api.delete_grant(user_id=self.user_foo['id'], - project_id=self.tenant_baz['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - user_id=self.user_foo['id'], - project_id=self.tenant_baz['id']) - self.assertEqual(0, len(roles_ref)) - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - user_id=self.user_foo['id'], - project_id=self.tenant_baz['id'], - role_id='member') - - def test_get_role_assignment_by_project_not_found(self): - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.check_grant_role_id, - user_id=self.user_foo['id'], - project_id=self.tenant_baz['id'], - role_id='member') - - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.check_grant_role_id, - group_id=uuid.uuid4().hex, - project_id=self.tenant_baz['id'], - role_id='member') - - def test_get_role_assignment_by_domain_not_found(self): - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.check_grant_role_id, - user_id=self.user_foo['id'], - domain_id=self.domain_default['id'], - role_id='member') - - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.check_grant_role_id, - group_id=uuid.uuid4().hex, - domain_id=self.domain_default['id'], - role_id='member') - - def test_del_role_assignment_by_project_not_found(self): - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - user_id=self.user_foo['id'], - project_id=self.tenant_baz['id'], - role_id='member') - - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - group_id=uuid.uuid4().hex, - project_id=self.tenant_baz['id'], - role_id='member') - - def test_del_role_assignment_by_domain_not_found(self): - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - user_id=self.user_foo['id'], - domain_id=self.domain_default['id'], - role_id='member') - - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - group_id=uuid.uuid4().hex, - domain_id=self.domain_default['id'], - role_id='member') - - def test_get_and_remove_role_grant_by_group_and_project(self): - new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(new_domain['id'], new_domain) - new_group = {'domain_id': new_domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - new_user = {'name': 'new_user', 'password': 'secret', - 'enabled': True, 'domain_id': new_domain['id']} - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - project_id=self.tenant_bar['id']) - self.assertEqual(0, len(roles_ref)) - self.assignment_api.create_grant(group_id=new_group['id'], - project_id=self.tenant_bar['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - project_id=self.tenant_bar['id']) - self.assertDictEqual(roles_ref[0], self.role_member) - - self.assignment_api.delete_grant(group_id=new_group['id'], - project_id=self.tenant_bar['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - project_id=self.tenant_bar['id']) - self.assertEqual(0, len(roles_ref)) - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - group_id=new_group['id'], - project_id=self.tenant_bar['id'], - role_id='member') - - def test_get_and_remove_role_grant_by_group_and_domain(self): - new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(new_domain['id'], new_domain) - new_group = {'domain_id': new_domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': new_domain['id']} - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) - - self.assignment_api.create_grant(group_id=new_group['id'], - domain_id=new_domain['id'], - role_id='member') - - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - domain_id=new_domain['id']) - self.assertDictEqual(roles_ref[0], self.role_member) - - self.assignment_api.delete_grant(group_id=new_group['id'], - domain_id=new_domain['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - group_id=new_group['id'], - domain_id=new_domain['id'], - role_id='member') - - def test_get_and_remove_correct_role_grant_from_a_mix(self): - new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(new_domain['id'], new_domain) - new_project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': new_domain['id']} - self.resource_api.create_project(new_project['id'], new_project) - new_group = {'domain_id': new_domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - new_group2 = {'domain_id': new_domain['id'], 'name': uuid.uuid4().hex} - new_group2 = self.identity_api.create_group(new_group2) - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': new_domain['id']} - new_user = self.identity_api.create_user(new_user) - new_user2 = {'name': 'new_user2', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': new_domain['id']} - new_user2 = self.identity_api.create_user(new_user2) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - # First check we have no grants - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) - # Now add the grant we are going to test for, and some others as - # well just to make sure we get back the right one - self.assignment_api.create_grant(group_id=new_group['id'], - domain_id=new_domain['id'], - role_id='member') - - self.assignment_api.create_grant(group_id=new_group2['id'], - domain_id=new_domain['id'], - role_id=self.role_admin['id']) - self.assignment_api.create_grant(user_id=new_user2['id'], - domain_id=new_domain['id'], - role_id=self.role_admin['id']) - self.assignment_api.create_grant(group_id=new_group['id'], - project_id=new_project['id'], - role_id=self.role_admin['id']) - - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - domain_id=new_domain['id']) - self.assertDictEqual(roles_ref[0], self.role_member) - - self.assignment_api.delete_grant(group_id=new_group['id'], - domain_id=new_domain['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - group_id=new_group['id'], - domain_id=new_domain['id'], - role_id='member') - - def test_get_and_remove_role_grant_by_user_and_domain(self): - new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(new_domain['id'], new_domain) - new_user = {'name': 'new_user', 'password': 'secret', - 'enabled': True, 'domain_id': new_domain['id']} - new_user = self.identity_api.create_user(new_user) - roles_ref = self.assignment_api.list_grants( - user_id=new_user['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) - self.assignment_api.create_grant(user_id=new_user['id'], - domain_id=new_domain['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - user_id=new_user['id'], - domain_id=new_domain['id']) - self.assertDictEqual(roles_ref[0], self.role_member) - - self.assignment_api.delete_grant(user_id=new_user['id'], - domain_id=new_domain['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - user_id=new_user['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - user_id=new_user['id'], - domain_id=new_domain['id'], - role_id='member') - - def test_get_and_remove_role_grant_by_group_and_cross_domain(self): - group1_domain1_role = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.role_api.create_role(group1_domain1_role['id'], - group1_domain1_role) - group1_domain2_role = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.role_api.create_role(group1_domain2_role['id'], - group1_domain2_role) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - group1 = {'domain_id': domain1['id'], 'name': uuid.uuid4().hex} - group1 = self.identity_api.create_group(group1) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain1['id']) - self.assertEqual(0, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain2['id']) - self.assertEqual(0, len(roles_ref)) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain1['id'], - role_id=group1_domain1_role['id']) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain2['id'], - role_id=group1_domain2_role['id']) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain1['id']) - self.assertDictEqual(roles_ref[0], group1_domain1_role) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain2['id']) - self.assertDictEqual(roles_ref[0], group1_domain2_role) - - self.assignment_api.delete_grant(group_id=group1['id'], - domain_id=domain2['id'], - role_id=group1_domain2_role['id']) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain2['id']) - self.assertEqual(0, len(roles_ref)) - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - group_id=group1['id'], - domain_id=domain2['id'], - role_id=group1_domain2_role['id']) - - def test_get_and_remove_role_grant_by_user_and_cross_domain(self): - user1_domain1_role = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.role_api.create_role(user1_domain1_role['id'], user1_domain1_role) - user1_domain2_role = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.role_api.create_role(user1_domain2_role['id'], user1_domain2_role) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - user1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex, 'enabled': True} - user1 = self.identity_api.create_user(user1) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain1['id']) - self.assertEqual(0, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain2['id']) - self.assertEqual(0, len(roles_ref)) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=user1_domain1_role['id']) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain2['id'], - role_id=user1_domain2_role['id']) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain1['id']) - self.assertDictEqual(roles_ref[0], user1_domain1_role) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain2['id']) - self.assertDictEqual(roles_ref[0], user1_domain2_role) - - self.assignment_api.delete_grant(user_id=user1['id'], - domain_id=domain2['id'], - role_id=user1_domain2_role['id']) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain2['id']) - self.assertEqual(0, len(roles_ref)) - self.assertRaises(exception.RoleAssignmentNotFound, - self.assignment_api.delete_grant, - user_id=user1['id'], - domain_id=domain2['id'], - role_id=user1_domain2_role['id']) - - def test_role_grant_by_group_and_cross_domain_project(self): - role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role1['id'], role1) - role2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role2['id'], role2) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group1 = self.identity_api.create_group(group1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain2['id']} - self.resource_api.create_project(project1['id'], project1) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - project_id=project1['id']) - self.assertEqual(0, len(roles_ref)) - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role2['id']) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - project_id=project1['id']) - - roles_ref_ids = [] - for ref in roles_ref: - roles_ref_ids.append(ref['id']) - self.assertIn(role1['id'], roles_ref_ids) - self.assertIn(role2['id'], roles_ref_ids) - - self.assignment_api.delete_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role1['id']) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - self.assertDictEqual(roles_ref[0], role2) - - def test_role_grant_by_user_and_cross_domain_project(self): - role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role1['id'], role1) - role2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role2['id'], role2) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - user1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex, 'enabled': True} - user1 = self.identity_api.create_user(user1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain2['id']} - self.resource_api.create_project(project1['id'], project1) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(0, len(roles_ref)) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role2['id']) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - - roles_ref_ids = [] - for ref in roles_ref: - roles_ref_ids.append(ref['id']) - self.assertIn(role1['id'], roles_ref_ids) - self.assertIn(role2['id'], roles_ref_ids) - - self.assignment_api.delete_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role1['id']) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - self.assertDictEqual(roles_ref[0], role2) - - def test_delete_user_grant_no_user(self): - # Can delete a grant where the user doesn't exist. - role_id = uuid.uuid4().hex - role = {'id': role_id, 'name': uuid.uuid4().hex} - self.role_api.create_role(role_id, role) - - user_id = uuid.uuid4().hex - - self.assignment_api.create_grant(role_id, user_id=user_id, - project_id=self.tenant_bar['id']) - - self.assignment_api.delete_grant(role_id, user_id=user_id, - project_id=self.tenant_bar['id']) - - def test_delete_group_grant_no_group(self): - # Can delete a grant where the group doesn't exist. - role_id = uuid.uuid4().hex - role = {'id': role_id, 'name': uuid.uuid4().hex} - self.role_api.create_role(role_id, role) - - group_id = uuid.uuid4().hex - - self.assignment_api.create_grant(role_id, group_id=group_id, - project_id=self.tenant_bar['id']) - - self.assignment_api.delete_grant(role_id, group_id=group_id, - project_id=self.tenant_bar['id']) - - def test_grant_crud_throws_exception_if_invalid_role(self): - """Ensure RoleNotFound thrown if role does not exist.""" - - def assert_role_not_found_exception(f, **kwargs): - self.assertRaises(exception.RoleNotFound, f, - role_id=uuid.uuid4().hex, **kwargs) - - user = {'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex, 'enabled': True} - user_resp = self.identity_api.create_user(user) - group = {'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True} - group_resp = self.identity_api.create_group(group) - project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} - project_resp = self.resource_api.create_project(project['id'], project) - - for manager_call in [self.assignment_api.create_grant, - self.assignment_api.get_grant, - self.assignment_api.delete_grant]: - assert_role_not_found_exception( - manager_call, - user_id=user_resp['id'], project_id=project_resp['id']) - assert_role_not_found_exception( - manager_call, - group_id=group_resp['id'], project_id=project_resp['id']) - assert_role_not_found_exception( - manager_call, - user_id=user_resp['id'], domain_id=DEFAULT_DOMAIN_ID) - assert_role_not_found_exception( - manager_call, - group_id=group_resp['id'], domain_id=DEFAULT_DOMAIN_ID) - - def test_multi_role_grant_by_user_group_on_project_domain(self): - role_list = [] - for _ in range(10): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - role_list.append(role) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - user1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex, 'enabled': True} - user1 = self.identity_api.create_user(user1) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group1 = self.identity_api.create_group(group1) - group2 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group2 = self.identity_api.create_group(group2) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - - self.identity_api.add_user_to_group(user1['id'], - group1['id']) - self.identity_api.add_user_to_group(user1['id'], - group2['id']) - - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(0, len(roles_ref)) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=role_list[0]['id']) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=role_list[1]['id']) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain1['id'], - role_id=role_list[2]['id']) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain1['id'], - role_id=role_list[3]['id']) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role_list[4]['id']) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role_list[5]['id']) - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role_list[6]['id']) - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role_list[7]['id']) - roles_ref = self.assignment_api.list_grants(user_id=user1['id'], - domain_id=domain1['id']) - self.assertEqual(2, len(roles_ref)) - self.assertIn(role_list[0], roles_ref) - self.assertIn(role_list[1], roles_ref) - roles_ref = self.assignment_api.list_grants(group_id=group1['id'], - domain_id=domain1['id']) - self.assertEqual(2, len(roles_ref)) - self.assertIn(role_list[2], roles_ref) - self.assertIn(role_list[3], roles_ref) - roles_ref = self.assignment_api.list_grants(user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(2, len(roles_ref)) - self.assertIn(role_list[4], roles_ref) - self.assertIn(role_list[5], roles_ref) - roles_ref = self.assignment_api.list_grants(group_id=group1['id'], - project_id=project1['id']) - self.assertEqual(2, len(roles_ref)) - self.assertIn(role_list[6], roles_ref) - self.assertIn(role_list[7], roles_ref) - - # Now test the alternate way of getting back lists of grants, - # where user and group roles are combined. These should match - # the above results. - combined_list = self.assignment_api.get_roles_for_user_and_project( - user1['id'], project1['id']) - self.assertEqual(4, len(combined_list)) - self.assertIn(role_list[4]['id'], combined_list) - self.assertIn(role_list[5]['id'], combined_list) - self.assertIn(role_list[6]['id'], combined_list) - self.assertIn(role_list[7]['id'], combined_list) - - combined_role_list = self.assignment_api.get_roles_for_user_and_domain( - user1['id'], domain1['id']) - self.assertEqual(4, len(combined_role_list)) - self.assertIn(role_list[0]['id'], combined_role_list) - self.assertIn(role_list[1]['id'], combined_role_list) - self.assertIn(role_list[2]['id'], combined_role_list) - self.assertIn(role_list[3]['id'], combined_role_list) - - def test_multi_group_grants_on_project_domain(self): - """Test multiple group roles for user on project and domain. - - Test Plan: - - - Create 6 roles - - Create a domain, with a project, user and two groups - - Make the user a member of both groups - - Check no roles yet exit - - Assign a role to each user and both groups on both the - project and domain - - Get a list of effective roles for the user on both the - project and domain, checking we get back the correct three - roles - - """ - role_list = [] - for _ in range(6): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - role_list.append(role) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - user1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex, 'enabled': True} - user1 = self.identity_api.create_user(user1) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group1 = self.identity_api.create_group(group1) - group2 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group2 = self.identity_api.create_group(group2) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - - self.identity_api.add_user_to_group(user1['id'], - group1['id']) - self.identity_api.add_user_to_group(user1['id'], - group2['id']) - - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(0, len(roles_ref)) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=role_list[0]['id']) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain1['id'], - role_id=role_list[1]['id']) - self.assignment_api.create_grant(group_id=group2['id'], - domain_id=domain1['id'], - role_id=role_list[2]['id']) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role_list[3]['id']) - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role_list[4]['id']) - self.assignment_api.create_grant(group_id=group2['id'], - project_id=project1['id'], - role_id=role_list[5]['id']) - - # Read by the roles, ensuring we get the correct 3 roles for - # both project and domain - combined_list = self.assignment_api.get_roles_for_user_and_project( - user1['id'], project1['id']) - self.assertEqual(3, len(combined_list)) - self.assertIn(role_list[3]['id'], combined_list) - self.assertIn(role_list[4]['id'], combined_list) - self.assertIn(role_list[5]['id'], combined_list) - - combined_role_list = self.assignment_api.get_roles_for_user_and_domain( - user1['id'], domain1['id']) - self.assertEqual(3, len(combined_role_list)) - self.assertIn(role_list[0]['id'], combined_role_list) - self.assertIn(role_list[1]['id'], combined_role_list) - self.assertIn(role_list[2]['id'], combined_role_list) - - def test_delete_role_with_user_and_group_grants(self): - role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role1['id'], role1) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - user1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex, 'enabled': True} - user1 = self.identity_api.create_user(user1) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group1 = self.identity_api.create_group(group1) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain1['id'], - role_id=role1['id']) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain1['id']) - self.assertEqual(1, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain1['id']) - self.assertEqual(1, len(roles_ref)) - self.role_api.delete_role(role1['id']) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(0, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - project_id=project1['id']) - self.assertEqual(0, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain1['id']) - self.assertEqual(0, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain1['id']) - self.assertEqual(0, len(roles_ref)) - - def test_delete_user_with_group_project_domain_links(self): - role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role1['id'], role1) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - user1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex, 'enabled': True} - user1 = self.identity_api.create_user(user1) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group1 = self.identity_api.create_group(group1) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=role1['id']) - self.identity_api.add_user_to_group(user_id=user1['id'], - group_id=group1['id']) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain1['id']) - self.assertEqual(1, len(roles_ref)) - self.identity_api.check_user_in_group( - user_id=user1['id'], - group_id=group1['id']) - self.identity_api.delete_user(user1['id']) - self.assertRaises(exception.NotFound, - self.identity_api.check_user_in_group, - user1['id'], - group1['id']) - - def test_delete_group_with_user_project_domain_links(self): - role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role1['id'], role1) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - user1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex, 'enabled': True} - user1 = self.identity_api.create_user(user1) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group1 = self.identity_api.create_group(group1) - - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain1['id'], - role_id=role1['id']) - self.identity_api.add_user_to_group(user_id=user1['id'], - group_id=group1['id']) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain1['id']) - self.assertEqual(1, len(roles_ref)) - self.identity_api.check_user_in_group( - user_id=user1['id'], - group_id=group1['id']) - self.identity_api.delete_group(group1['id']) - self.identity_api.get_user(user1['id']) - - def test_list_role_assignment_by_domain(self): - """Test listing of role assignment filtered by domain.""" - - test_plan = { - # A domain with 3 users, 1 group, a spoiler domain and 2 roles. - 'entities': {'domains': [{'users': 3, 'groups': 1}, 1], - 'roles': 2}, - # Users 1 & 2 are in the group - 'group_memberships': [{'group': 0, 'users': [1, 2]}], - # Assign a role for user 0 and the group - 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, - {'group': 0, 'role': 1, 'domain': 0}], - 'tests': [ - # List all effective assignments for domain[0]. - # Should get one direct user role and user roles for each of - # the users in the group. - {'params': {'domain': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 1, 'role': 1, 'domain': 0, - 'indirect': {'group': 0}}, - {'user': 2, 'role': 1, 'domain': 0, - 'indirect': {'group': 0}} - ]}, - # Using domain[1] should return nothing - {'params': {'domain': 1, 'effective': True}, - 'results': []}, - ] - } - self.execute_assignment_test_plan(test_plan) - - def test_list_role_assignment_by_user_with_domain_group_roles(self): - """Test listing assignments by user, with group roles on a domain.""" - - test_plan = { - # A domain with 3 users, 3 groups, a spoiler domain - # plus 3 roles. - 'entities': {'domains': [{'users': 3, 'groups': 3}, 1], - 'roles': 3}, - # Users 1 & 2 are in the group 0, User 1 also in group 1 - 'group_memberships': [{'group': 0, 'users': [0, 1]}, - {'group': 1, 'users': [0]}], - 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, - {'group': 0, 'role': 1, 'domain': 0}, - {'group': 1, 'role': 2, 'domain': 0}, - # ...and two spoiler assignments - {'user': 1, 'role': 1, 'domain': 0}, - {'group': 2, 'role': 2, 'domain': 0}], - 'tests': [ - # List all effective assignments for user[0]. - # Should get one direct user role and a user roles for each of - # groups 0 and 1 - {'params': {'user': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'domain': 0, - 'indirect': {'group': 0}}, - {'user': 0, 'role': 2, 'domain': 0, - 'indirect': {'group': 1}} - ]}, - # Adding domain[0] as a filter should return the same data - {'params': {'user': 0, 'domain': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'domain': 0, - 'indirect': {'group': 0}}, - {'user': 0, 'role': 2, 'domain': 0, - 'indirect': {'group': 1}} - ]}, - # Using domain[1] should return nothing - {'params': {'user': 0, 'domain': 1, 'effective': True}, - 'results': []}, - # Using user[2] should return nothing - {'params': {'user': 2, 'domain': 0, 'effective': True}, - 'results': []}, - ] - } - self.execute_assignment_test_plan(test_plan) - - def test_delete_domain_with_user_group_project_links(self): - # TODO(chungg):add test case once expected behaviour defined - pass - - def test_add_user_to_project(self): - self.assignment_api.add_user_to_project(self.tenant_baz['id'], - self.user_foo['id']) - tenants = self.assignment_api.list_projects_for_user( - self.user_foo['id']) - self.assertIn(self.tenant_baz, tenants) - - def test_add_user_to_project_missing_default_role(self): - self.role_api.delete_role(CONF.member_role_id) - self.assertRaises(exception.RoleNotFound, - self.role_api.get_role, - CONF.member_role_id) - self.assignment_api.add_user_to_project(self.tenant_baz['id'], - self.user_foo['id']) - tenants = ( - self.assignment_api.list_projects_for_user(self.user_foo['id'])) - self.assertIn(self.tenant_baz, tenants) - default_role = self.role_api.get_role(CONF.member_role_id) - self.assertIsNotNone(default_role) - - def test_add_user_to_project_404(self): - self.assertRaises(exception.ProjectNotFound, - self.assignment_api.add_user_to_project, - uuid.uuid4().hex, - self.user_foo['id']) - - def test_add_user_to_project_no_user(self): - # If add_user_to_project and the user doesn't exist, then - # no error. - user_id_not_exist = uuid.uuid4().hex - self.assignment_api.add_user_to_project(self.tenant_bar['id'], - user_id_not_exist) - - def test_remove_user_from_project(self): - self.assignment_api.add_user_to_project(self.tenant_baz['id'], - self.user_foo['id']) - self.assignment_api.remove_user_from_project(self.tenant_baz['id'], - self.user_foo['id']) - tenants = self.assignment_api.list_projects_for_user( - self.user_foo['id']) - self.assertNotIn(self.tenant_baz, tenants) - - def test_remove_user_from_project_race_delete_role(self): - self.assignment_api.add_user_to_project(self.tenant_baz['id'], - self.user_foo['id']) - self.assignment_api.add_role_to_user_and_project( - tenant_id=self.tenant_baz['id'], - user_id=self.user_foo['id'], - role_id=self.role_other['id']) - - # Mock a race condition, delete a role after - # get_roles_for_user_and_project() is called in - # remove_user_from_project(). - roles = self.assignment_api.get_roles_for_user_and_project( - self.user_foo['id'], self.tenant_baz['id']) - self.role_api.delete_role(self.role_other['id']) - self.assignment_api.get_roles_for_user_and_project = mock.Mock( - return_value=roles) - self.assignment_api.remove_user_from_project(self.tenant_baz['id'], - self.user_foo['id']) - tenants = self.assignment_api.list_projects_for_user( - self.user_foo['id']) - self.assertNotIn(self.tenant_baz, tenants) - - def test_remove_user_from_project_404(self): - self.assertRaises(exception.ProjectNotFound, - self.assignment_api.remove_user_from_project, - uuid.uuid4().hex, - self.user_foo['id']) - - self.assertRaises(exception.UserNotFound, - self.assignment_api.remove_user_from_project, - self.tenant_bar['id'], - uuid.uuid4().hex) - - self.assertRaises(exception.NotFound, - self.assignment_api.remove_user_from_project, - self.tenant_baz['id'], - self.user_foo['id']) - - def test_list_user_project_ids_404(self): - self.assertRaises(exception.UserNotFound, - self.assignment_api.list_projects_for_user, - uuid.uuid4().hex) - - def test_update_project_404(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.update_project, - uuid.uuid4().hex, - dict()) - - def test_delete_project_404(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.delete_project, - uuid.uuid4().hex) - - def test_update_user_404(self): - user_id = uuid.uuid4().hex - self.assertRaises(exception.UserNotFound, - self.identity_api.update_user, - user_id, - {'id': user_id, - 'domain_id': DEFAULT_DOMAIN_ID}) - - def test_delete_user_with_project_association(self): - user = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex} - user = self.identity_api.create_user(user) - self.assignment_api.add_user_to_project(self.tenant_bar['id'], - user['id']) - self.identity_api.delete_user(user['id']) - self.assertRaises(exception.UserNotFound, - self.assignment_api.list_projects_for_user, - user['id']) - - def test_delete_user_with_project_roles(self): - user = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex} - user = self.identity_api.create_user(user) - self.assignment_api.add_role_to_user_and_project( - user['id'], - self.tenant_bar['id'], - self.role_member['id']) - self.identity_api.delete_user(user['id']) - self.assertRaises(exception.UserNotFound, - self.assignment_api.list_projects_for_user, - user['id']) - - def test_delete_user_404(self): - self.assertRaises(exception.UserNotFound, - self.identity_api.delete_user, - uuid.uuid4().hex) - - def test_delete_role_404(self): - self.assertRaises(exception.RoleNotFound, - self.role_api.delete_role, - uuid.uuid4().hex) - - def test_create_update_delete_unicode_project(self): - unicode_project_name = u'name \u540d\u5b57' - project = {'id': uuid.uuid4().hex, - 'name': unicode_project_name, - 'description': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id} - self.resource_api.create_project(project['id'], project) - self.resource_api.update_project(project['id'], project) - self.resource_api.delete_project(project['id']) - - def test_create_project_with_no_enabled_field(self): - ref = { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex.lower(), - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project(ref['id'], ref) - - project = self.resource_api.get_project(ref['id']) - self.assertIs(project['enabled'], True) - - def test_create_project_long_name_fails(self): - tenant = {'id': 'fake1', 'name': 'a' * 65, - 'domain_id': DEFAULT_DOMAIN_ID} - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - tenant['id'], - tenant) - - def test_create_project_blank_name_fails(self): - tenant = {'id': 'fake1', 'name': '', - 'domain_id': DEFAULT_DOMAIN_ID} - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - tenant['id'], - tenant) - - def test_create_project_invalid_name_fails(self): - tenant = {'id': 'fake1', 'name': None, - 'domain_id': DEFAULT_DOMAIN_ID} - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - tenant['id'], - tenant) - tenant = {'id': 'fake1', 'name': 123, - 'domain_id': DEFAULT_DOMAIN_ID} - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - tenant['id'], - tenant) - - def test_update_project_blank_name_fails(self): - tenant = {'id': 'fake1', 'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project('fake1', tenant) - tenant['name'] = '' - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - tenant['id'], - tenant) - - def test_update_project_long_name_fails(self): - tenant = {'id': 'fake1', 'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project('fake1', tenant) - tenant['name'] = 'a' * 65 - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - tenant['id'], - tenant) - - def test_update_project_invalid_name_fails(self): - tenant = {'id': 'fake1', 'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project('fake1', tenant) - tenant['name'] = None - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - tenant['id'], - tenant) - - tenant['name'] = 123 - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - tenant['id'], - tenant) - - def test_create_user_long_name_fails(self): - user = {'name': 'a' * 256, - 'domain_id': DEFAULT_DOMAIN_ID} - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_create_user_blank_name_fails(self): - user = {'name': '', - 'domain_id': DEFAULT_DOMAIN_ID} - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_create_user_missed_password(self): - user = {'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - user = self.identity_api.create_user(user) - self.identity_api.get_user(user['id']) - # Make sure the user is not allowed to login - # with a password that is empty string or None - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password='') - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password=None) - - def test_create_user_none_password(self): - user = {'name': 'fake1', 'password': None, - 'domain_id': DEFAULT_DOMAIN_ID} - user = self.identity_api.create_user(user) - self.identity_api.get_user(user['id']) - # Make sure the user is not allowed to login - # with a password that is empty string or None - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password='') - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password=None) - - def test_create_user_invalid_name_fails(self): - user = {'name': None, - 'domain_id': DEFAULT_DOMAIN_ID} - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - user = {'name': 123, - 'domain_id': DEFAULT_DOMAIN_ID} - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_update_project_invalid_enabled_type_string(self): - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'enabled': True, - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(True, project_ref['enabled']) - - # Strings are not valid boolean values - project['enabled'] = "false" - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_create_project_invalid_enabled_type_string(self): - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - # invalid string value - 'enabled': "true"} - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - - def test_create_project_invalid_domain_id(self): - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': uuid.uuid4().hex, - 'enabled': True} - self.assertRaises(exception.DomainNotFound, - self.resource_api.create_project, - project['id'], - project) - - def test_create_user_invalid_enabled_type_string(self): - user = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex, - # invalid string value - 'enabled': "true"} - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_update_user_long_name_fails(self): - user = {'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - user = self.identity_api.create_user(user) - user['name'] = 'a' * 256 - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_update_user_blank_name_fails(self): - user = {'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - user = self.identity_api.create_user(user) - user['name'] = '' - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_update_user_invalid_name_fails(self): - user = {'name': 'fake1', - 'domain_id': DEFAULT_DOMAIN_ID} - user = self.identity_api.create_user(user) - - user['name'] = None - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - user['name'] = 123 - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_list_users(self): - users = self.identity_api.list_users( - domain_scope=self._set_domain_scope(DEFAULT_DOMAIN_ID)) - self.assertEqual(len(default_fixtures.USERS), len(users)) - user_ids = set(user['id'] for user in users) - expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id'] - for user in default_fixtures.USERS) - for user_ref in users: - self.assertNotIn('password', user_ref) - self.assertEqual(expected_user_ids, user_ids) - - def test_list_groups(self): - group1 = { - 'domain_id': DEFAULT_DOMAIN_ID, - 'name': uuid.uuid4().hex} - group2 = { - 'domain_id': DEFAULT_DOMAIN_ID, - 'name': uuid.uuid4().hex} - group1 = self.identity_api.create_group(group1) - group2 = self.identity_api.create_group(group2) - groups = self.identity_api.list_groups( - domain_scope=self._set_domain_scope(DEFAULT_DOMAIN_ID)) - self.assertEqual(2, len(groups)) - group_ids = [] - for group in groups: - group_ids.append(group.get('id')) - self.assertIn(group1['id'], group_ids) - self.assertIn(group2['id'], group_ids) - - def test_list_domains(self): - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - self.resource_api.create_domain(domain2['id'], domain2) - domains = self.resource_api.list_domains() - self.assertEqual(3, len(domains)) - domain_ids = [] - for domain in domains: - domain_ids.append(domain.get('id')) - self.assertIn(DEFAULT_DOMAIN_ID, domain_ids) - self.assertIn(domain1['id'], domain_ids) - self.assertIn(domain2['id'], domain_ids) - - def test_list_projects(self): - projects = self.resource_api.list_projects() - self.assertEqual(4, len(projects)) - project_ids = [] - for project in projects: - project_ids.append(project.get('id')) - self.assertIn(self.tenant_bar['id'], project_ids) - self.assertIn(self.tenant_baz['id'], project_ids) - - def test_list_projects_with_multiple_filters(self): - # Create a project - project = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, - 'name': uuid.uuid4().hex, 'description': uuid.uuid4().hex, - 'enabled': True, 'parent_id': None, 'is_domain': False} - self.resource_api.create_project(project['id'], project) - - # Build driver hints with the project's name and inexistent description - hints = driver_hints.Hints() - hints.add_filter('name', project['name']) - hints.add_filter('description', uuid.uuid4().hex) - - # Retrieve projects based on hints and check an empty list is returned - projects = self.resource_api.list_projects(hints) - self.assertEqual([], projects) - - # Build correct driver hints - hints = driver_hints.Hints() - hints.add_filter('name', project['name']) - hints.add_filter('description', project['description']) - - # Retrieve projects based on hints - projects = self.resource_api.list_projects(hints) - - # Check that the returned list contains only the first project - self.assertEqual(1, len(projects)) - self.assertEqual(project, projects[0]) - - def test_list_projects_for_domain(self): - project_ids = ([x['id'] for x in - self.resource_api.list_projects_in_domain( - DEFAULT_DOMAIN_ID)]) - self.assertEqual(4, len(project_ids)) - self.assertIn(self.tenant_bar['id'], project_ids) - self.assertIn(self.tenant_baz['id'], project_ids) - self.assertIn(self.tenant_mtu['id'], project_ids) - self.assertIn(self.tenant_service['id'], project_ids) - - @unit.skip_if_no_multiple_domains_support - def test_list_projects_for_alternate_domain(self): - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project2['id'], project2) - project_ids = ([x['id'] for x in - self.resource_api.list_projects_in_domain( - domain1['id'])]) - self.assertEqual(2, len(project_ids)) - self.assertIn(project1['id'], project_ids) - self.assertIn(project2['id'], project_ids) - - def _create_projects_hierarchy(self, hierarchy_size=2, - domain_id=DEFAULT_DOMAIN_ID, - is_domain=False): - """Creates a project hierarchy with specified size. - - :param hierarchy_size: the desired hierarchy size, default is 2 - - a project with one child. - :param domain_id: domain where the projects hierarchy will be created. - :param is_domain: if the hierarchy will have the is_domain flag active - or not. - - :returns projects: a list of the projects in the created hierarchy. - - """ - project_id = uuid.uuid4().hex - project = {'id': project_id, - 'description': '', - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None, - 'domain_id': domain_id, - 'is_domain': is_domain} - self.resource_api.create_project(project_id, project) - - projects = [project] - for i in range(1, hierarchy_size): - new_project = {'id': uuid.uuid4().hex, - 'description': '', - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': project_id, - 'is_domain': is_domain} - new_project['domain_id'] = domain_id - - self.resource_api.create_project(new_project['id'], new_project) - projects.append(new_project) - project_id = new_project['id'] - - return projects - - @unit.skip_if_no_multiple_domains_support - def test_create_domain_with_project_api(self): - project_id = uuid.uuid4().hex - project = {'id': project_id, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None, - 'is_domain': True} - ref = self.resource_api.create_project(project['id'], project) - self.assertTrue(ref['is_domain']) - self.assertEqual(DEFAULT_DOMAIN_ID, ref['domain_id']) - - @unit.skip_if_no_multiple_domains_support - @test_utils.wip('waiting for projects acting as domains implementation') - def test_is_domain_sub_project_has_parent_domain_id(self): - project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None, - 'is_domain': True} - self.resource_api.create_project(project['id'], project) - - sub_project_id = uuid.uuid4().hex - sub_project = {'id': sub_project_id, - 'description': '', - 'domain_id': project['id'], - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': project['id'], - 'is_domain': True} - ref = self.resource_api.create_project(sub_project['id'], sub_project) - self.assertTrue(ref['is_domain']) - self.assertEqual(project['id'], ref['parent_id']) - self.assertEqual(project['id'], ref['domain_id']) - - @unit.skip_if_no_multiple_domains_support - @test_utils.wip('waiting for projects acting as domains implementation') - def test_delete_domain_with_project_api(self): - project_id = uuid.uuid4().hex - project = {'id': project_id, - 'description': '', - 'domain_id': None, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None, - 'is_domain': True} - self.resource_api.create_project(project['id'], project) - - # Try to delete is_domain project that is enabled - self.assertRaises(exception.ValidationError, - self.resource_api.delete_project, - project['id']) - - # Disable the project - project['enabled'] = False - self.resource_api.update_project(project['id'], project) - - # Successfuly delete the project - self.resource_api.delete_project(project['id']) - - @unit.skip_if_no_multiple_domains_support - @test_utils.wip('waiting for projects acting as domains implementation') - def test_create_domain_under_regular_project_hierarchy_fails(self): - # Creating a regular project hierarchy. Projects acting as domains - # can't have a parent that is a regular project. - projects_hierarchy = self._create_projects_hierarchy() - parent = projects_hierarchy[1] - project_id = uuid.uuid4().hex - project = {'id': project_id, - 'description': '', - 'domain_id': parent['id'], - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': parent['id'], - 'is_domain': True} - - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], project) - - @unit.skip_if_no_multiple_domains_support - @test_utils.wip('waiting for projects acting as domains implementation') - def test_create_project_under_domain_hierarchy(self): - projects_hierarchy = self._create_projects_hierarchy(is_domain=True) - parent = projects_hierarchy[1] - project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': parent['id'], - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': parent['id'], - 'is_domain': False} - - ref = self.resource_api.create_project(project['id'], project) - self.assertFalse(ref['is_domain']) - self.assertEqual(parent['id'], ref['parent_id']) - self.assertEqual(parent['id'], ref['domain_id']) - - def test_create_project_without_is_domain_flag(self): - project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None} - - ref = self.resource_api.create_project(project['id'], project) - # The is_domain flag should be False by default - self.assertFalse(ref['is_domain']) - - def test_create_is_domain_project(self): - project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None, - 'is_domain': True} - - ref = self.resource_api.create_project(project['id'], project) - self.assertTrue(ref['is_domain']) - - @test_utils.wip('waiting for projects acting as domains implementation') - def test_create_project_with_parent_id_and_without_domain_id(self): - project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': None, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None} - self.resource_api.create_project(project['id'], project) - - sub_project = {'id': uuid.uuid4().hex, - 'description': '', - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': project['id']} - ref = self.resource_api.create_project(sub_project['id'], sub_project) - - # The domain_id should be set to the parent domain_id - self.assertEqual(project['domain_id'], ref['domain_id']) - - @test_utils.wip('waiting for projects acting as domains implementation') - def test_create_project_with_domain_id_and_without_parent_id(self): - project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': None, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None} - self.resource_api.create_project(project['id'], project) - - sub_project = {'id': uuid.uuid4().hex, - 'description': '', - 'enabled': True, - 'domain_id': project['id'], - 'name': uuid.uuid4().hex} - ref = self.resource_api.create_project(sub_project['id'], sub_project) - - # The parent_id should be set to the domain_id - self.assertEqual(ref['parent_id'], project['id']) - - def test_check_leaf_projects(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - self.assertFalse(self.resource_api.is_leaf_project( - root_project['id'])) - self.assertTrue(self.resource_api.is_leaf_project( - leaf_project['id'])) - - # Delete leaf_project - self.resource_api.delete_project(leaf_project['id']) - - # Now, root_project should be leaf - self.assertTrue(self.resource_api.is_leaf_project( - root_project['id'])) - - def test_list_projects_in_subtree(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - project1 = projects_hierarchy[0] - project2 = projects_hierarchy[1] - project3 = projects_hierarchy[2] - project4 = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': project2['id'], - 'is_domain': False} - self.resource_api.create_project(project4['id'], project4) - - subtree = self.resource_api.list_projects_in_subtree(project1['id']) - self.assertEqual(3, len(subtree)) - self.assertIn(project2, subtree) - self.assertIn(project3, subtree) - self.assertIn(project4, subtree) - - subtree = self.resource_api.list_projects_in_subtree(project2['id']) - self.assertEqual(2, len(subtree)) - self.assertIn(project3, subtree) - self.assertIn(project4, subtree) - - subtree = self.resource_api.list_projects_in_subtree(project3['id']) - self.assertEqual(0, len(subtree)) - - def test_list_projects_in_subtree_with_circular_reference(self): - project1_id = uuid.uuid4().hex - project2_id = uuid.uuid4().hex - - project1 = {'id': project1_id, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex} - self.resource_api.create_project(project1['id'], project1) - - project2 = {'id': project2_id, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': project1_id} - self.resource_api.create_project(project2['id'], project2) - - project1['parent_id'] = project2_id # Adds cyclic reference - - # NOTE(dstanek): The manager does not allow parent_id to be updated. - # Instead will directly use the driver to create the cyclic - # reference. - self.resource_api.driver.update_project(project1_id, project1) - - subtree = self.resource_api.list_projects_in_subtree(project1_id) - - # NOTE(dstanek): If a cyclic refence is detected the code bails - # and returns None instead of falling into the infinite - # recursion trap. - self.assertIsNone(subtree) - - def test_list_projects_in_subtree_invalid_project_id(self): - self.assertRaises(exception.ValidationError, - self.resource_api.list_projects_in_subtree, - None) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.list_projects_in_subtree, - uuid.uuid4().hex) - - def test_list_project_parents(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - project1 = projects_hierarchy[0] - project2 = projects_hierarchy[1] - project3 = projects_hierarchy[2] - project4 = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': project2['id'], - 'is_domain': False} - self.resource_api.create_project(project4['id'], project4) - - parents1 = self.resource_api.list_project_parents(project3['id']) - self.assertEqual(2, len(parents1)) - self.assertIn(project1, parents1) - self.assertIn(project2, parents1) - - parents2 = self.resource_api.list_project_parents(project4['id']) - self.assertEqual(parents1, parents2) - - parents = self.resource_api.list_project_parents(project1['id']) - self.assertEqual(0, len(parents)) - - def test_list_project_parents_invalid_project_id(self): - self.assertRaises(exception.ValidationError, - self.resource_api.list_project_parents, - None) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.list_project_parents, - uuid.uuid4().hex) - - def test_delete_project_with_role_assignments(self): - tenant = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project(tenant['id'], tenant) - self.assignment_api.add_role_to_user_and_project( - self.user_foo['id'], tenant['id'], 'member') - self.resource_api.delete_project(tenant['id']) - self.assertRaises(exception.NotFound, - self.resource_api.get_project, - tenant['id']) - - def test_delete_role_check_role_grant(self): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - alt_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - self.role_api.create_role(alt_role['id'], alt_role) - self.assignment_api.add_role_to_user_and_project( - self.user_foo['id'], self.tenant_bar['id'], role['id']) - self.assignment_api.add_role_to_user_and_project( - self.user_foo['id'], self.tenant_bar['id'], alt_role['id']) - self.role_api.delete_role(role['id']) - roles_ref = self.assignment_api.get_roles_for_user_and_project( - self.user_foo['id'], self.tenant_bar['id']) - self.assertNotIn(role['id'], roles_ref) - self.assertIn(alt_role['id'], roles_ref) - - def test_create_project_doesnt_modify_passed_in_dict(self): - new_project = {'id': 'tenant_id', 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} - original_project = new_project.copy() - self.resource_api.create_project('tenant_id', new_project) - self.assertDictEqual(original_project, new_project) - - def test_create_user_doesnt_modify_passed_in_dict(self): - new_user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID} - original_user = new_user.copy() - self.identity_api.create_user(new_user) - self.assertDictEqual(original_user, new_user) - - def test_update_user_enable(self): - user = {'name': 'fake1', 'enabled': True, - 'domain_id': DEFAULT_DOMAIN_ID} - user = self.identity_api.create_user(user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(True, user_ref['enabled']) - - user['enabled'] = False - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(user['enabled'], user_ref['enabled']) - - # If not present, enabled field should not be updated - del user['enabled'] - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(False, user_ref['enabled']) - - user['enabled'] = True - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(user['enabled'], user_ref['enabled']) - - del user['enabled'] - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(True, user_ref['enabled']) - - # Integers are valid Python's booleans. Explicitly test it. - user['enabled'] = 0 - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(False, user_ref['enabled']) - - # Any integers other than 0 are interpreted as True - user['enabled'] = -42 - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(True, user_ref['enabled']) - - def test_update_user_name(self): - user = {'name': uuid.uuid4().hex, - 'enabled': True, - 'domain_id': DEFAULT_DOMAIN_ID} - user = self.identity_api.create_user(user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(user['name'], user_ref['name']) - - changed_name = user_ref['name'] + '_changed' - user_ref['name'] = changed_name - updated_user = self.identity_api.update_user(user_ref['id'], user_ref) - - # NOTE(dstanek): the SQL backend adds an 'extra' field containing a - # dictionary of the extra fields in addition to the - # fields in the object. For the details see: - # SqlIdentity.test_update_project_returns_extra - updated_user.pop('extra', None) - - self.assertDictEqual(user_ref, updated_user) - - user_ref = self.identity_api.get_user(user_ref['id']) - self.assertEqual(changed_name, user_ref['name']) - - def test_update_user_enable_fails(self): - user = {'name': 'fake1', 'enabled': True, - 'domain_id': DEFAULT_DOMAIN_ID} - user = self.identity_api.create_user(user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(True, user_ref['enabled']) - - # Strings are not valid boolean values - user['enabled'] = "false" - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_update_project_enable(self): - tenant = {'id': 'fake1', 'name': 'fake1', 'enabled': True, - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project('fake1', tenant) - tenant_ref = self.resource_api.get_project('fake1') - self.assertEqual(True, tenant_ref['enabled']) - - tenant['enabled'] = False - self.resource_api.update_project('fake1', tenant) - tenant_ref = self.resource_api.get_project('fake1') - self.assertEqual(tenant['enabled'], tenant_ref['enabled']) - - # If not present, enabled field should not be updated - del tenant['enabled'] - self.resource_api.update_project('fake1', tenant) - tenant_ref = self.resource_api.get_project('fake1') - self.assertEqual(False, tenant_ref['enabled']) - - tenant['enabled'] = True - self.resource_api.update_project('fake1', tenant) - tenant_ref = self.resource_api.get_project('fake1') - self.assertEqual(tenant['enabled'], tenant_ref['enabled']) - - del tenant['enabled'] - self.resource_api.update_project('fake1', tenant) - tenant_ref = self.resource_api.get_project('fake1') - self.assertEqual(True, tenant_ref['enabled']) - - def test_add_user_to_group(self): - domain = self._get_domain_fixture() - new_group = {'domain_id': domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': domain['id']} - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - groups = self.identity_api.list_groups_for_user(new_user['id']) - - found = False - for x in groups: - if (x['id'] == new_group['id']): - found = True - self.assertTrue(found) - - def test_add_user_to_group_404(self): - domain = self._get_domain_fixture() - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': domain['id']} - new_user = self.identity_api.create_user(new_user) - self.assertRaises(exception.GroupNotFound, - self.identity_api.add_user_to_group, - new_user['id'], - uuid.uuid4().hex) - - new_group = {'domain_id': domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - self.assertRaises(exception.UserNotFound, - self.identity_api.add_user_to_group, - uuid.uuid4().hex, - new_group['id']) - - self.assertRaises(exception.NotFound, - self.identity_api.add_user_to_group, - uuid.uuid4().hex, - uuid.uuid4().hex) - - def test_check_user_in_group(self): - domain = self._get_domain_fixture() - new_group = {'domain_id': domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': domain['id']} - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - self.identity_api.check_user_in_group(new_user['id'], new_group['id']) - - def test_create_invalid_domain_fails(self): - new_group = {'domain_id': "doesnotexist", 'name': uuid.uuid4().hex} - self.assertRaises(exception.DomainNotFound, - self.identity_api.create_group, - new_group) - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': "doesnotexist"} - self.assertRaises(exception.DomainNotFound, - self.identity_api.create_user, - new_user) - - def test_check_user_not_in_group(self): - new_group = { - 'domain_id': DEFAULT_DOMAIN_ID, - 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': DEFAULT_DOMAIN_ID} - new_user = self.identity_api.create_user(new_user) - - self.assertRaises(exception.NotFound, - self.identity_api.check_user_in_group, - new_user['id'], - new_group['id']) - - def test_check_user_in_group_404(self): - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': DEFAULT_DOMAIN_ID} - new_user = self.identity_api.create_user(new_user) - - new_group = { - 'domain_id': DEFAULT_DOMAIN_ID, - 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - - self.assertRaises(exception.UserNotFound, - self.identity_api.check_user_in_group, - uuid.uuid4().hex, - new_group['id']) - - self.assertRaises(exception.GroupNotFound, - self.identity_api.check_user_in_group, - new_user['id'], - uuid.uuid4().hex) - - self.assertRaises(exception.NotFound, - self.identity_api.check_user_in_group, - uuid.uuid4().hex, - uuid.uuid4().hex) - - def test_list_users_in_group(self): - domain = self._get_domain_fixture() - new_group = {'domain_id': domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - # Make sure we get an empty list back on a new group, not an error. - user_refs = self.identity_api.list_users_in_group(new_group['id']) - self.assertEqual([], user_refs) - # Make sure we get the correct users back once they have been added - # to the group. - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': domain['id']} - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - user_refs = self.identity_api.list_users_in_group(new_group['id']) - found = False - for x in user_refs: - if (x['id'] == new_user['id']): - found = True - self.assertNotIn('password', x) - self.assertTrue(found) - - def test_list_users_in_group_404(self): - self.assertRaises(exception.GroupNotFound, - self.identity_api.list_users_in_group, - uuid.uuid4().hex) - - def test_list_groups_for_user(self): - domain = self._get_domain_fixture() - test_groups = [] - test_users = [] - GROUP_COUNT = 3 - USER_COUNT = 2 - - for x in range(0, USER_COUNT): - new_user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': domain['id']} - new_user = self.identity_api.create_user(new_user) - test_users.append(new_user) - positive_user = test_users[0] - negative_user = test_users[1] - - for x in range(0, USER_COUNT): - group_refs = self.identity_api.list_groups_for_user( - test_users[x]['id']) - self.assertEqual(0, len(group_refs)) - - for x in range(0, GROUP_COUNT): - before_count = x - after_count = x + 1 - new_group = {'domain_id': domain['id'], - 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - test_groups.append(new_group) - - # add the user to the group and ensure that the - # group count increases by one for each - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(before_count, len(group_refs)) - self.identity_api.add_user_to_group( - positive_user['id'], - new_group['id']) - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(after_count, len(group_refs)) - - # Make sure the group count for the unrelated user did not change - group_refs = self.identity_api.list_groups_for_user( - negative_user['id']) - self.assertEqual(0, len(group_refs)) - - # remove the user from each group and ensure that - # the group count reduces by one for each - for x in range(0, 3): - before_count = GROUP_COUNT - x - after_count = GROUP_COUNT - x - 1 - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(before_count, len(group_refs)) - self.identity_api.remove_user_from_group( - positive_user['id'], - test_groups[x]['id']) - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(after_count, len(group_refs)) - # Make sure the group count for the unrelated user - # did not change - group_refs = self.identity_api.list_groups_for_user( - negative_user['id']) - self.assertEqual(0, len(group_refs)) - - def test_remove_user_from_group(self): - domain = self._get_domain_fixture() - new_group = {'domain_id': domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': domain['id']} - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - groups = self.identity_api.list_groups_for_user(new_user['id']) - self.assertIn(new_group['id'], [x['id'] for x in groups]) - self.identity_api.remove_user_from_group(new_user['id'], - new_group['id']) - groups = self.identity_api.list_groups_for_user(new_user['id']) - self.assertNotIn(new_group['id'], [x['id'] for x in groups]) - - def test_remove_user_from_group_404(self): - domain = self._get_domain_fixture() - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': domain['id']} - new_user = self.identity_api.create_user(new_user) - new_group = {'domain_id': domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - self.assertRaises(exception.GroupNotFound, - self.identity_api.remove_user_from_group, - new_user['id'], - uuid.uuid4().hex) - - self.assertRaises(exception.UserNotFound, - self.identity_api.remove_user_from_group, - uuid.uuid4().hex, - new_group['id']) - - self.assertRaises(exception.NotFound, - self.identity_api.remove_user_from_group, - uuid.uuid4().hex, - uuid.uuid4().hex) - - def test_group_crud(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain['id'], domain) - group = {'domain_id': domain['id'], 'name': uuid.uuid4().hex} - group = self.identity_api.create_group(group) - group_ref = self.identity_api.get_group(group['id']) - self.assertDictContainsSubset(group, group_ref) - - group['name'] = uuid.uuid4().hex - self.identity_api.update_group(group['id'], group) - group_ref = self.identity_api.get_group(group['id']) - self.assertDictContainsSubset(group, group_ref) - - self.identity_api.delete_group(group['id']) - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group, - group['id']) - - def test_get_group_by_name(self): - group_name = uuid.uuid4().hex - group = {'domain_id': DEFAULT_DOMAIN_ID, 'name': group_name} - group = self.identity_api.create_group(group) - spoiler = {'domain_id': DEFAULT_DOMAIN_ID, 'name': uuid.uuid4().hex} - self.identity_api.create_group(spoiler) - - group_ref = self.identity_api.get_group_by_name( - group_name, DEFAULT_DOMAIN_ID) - self.assertDictEqual(group_ref, group) - - def test_get_group_by_name_404(self): - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group_by_name, - uuid.uuid4().hex, - DEFAULT_DOMAIN_ID) - - @unit.skip_if_cache_disabled('identity') - def test_cache_layer_group_crud(self): - group = {'domain_id': DEFAULT_DOMAIN_ID, 'name': uuid.uuid4().hex} - group = self.identity_api.create_group(group) - # cache the result - group_ref = self.identity_api.get_group(group['id']) - # delete the group bypassing identity api. - domain_id, driver, entity_id = ( - self.identity_api._get_domain_driver_and_entity_id(group['id'])) - driver.delete_group(entity_id) - - self.assertEqual(group_ref, self.identity_api.get_group(group['id'])) - self.identity_api.get_group.invalidate(self.identity_api, group['id']) - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group, group['id']) - - group = {'domain_id': DEFAULT_DOMAIN_ID, 'name': uuid.uuid4().hex} - group = self.identity_api.create_group(group) - # cache the result - self.identity_api.get_group(group['id']) - group['name'] = uuid.uuid4().hex - group_ref = self.identity_api.update_group(group['id'], group) - # after updating through identity api, get updated group - self.assertDictContainsSubset(self.identity_api.get_group(group['id']), - group_ref) - - def test_create_duplicate_group_name_fails(self): - group1 = {'domain_id': DEFAULT_DOMAIN_ID, 'name': uuid.uuid4().hex} - group2 = {'domain_id': DEFAULT_DOMAIN_ID, 'name': group1['name']} - group1 = self.identity_api.create_group(group1) - self.assertRaises(exception.Conflict, - self.identity_api.create_group, - group2) - - def test_create_duplicate_group_name_in_different_domains(self): - new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(new_domain['id'], new_domain) - group1 = {'domain_id': DEFAULT_DOMAIN_ID, 'name': uuid.uuid4().hex} - group2 = {'domain_id': new_domain['id'], 'name': group1['name']} - group1 = self.identity_api.create_group(group1) - group2 = self.identity_api.create_group(group2) - - def test_move_group_between_domains(self): - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - group = {'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - group = self.identity_api.create_group(group) - group['domain_id'] = domain2['id'] - self.identity_api.update_group(group['id'], group) - - def test_move_group_between_domains_with_clashing_names_fails(self): - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - # First, create a group in domain1 - group1 = {'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - group1 = self.identity_api.create_group(group1) - # Now create a group in domain2 with a potentially clashing - # name - which should work since we have domain separation - group2 = {'name': group1['name'], - 'domain_id': domain2['id']} - group2 = self.identity_api.create_group(group2) - # Now try and move group1 into the 2nd domain - which should - # fail since the names clash - group1['domain_id'] = domain2['id'] - self.assertRaises(exception.Conflict, - self.identity_api.update_group, - group1['id'], - group1) - - @unit.skip_if_no_multiple_domains_support - def test_project_crud(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - self.resource_api.create_domain(domain['id'], domain) - project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} - self.resource_api.create_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertDictContainsSubset(project, project_ref) - - project['name'] = uuid.uuid4().hex - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertDictContainsSubset(project, project_ref) - - self.resource_api.delete_project(project['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project['id']) - - def test_domain_delete_hierarchy(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - self.resource_api.create_domain(domain['id'], domain) - - # Creating a root and a leaf project inside the domain - projects_hierarchy = self._create_projects_hierarchy( - domain_id=domain['id']) - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[0] - - # Disable the domain - domain['enabled'] = False - self.resource_api.update_domain(domain['id'], domain) - - # Delete the domain - self.resource_api.delete_domain(domain['id']) - - # Make sure the domain no longer exists - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - - # Make sure the root project no longer exists - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project['id']) - - # Make sure the leaf project no longer exists - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - leaf_project['id']) - - def test_hierarchical_projects_crud(self): - # create a hierarchy with just a root project (which is a leaf as well) - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=1) - root_project1 = projects_hierarchy[0] - - # create a hierarchy with one root project and one leaf project - projects_hierarchy = self._create_projects_hierarchy() - root_project2 = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - # update description from leaf_project - leaf_project['description'] = 'new description' - self.resource_api.update_project(leaf_project['id'], leaf_project) - proj_ref = self.resource_api.get_project(leaf_project['id']) - self.assertDictEqual(proj_ref, leaf_project) - - # update the parent_id is not allowed - leaf_project['parent_id'] = root_project1['id'] - self.assertRaises(exception.ForbiddenAction, - self.resource_api.update_project, - leaf_project['id'], - leaf_project) - - # delete root_project1 - self.resource_api.delete_project(root_project1['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project1['id']) - - # delete root_project2 is not allowed since it is not a leaf project - self.assertRaises(exception.ForbiddenAction, - self.resource_api.delete_project, - root_project2['id']) - - def test_create_project_with_invalid_parent(self): - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'parent_id': 'fake', - 'is_domain': False} - self.assertRaises(exception.ProjectNotFound, - self.resource_api.create_project, - project['id'], - project) - - @unit.skip_if_no_multiple_domains_support - def test_create_leaf_project_with_different_domain(self): - root_project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'parent_id': None, - 'is_domain': False} - self.resource_api.create_project(root_project['id'], root_project) - - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - self.resource_api.create_domain(domain['id'], domain) - leaf_project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': '', - 'domain_id': domain['id'], - 'enabled': True, - 'parent_id': root_project['id'], - 'is_domain': False} - - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - leaf_project['id'], - leaf_project) - - def test_delete_hierarchical_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - self.resource_api.delete_project(leaf_project['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - leaf_project['id']) - - self.resource_api.delete_project(root_project['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project['id']) - - def test_delete_hierarchical_not_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - - self.assertRaises(exception.ForbiddenAction, - self.resource_api.delete_project, - root_project['id']) - - def test_update_project_parent(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - project1 = projects_hierarchy[0] - project2 = projects_hierarchy[1] - project3 = projects_hierarchy[2] - - # project2 is the parent from project3 - self.assertEqual(project3.get('parent_id'), project2['id']) - - # try to update project3 parent to parent1 - project3['parent_id'] = project1['id'] - self.assertRaises(exception.ForbiddenAction, - self.resource_api.update_project, - project3['id'], - project3) - - def test_create_project_under_disabled_one(self): - project1 = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': False, - 'parent_id': None, - 'is_domain': False} - self.resource_api.create_project(project1['id'], project1) - - project2 = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'parent_id': project1['id'], - 'is_domain': False} - - # It's not possible to create a project under a disabled one in the - # hierarchy - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project2['id'], - project2) - - def test_disable_hierarchical_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - leaf_project = projects_hierarchy[1] - - leaf_project['enabled'] = False - self.resource_api.update_project(leaf_project['id'], leaf_project) - - project_ref = self.resource_api.get_project(leaf_project['id']) - self.assertEqual(project_ref['enabled'], leaf_project['enabled']) - - def test_disable_hierarchical_not_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - - root_project['enabled'] = False - self.assertRaises(exception.ForbiddenAction, - self.resource_api.update_project, - root_project['id'], - root_project) - - def test_enable_project_with_disabled_parent(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - # Disable leaf and root - leaf_project['enabled'] = False - self.resource_api.update_project(leaf_project['id'], leaf_project) - root_project['enabled'] = False - self.resource_api.update_project(root_project['id'], root_project) - - # Try to enable the leaf project, it's not possible since it has - # a disabled parent - leaf_project['enabled'] = True - self.assertRaises(exception.ForbiddenAction, - self.resource_api.update_project, - leaf_project['id'], - leaf_project) - - def _get_hierarchy_depth(self, project_id): - return len(self.resource_api.list_project_parents(project_id)) + 1 - - def test_check_hierarchy_depth(self): - # First create a hierarchy with the max allowed depth - projects_hierarchy = self._create_projects_hierarchy( - CONF.max_project_tree_depth) - leaf_project = projects_hierarchy[CONF.max_project_tree_depth - 1] - - depth = self._get_hierarchy_depth(leaf_project['id']) - self.assertEqual(CONF.max_project_tree_depth, depth) - - # Creating another project in the hierarchy shouldn't be allowed - project_id = uuid.uuid4().hex - project = { - 'id': project_id, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'parent_id': leaf_project['id'], - 'is_domain': False} - self.assertRaises(exception.ForbiddenAction, - self.resource_api.create_project, - project_id, - project) - - def test_project_update_missing_attrs_with_a_value(self): - # Creating a project with no description attribute. - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'parent_id': None, - 'is_domain': False} - self.resource_api.create_project(project['id'], project) - - # Add a description attribute. - project['description'] = uuid.uuid4().hex - self.resource_api.update_project(project['id'], project) - - project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project_ref, project) - - def test_project_update_missing_attrs_with_a_falsey_value(self): - # Creating a project with no description attribute. - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'parent_id': None, - 'is_domain': False} - self.resource_api.create_project(project['id'], project) - - # Add a description attribute. - project['description'] = '' - self.resource_api.update_project(project['id'], project) - - project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project_ref, project) - - def test_domain_crud(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - self.resource_api.create_domain(domain['id'], domain) - domain_ref = self.resource_api.get_domain(domain['id']) - self.assertDictEqual(domain_ref, domain) - - domain['name'] = uuid.uuid4().hex - self.resource_api.update_domain(domain['id'], domain) - domain_ref = self.resource_api.get_domain(domain['id']) - self.assertDictEqual(domain_ref, domain) - - # Ensure an 'enabled' domain cannot be deleted - self.assertRaises(exception.ForbiddenAction, - self.resource_api.delete_domain, - domain_id=domain['id']) - - # Disable the domain - domain['enabled'] = False - self.resource_api.update_domain(domain['id'], domain) - - # Delete the domain - self.resource_api.delete_domain(domain['id']) - - # Make sure the domain no longer exists - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - - @unit.skip_if_no_multiple_domains_support - def test_create_domain_case_sensitivity(self): - # create a ref with a lowercase name - ref = { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex.lower()} - self.resource_api.create_domain(ref['id'], ref) - - # assign a new ID with the same name, but this time in uppercase - ref['id'] = uuid.uuid4().hex - ref['name'] = ref['name'].upper() - self.resource_api.create_domain(ref['id'], ref) - - def test_attribute_update(self): - project = { - 'domain_id': DEFAULT_DOMAIN_ID, - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.resource_api.create_project(project['id'], project) - - # pick a key known to be non-existent - key = 'description' - - def assert_key_equals(value): - project_ref = self.resource_api.update_project( - project['id'], project) - self.assertEqual(value, project_ref[key]) - project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(value, project_ref[key]) - - def assert_get_key_is(value): - project_ref = self.resource_api.update_project( - project['id'], project) - self.assertIs(project_ref.get(key), value) - project_ref = self.resource_api.get_project(project['id']) - self.assertIs(project_ref.get(key), value) - - # add an attribute that doesn't exist, set it to a falsey value - value = '' - project[key] = value - assert_key_equals(value) - - # set an attribute with a falsey value to null - value = None - project[key] = value - assert_get_key_is(value) - - # do it again, in case updating from this situation is handled oddly - value = None - project[key] = value - assert_get_key_is(value) - - # set a possibly-null value to a falsey value - value = '' - project[key] = value - assert_key_equals(value) - - # set a falsey value to a truthy value - value = uuid.uuid4().hex - project[key] = value - assert_key_equals(value) - - def test_user_crud(self): - user_dict = {'domain_id': DEFAULT_DOMAIN_ID, - 'name': uuid.uuid4().hex, 'password': 'passw0rd'} - user = self.identity_api.create_user(user_dict) - user_ref = self.identity_api.get_user(user['id']) - del user_dict['password'] - user_ref_dict = {x: user_ref[x] for x in user_ref} - self.assertDictContainsSubset(user_dict, user_ref_dict) - - user_dict['password'] = uuid.uuid4().hex - self.identity_api.update_user(user['id'], user_dict) - user_ref = self.identity_api.get_user(user['id']) - del user_dict['password'] - user_ref_dict = {x: user_ref[x] for x in user_ref} - self.assertDictContainsSubset(user_dict, user_ref_dict) - - self.identity_api.delete_user(user['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - user['id']) - - def test_list_projects_for_user(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain['id'], domain) - user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain['id'], 'enabled': True} - user1 = self.identity_api.create_user(user1) - user_projects = self.assignment_api.list_projects_for_user(user1['id']) - self.assertEqual(0, len(user_projects)) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=self.tenant_bar['id'], - role_id=self.role_member['id']) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=self.tenant_baz['id'], - role_id=self.role_member['id']) - user_projects = self.assignment_api.list_projects_for_user(user1['id']) - self.assertEqual(2, len(user_projects)) - - def test_list_projects_for_user_with_grants(self): - # Create two groups each with a role on a different project, and - # make user1 a member of both groups. Both these new projects - # should now be included, along with any direct user grants. - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain['id'], domain) - user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain['id'], 'enabled': True} - user1 = self.identity_api.create_user(user1) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} - group1 = self.identity_api.create_group(group1) - group2 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} - group2 = self.identity_api.create_group(group2) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} - self.resource_api.create_project(project1['id'], project1) - project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} - self.resource_api.create_project(project2['id'], project2) - self.identity_api.add_user_to_group(user1['id'], group1['id']) - self.identity_api.add_user_to_group(user1['id'], group2['id']) - - # Create 3 grants, one user grant, the other two as group grants - self.assignment_api.create_grant(user_id=user1['id'], - project_id=self.tenant_bar['id'], - role_id=self.role_member['id']) - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=self.role_admin['id']) - self.assignment_api.create_grant(group_id=group2['id'], - project_id=project2['id'], - role_id=self.role_admin['id']) - user_projects = self.assignment_api.list_projects_for_user(user1['id']) - self.assertEqual(3, len(user_projects)) - - @unit.skip_if_cache_disabled('resource') - @unit.skip_if_no_multiple_domains_support - def test_domain_rename_invalidates_get_domain_by_name_cache(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - domain_id = domain['id'] - domain_name = domain['name'] - self.resource_api.create_domain(domain_id, domain) - domain_ref = self.resource_api.get_domain_by_name(domain_name) - domain_ref['name'] = uuid.uuid4().hex - self.resource_api.update_domain(domain_id, domain_ref) - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain_by_name, - domain_name) - - @unit.skip_if_cache_disabled('resource') - def test_cache_layer_domain_crud(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - domain_id = domain['id'] - # Create Domain - self.resource_api.create_domain(domain_id, domain) - domain_ref = self.resource_api.get_domain(domain_id) - updated_domain_ref = copy.deepcopy(domain_ref) - updated_domain_ref['name'] = uuid.uuid4().hex - # Update domain, bypassing resource api manager - self.resource_api.driver.update_domain(domain_id, updated_domain_ref) - # Verify get_domain still returns the domain - self.assertDictContainsSubset( - domain_ref, self.resource_api.get_domain(domain_id)) - # Invalidate cache - self.resource_api.get_domain.invalidate(self.resource_api, - domain_id) - # Verify get_domain returns the updated domain - self.assertDictContainsSubset( - updated_domain_ref, self.resource_api.get_domain(domain_id)) - # Update the domain back to original ref, using the assignment api - # manager - self.resource_api.update_domain(domain_id, domain_ref) - self.assertDictContainsSubset( - domain_ref, self.resource_api.get_domain(domain_id)) - # Make sure domain is 'disabled', bypass resource api manager - domain_ref_disabled = domain_ref.copy() - domain_ref_disabled['enabled'] = False - self.resource_api.driver.update_domain(domain_id, - domain_ref_disabled) - # Delete domain, bypassing resource api manager - self.resource_api.driver.delete_domain(domain_id) - # Verify get_domain still returns the domain - self.assertDictContainsSubset( - domain_ref, self.resource_api.get_domain(domain_id)) - # Invalidate cache - self.resource_api.get_domain.invalidate(self.resource_api, - domain_id) - # Verify get_domain now raises DomainNotFound - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, domain_id) - # Recreate Domain - self.resource_api.create_domain(domain_id, domain) - self.resource_api.get_domain(domain_id) - # Make sure domain is 'disabled', bypass resource api manager - domain['enabled'] = False - self.resource_api.driver.update_domain(domain_id, domain) - # Delete domain - self.resource_api.delete_domain(domain_id) - # verify DomainNotFound raised - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain_id) - - @unit.skip_if_cache_disabled('resource') - @unit.skip_if_no_multiple_domains_support - def test_project_rename_invalidates_get_project_by_name_cache(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} - project_id = project['id'] - project_name = project['name'] - self.resource_api.create_domain(domain['id'], domain) - # Create a project - self.resource_api.create_project(project_id, project) - self.resource_api.get_project_by_name(project_name, domain['id']) - project['name'] = uuid.uuid4().hex - self.resource_api.update_project(project_id, project) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project_by_name, - project_name, - domain['id']) - - @unit.skip_if_cache_disabled('resource') - @unit.skip_if_no_multiple_domains_support - def test_cache_layer_project_crud(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} - project_id = project['id'] - self.resource_api.create_domain(domain['id'], domain) - # Create a project - self.resource_api.create_project(project_id, project) - self.resource_api.get_project(project_id) - updated_project = copy.deepcopy(project) - updated_project['name'] = uuid.uuid4().hex - # Update project, bypassing resource manager - self.resource_api.driver.update_project(project_id, - updated_project) - # Verify get_project still returns the original project_ref - self.assertDictContainsSubset( - project, self.resource_api.get_project(project_id)) - # Invalidate cache - self.resource_api.get_project.invalidate(self.resource_api, - project_id) - # Verify get_project now returns the new project - self.assertDictContainsSubset( - updated_project, - self.resource_api.get_project(project_id)) - # Update project using the resource_api manager back to original - self.resource_api.update_project(project['id'], project) - # Verify get_project returns the original project_ref - self.assertDictContainsSubset( - project, self.resource_api.get_project(project_id)) - # Delete project bypassing resource - self.resource_api.driver.delete_project(project_id) - # Verify get_project still returns the project_ref - self.assertDictContainsSubset( - project, self.resource_api.get_project(project_id)) - # Invalidate cache - self.resource_api.get_project.invalidate(self.resource_api, - project_id) - # Verify ProjectNotFound now raised - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project_id) - # recreate project - self.resource_api.create_project(project_id, project) - self.resource_api.get_project(project_id) - # delete project - self.resource_api.delete_project(project_id) - # Verify ProjectNotFound is raised - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project_id) - - def create_user_dict(self, **attributes): - user_dict = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True} - user_dict.update(attributes) - return user_dict - - def test_arbitrary_attributes_are_returned_from_create_user(self): - attr_value = uuid.uuid4().hex - user_data = self.create_user_dict(arbitrary_attr=attr_value) - - user = self.identity_api.create_user(user_data) - - self.assertEqual(attr_value, user['arbitrary_attr']) - - def test_arbitrary_attributes_are_returned_from_get_user(self): - attr_value = uuid.uuid4().hex - user_data = self.create_user_dict(arbitrary_attr=attr_value) - - user_data = self.identity_api.create_user(user_data) - - user = self.identity_api.get_user(user_data['id']) - self.assertEqual(attr_value, user['arbitrary_attr']) - - def test_new_arbitrary_attributes_are_returned_from_update_user(self): - user_data = self.create_user_dict() - - user = self.identity_api.create_user(user_data) - attr_value = uuid.uuid4().hex - user['arbitrary_attr'] = attr_value - updated_user = self.identity_api.update_user(user['id'], user) - - self.assertEqual(attr_value, updated_user['arbitrary_attr']) - - def test_updated_arbitrary_attributes_are_returned_from_update_user(self): - attr_value = uuid.uuid4().hex - user_data = self.create_user_dict(arbitrary_attr=attr_value) - - new_attr_value = uuid.uuid4().hex - user = self.identity_api.create_user(user_data) - user['arbitrary_attr'] = new_attr_value - updated_user = self.identity_api.update_user(user['id'], user) - - self.assertEqual(new_attr_value, updated_user['arbitrary_attr']) - - def test_create_grant_no_user(self): - # If call create_grant with a user that doesn't exist, doesn't fail. - self.assignment_api.create_grant( - self.role_other['id'], - user_id=uuid.uuid4().hex, - project_id=self.tenant_bar['id']) - - def test_create_grant_no_group(self): - # If call create_grant with a group that doesn't exist, doesn't fail. - self.assignment_api.create_grant( - self.role_other['id'], - group_id=uuid.uuid4().hex, - project_id=self.tenant_bar['id']) - - @unit.skip_if_no_multiple_domains_support - def test_get_default_domain_by_name(self): - domain_name = 'default' - - domain = {'id': uuid.uuid4().hex, 'name': domain_name, 'enabled': True} - self.resource_api.create_domain(domain['id'], domain) - - domain_ref = self.resource_api.get_domain_by_name(domain_name) - self.assertEqual(domain, domain_ref) - - def test_get_not_default_domain_by_name(self): - domain_name = 'foo' - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain_by_name, - domain_name) - - def test_project_update_and_project_get_return_same_response(self): - project = { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'description': uuid.uuid4().hex, - 'enabled': True} - - self.resource_api.create_project(project['id'], project) - - updated_project = {'enabled': False} - updated_project_ref = self.resource_api.update_project( - project['id'], updated_project) - - # SQL backend adds 'extra' field - updated_project_ref.pop('extra', None) - - self.assertIs(False, updated_project_ref['enabled']) - - project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project_ref, updated_project_ref) - - def test_user_update_and_user_get_return_same_response(self): - user = { - 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'description': uuid.uuid4().hex, - 'enabled': True} - - user = self.identity_api.create_user(user) - - updated_user = {'enabled': False} - updated_user_ref = self.identity_api.update_user( - user['id'], updated_user) - - # SQL backend adds 'extra' field - updated_user_ref.pop('extra', None) - - self.assertIs(False, updated_user_ref['enabled']) - - user_ref = self.identity_api.get_user(user['id']) - self.assertDictEqual(user_ref, updated_user_ref) - - def test_delete_group_removes_role_assignments(self): - # When a group is deleted any role assignments for the group are - # removed. - - MEMBER_ROLE_ID = 'member' - - def get_member_assignments(): - assignments = self.assignment_api.list_role_assignments() - return [x for x in assignments if x['role_id'] == MEMBER_ROLE_ID] - - orig_member_assignments = get_member_assignments() - - # Create a group. - new_group = { - 'domain_id': DEFAULT_DOMAIN_ID, - 'name': self.getUniqueString(prefix='tdgrra')} - new_group = self.identity_api.create_group(new_group) - - # Create a project. - new_project = { - 'id': uuid.uuid4().hex, - 'name': self.getUniqueString(prefix='tdgrra'), - 'domain_id': DEFAULT_DOMAIN_ID} - self.resource_api.create_project(new_project['id'], new_project) - - # Assign a role to the group. - self.assignment_api.create_grant( - group_id=new_group['id'], project_id=new_project['id'], - role_id=MEMBER_ROLE_ID) - - # Delete the group. - self.identity_api.delete_group(new_group['id']) - - # Check that the role assignment for the group is gone - member_assignments = get_member_assignments() - - self.assertThat(member_assignments, - matchers.Equals(orig_member_assignments)) - - def test_get_roles_for_groups_on_domain(self): - """Test retrieving group domain roles. - - Test Plan: - - - Create a domain, three groups and three roles - - Assign one an inherited and the others a non-inherited group role - to the domain - - Ensure that only the non-inherited roles are returned on the domain - - """ - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - group_list = [] - group_id_list = [] - role_list = [] - for _ in range(3): - group = {'name': uuid.uuid4().hex, 'domain_id': domain1['id']} - group = self.identity_api.create_group(group) - group_list.append(group) - group_id_list.append(group['id']) - - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - role_list.append(role) - - # Assign the roles - one is inherited - self.assignment_api.create_grant(group_id=group_list[0]['id'], - domain_id=domain1['id'], - role_id=role_list[0]['id']) - self.assignment_api.create_grant(group_id=group_list[1]['id'], - domain_id=domain1['id'], - role_id=role_list[1]['id']) - self.assignment_api.create_grant(group_id=group_list[2]['id'], - domain_id=domain1['id'], - role_id=role_list[2]['id'], - inherited_to_projects=True) - - # Now get the effective roles for the groups on the domain project. We - # shouldn't get back the inherited role. - - role_refs = self.assignment_api.get_roles_for_groups( - group_id_list, domain_id=domain1['id']) - - self.assertThat(role_refs, matchers.HasLength(2)) - self.assertIn(role_list[0], role_refs) - self.assertIn(role_list[1], role_refs) - - def test_get_roles_for_groups_on_project(self): - """Test retrieving group project roles. - - Test Plan: - - - Create two domains, two projects, six groups and six roles - - Project1 is in Domain1, Project2 is in Domain2 - - Domain2/Project2 are spoilers - - Assign a different direct group role to each project as well - as both an inherited and non-inherited role to each domain - - Get the group roles for Project 1 - depending on whether we have - enabled inheritance, we should either get back just the direct role - or both the direct one plus the inherited domain role from Domain 1 - - """ - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain2['id']} - self.resource_api.create_project(project2['id'], project2) - group_list = [] - group_id_list = [] - role_list = [] - for _ in range(6): - group = {'name': uuid.uuid4().hex, 'domain_id': domain1['id']} - group = self.identity_api.create_group(group) - group_list.append(group) - group_id_list.append(group['id']) - - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - role_list.append(role) - - # Assign the roles - one inherited and one non-inherited on Domain1, - # plus one on Project1 - self.assignment_api.create_grant(group_id=group_list[0]['id'], - domain_id=domain1['id'], - role_id=role_list[0]['id']) - self.assignment_api.create_grant(group_id=group_list[1]['id'], - domain_id=domain1['id'], - role_id=role_list[1]['id'], - inherited_to_projects=True) - self.assignment_api.create_grant(group_id=group_list[2]['id'], - project_id=project1['id'], - role_id=role_list[2]['id']) - - # ...and a duplicate set of spoiler assignments to Domain2/Project2 - self.assignment_api.create_grant(group_id=group_list[3]['id'], - domain_id=domain2['id'], - role_id=role_list[3]['id']) - self.assignment_api.create_grant(group_id=group_list[4]['id'], - domain_id=domain2['id'], - role_id=role_list[4]['id'], - inherited_to_projects=True) - self.assignment_api.create_grant(group_id=group_list[5]['id'], - project_id=project2['id'], - role_id=role_list[5]['id']) - - # Now get the effective roles for all groups on the Project1. With - # inheritance off, we should only get back the direct role. - - self.config_fixture.config(group='os_inherit', enabled=False) - role_refs = self.assignment_api.get_roles_for_groups( - group_id_list, project_id=project1['id']) - - self.assertThat(role_refs, matchers.HasLength(1)) - self.assertIn(role_list[2], role_refs) - - # With inheritance on, we should also get back the inherited role from - # its owning domain. - - self.config_fixture.config(group='os_inherit', enabled=True) - role_refs = self.assignment_api.get_roles_for_groups( - group_id_list, project_id=project1['id']) - - self.assertThat(role_refs, matchers.HasLength(2)) - self.assertIn(role_list[1], role_refs) - self.assertIn(role_list[2], role_refs) - - def test_list_domains_for_groups(self): - """Test retrieving domains for a list of groups. - - Test Plan: - - - Create three domains, three groups and one role - - Assign a non-inherited group role to two domains, and an inherited - group role to the third - - Ensure only the domains with non-inherited roles are returned - - """ - domain_list = [] - group_list = [] - group_id_list = [] - for _ in range(3): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain['id'], domain) - domain_list.append(domain) - - group = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} - group = self.identity_api.create_group(group) - group_list.append(group) - group_id_list.append(group['id']) - - role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role1['id'], role1) - - # Assign the roles - one is inherited - self.assignment_api.create_grant(group_id=group_list[0]['id'], - domain_id=domain_list[0]['id'], - role_id=role1['id']) - self.assignment_api.create_grant(group_id=group_list[1]['id'], - domain_id=domain_list[1]['id'], - role_id=role1['id']) - self.assignment_api.create_grant(group_id=group_list[2]['id'], - domain_id=domain_list[2]['id'], - role_id=role1['id'], - inherited_to_projects=True) - - # Now list the domains that have roles for any of the 3 groups - # We shouldn't get back domain[2] since that had an inherited role. - - domain_refs = ( - self.assignment_api.list_domains_for_groups(group_id_list)) - - self.assertThat(domain_refs, matchers.HasLength(2)) - self.assertIn(domain_list[0], domain_refs) - self.assertIn(domain_list[1], domain_refs) - - def test_list_projects_for_groups(self): - """Test retrieving projects for a list of groups. - - Test Plan: - - - Create two domains, four projects, seven groups and seven roles - - Project1-3 are in Domain1, Project4 is in Domain2 - - Domain2/Project4 are spoilers - - Project1 and 2 have direct group roles, Project3 has no direct - roles but should inherit a group role from Domain1 - - Get the projects for the group roles that are assigned to Project1 - Project2 and the inherited one on Domain1. Depending on whether we - have enabled inheritance, we should either get back just the projects - with direct roles (Project 1 and 2) or also Project3 due to its - inherited role from Domain1. - - """ - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id'], 'is_domain': False} - project1 = self.resource_api.create_project(project1['id'], project1) - project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id'], 'is_domain': False} - project2 = self.resource_api.create_project(project2['id'], project2) - project3 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id'], 'is_domain': False} - project3 = self.resource_api.create_project(project3['id'], project3) - project4 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain2['id'], 'is_domain': False} - project4 = self.resource_api.create_project(project4['id'], project4) - group_list = [] - role_list = [] - for _ in range(7): - group = {'name': uuid.uuid4().hex, 'domain_id': domain1['id']} - group = self.identity_api.create_group(group) - group_list.append(group) - - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - role_list.append(role) - - # Assign the roles - one inherited and one non-inherited on Domain1, - # plus one on Project1 and Project2 - self.assignment_api.create_grant(group_id=group_list[0]['id'], - domain_id=domain1['id'], - role_id=role_list[0]['id']) - self.assignment_api.create_grant(group_id=group_list[1]['id'], - domain_id=domain1['id'], - role_id=role_list[1]['id'], - inherited_to_projects=True) - self.assignment_api.create_grant(group_id=group_list[2]['id'], - project_id=project1['id'], - role_id=role_list[2]['id']) - self.assignment_api.create_grant(group_id=group_list[3]['id'], - project_id=project2['id'], - role_id=role_list[3]['id']) - - # ...and a few of spoiler assignments to Domain2/Project4 - self.assignment_api.create_grant(group_id=group_list[4]['id'], - domain_id=domain2['id'], - role_id=role_list[4]['id']) - self.assignment_api.create_grant(group_id=group_list[5]['id'], - domain_id=domain2['id'], - role_id=role_list[5]['id'], - inherited_to_projects=True) - self.assignment_api.create_grant(group_id=group_list[6]['id'], - project_id=project4['id'], - role_id=role_list[6]['id']) - - # Now get the projects for the groups that have roles on Project1, - # Project2 and the inherited role on Domain!. With inheritance off, - # we should only get back the projects with direct role. - - self.config_fixture.config(group='os_inherit', enabled=False) - group_id_list = [group_list[1]['id'], group_list[2]['id'], - group_list[3]['id']] - project_refs = ( - self.assignment_api.list_projects_for_groups(group_id_list)) - - self.assertThat(project_refs, matchers.HasLength(2)) - self.assertIn(project1, project_refs) - self.assertIn(project2, project_refs) - - # With inheritance on, we should also get back the Project3 due to the - # inherited role from its owning domain. - - self.config_fixture.config(group='os_inherit', enabled=True) - project_refs = ( - self.assignment_api.list_projects_for_groups(group_id_list)) - - self.assertThat(project_refs, matchers.HasLength(3)) - self.assertIn(project1, project_refs) - self.assertIn(project2, project_refs) - self.assertIn(project3, project_refs) - - def test_update_role_no_name(self): - # A user can update a role and not include the name. - - # description is picked just because it's not name. - self.role_api.update_role(self.role_member['id'], - {'description': uuid.uuid4().hex}) - # If the previous line didn't raise an exception then the test passes. - - def test_update_role_same_name(self): - # A user can update a role and set the name to be the same as it was. - - self.role_api.update_role(self.role_member['id'], - {'name': self.role_member['name']}) - # If the previous line didn't raise an exception then the test passes. - - -class TokenTests(object): - def _create_token_id(self): - # Use a token signed by the cms module - token_id = "" - for i in range(1, 20): - token_id += uuid.uuid4().hex - return cms.cms_sign_token(token_id, - CONF.signing.certfile, - CONF.signing.keyfile) - - def _assert_revoked_token_list_matches_token_persistence( - self, revoked_token_id_list): - # Assert that the list passed in matches the list returned by the - # token persistence service - persistence_list = [ - x['id'] - for x in self.token_provider_api.list_revoked_tokens() - ] - self.assertEqual(persistence_list, revoked_token_id_list) - - def test_token_crud(self): - token_id = self._create_token_id() - data = {'id': token_id, 'a': 'b', - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - expires = data_ref.pop('expires') - data_ref.pop('user_id') - self.assertIsInstance(expires, datetime.datetime) - data_ref.pop('id') - data.pop('id') - self.assertDictEqual(data_ref, data) - - new_data_ref = self.token_provider_api._persistence.get_token(token_id) - expires = new_data_ref.pop('expires') - self.assertIsInstance(expires, datetime.datetime) - new_data_ref.pop('user_id') - new_data_ref.pop('id') - - self.assertEqual(data, new_data_ref) - - self.token_provider_api._persistence.delete_token(token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.get_token, token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.delete_token, token_id) - - def create_token_sample_data(self, token_id=None, tenant_id=None, - trust_id=None, user_id=None, expires=None): - if token_id is None: - token_id = self._create_token_id() - if user_id is None: - user_id = 'testuserid' - # FIXME(morganfainberg): These tokens look nothing like "Real" tokens. - # This should be fixed when token issuance is cleaned up. - data = {'id': token_id, 'a': 'b', - 'user': {'id': user_id}} - if tenant_id is not None: - data['tenant'] = {'id': tenant_id, 'name': tenant_id} - if tenant_id is NULL_OBJECT: - data['tenant'] = None - if expires is not None: - data['expires'] = expires - if trust_id is not None: - data['trust_id'] = trust_id - data.setdefault('access', {}).setdefault('trust', {}) - # Testuserid2 is used here since a trustee will be different in - # the cases of impersonation and therefore should not match the - # token's user_id. - data['access']['trust']['trustee_user_id'] = 'testuserid2' - data['token_version'] = provider.V2 - # Issue token stores a copy of all token data at token['token_data']. - # This emulates that assumption as part of the test. - data['token_data'] = copy.deepcopy(data) - new_token = self.token_provider_api._persistence.create_token(token_id, - data) - return new_token['id'], data - - def test_delete_tokens(self): - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(0, len(tokens)) - token_id1, data = self.create_token_sample_data( - tenant_id='testtenantid') - token_id2, data = self.create_token_sample_data( - tenant_id='testtenantid') - token_id3, data = self.create_token_sample_data( - tenant_id='testtenantid', - user_id='testuserid1') - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(2, len(tokens)) - self.assertIn(token_id2, tokens) - self.assertIn(token_id1, tokens) - self.token_provider_api._persistence.delete_tokens( - user_id='testuserid', - tenant_id='testtenantid') - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(0, len(tokens)) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id1) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id2) - - self.token_provider_api._persistence.get_token(token_id3) - - def test_delete_tokens_trust(self): - tokens = self.token_provider_api._persistence._list_tokens( - user_id='testuserid') - self.assertEqual(0, len(tokens)) - token_id1, data = self.create_token_sample_data( - tenant_id='testtenantid', - trust_id='testtrustid') - token_id2, data = self.create_token_sample_data( - tenant_id='testtenantid', - user_id='testuserid1', - trust_id='testtrustid1') - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(1, len(tokens)) - self.assertIn(token_id1, tokens) - self.token_provider_api._persistence.delete_tokens( - user_id='testuserid', - tenant_id='testtenantid', - trust_id='testtrustid') - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id1) - self.token_provider_api._persistence.get_token(token_id2) - - def _test_token_list(self, token_list_fn): - tokens = token_list_fn('testuserid') - self.assertEqual(0, len(tokens)) - token_id1, data = self.create_token_sample_data() - tokens = token_list_fn('testuserid') - self.assertEqual(1, len(tokens)) - self.assertIn(token_id1, tokens) - token_id2, data = self.create_token_sample_data() - tokens = token_list_fn('testuserid') - self.assertEqual(2, len(tokens)) - self.assertIn(token_id2, tokens) - self.assertIn(token_id1, tokens) - self.token_provider_api._persistence.delete_token(token_id1) - tokens = token_list_fn('testuserid') - self.assertIn(token_id2, tokens) - self.assertNotIn(token_id1, tokens) - self.token_provider_api._persistence.delete_token(token_id2) - tokens = token_list_fn('testuserid') - self.assertNotIn(token_id2, tokens) - self.assertNotIn(token_id1, tokens) - - # tenant-specific tokens - tenant1 = uuid.uuid4().hex - tenant2 = uuid.uuid4().hex - token_id3, data = self.create_token_sample_data(tenant_id=tenant1) - token_id4, data = self.create_token_sample_data(tenant_id=tenant2) - # test for existing but empty tenant (LP:1078497) - token_id5, data = self.create_token_sample_data(tenant_id=NULL_OBJECT) - tokens = token_list_fn('testuserid') - self.assertEqual(3, len(tokens)) - self.assertNotIn(token_id1, tokens) - self.assertNotIn(token_id2, tokens) - self.assertIn(token_id3, tokens) - self.assertIn(token_id4, tokens) - self.assertIn(token_id5, tokens) - tokens = token_list_fn('testuserid', tenant2) - self.assertEqual(1, len(tokens)) - self.assertNotIn(token_id1, tokens) - self.assertNotIn(token_id2, tokens) - self.assertNotIn(token_id3, tokens) - self.assertIn(token_id4, tokens) - - def test_token_list(self): - self._test_token_list( - self.token_provider_api._persistence._list_tokens) - - def test_token_list_trust(self): - trust_id = uuid.uuid4().hex - token_id5, data = self.create_token_sample_data(trust_id=trust_id) - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid', trust_id=trust_id) - self.assertEqual(1, len(tokens)) - self.assertIn(token_id5, tokens) - - def test_get_token_404(self): - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - uuid.uuid4().hex) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - None) - - def test_delete_token_404(self): - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.delete_token, - uuid.uuid4().hex) - - def test_expired_token(self): - token_id = uuid.uuid4().hex - expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1) - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - data_ref.pop('user_id') - self.assertDictEqual(data_ref, data) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id) - - def test_null_expires_token(self): - token_id = uuid.uuid4().hex - data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - self.assertIsNotNone(data_ref['expires']) - new_data_ref = self.token_provider_api._persistence.get_token(token_id) - - # MySQL doesn't store microseconds, so discard them before testing - data_ref['expires'] = data_ref['expires'].replace(microsecond=0) - new_data_ref['expires'] = new_data_ref['expires'].replace( - microsecond=0) - - self.assertEqual(data_ref, new_data_ref) - - def check_list_revoked_tokens(self, token_ids): - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - for token_id in token_ids: - self.assertIn(token_id, revoked_ids) - - def delete_token(self): - token_id = uuid.uuid4().hex - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - self.token_provider_api._persistence.delete_token(token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - data_ref['id']) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.delete_token, - data_ref['id']) - return token_id - - def test_list_revoked_tokens_returns_empty_list(self): - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertEqual([], revoked_ids) - - def test_list_revoked_tokens_for_single_token(self): - self.check_list_revoked_tokens([self.delete_token()]) - - def test_list_revoked_tokens_for_multiple_tokens(self): - self.check_list_revoked_tokens([self.delete_token() - for x in six.moves.range(2)]) - - def test_flush_expired_token(self): - token_id = uuid.uuid4().hex - expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1) - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - data_ref.pop('user_id') - self.assertDictEqual(data_ref, data) - - token_id = uuid.uuid4().hex - expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1) - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - data_ref.pop('user_id') - self.assertDictEqual(data_ref, data) - - self.token_provider_api._persistence.flush_expired_tokens() - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(1, len(tokens)) - self.assertIn(token_id, tokens) - - @unit.skip_if_cache_disabled('token') - def test_revocation_list_cache(self): - expire_time = timeutils.utcnow() + datetime.timedelta(minutes=10) - token_id = uuid.uuid4().hex - token_data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - token2_id = uuid.uuid4().hex - token2_data = {'id_hash': token2_id, 'id': token2_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - # Create 2 Tokens. - self.token_provider_api._persistence.create_token(token_id, - token_data) - self.token_provider_api._persistence.create_token(token2_id, - token2_data) - # Verify the revocation list is empty. - self.assertEqual( - [], self.token_provider_api._persistence.list_revoked_tokens()) - self.assertEqual([], self.token_provider_api.list_revoked_tokens()) - # Delete a token directly, bypassing the manager. - self.token_provider_api._persistence.driver.delete_token(token_id) - # Verify the revocation list is still empty. - self.assertEqual( - [], self.token_provider_api._persistence.list_revoked_tokens()) - self.assertEqual([], self.token_provider_api.list_revoked_tokens()) - # Invalidate the revocation list. - self.token_provider_api._persistence.invalidate_revocation_list() - # Verify the deleted token is in the revocation list. - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertIn(token_id, revoked_ids) - # Delete the second token, through the manager - self.token_provider_api._persistence.delete_token(token2_id) - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - # Verify both tokens are in the revocation list. - self.assertIn(token_id, revoked_ids) - self.assertIn(token2_id, revoked_ids) - - def _test_predictable_revoked_pki_token_id(self, hash_fn): - token_id = self._create_token_id() - token_id_hash = hash_fn(token_id).hexdigest() - token = {'user': {'id': uuid.uuid4().hex}} - - self.token_provider_api._persistence.create_token(token_id, token) - self.token_provider_api._persistence.delete_token(token_id) - - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertIn(token_id_hash, revoked_ids) - self.assertNotIn(token_id, revoked_ids) - for t in self.token_provider_api._persistence.list_revoked_tokens(): - self.assertIn('expires', t) - - def test_predictable_revoked_pki_token_id_default(self): - self._test_predictable_revoked_pki_token_id(hashlib.md5) - - def test_predictable_revoked_pki_token_id_sha256(self): - self.config_fixture.config(group='token', hash_algorithm='sha256') - self._test_predictable_revoked_pki_token_id(hashlib.sha256) - - def test_predictable_revoked_uuid_token_id(self): - token_id = uuid.uuid4().hex - token = {'user': {'id': uuid.uuid4().hex}} - - self.token_provider_api._persistence.create_token(token_id, token) - self.token_provider_api._persistence.delete_token(token_id) - - revoked_tokens = self.token_provider_api.list_revoked_tokens() - revoked_ids = [x['id'] for x in revoked_tokens] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertIn(token_id, revoked_ids) - for t in revoked_tokens: - self.assertIn('expires', t) - - def test_create_unicode_token_id(self): - token_id = six.text_type(self._create_token_id()) - self.create_token_sample_data(token_id=token_id) - self.token_provider_api._persistence.get_token(token_id) - - def test_create_unicode_user_id(self): - user_id = six.text_type(uuid.uuid4().hex) - token_id, data = self.create_token_sample_data(user_id=user_id) - self.token_provider_api._persistence.get_token(token_id) - - def test_token_expire_timezone(self): - - @test_utils.timezone - def _create_token(expire_time): - token_id = uuid.uuid4().hex - user_id = six.text_type(uuid.uuid4().hex) - return self.create_token_sample_data(token_id=token_id, - user_id=user_id, - expires=expire_time) - - for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']: - test_utils.TZ = 'UTC' + d - expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1) - token_id, data_in = _create_token(expire_time) - data_get = self.token_provider_api._persistence.get_token(token_id) - - self.assertEqual(data_in['id'], data_get['id'], - 'TZ=%s' % test_utils.TZ) - - expire_time_expired = ( - timeutils.utcnow() + datetime.timedelta(minutes=-1)) - token_id, data_in = _create_token(expire_time_expired) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - data_in['id']) - - -class TokenCacheInvalidation(object): - def _create_test_data(self): - self.user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'password': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, 'enabled': True} - self.tenant = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, 'enabled': True} - - # Create an equivalent of a scoped token - token_dict = {'user': self.user, 'tenant': self.tenant, - 'metadata': {}, 'id': 'placeholder'} - token_id, data = self.token_provider_api.issue_v2_token(token_dict) - self.scoped_token_id = token_id - - # ..and an un-scoped one - token_dict = {'user': self.user, 'tenant': None, - 'metadata': {}, 'id': 'placeholder'} - token_id, data = self.token_provider_api.issue_v2_token(token_dict) - self.unscoped_token_id = token_id - - # Validate them, in the various ways possible - this will load the - # responses into the token cache. - self._check_scoped_tokens_are_valid() - self._check_unscoped_tokens_are_valid() - - def _check_unscoped_tokens_are_invalid(self): - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_token, - self.unscoped_token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_v2_token, - self.unscoped_token_id) - - def _check_scoped_tokens_are_invalid(self): - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_token, - self.scoped_token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_token, - self.scoped_token_id, - self.tenant['id']) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_v2_token, - self.scoped_token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_v2_token, - self.scoped_token_id, - self.tenant['id']) - - def _check_scoped_tokens_are_valid(self): - self.token_provider_api.validate_token(self.scoped_token_id) - self.token_provider_api.validate_token( - self.scoped_token_id, belongs_to=self.tenant['id']) - self.token_provider_api.validate_v2_token(self.scoped_token_id) - self.token_provider_api.validate_v2_token( - self.scoped_token_id, belongs_to=self.tenant['id']) - - def _check_unscoped_tokens_are_valid(self): - self.token_provider_api.validate_token(self.unscoped_token_id) - self.token_provider_api.validate_v2_token(self.unscoped_token_id) - - def test_delete_unscoped_token(self): - self.token_provider_api._persistence.delete_token( - self.unscoped_token_id) - self._check_unscoped_tokens_are_invalid() - self._check_scoped_tokens_are_valid() - - def test_delete_scoped_token_by_id(self): - self.token_provider_api._persistence.delete_token(self.scoped_token_id) - self._check_scoped_tokens_are_invalid() - self._check_unscoped_tokens_are_valid() - - def test_delete_scoped_token_by_user(self): - self.token_provider_api._persistence.delete_tokens(self.user['id']) - # Since we are deleting all tokens for this user, they should all - # now be invalid. - self._check_scoped_tokens_are_invalid() - self._check_unscoped_tokens_are_invalid() - - def test_delete_scoped_token_by_user_and_tenant(self): - self.token_provider_api._persistence.delete_tokens( - self.user['id'], - tenant_id=self.tenant['id']) - self._check_scoped_tokens_are_invalid() - self._check_unscoped_tokens_are_valid() - - -class TrustTests(object): - def create_sample_trust(self, new_id, remaining_uses=None): - self.trustor = self.user_foo - self.trustee = self.user_two - trust_data = (self.trust_api.create_trust - (new_id, - {'trustor_user_id': self.trustor['id'], - 'trustee_user_id': self.user_two['id'], - 'project_id': self.tenant_bar['id'], - 'expires_at': timeutils. - parse_isotime('2031-02-18T18:10:00Z'), - 'impersonation': True, - 'remaining_uses': remaining_uses}, - roles=[{"id": "member"}, - {"id": "other"}, - {"id": "browser"}])) - return trust_data - - def test_delete_trust(self): - new_id = uuid.uuid4().hex - trust_data = self.create_sample_trust(new_id) - trust_id = trust_data['id'] - self.assertIsNotNone(trust_data) - trust_data = self.trust_api.get_trust(trust_id) - self.assertEqual(new_id, trust_data['id']) - self.trust_api.delete_trust(trust_id) - self.assertRaises(exception.TrustNotFound, - self.trust_api.get_trust, - trust_id) - - def test_delete_trust_not_found(self): - trust_id = uuid.uuid4().hex - self.assertRaises(exception.TrustNotFound, - self.trust_api.delete_trust, - trust_id) - - def test_get_trust(self): - new_id = uuid.uuid4().hex - trust_data = self.create_sample_trust(new_id) - trust_id = trust_data['id'] - self.assertIsNotNone(trust_data) - trust_data = self.trust_api.get_trust(trust_id) - self.assertEqual(new_id, trust_data['id']) - self.trust_api.delete_trust(trust_data['id']) - - def test_get_deleted_trust(self): - new_id = uuid.uuid4().hex - trust_data = self.create_sample_trust(new_id) - self.assertIsNotNone(trust_data) - self.assertIsNone(trust_data['deleted_at']) - self.trust_api.delete_trust(new_id) - self.assertRaises(exception.TrustNotFound, - self.trust_api.get_trust, - new_id) - deleted_trust = self.trust_api.get_trust(trust_data['id'], - deleted=True) - self.assertEqual(trust_data['id'], deleted_trust['id']) - self.assertIsNotNone(deleted_trust.get('deleted_at')) - - def test_create_trust(self): - new_id = uuid.uuid4().hex - trust_data = self.create_sample_trust(new_id) - - self.assertEqual(new_id, trust_data['id']) - self.assertEqual(self.trustee['id'], trust_data['trustee_user_id']) - self.assertEqual(self.trustor['id'], trust_data['trustor_user_id']) - self.assertTrue(timeutils.normalize_time(trust_data['expires_at']) > - timeutils.utcnow()) - - self.assertEqual([{'id': 'member'}, - {'id': 'other'}, - {'id': 'browser'}], trust_data['roles']) - - def test_list_trust_by_trustee(self): - for i in range(3): - self.create_sample_trust(uuid.uuid4().hex) - trusts = self.trust_api.list_trusts_for_trustee(self.trustee['id']) - self.assertEqual(3, len(trusts)) - self.assertEqual(trusts[0]["trustee_user_id"], self.trustee['id']) - trusts = self.trust_api.list_trusts_for_trustee(self.trustor['id']) - self.assertEqual(0, len(trusts)) - - def test_list_trust_by_trustor(self): - for i in range(3): - self.create_sample_trust(uuid.uuid4().hex) - trusts = self.trust_api.list_trusts_for_trustor(self.trustor['id']) - self.assertEqual(3, len(trusts)) - self.assertEqual(trusts[0]["trustor_user_id"], self.trustor['id']) - trusts = self.trust_api.list_trusts_for_trustor(self.trustee['id']) - self.assertEqual(0, len(trusts)) - - def test_list_trusts(self): - for i in range(3): - self.create_sample_trust(uuid.uuid4().hex) - trusts = self.trust_api.list_trusts() - self.assertEqual(3, len(trusts)) - - def test_trust_has_remaining_uses_positive(self): - # create a trust with limited uses, check that we have uses left - trust_data = self.create_sample_trust(uuid.uuid4().hex, - remaining_uses=5) - self.assertEqual(5, trust_data['remaining_uses']) - # create a trust with unlimited uses, check that we have uses left - trust_data = self.create_sample_trust(uuid.uuid4().hex) - self.assertIsNone(trust_data['remaining_uses']) - - def test_trust_has_remaining_uses_negative(self): - # try to create a trust with no remaining uses, check that it fails - self.assertRaises(exception.ValidationError, - self.create_sample_trust, - uuid.uuid4().hex, - remaining_uses=0) - # try to create a trust with negative remaining uses, - # check that it fails - self.assertRaises(exception.ValidationError, - self.create_sample_trust, - uuid.uuid4().hex, - remaining_uses=-12) - - def test_consume_use(self): - # consume a trust repeatedly until it has no uses anymore - trust_data = self.create_sample_trust(uuid.uuid4().hex, - remaining_uses=2) - self.trust_api.consume_use(trust_data['id']) - t = self.trust_api.get_trust(trust_data['id']) - self.assertEqual(1, t['remaining_uses']) - self.trust_api.consume_use(trust_data['id']) - # This was the last use, the trust isn't available anymore - self.assertRaises(exception.TrustNotFound, - self.trust_api.get_trust, - trust_data['id']) - - -class CatalogTests(object): - - _legacy_endpoint_id_in_endpoint = False - _enabled_default_to_true_when_creating_endpoint = False - - def test_region_crud(self): - # create - region_id = '0' * 255 - new_region = { - 'id': region_id, - 'description': uuid.uuid4().hex, - } - res = self.catalog_api.create_region( - new_region.copy()) - # Ensure that we don't need to have a - # parent_region_id in the original supplied - # ref dict, but that it will be returned from - # the endpoint, with None value. - expected_region = new_region.copy() - expected_region['parent_region_id'] = None - self.assertDictEqual(res, expected_region) - - # Test adding another region with the one above - # as its parent. We will check below whether deleting - # the parent successfully deletes any child regions. - parent_region_id = region_id - region_id = uuid.uuid4().hex - new_region = { - 'id': region_id, - 'description': uuid.uuid4().hex, - 'parent_region_id': parent_region_id, - } - res = self.catalog_api.create_region( - new_region.copy()) - self.assertDictEqual(new_region, res) - - # list - regions = self.catalog_api.list_regions() - self.assertThat(regions, matchers.HasLength(2)) - region_ids = [x['id'] for x in regions] - self.assertIn(parent_region_id, region_ids) - self.assertIn(region_id, region_ids) - - # update - region_desc_update = {'description': uuid.uuid4().hex} - res = self.catalog_api.update_region(region_id, region_desc_update) - expected_region = new_region.copy() - expected_region['description'] = region_desc_update['description'] - self.assertDictEqual(expected_region, res) - - # delete - self.catalog_api.delete_region(parent_region_id) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.delete_region, - parent_region_id) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - parent_region_id) - # Ensure the child is also gone... - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_id) - - def _create_region_with_parent_id(self, parent_id=None): - new_region = { - 'id': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - 'parent_region_id': parent_id - } - self.catalog_api.create_region( - new_region) - return new_region - - def test_list_regions_filtered_by_parent_region_id(self): - new_region = self._create_region_with_parent_id() - parent_id = new_region['id'] - new_region = self._create_region_with_parent_id(parent_id) - new_region = self._create_region_with_parent_id(parent_id) - - # filter by parent_region_id - hints = driver_hints.Hints() - hints.add_filter('parent_region_id', parent_id) - regions = self.catalog_api.list_regions(hints) - for region in regions: - self.assertEqual(parent_id, region['parent_region_id']) - - @unit.skip_if_cache_disabled('catalog') - def test_cache_layer_region_crud(self): - region_id = uuid.uuid4().hex - new_region = { - 'id': region_id, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_region(new_region.copy()) - updated_region = copy.deepcopy(new_region) - updated_region['description'] = uuid.uuid4().hex - # cache the result - self.catalog_api.get_region(region_id) - # update the region bypassing catalog_api - self.catalog_api.driver.update_region(region_id, updated_region) - self.assertDictContainsSubset(new_region, - self.catalog_api.get_region(region_id)) - self.catalog_api.get_region.invalidate(self.catalog_api, region_id) - self.assertDictContainsSubset(updated_region, - self.catalog_api.get_region(region_id)) - # delete the region - self.catalog_api.driver.delete_region(region_id) - # still get the old region - self.assertDictContainsSubset(updated_region, - self.catalog_api.get_region(region_id)) - self.catalog_api.get_region.invalidate(self.catalog_api, region_id) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, region_id) - - @unit.skip_if_cache_disabled('catalog') - def test_invalidate_cache_when_updating_region(self): - region_id = uuid.uuid4().hex - new_region = { - 'id': region_id, - 'description': uuid.uuid4().hex - } - self.catalog_api.create_region(new_region) - - # cache the region - self.catalog_api.get_region(region_id) - - # update the region via catalog_api - new_description = {'description': uuid.uuid4().hex} - self.catalog_api.update_region(region_id, new_description) - - # assert that we can get the new region - current_region = self.catalog_api.get_region(region_id) - self.assertEqual(new_description['description'], - current_region['description']) - - def test_create_region_with_duplicate_id(self): - region_id = uuid.uuid4().hex - new_region = { - 'id': region_id, - 'description': uuid.uuid4().hex - } - self.catalog_api.create_region(new_region) - # Create region again with duplicate id - self.assertRaises(exception.Conflict, - self.catalog_api.create_region, - new_region) - - def test_get_region_404(self): - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - uuid.uuid4().hex) - - def test_delete_region_404(self): - self.assertRaises(exception.RegionNotFound, - self.catalog_api.delete_region, - uuid.uuid4().hex) - - def test_create_region_invalid_parent_region_404(self): - region_id = uuid.uuid4().hex - new_region = { - 'id': region_id, - 'description': uuid.uuid4().hex, - 'parent_region_id': 'nonexisting' - } - self.assertRaises(exception.RegionNotFound, - self.catalog_api.create_region, - new_region) - - def test_avoid_creating_circular_references_in_regions_update(self): - region_one = self._create_region_with_parent_id() - - # self circle: region_one->region_one - self.assertRaises(exception.CircularRegionHierarchyError, - self.catalog_api.update_region, - region_one['id'], - {'parent_region_id': region_one['id']}) - - # region_one->region_two->region_one - region_two = self._create_region_with_parent_id(region_one['id']) - self.assertRaises(exception.CircularRegionHierarchyError, - self.catalog_api.update_region, - region_one['id'], - {'parent_region_id': region_two['id']}) - - # region_one region_two->region_three->region_four->region_two - region_three = self._create_region_with_parent_id(region_two['id']) - region_four = self._create_region_with_parent_id(region_three['id']) - self.assertRaises(exception.CircularRegionHierarchyError, - self.catalog_api.update_region, - region_two['id'], - {'parent_region_id': region_four['id']}) - - @mock.patch.object(core.CatalogDriverV8, - "_ensure_no_circle_in_hierarchical_regions") - def test_circular_regions_can_be_deleted(self, mock_ensure_on_circle): - # turn off the enforcement so that cycles can be created for the test - mock_ensure_on_circle.return_value = None - - region_one = self._create_region_with_parent_id() - - # self circle: region_one->region_one - self.catalog_api.update_region( - region_one['id'], - {'parent_region_id': region_one['id']}) - self.catalog_api.delete_region(region_one['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_one['id']) - - # region_one->region_two->region_one - region_one = self._create_region_with_parent_id() - region_two = self._create_region_with_parent_id(region_one['id']) - self.catalog_api.update_region( - region_one['id'], - {'parent_region_id': region_two['id']}) - self.catalog_api.delete_region(region_one['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_one['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_two['id']) - - # region_one->region_two->region_three->region_one - region_one = self._create_region_with_parent_id() - region_two = self._create_region_with_parent_id(region_one['id']) - region_three = self._create_region_with_parent_id(region_two['id']) - self.catalog_api.update_region( - region_one['id'], - {'parent_region_id': region_three['id']}) - self.catalog_api.delete_region(region_two['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_two['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_one['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_three['id']) - - def test_service_crud(self): - # create - service_id = uuid.uuid4().hex - new_service = { - 'id': service_id, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - res = self.catalog_api.create_service( - service_id, - new_service.copy()) - new_service['enabled'] = True - self.assertDictEqual(new_service, res) - - # list - services = self.catalog_api.list_services() - self.assertIn(service_id, [x['id'] for x in services]) - - # update - service_name_update = {'name': uuid.uuid4().hex} - res = self.catalog_api.update_service(service_id, service_name_update) - expected_service = new_service.copy() - expected_service['name'] = service_name_update['name'] - self.assertDictEqual(expected_service, res) - - # delete - self.catalog_api.delete_service(service_id) - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.delete_service, - service_id) - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.get_service, - service_id) - - def _create_random_service(self): - service_id = uuid.uuid4().hex - new_service = { - 'id': service_id, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - return self.catalog_api.create_service(service_id, new_service.copy()) - - def test_service_filtering(self): - target_service = self._create_random_service() - unrelated_service1 = self._create_random_service() - unrelated_service2 = self._create_random_service() - - # filter by type - hint_for_type = driver_hints.Hints() - hint_for_type.add_filter(name="type", value=target_service['type']) - services = self.catalog_api.list_services(hint_for_type) - - self.assertEqual(1, len(services)) - filtered_service = services[0] - self.assertEqual(target_service['type'], filtered_service['type']) - self.assertEqual(target_service['id'], filtered_service['id']) - - # filter should have been removed, since it was already used by the - # backend - self.assertEqual(0, len(hint_for_type.filters)) - - # the backend shouldn't filter by name, since this is handled by the - # front end - hint_for_name = driver_hints.Hints() - hint_for_name.add_filter(name="name", value=target_service['name']) - services = self.catalog_api.list_services(hint_for_name) - - self.assertEqual(3, len(services)) - - # filter should still be there, since it wasn't used by the backend - self.assertEqual(1, len(hint_for_name.filters)) - - self.catalog_api.delete_service(target_service['id']) - self.catalog_api.delete_service(unrelated_service1['id']) - self.catalog_api.delete_service(unrelated_service2['id']) - - @unit.skip_if_cache_disabled('catalog') - def test_cache_layer_service_crud(self): - service_id = uuid.uuid4().hex - new_service = { - 'id': service_id, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - res = self.catalog_api.create_service( - service_id, - new_service.copy()) - new_service['enabled'] = True - self.assertDictEqual(new_service, res) - self.catalog_api.get_service(service_id) - updated_service = copy.deepcopy(new_service) - updated_service['description'] = uuid.uuid4().hex - # update bypassing catalog api - self.catalog_api.driver.update_service(service_id, updated_service) - self.assertDictContainsSubset(new_service, - self.catalog_api.get_service(service_id)) - self.catalog_api.get_service.invalidate(self.catalog_api, service_id) - self.assertDictContainsSubset(updated_service, - self.catalog_api.get_service(service_id)) - - # delete bypassing catalog api - self.catalog_api.driver.delete_service(service_id) - self.assertDictContainsSubset(updated_service, - self.catalog_api.get_service(service_id)) - self.catalog_api.get_service.invalidate(self.catalog_api, service_id) - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.delete_service, - service_id) - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.get_service, - service_id) - - @unit.skip_if_cache_disabled('catalog') - def test_invalidate_cache_when_updating_service(self): - service_id = uuid.uuid4().hex - new_service = { - 'id': service_id, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service( - service_id, - new_service.copy()) - - # cache the service - self.catalog_api.get_service(service_id) - - # update the service via catalog api - new_type = {'type': uuid.uuid4().hex} - self.catalog_api.update_service(service_id, new_type) - - # assert that we can get the new service - current_service = self.catalog_api.get_service(service_id) - self.assertEqual(new_type['type'], current_service['type']) - - def test_delete_service_with_endpoint(self): - # create a service - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service(service['id'], service) - - # create an endpoint attached to the service - endpoint = { - 'id': uuid.uuid4().hex, - 'region': uuid.uuid4().hex, - 'interface': uuid.uuid4().hex[:8], - 'url': uuid.uuid4().hex, - 'service_id': service['id'], - } - self.catalog_api.create_endpoint(endpoint['id'], endpoint) - - # deleting the service should also delete the endpoint - self.catalog_api.delete_service(service['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - endpoint['id']) - - def test_cache_layer_delete_service_with_endpoint(self): - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service(service['id'], service) - - # create an endpoint attached to the service - endpoint = { - 'id': uuid.uuid4().hex, - 'region_id': None, - 'interface': uuid.uuid4().hex[:8], - 'url': uuid.uuid4().hex, - 'service_id': service['id'], - } - self.catalog_api.create_endpoint(endpoint['id'], endpoint) - # cache the result - self.catalog_api.get_service(service['id']) - self.catalog_api.get_endpoint(endpoint['id']) - # delete the service bypassing catalog api - self.catalog_api.driver.delete_service(service['id']) - self.assertDictContainsSubset(endpoint, - self.catalog_api. - get_endpoint(endpoint['id'])) - self.assertDictContainsSubset(service, - self.catalog_api. - get_service(service['id'])) - self.catalog_api.get_endpoint.invalidate(self.catalog_api, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - endpoint['id']) - # multiple endpoints associated with a service - second_endpoint = { - 'id': uuid.uuid4().hex, - 'region_id': None, - 'interface': uuid.uuid4().hex[:8], - 'url': uuid.uuid4().hex, - 'service_id': service['id'], - } - self.catalog_api.create_service(service['id'], service) - self.catalog_api.create_endpoint(endpoint['id'], endpoint) - self.catalog_api.create_endpoint(second_endpoint['id'], - second_endpoint) - self.catalog_api.delete_service(service['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - second_endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - second_endpoint['id']) - - def test_get_service_404(self): - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.get_service, - uuid.uuid4().hex) - - def test_delete_service_404(self): - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.delete_service, - uuid.uuid4().hex) - - def test_create_endpoint_nonexistent_service(self): - endpoint = { - 'id': uuid.uuid4().hex, - 'service_id': uuid.uuid4().hex, - } - self.assertRaises(exception.ValidationError, - self.catalog_api.create_endpoint, - endpoint['id'], - endpoint) - - def test_update_endpoint_nonexistent_service(self): - dummy_service, enabled_endpoint, dummy_disabled_endpoint = ( - self._create_endpoints()) - new_endpoint = { - 'service_id': uuid.uuid4().hex, - } - self.assertRaises(exception.ValidationError, - self.catalog_api.update_endpoint, - enabled_endpoint['id'], - new_endpoint) - - def test_create_endpoint_nonexistent_region(self): - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service(service['id'], service.copy()) - - endpoint = { - 'id': uuid.uuid4().hex, - 'service_id': service['id'], - 'interface': 'public', - 'url': uuid.uuid4().hex, - 'region_id': uuid.uuid4().hex, - } - self.assertRaises(exception.ValidationError, - self.catalog_api.create_endpoint, - endpoint['id'], - endpoint) - - def test_update_endpoint_nonexistent_region(self): - dummy_service, enabled_endpoint, dummy_disabled_endpoint = ( - self._create_endpoints()) - new_endpoint = { - 'region_id': uuid.uuid4().hex, - } - self.assertRaises(exception.ValidationError, - self.catalog_api.update_endpoint, - enabled_endpoint['id'], - new_endpoint) - - def test_get_endpoint_404(self): - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - uuid.uuid4().hex) - - def test_delete_endpoint_404(self): - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - uuid.uuid4().hex) - - def test_create_endpoint(self): - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service(service['id'], service.copy()) - - endpoint = { - 'id': uuid.uuid4().hex, - 'region_id': None, - 'service_id': service['id'], - 'interface': 'public', - 'url': uuid.uuid4().hex, - } - self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy()) - - def test_update_endpoint(self): - dummy_service_ref, endpoint_ref, dummy_disabled_endpoint_ref = ( - self._create_endpoints()) - res = self.catalog_api.update_endpoint(endpoint_ref['id'], - {'interface': 'private'}) - expected_endpoint = endpoint_ref.copy() - expected_endpoint['interface'] = 'private' - if self._legacy_endpoint_id_in_endpoint: - expected_endpoint['legacy_endpoint_id'] = None - if self._enabled_default_to_true_when_creating_endpoint: - expected_endpoint['enabled'] = True - self.assertDictEqual(expected_endpoint, res) - - def _create_endpoints(self): - # Creates a service and 2 endpoints for the service in the same region. - # The 'public' interface is enabled and the 'internal' interface is - # disabled. - - def create_endpoint(service_id, region, **kwargs): - id_ = uuid.uuid4().hex - ref = { - 'id': id_, - 'interface': 'public', - 'region_id': region, - 'service_id': service_id, - 'url': 'http://localhost/%s' % uuid.uuid4().hex, - } - ref.update(kwargs) - self.catalog_api.create_endpoint(id_, ref) - return ref - - # Create a service for use with the endpoints. - service_id = uuid.uuid4().hex - service_ref = { - 'id': service_id, - 'name': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - } - self.catalog_api.create_service(service_id, service_ref) - - region = {'id': uuid.uuid4().hex} - self.catalog_api.create_region(region) - - # Create endpoints - enabled_endpoint_ref = create_endpoint(service_id, region['id']) - disabled_endpoint_ref = create_endpoint( - service_id, region['id'], enabled=False, interface='internal') - - return service_ref, enabled_endpoint_ref, disabled_endpoint_ref - - def test_list_endpoints(self): - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service(service['id'], service.copy()) - - expected_ids = set([uuid.uuid4().hex for _ in range(3)]) - for endpoint_id in expected_ids: - endpoint = { - 'id': endpoint_id, - 'region_id': None, - 'service_id': service['id'], - 'interface': 'public', - 'url': uuid.uuid4().hex, - } - self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy()) - - endpoints = self.catalog_api.list_endpoints() - self.assertEqual(expected_ids, set(e['id'] for e in endpoints)) - - def test_get_catalog_endpoint_disabled(self): - """Get back only enabled endpoints when get the v2 catalog.""" - - service_ref, enabled_endpoint_ref, dummy_disabled_endpoint_ref = ( - self._create_endpoints()) - - user_id = uuid.uuid4().hex - project_id = uuid.uuid4().hex - catalog = self.catalog_api.get_catalog(user_id, project_id) - - exp_entry = { - 'id': enabled_endpoint_ref['id'], - 'name': service_ref['name'], - 'publicURL': enabled_endpoint_ref['url'], - } - - region = enabled_endpoint_ref['region_id'] - self.assertEqual(exp_entry, catalog[region][service_ref['type']]) - - def test_get_v3_catalog_endpoint_disabled(self): - """Get back only enabled endpoints when get the v3 catalog.""" - - enabled_endpoint_ref = self._create_endpoints()[1] - - user_id = uuid.uuid4().hex - project_id = uuid.uuid4().hex - catalog = self.catalog_api.get_v3_catalog(user_id, project_id) - - endpoint_ids = [x['id'] for x in catalog[0]['endpoints']] - self.assertEqual([enabled_endpoint_ref['id']], endpoint_ids) - - @unit.skip_if_cache_disabled('catalog') - def test_invalidate_cache_when_updating_endpoint(self): - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service(service['id'], service) - - # create an endpoint attached to the service - endpoint_id = uuid.uuid4().hex - endpoint = { - 'id': endpoint_id, - 'region_id': None, - 'interface': uuid.uuid4().hex[:8], - 'url': uuid.uuid4().hex, - 'service_id': service['id'], - } - self.catalog_api.create_endpoint(endpoint_id, endpoint) - - # cache the endpoint - self.catalog_api.get_endpoint(endpoint_id) - - # update the endpoint via catalog api - new_url = {'url': uuid.uuid4().hex} - self.catalog_api.update_endpoint(endpoint_id, new_url) - - # assert that we can get the new endpoint - current_endpoint = self.catalog_api.get_endpoint(endpoint_id) - self.assertEqual(new_url['url'], current_endpoint['url']) - - -class PolicyTests(object): - def _new_policy_ref(self): - return { - 'id': uuid.uuid4().hex, - 'policy': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'endpoint_id': uuid.uuid4().hex, - } - - def assertEqualPolicies(self, a, b): - self.assertEqual(a['id'], b['id']) - self.assertEqual(a['endpoint_id'], b['endpoint_id']) - self.assertEqual(a['policy'], b['policy']) - self.assertEqual(a['type'], b['type']) - - def test_create(self): - ref = self._new_policy_ref() - res = self.policy_api.create_policy(ref['id'], ref) - self.assertEqualPolicies(ref, res) - - def test_get(self): - ref = self._new_policy_ref() - res = self.policy_api.create_policy(ref['id'], ref) - - res = self.policy_api.get_policy(ref['id']) - self.assertEqualPolicies(ref, res) - - def test_list(self): - ref = self._new_policy_ref() - self.policy_api.create_policy(ref['id'], ref) - - res = self.policy_api.list_policies() - res = [x for x in res if x['id'] == ref['id']][0] - self.assertEqualPolicies(ref, res) - - def test_update(self): - ref = self._new_policy_ref() - self.policy_api.create_policy(ref['id'], ref) - orig = ref - - ref = self._new_policy_ref() - - # (cannot change policy ID) - self.assertRaises(exception.ValidationError, - self.policy_api.update_policy, - orig['id'], - ref) - - ref['id'] = orig['id'] - res = self.policy_api.update_policy(orig['id'], ref) - self.assertEqualPolicies(ref, res) - - def test_delete(self): - ref = self._new_policy_ref() - self.policy_api.create_policy(ref['id'], ref) - - self.policy_api.delete_policy(ref['id']) - self.assertRaises(exception.PolicyNotFound, - self.policy_api.delete_policy, - ref['id']) - self.assertRaises(exception.PolicyNotFound, - self.policy_api.get_policy, - ref['id']) - res = self.policy_api.list_policies() - self.assertFalse(len([x for x in res if x['id'] == ref['id']])) - - def test_get_policy_404(self): - self.assertRaises(exception.PolicyNotFound, - self.policy_api.get_policy, - uuid.uuid4().hex) - - def test_update_policy_404(self): - ref = self._new_policy_ref() - self.assertRaises(exception.PolicyNotFound, - self.policy_api.update_policy, - ref['id'], - ref) - - def test_delete_policy_404(self): - self.assertRaises(exception.PolicyNotFound, - self.policy_api.delete_policy, - uuid.uuid4().hex) - - -class InheritanceTests(AssignmentTestHelperMixin): - - def test_role_assignments_user_domain_to_project_inheritance(self): - test_plan = { - 'entities': {'domains': {'users': 2, 'projects': 1}, - 'roles': 3}, - 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'user': 0, 'role': 2, 'domain': 0, - 'inherited_to_projects': True}, - {'user': 1, 'role': 1, 'project': 0}], - 'tests': [ - # List all direct assignments for user[0] - {'params': {'user': 0}, - 'results': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'user': 0, 'role': 2, 'domain': 0, - 'inherited_to_projects': 'projects'}]}, - # Now the effective ones - so the domain role should turn into - # a project role - {'params': {'user': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'user': 0, 'role': 2, 'project': 0, - 'indirect': {'domain': 0}}]}, - # Narrow down to effective roles for user[0] and project[0] - {'params': {'user': 0, 'project': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 1, 'project': 0}, - {'user': 0, 'role': 2, 'project': 0, - 'indirect': {'domain': 0}}]} - ] - } - self.config_fixture.config(group='os_inherit', enabled=True) - self.execute_assignment_test_plan(test_plan) - - def test_inherited_role_assignments_excluded_if_os_inherit_false(self): - test_plan = { - 'entities': {'domains': {'users': 2, 'groups': 1, 'projects': 1}, - 'roles': 4}, - 'group_memberships': [{'group': 0, 'users': [0]}], - 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'user': 0, 'role': 2, 'domain': 0, - 'inherited_to_projects': True}, - {'user': 1, 'role': 1, 'project': 0}, - {'group': 0, 'role': 3, 'project': 0}], - 'tests': [ - # List all direct assignments for user[0], since os-inherit is - # disabled, we should not see the inherited role - {'params': {'user': 0}, - 'results': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}]}, - # Same in effective mode - inherited roles should not be - # included or expanded...but the group role should now - # turn up as a user role, since group expansion is not - # part of os-inherit. - {'params': {'user': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'domain': 0}, - {'user': 0, 'role': 1, 'project': 0}, - {'user': 0, 'role': 3, 'project': 0, - 'indirect': {'group': 0}}]}, - ] - } - self.config_fixture.config(group='os_inherit', enabled=False) - self.execute_assignment_test_plan(test_plan) - - def _test_crud_inherited_and_direct_assignment(self, **kwargs): - """Tests inherited and direct assignments for the actor and target - - Ensure it is possible to create both inherited and direct role - assignments for the same actor on the same target. The actor and the - target are specified in the kwargs as ('user_id' or 'group_id') and - ('project_id' or 'domain_id'), respectively. - - """ - - # Create a new role to avoid assignments loaded from default fixtures - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - role = self.role_api.create_role(role['id'], role) - - # Define the common assigment entity - assignment_entity = {'role_id': role['id']} - assignment_entity.update(kwargs) - - # Define assignments under test - direct_assignment_entity = assignment_entity.copy() - inherited_assignment_entity = assignment_entity.copy() - inherited_assignment_entity['inherited_to_projects'] = 'projects' - - # Create direct assignment and check grants - self.assignment_api.create_grant(inherited_to_projects=False, - **assignment_entity) - - grants = self.assignment_api.list_role_assignments_for_role(role['id']) - self.assertThat(grants, matchers.HasLength(1)) - self.assertIn(direct_assignment_entity, grants) - - # Now add inherited assignment and check grants - self.assignment_api.create_grant(inherited_to_projects=True, - **assignment_entity) - - grants = self.assignment_api.list_role_assignments_for_role(role['id']) - self.assertThat(grants, matchers.HasLength(2)) - self.assertIn(direct_assignment_entity, grants) - self.assertIn(inherited_assignment_entity, grants) - - # Delete both and check grants - self.assignment_api.delete_grant(inherited_to_projects=False, - **assignment_entity) - self.assignment_api.delete_grant(inherited_to_projects=True, - **assignment_entity) - - grants = self.assignment_api.list_role_assignments_for_role(role['id']) - self.assertEqual([], grants) - - def test_crud_inherited_and_direct_assignment_for_user_on_domain(self): - self._test_crud_inherited_and_direct_assignment( - user_id=self.user_foo['id'], domain_id=DEFAULT_DOMAIN_ID) - - def test_crud_inherited_and_direct_assignment_for_group_on_domain(self): - group = {'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID} - group = self.identity_api.create_group(group) - - self._test_crud_inherited_and_direct_assignment( - group_id=group['id'], domain_id=DEFAULT_DOMAIN_ID) - - def test_crud_inherited_and_direct_assignment_for_user_on_project(self): - self._test_crud_inherited_and_direct_assignment( - user_id=self.user_foo['id'], project_id=self.tenant_baz['id']) - - def test_crud_inherited_and_direct_assignment_for_group_on_project(self): - group = {'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID} - group = self.identity_api.create_group(group) - - self._test_crud_inherited_and_direct_assignment( - group_id=group['id'], project_id=self.tenant_baz['id']) - - def test_inherited_role_grants_for_user(self): - """Test inherited user roles. - - Test Plan: - - - Enable OS-INHERIT extension - - Create 3 roles - - Create a domain, with a project and a user - - Check no roles yet exit - - Assign a direct user role to the project and a (non-inherited) - user role to the domain - - Get a list of effective roles - should only get the one direct role - - Now add an inherited user role to the domain - - Get a list of effective roles - should have two roles, one - direct and one by virtue of the inherited user role - - Also get effective roles for the domain - the role marked as - inherited should not show up - - """ - self.config_fixture.config(group='os_inherit', enabled=True) - role_list = [] - for _ in range(3): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - role_list.append(role) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - user1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex, 'enabled': True} - user1 = self.identity_api.create_user(user1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(0, len(roles_ref)) - - # Create the first two roles - the domain one is not inherited - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role_list[0]['id']) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=role_list[1]['id']) - - # Now get the effective roles for the user and project, this - # should only include the direct role assignment on the project - combined_list = self.assignment_api.get_roles_for_user_and_project( - user1['id'], project1['id']) - self.assertEqual(1, len(combined_list)) - self.assertIn(role_list[0]['id'], combined_list) - - # Now add an inherited role on the domain - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=role_list[2]['id'], - inherited_to_projects=True) - - # Now get the effective roles for the user and project again, this - # should now include the inherited role on the domain - combined_list = self.assignment_api.get_roles_for_user_and_project( - user1['id'], project1['id']) - self.assertEqual(2, len(combined_list)) - self.assertIn(role_list[0]['id'], combined_list) - self.assertIn(role_list[2]['id'], combined_list) - - # Finally, check that the inherited role does not appear as a valid - # directly assigned role on the domain itself - combined_role_list = self.assignment_api.get_roles_for_user_and_domain( - user1['id'], domain1['id']) - self.assertEqual(1, len(combined_role_list)) - self.assertIn(role_list[1]['id'], combined_role_list) - - # TODO(henry-nash): The test above uses get_roles_for_user_and_project - # and get_roles_for_user_and_domain, which will, in a subsequent patch, - # be re-implemeted to simply call list_role_assignments (see blueprint - # remove-role-metadata). - # - # The test plan below therefore mirrors this test, to ensure that - # list_role_assignments works the same. Once get_roles_for_user_and - # project/domain have been re-implemented then the manual tests above - # can be refactored to simply ensure it gives the same answers. - test_plan = { - # A domain with a user & project, plus 3 roles. - 'entities': {'domains': {'users': 1, 'projects': 1}, - 'roles': 3}, - 'assignments': [{'user': 0, 'role': 0, 'project': 0}, - {'user': 0, 'role': 1, 'domain': 0}, - {'user': 0, 'role': 2, 'domain': 0, - 'inherited_to_projects': True}], - 'tests': [ - # List all effective assignments for user[0] on project[0]. - # Should get one direct role and one inherited role. - {'params': {'user': 0, 'project': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'project': 0}, - {'user': 0, 'role': 2, 'project': 0, - 'indirect': {'domain': 0}}]}, - # Ensure effective mode on the domain does not list the - # inherited role on that domain - {'params': {'user': 0, 'domain': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 1, 'domain': 0}]}, - # Ensure non-inherited mode also only returns the non-inherited - # role on the domain - {'params': {'user': 0, 'domain': 0, 'inherited': False}, - 'results': [{'user': 0, 'role': 1, 'domain': 0}]}, - ] - } - self.execute_assignment_test_plan(test_plan) - - def test_inherited_role_grants_for_group(self): - """Test inherited group roles. - - Test Plan: - - - Enable OS-INHERIT extension - - Create 4 roles - - Create a domain, with a project, user and two groups - - Make the user a member of both groups - - Check no roles yet exit - - Assign a direct user role to the project and a (non-inherited) - group role on the domain - - Get a list of effective roles - should only get the one direct role - - Now add two inherited group roles to the domain - - Get a list of effective roles - should have three roles, one - direct and two by virtue of inherited group roles - - """ - self.config_fixture.config(group='os_inherit', enabled=True) - role_list = [] - for _ in range(4): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - role_list.append(role) - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain1['id'], domain1) - user1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'password': uuid.uuid4().hex, 'enabled': True} - user1 = self.identity_api.create_user(user1) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group1 = self.identity_api.create_group(group1) - group2 = {'name': uuid.uuid4().hex, 'domain_id': domain1['id'], - 'enabled': True} - group2 = self.identity_api.create_group(group2) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} - self.resource_api.create_project(project1['id'], project1) - - self.identity_api.add_user_to_group(user1['id'], - group1['id']) - self.identity_api.add_user_to_group(user1['id'], - group2['id']) - - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(0, len(roles_ref)) - - # Create two roles - the domain one is not inherited - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role_list[0]['id']) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain1['id'], - role_id=role_list[1]['id']) - - # Now get the effective roles for the user and project, this - # should only include the direct role assignment on the project - combined_list = self.assignment_api.get_roles_for_user_and_project( - user1['id'], project1['id']) - self.assertEqual(1, len(combined_list)) - self.assertIn(role_list[0]['id'], combined_list) - - # Now add to more group roles, both inherited, to the domain - self.assignment_api.create_grant(group_id=group2['id'], - domain_id=domain1['id'], - role_id=role_list[2]['id'], - inherited_to_projects=True) - self.assignment_api.create_grant(group_id=group2['id'], - domain_id=domain1['id'], - role_id=role_list[3]['id'], - inherited_to_projects=True) - - # Now get the effective roles for the user and project again, this - # should now include the inherited roles on the domain - combined_list = self.assignment_api.get_roles_for_user_and_project( - user1['id'], project1['id']) - self.assertEqual(3, len(combined_list)) - self.assertIn(role_list[0]['id'], combined_list) - self.assertIn(role_list[2]['id'], combined_list) - self.assertIn(role_list[3]['id'], combined_list) - - # TODO(henry-nash): The test above uses get_roles_for_user_and_project - # which will, in a subsequent patch, be re-implemeted to simply call - # list_role_assignments (see blueprint remove-role-metadata). - # - # The test plan below therefore mirrors this test, to ensure that - # list_role_assignments works the same. Once - # get_roles_for_user_and_project has been re-implemented then the - # manual tests above can be refactored to simply ensure it gives - # the same answers. - test_plan = { - # A domain with a user and project, 2 groups, plus 4 roles. - 'entities': {'domains': {'users': 1, 'projects': 1, 'groups': 2}, - 'roles': 4}, - 'group_memberships': [{'group': 0, 'users': [0]}, - {'group': 1, 'users': [0]}], - 'assignments': [{'user': 0, 'role': 0, 'project': 0}, - {'group': 0, 'role': 1, 'domain': 0}, - {'group': 1, 'role': 2, 'domain': 0, - 'inherited_to_projects': True}, - {'group': 1, 'role': 3, 'domain': 0, - 'inherited_to_projects': True}], - 'tests': [ - # List all effective assignments for user[0] on project[0]. - # Should get one direct role and both inherited roles, but - # not the direct one on domain[0], even though user[0] is - # in group[0]. - {'params': {'user': 0, 'project': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'project': 0}, - {'user': 0, 'role': 2, 'project': 0, - 'indirect': {'domain': 0, 'group': 1}}, - {'user': 0, 'role': 3, 'project': 0, - 'indirect': {'domain': 0, 'group': 1}}]} - ] - } - self.execute_assignment_test_plan(test_plan) - - def test_list_projects_for_user_with_inherited_grants(self): - """Test inherited user roles. - - Test Plan: - - - Enable OS-INHERIT extension - - Create a domain, with two projects and a user - - Assign an inherited user role on the domain, as well as a direct - user role to a separate project in a different domain - - Get a list of projects for user, should return all three projects - - """ - self.config_fixture.config(group='os_inherit', enabled=True) - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain['id'], domain) - user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain['id'], 'enabled': True} - user1 = self.identity_api.create_user(user1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} - self.resource_api.create_project(project1['id'], project1) - project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} - self.resource_api.create_project(project2['id'], project2) - - # Create 2 grants, one on a project and one inherited grant - # on the domain - self.assignment_api.create_grant(user_id=user1['id'], - project_id=self.tenant_bar['id'], - role_id=self.role_member['id']) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain['id'], - role_id=self.role_admin['id'], - inherited_to_projects=True) - # Should get back all three projects, one by virtue of the direct - # grant, plus both projects in the domain - user_projects = self.assignment_api.list_projects_for_user(user1['id']) - self.assertEqual(3, len(user_projects)) - - # TODO(henry-nash): The test above uses list_projects_for_user - # which may, in a subsequent patch, be re-implemeted to call - # list_role_assignments and then report only the distinct projects. - # - # The test plan below therefore mirrors this test, to ensure that - # list_role_assignments works the same. Once list_projects_for_user - # has been re-implemented then the manual tests above can be - # refactored. - test_plan = { - # A domain with 1 project, plus a second domain with 2 projects, - # as well as a user. Also, create 2 roles. - 'entities': {'domains': [{'projects': 1}, - {'users': 1, 'projects': 2}], - 'roles': 2}, - 'assignments': [{'user': 0, 'role': 0, 'project': 0}, - {'user': 0, 'role': 1, 'domain': 1, - 'inherited_to_projects': True}], - 'tests': [ - # List all effective assignments for user[0] - # Should get one direct role plus one inherited role for each - # project in domain - {'params': {'user': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'project': 0}, - {'user': 0, 'role': 1, 'project': 1, - 'indirect': {'domain': 1}}, - {'user': 0, 'role': 1, 'project': 2, - 'indirect': {'domain': 1}}]} - ] - } - self.execute_assignment_test_plan(test_plan) - - def test_list_projects_for_user_with_inherited_user_project_grants(self): - """Test inherited role assignments for users on nested projects. - - Test Plan: - - - Enable OS-INHERIT extension - - Create a hierarchy of projects with one root and one leaf project - - Assign an inherited user role on root project - - Assign a non-inherited user role on root project - - Get a list of projects for user, should return both projects - - Disable OS-INHERIT extension - - Get a list of projects for user, should return only root project - - """ - # Enable OS-INHERIT extension - self.config_fixture.config(group='os_inherit', enabled=True) - root_project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None, - 'is_domain': False} - self.resource_api.create_project(root_project['id'], root_project) - leaf_project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': root_project['id'], - 'is_domain': False} - self.resource_api.create_project(leaf_project['id'], leaf_project) - - user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, 'enabled': True} - user = self.identity_api.create_user(user) - - # Grant inherited user role - self.assignment_api.create_grant(user_id=user['id'], - project_id=root_project['id'], - role_id=self.role_admin['id'], - inherited_to_projects=True) - # Grant non-inherited user role - self.assignment_api.create_grant(user_id=user['id'], - project_id=root_project['id'], - role_id=self.role_member['id']) - # Should get back both projects: because the direct role assignment for - # the root project and inherited role assignment for leaf project - user_projects = self.assignment_api.list_projects_for_user(user['id']) - self.assertEqual(2, len(user_projects)) - self.assertIn(root_project, user_projects) - self.assertIn(leaf_project, user_projects) - - # Disable OS-INHERIT extension - self.config_fixture.config(group='os_inherit', enabled=False) - # Should get back just root project - due the direct role assignment - user_projects = self.assignment_api.list_projects_for_user(user['id']) - self.assertEqual(1, len(user_projects)) - self.assertIn(root_project, user_projects) - - # TODO(henry-nash): The test above uses list_projects_for_user - # which may, in a subsequent patch, be re-implemeted to call - # list_role_assignments and then report only the distinct projects. - # - # The test plan below therefore mirrors this test, to ensure that - # list_role_assignments works the same. Once list_projects_for_user - # has been re-implemented then the manual tests above can be - # refactored. - test_plan = { - # A domain with a project and sub-project, plus a user. - # Also, create 2 roles. - 'entities': { - 'domains': {'id': DEFAULT_DOMAIN_ID, 'users': 1, - 'projects': {'project': 1}}, - 'roles': 2}, - # A direct role and an inherited role on the parent - 'assignments': [{'user': 0, 'role': 0, 'project': 0}, - {'user': 0, 'role': 1, 'project': 0, - 'inherited_to_projects': True}], - 'tests': [ - # List all effective assignments for user[0] - should get back - # one direct role plus one inherited role. - {'params': {'user': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'project': 0}, - {'user': 0, 'role': 1, 'project': 1, - 'indirect': {'project': 0}}]} - ] - } - - test_plan_with_os_inherit_disabled = { - 'tests': [ - # List all effective assignments for user[0] - should only get - # back the one direct role. - {'params': {'user': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'project': 0}]} - ] - } - self.config_fixture.config(group='os_inherit', enabled=True) - test_data = self.execute_assignment_test_plan(test_plan) - self.config_fixture.config(group='os_inherit', enabled=False) - # Pass the existing test data in to allow execution of 2nd test plan - self.execute_assignment_tests( - test_plan_with_os_inherit_disabled, test_data) - - def test_list_projects_for_user_with_inherited_group_grants(self): - """Test inherited group roles. - - Test Plan: - - - Enable OS-INHERIT extension - - Create two domains, each with two projects - - Create a user and group - - Make the user a member of the group - - Assign a user role two projects, an inherited - group role to one domain and an inherited regular role on - the other domain - - Get a list of projects for user, should return both pairs of projects - from the domain, plus the one separate project - - """ - self.config_fixture.config(group='os_inherit', enabled=True) - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain['id'], domain) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} - self.resource_api.create_project(project1['id'], project1) - project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} - self.resource_api.create_project(project2['id'], project2) - project3 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain2['id']} - self.resource_api.create_project(project3['id'], project3) - project4 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain2['id']} - self.resource_api.create_project(project4['id'], project4) - user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain['id'], 'enabled': True} - user1 = self.identity_api.create_user(user1) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} - group1 = self.identity_api.create_group(group1) - self.identity_api.add_user_to_group(user1['id'], group1['id']) - - # Create 4 grants: - # - one user grant on a project in domain2 - # - one user grant on a project in the default domain - # - one inherited user grant on domain - # - one inherited group grant on domain2 - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project3['id'], - role_id=self.role_member['id']) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=self.tenant_bar['id'], - role_id=self.role_member['id']) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain['id'], - role_id=self.role_admin['id'], - inherited_to_projects=True) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain2['id'], - role_id=self.role_admin['id'], - inherited_to_projects=True) - # Should get back all five projects, but without a duplicate for - # project3 (since it has both a direct user role and an inherited role) - user_projects = self.assignment_api.list_projects_for_user(user1['id']) - self.assertEqual(5, len(user_projects)) - - # TODO(henry-nash): The test above uses list_projects_for_user - # which may, in a subsequent patch, be re-implemeted to call - # list_role_assignments and then report only the distinct projects. - # - # The test plan below therefore mirrors this test, to ensure that - # list_role_assignments works the same. Once list_projects_for_user - # has been re-implemented then the manual tests above can be - # refactored. - test_plan = { - # A domain with a 1 project, plus a second domain with 2 projects, - # as well as a user & group and a 3rd domain with 2 projects. - # Also, created 2 roles. - 'entities': {'domains': [{'projects': 1}, - {'users': 1, 'groups': 1, 'projects': 2}, - {'projects': 2}], - 'roles': 2}, - 'group_memberships': [{'group': 0, 'users': [0]}], - 'assignments': [{'user': 0, 'role': 0, 'project': 0}, - {'user': 0, 'role': 0, 'project': 3}, - {'user': 0, 'role': 1, 'domain': 1, - 'inherited_to_projects': True}, - {'user': 0, 'role': 1, 'domain': 2, - 'inherited_to_projects': True}], - 'tests': [ - # List all effective assignments for user[0] - # Should get back both direct roles plus roles on both projects - # from each domain. Duplicates should not be fitered out. - {'params': {'user': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'project': 3}, - {'user': 0, 'role': 0, 'project': 0}, - {'user': 0, 'role': 1, 'project': 1, - 'indirect': {'domain': 1}}, - {'user': 0, 'role': 1, 'project': 2, - 'indirect': {'domain': 1}}, - {'user': 0, 'role': 1, 'project': 3, - 'indirect': {'domain': 2}}, - {'user': 0, 'role': 1, 'project': 4, - 'indirect': {'domain': 2}}]} - ] - } - self.execute_assignment_test_plan(test_plan) - - def test_list_projects_for_user_with_inherited_group_project_grants(self): - """Test inherited role assignments for groups on nested projects. - - Test Plan: - - - Enable OS-INHERIT extension - - Create a hierarchy of projects with one root and one leaf project - - Assign an inherited group role on root project - - Assign a non-inherited group role on root project - - Get a list of projects for user, should return both projects - - Disable OS-INHERIT extension - - Get a list of projects for user, should return only root project - - """ - self.config_fixture.config(group='os_inherit', enabled=True) - root_project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': None, - 'is_domain': False} - self.resource_api.create_project(root_project['id'], root_project) - leaf_project = {'id': uuid.uuid4().hex, - 'description': '', - 'domain_id': DEFAULT_DOMAIN_ID, - 'enabled': True, - 'name': uuid.uuid4().hex, - 'parent_id': root_project['id'], - 'is_domain': False} - self.resource_api.create_project(leaf_project['id'], leaf_project) - - user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, 'enabled': True} - user = self.identity_api.create_user(user) - - group = {'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID} - group = self.identity_api.create_group(group) - self.identity_api.add_user_to_group(user['id'], group['id']) - - # Grant inherited group role - self.assignment_api.create_grant(group_id=group['id'], - project_id=root_project['id'], - role_id=self.role_admin['id'], - inherited_to_projects=True) - # Grant non-inherited group role - self.assignment_api.create_grant(group_id=group['id'], - project_id=root_project['id'], - role_id=self.role_member['id']) - # Should get back both projects: because the direct role assignment for - # the root project and inherited role assignment for leaf project - user_projects = self.assignment_api.list_projects_for_user(user['id']) - self.assertEqual(2, len(user_projects)) - self.assertIn(root_project, user_projects) - self.assertIn(leaf_project, user_projects) - - # Disable OS-INHERIT extension - self.config_fixture.config(group='os_inherit', enabled=False) - # Should get back just root project - due the direct role assignment - user_projects = self.assignment_api.list_projects_for_user(user['id']) - self.assertEqual(1, len(user_projects)) - self.assertIn(root_project, user_projects) - - # TODO(henry-nash): The test above uses list_projects_for_user - # which may, in a subsequent patch, be re-implemeted to call - # list_role_assignments and then report only the distinct projects. - # - # The test plan below therefore mirrors this test, to ensure that - # list_role_assignments works the same. Once list_projects_for_user - # has been re-implemented then the manual tests above can be - # refactored. - test_plan = { - # A domain with a project ans sub-project, plus a user. - # Also, create 2 roles. - 'entities': { - 'domains': {'id': DEFAULT_DOMAIN_ID, 'users': 1, 'groups': 1, - 'projects': {'project': 1}}, - 'roles': 2}, - 'group_memberships': [{'group': 0, 'users': [0]}], - # A direct role and an inherited role on the parent - 'assignments': [{'group': 0, 'role': 0, 'project': 0}, - {'group': 0, 'role': 1, 'project': 0, - 'inherited_to_projects': True}], - 'tests': [ - # List all effective assignments for user[0] - should get back - # one direct role plus one inherited role. - {'params': {'user': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'project': 0, - 'indirect': {'group': 0}}, - {'user': 0, 'role': 1, 'project': 1, - 'indirect': {'group': 0, 'project': 0}}]} - ] - } - - test_plan_with_os_inherit_disabled = { - 'tests': [ - # List all effective assignments for user[0] - should only get - # back the one direct role. - {'params': {'user': 0, 'effective': True}, - 'results': [{'user': 0, 'role': 0, 'project': 0, - 'indirect': {'group': 0}}]} - ] - } - self.config_fixture.config(group='os_inherit', enabled=True) - test_data = self.execute_assignment_test_plan(test_plan) - self.config_fixture.config(group='os_inherit', enabled=False) - # Pass the existing test data in to allow execution of 2nd test plan - self.execute_assignment_tests( - test_plan_with_os_inherit_disabled, test_data) - - -class FilterTests(filtering.FilterTests): - def test_list_entities_filtered(self): - for entity in ['user', 'group', 'project']: - # Create 20 entities - entity_list = self._create_test_data(entity, 20) - - # Try filtering to get one an exact item out of the list - hints = driver_hints.Hints() - hints.add_filter('name', entity_list[10]['name']) - entities = self._list_entities(entity)(hints=hints) - self.assertEqual(1, len(entities)) - self.assertEqual(entities[0]['id'], entity_list[10]['id']) - # Check the driver has removed the filter from the list hints - self.assertFalse(hints.get_exact_filter_by_name('name')) - self._delete_test_data(entity, entity_list) - - def test_list_users_inexact_filtered(self): - # Create 20 users, some with specific names. We set the names at create - # time (rather than updating them), since the LDAP driver does not - # support name updates. - user_name_data = { - # user index: name for user - 5: 'The', - 6: 'The Ministry', - 7: 'The Ministry of', - 8: 'The Ministry of Silly', - 9: 'The Ministry of Silly Walks', - # ...and one for useful case insensitivity testing - 10: 'The ministry of silly walks OF' - } - user_list = self._create_test_data( - 'user', 20, domain_id=DEFAULT_DOMAIN_ID, name_dict=user_name_data) - - hints = driver_hints.Hints() - hints.add_filter('name', 'ministry', comparator='contains') - users = self.identity_api.list_users(hints=hints) - self.assertEqual(5, len(users)) - self._match_with_list(users, user_list, - list_start=6, list_end=11) - # TODO(henry-nash) Check inexact filter has been removed. - - hints = driver_hints.Hints() - hints.add_filter('name', 'The', comparator='startswith') - users = self.identity_api.list_users(hints=hints) - self.assertEqual(6, len(users)) - self._match_with_list(users, user_list, - list_start=5, list_end=11) - # TODO(henry-nash) Check inexact filter has been removed. - - hints = driver_hints.Hints() - hints.add_filter('name', 'of', comparator='endswith') - users = self.identity_api.list_users(hints=hints) - self.assertEqual(2, len(users)) - # We can't assume we will get back the users in any particular order - self.assertIn(user_list[7]['id'], [users[0]['id'], users[1]['id']]) - self.assertIn(user_list[10]['id'], [users[0]['id'], users[1]['id']]) - # TODO(henry-nash) Check inexact filter has been removed. - - # TODO(henry-nash): Add some case sensitive tests. However, - # these would be hard to validate currently, since: - # - # For SQL, the issue is that MySQL 0.7, by default, is installed in - # case insensitive mode (which is what is run by default for our - # SQL backend tests). For production deployments. OpenStack - # assumes a case sensitive database. For these tests, therefore, we - # need to be able to check the sensitivity of the database so as to - # know whether to run case sensitive tests here. - # - # For LDAP/AD, although dependent on the schema being used, attributes - # are typically configured to be case aware, but not case sensitive. - - self._delete_test_data('user', user_list) - - def test_groups_for_user_filtered(self): - """Test use of filtering doesn't break groups_for_user listing. - - Some backends may use filtering to achieve the list of groups for a - user, so test that it can combine a second filter. - - Test Plan: - - - Create 10 groups, some with names we can filter on - - Create 2 users - - Assign 1 of those users to most of the groups, including some of the - well known named ones - - Assign the other user to other groups as spoilers - - Ensure that when we list groups for users with a filter on the group - name, both restrictions have been enforced on what is returned. - - """ - - number_of_groups = 10 - group_name_data = { - # entity index: name for entity - 5: 'The', - 6: 'The Ministry', - 9: 'The Ministry of Silly Walks', - } - group_list = self._create_test_data( - 'group', number_of_groups, - domain_id=DEFAULT_DOMAIN_ID, name_dict=group_name_data) - user_list = self._create_test_data('user', 2) - - for group in range(7): - # Create membership, including with two out of the three groups - # with well know names - self.identity_api.add_user_to_group(user_list[0]['id'], - group_list[group]['id']) - # ...and some spoiler memberships - for group in range(7, number_of_groups): - self.identity_api.add_user_to_group(user_list[1]['id'], - group_list[group]['id']) - - hints = driver_hints.Hints() - hints.add_filter('name', 'The', comparator='startswith') - groups = self.identity_api.list_groups_for_user( - user_list[0]['id'], hints=hints) - # We should only get back 2 out of the 3 groups that start with 'The' - # hence showing that both "filters" have been applied - self.assertThat(len(groups), matchers.Equals(2)) - self.assertIn(group_list[5]['id'], [groups[0]['id'], groups[1]['id']]) - self.assertIn(group_list[6]['id'], [groups[0]['id'], groups[1]['id']]) - self._delete_test_data('user', user_list) - self._delete_test_data('group', group_list) - - def _get_user_name_field_size(self): - """Return the size of the user name field for the backend. - - Subclasses can override this method to indicate that the user name - field is limited in length. The user name is the field used in the test - that validates that a filter value works even if it's longer than a - field. - - If the backend doesn't limit the value length then return None. - - """ - return None - - def test_filter_value_wider_than_field(self): - # If a filter value is given that's larger than the field in the - # backend then no values are returned. - - user_name_field_size = self._get_user_name_field_size() - - if user_name_field_size is None: - # The backend doesn't limit the size of the user name, so pass this - # test. - return - - # Create some users just to make sure would return something if the - # filter was ignored. - self._create_test_data('user', 2) - - hints = driver_hints.Hints() - value = 'A' * (user_name_field_size + 1) - hints.add_filter('name', value) - users = self.identity_api.list_users(hints=hints) - self.assertEqual([], users) - - def test_list_users_in_group_filtered(self): - number_of_users = 10 - user_name_data = { - 1: 'Arthur Conan Doyle', - 3: 'Arthur Rimbaud', - 9: 'Arthur Schopenhauer', - } - user_list = self._create_test_data( - 'user', number_of_users, - domain_id=DEFAULT_DOMAIN_ID, name_dict=user_name_data) - group = self._create_one_entity('group', - DEFAULT_DOMAIN_ID, 'Great Writers') - for i in range(7): - self.identity_api.add_user_to_group(user_list[i]['id'], - group['id']) - - hints = driver_hints.Hints() - hints.add_filter('name', 'Arthur', comparator='startswith') - users = self.identity_api.list_users_in_group(group['id'], hints=hints) - self.assertThat(len(users), matchers.Equals(2)) - self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']]) - self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']]) - self._delete_test_data('user', user_list) - self._delete_entity('group')(group['id']) - - -class LimitTests(filtering.FilterTests): - ENTITIES = ['user', 'group', 'project'] - - def setUp(self): - """Setup for Limit Test Cases.""" - - self.domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(self.domain1['id'], self.domain1) - self.addCleanup(self.clean_up_domain) - - self.entity_lists = {} - self.domain1_entity_lists = {} - - for entity in self.ENTITIES: - # Create 20 entities, 14 of which are in domain1 - self.entity_lists[entity] = self._create_test_data(entity, 6) - self.domain1_entity_lists[entity] = self._create_test_data( - entity, 14, self.domain1['id']) - self.addCleanup(self.clean_up_entities) - - def clean_up_domain(self): - """Clean up domain test data from Limit Test Cases.""" - - self.domain1['enabled'] = False - self.resource_api.update_domain(self.domain1['id'], self.domain1) - self.resource_api.delete_domain(self.domain1['id']) - del self.domain1 - - def clean_up_entities(self): - """Clean up entity test data from Limit Test Cases.""" - for entity in self.ENTITIES: - self._delete_test_data(entity, self.entity_lists[entity]) - self._delete_test_data(entity, self.domain1_entity_lists[entity]) - del self.entity_lists - del self.domain1_entity_lists - - def _test_list_entity_filtered_and_limited(self, entity): - self.config_fixture.config(list_limit=10) - # Should get back just 10 entities in domain1 - hints = driver_hints.Hints() - hints.add_filter('domain_id', self.domain1['id']) - entities = self._list_entities(entity)(hints=hints) - self.assertEqual(hints.limit['limit'], len(entities)) - self.assertTrue(hints.limit['truncated']) - self._match_with_list(entities, self.domain1_entity_lists[entity]) - - # Override with driver specific limit - if entity == 'project': - self.config_fixture.config(group='resource', list_limit=5) - else: - self.config_fixture.config(group='identity', list_limit=5) - - # Should get back just 5 users in domain1 - hints = driver_hints.Hints() - hints.add_filter('domain_id', self.domain1['id']) - entities = self._list_entities(entity)(hints=hints) - self.assertEqual(hints.limit['limit'], len(entities)) - self._match_with_list(entities, self.domain1_entity_lists[entity]) - - # Finally, let's pretend we want to get the full list of entities, - # even with the limits set, as part of some internal calculation. - # Calling the API without a hints list should achieve this, and - # return at least the 20 entries we created (there may be other - # entities lying around created by other tests/setup). - entities = self._list_entities(entity)() - self.assertTrue(len(entities) >= 20) - - def test_list_users_filtered_and_limited(self): - self._test_list_entity_filtered_and_limited('user') - - def test_list_groups_filtered_and_limited(self): - self._test_list_entity_filtered_and_limited('group') - - def test_list_projects_filtered_and_limited(self): - self._test_list_entity_filtered_and_limited('project') |