diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/assignment')
5 files changed, 3990 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/assignment/__init__.py b/keystone-moon/keystone/tests/unit/assignment/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/assignment/__init__.py diff --git a/keystone-moon/keystone/tests/unit/assignment/role_backends/__init__.py b/keystone-moon/keystone/tests/unit/assignment/role_backends/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/assignment/role_backends/__init__.py diff --git a/keystone-moon/keystone/tests/unit/assignment/role_backends/test_sql.py b/keystone-moon/keystone/tests/unit/assignment/role_backends/test_sql.py new file mode 100644 index 00000000..37e2d924 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/assignment/role_backends/test_sql.py @@ -0,0 +1,112 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystone.common import sql +from keystone import exception +from keystone.tests import unit +from keystone.tests.unit.assignment import test_core +from keystone.tests.unit.backend import core_sql + + +class SqlRoleModels(core_sql.BaseBackendSqlModels): + + def test_role_model(self): + cols = (('id', sql.String, 64), + ('name', sql.String, 255), + ('domain_id', sql.String, 64)) + self.assertExpectedSchema('role', cols) + + +class SqlRole(core_sql.BaseBackendSqlTests, test_core.RoleTests): + + def test_create_null_role_name(self): + role = unit.new_role_ref(name=None) + self.assertRaises(exception.UnexpectedError, + self.role_api.create_role, + role['id'], + role) + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + role['id']) + + def test_create_duplicate_role_domain_specific_name_fails(self): + domain = unit.new_domain_ref() + role1 = unit.new_role_ref(domain_id=domain['id']) + self.role_api.create_role(role1['id'], role1) + role2 = unit.new_role_ref(name=role1['name'], + domain_id=domain['id']) + self.assertRaises(exception.Conflict, + self.role_api.create_role, + role2['id'], + role2) + + def test_update_domain_id_of_role_fails(self): + # Create a global role + role1 = unit.new_role_ref() + role1 = self.role_api.create_role(role1['id'], role1) + # Try and update it to be domain specific + domainA = unit.new_domain_ref() + role1['domain_id'] = domainA['id'] + self.assertRaises(exception.ValidationError, + self.role_api.update_role, + role1['id'], + role1) + + # Create a domain specific role from scratch + role2 = unit.new_role_ref(domain_id=domainA['id']) + self.role_api.create_role(role2['id'], role2) + # Try to "move" it to another domain + domainB = unit.new_domain_ref() + role2['domain_id'] = domainB['id'] + self.assertRaises(exception.ValidationError, + self.role_api.update_role, + role2['id'], + role2) + # Now try to make it global + role2['domain_id'] = None + self.assertRaises(exception.ValidationError, + self.role_api.update_role, + role2['id'], + role2) + + def test_domain_specific_separation(self): + domain1 = unit.new_domain_ref() + role1 = unit.new_role_ref(domain_id=domain1['id']) + role_ref1 = self.role_api.create_role(role1['id'], role1) + self.assertDictEqual(role1, role_ref1) + # Check we can have the same named role in a different domain + domain2 = unit.new_domain_ref() + role2 = unit.new_role_ref(name=role1['name'], domain_id=domain2['id']) + role_ref2 = self.role_api.create_role(role2['id'], role2) + self.assertDictEqual(role2, role_ref2) + # ...and in fact that you can have the same named role as a global role + role3 = unit.new_role_ref(name=role1['name']) + role_ref3 = self.role_api.create_role(role3['id'], role3) + self.assertDictEqual(role3, role_ref3) + # Check that updating one doesn't change the others + role1['name'] = uuid.uuid4().hex + self.role_api.update_role(role1['id'], role1) + role_ref1 = self.role_api.get_role(role1['id']) + self.assertDictEqual(role1, role_ref1) + role_ref2 = self.role_api.get_role(role2['id']) + self.assertDictEqual(role2, role_ref2) + role_ref3 = self.role_api.get_role(role3['id']) + self.assertDictEqual(role3, role_ref3) + # Check that deleting one of these, doesn't affect the others + self.role_api.delete_role(role1['id']) + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + role1['id']) + self.role_api.get_role(role2['id']) + self.role_api.get_role(role3['id']) diff --git a/keystone-moon/keystone/tests/unit/assignment/test_backends.py b/keystone-moon/keystone/tests/unit/assignment/test_backends.py new file mode 100644 index 00000000..eb40e569 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/assignment/test_backends.py @@ -0,0 +1,3755 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +import mock +from oslo_config import cfg +from six.moves import range +from testtools import matchers + +from keystone import exception +from keystone.tests import unit + + +CONF = cfg.CONF + + +class AssignmentTestHelperMixin(object): + """Mixin class to aid testing of assignments. + + This class supports data driven test plans that enable: + + - Creation of initial entities, such as domains, users, groups, projects + and roles + - Creation of assignments referencing the above entities + - A set of input parameters and expected outputs to list_role_assignments + based on the above test data + + A test plan is a dict of the form: + + test_plan = { + entities: details and number of entities, + group_memberships: group-user entity memberships, + assignments: list of assignments to create, + tests: list of pairs of input params and expected outputs} + + An example test plan: + + test_plan = { + # First, create the entities required. Entities are specified by + # a dict with the key being the entity type and the value an + # entity specification which can be one of: + # + # - a simple number, e.g. {'users': 3} creates 3 users + # - a dict where more information regarding the contents of the entity + # is required, e.g. {'domains' : {'users : 3}} creates a domain + # with three users + # - a list of entity specifications if multiple are required + # + # The following creates a domain that contains a single user, group and + # project, as well as creating three roles. + + 'entities': {'domains': {'users': 1, 'groups': 1, 'projects': 1}, + 'roles': 3}, + + # If it is required that an existing domain be used for the new + # entities, then the id of that domain can be included in the + # domain dict. For example, if alternatively we wanted to add 3 users + # to the default domain, add a second domain containing 3 projects as + # well as 5 additional empty domains, the entities would be defined as: + # + # 'entities': {'domains': [{'id': DEFAULT_DOMAIN, 'users': 3}, + # {'projects': 3}, 5]}, + # + # A project hierarchy can be specified within the 'projects' section by + # nesting the 'project' key, for example to create a project with three + # sub-projects you would use: + + 'projects': {'project': 3} + + # A more complex hierarchy can also be defined, for example the + # following would define three projects each containing a + # sub-project, each of which contain a further three sub-projects. + + 'projects': [{'project': {'project': 3}}, + {'project': {'project': 3}}, + {'project': {'project': 3}}] + + # If the 'roles' entity count is defined as top level key in 'entities' + # dict then these are global roles. If it is placed within the + # 'domain' dict, then they will be domain specific roles. A mix of + # domain specific and global roles are allowed, with the role index + # being calculated in the order they are defined in the 'entities' + # dict. + + # A set of implied role specifications. In this case, prior role + # index 0 implies role index 1, and role 1 implies roles 2 and 3. + + 'roles': [{'role': 0, 'implied_roles': [1]}, + {'role': 1, 'implied_roles': [2, 3]}] + + # A list of groups and their members. In this case make users with + # index 0 and 1 members of group with index 0. Users and Groups are + # indexed in the order they appear in the 'entities' key above. + + 'group_memberships': [{'group': 0, 'users': [0, 1]}] + + # Next, create assignments between the entities, referencing the + # entities by index, i.e. 'user': 0 refers to user[0]. Entities are + # indexed in the order they appear in the 'entities' key above within + # their entity type. + + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'group': 0, 'role': 2, 'domain': 0}, + {'user': 0, 'role': 2, 'project': 0}], + + # Finally, define an array of tests where list_role_assignment() is + # called with the given input parameters and the results are then + # confirmed to be as given in 'results'. Again, all entities are + # referenced by index. + + 'tests': [ + {'params': {}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'group': 0, 'role': 2, 'domain': 0}, + {'user': 0, 'role': 2, 'project': 0}]}, + {'params': {'role': 2}, + 'results': [{'group': 0, 'role': 2, 'domain': 0}, + {'user': 0, 'role': 2, 'project': 0}]}] + + # The 'params' key also supports the 'effective', + # 'inherited_to_projects' and 'source_from_group_ids' options to + # list_role_assignments.} + + """ + + def _handle_project_spec(self, test_data, domain_id, project_spec, + parent_id=None): + """Handle the creation of a project or hierarchy of projects. + + project_spec may either be a count of the number of projects to + create, or it may be a list of the form: + + [{'project': project_spec}, {'project': project_spec}, ...] + + This method is called recursively to handle the creation of a + hierarchy of projects. + + """ + def _create_project(domain_id, parent_id): + new_project = unit.new_project_ref(domain_id=domain_id, + parent_id=parent_id) + new_project = self.resource_api.create_project(new_project['id'], + new_project) + return new_project + + if isinstance(project_spec, list): + for this_spec in project_spec: + self._handle_project_spec( + test_data, domain_id, this_spec, parent_id=parent_id) + elif isinstance(project_spec, dict): + new_proj = _create_project(domain_id, parent_id) + test_data['projects'].append(new_proj) + self._handle_project_spec( + test_data, domain_id, project_spec['project'], + parent_id=new_proj['id']) + else: + for _ in range(project_spec): + test_data['projects'].append( + _create_project(domain_id, parent_id)) + + def _create_role(self, domain_id=None): + new_role = unit.new_role_ref(domain_id=domain_id) + return self.role_api.create_role(new_role['id'], new_role) + + def _handle_domain_spec(self, test_data, domain_spec): + """Handle the creation of domains and their contents. + + domain_spec may either be a count of the number of empty domains to + create, a dict describing the domain contents, or a list of + domain_specs. + + In the case when a list is provided, this method calls itself + recursively to handle the list elements. + + This method will insert any entities created into test_data + + """ + def _create_domain(domain_id=None): + if domain_id is None: + new_domain = unit.new_domain_ref() + self.resource_api.create_domain(new_domain['id'], + new_domain) + return new_domain + else: + # The test plan specified an existing domain to use + return self.resource_api.get_domain(domain_id) + + def _create_entity_in_domain(entity_type, domain_id): + """Create a user or group entity in the domain.""" + if entity_type == 'users': + new_entity = unit.new_user_ref(domain_id=domain_id) + new_entity = self.identity_api.create_user(new_entity) + elif entity_type == 'groups': + new_entity = unit.new_group_ref(domain_id=domain_id) + new_entity = self.identity_api.create_group(new_entity) + elif entity_type == 'roles': + new_entity = self._create_role(domain_id=domain_id) + else: + # Must be a bad test plan + raise exception.NotImplemented() + return new_entity + + if isinstance(domain_spec, list): + for x in domain_spec: + self._handle_domain_spec(test_data, x) + elif isinstance(domain_spec, dict): + # If there is a domain ID specified, then use it + the_domain = _create_domain(domain_spec.get('id')) + test_data['domains'].append(the_domain) + for entity_type, value in domain_spec.items(): + if entity_type == 'id': + # We already used this above to determine whether to + # use and existing domain + continue + if entity_type == 'projects': + # If it's projects, we need to handle the potential + # specification of a project hierarchy + self._handle_project_spec( + test_data, the_domain['id'], value) + else: + # It's a count of number of entities + for _ in range(value): + test_data[entity_type].append( + _create_entity_in_domain( + entity_type, the_domain['id'])) + else: + for _ in range(domain_spec): + test_data['domains'].append(_create_domain()) + + def create_entities(self, entity_pattern): + """Create the entities specified in the test plan. + + Process the 'entities' key in the test plan, creating the requested + entities. Each created entity will be added to the array of entities + stored in the returned test_data object, e.g.: + + test_data['users'] = [user[0], user[1]....] + + """ + test_data = {} + for entity in ['users', 'groups', 'domains', 'projects', 'roles']: + test_data[entity] = [] + + # Create any domains requested and, if specified, any entities within + # those domains + if 'domains' in entity_pattern: + self._handle_domain_spec(test_data, entity_pattern['domains']) + + # Create any roles requested + if 'roles' in entity_pattern: + for _ in range(entity_pattern['roles']): + test_data['roles'].append(self._create_role()) + + return test_data + + def _convert_entity_shorthand(self, key, shorthand_data, reference_data): + """Convert a shorthand entity description into a full ID reference. + + In test plan definitions, we allow a shorthand for referencing to an + entity of the form: + + 'user': 0 + + which is actually shorthand for: + + 'user_id': reference_data['users'][0]['id'] + + This method converts the shorthand version into the full reference. + + """ + expanded_key = '%s_id' % key + reference_index = '%ss' % key + index_value = ( + reference_data[reference_index][shorthand_data[key]]['id']) + return expanded_key, index_value + + def create_implied_roles(self, implied_pattern, test_data): + """Create the implied roles specified in the test plan.""" + for implied_spec in implied_pattern: + # Each implied role specification is a dict of the form: + # + # {'role': 0, 'implied_roles': list of roles} + + prior_role = test_data['roles'][implied_spec['role']]['id'] + if isinstance(implied_spec['implied_roles'], list): + for this_role in implied_spec['implied_roles']: + implied_role = test_data['roles'][this_role]['id'] + self.role_api.create_implied_role(prior_role, implied_role) + else: + implied_role = ( + test_data['roles'][implied_spec['implied_roles']]['id']) + self.role_api.create_implied_role(prior_role, implied_role) + + def create_group_memberships(self, group_pattern, test_data): + """Create the group memberships specified in the test plan.""" + for group_spec in group_pattern: + # Each membership specification is a dict of the form: + # + # {'group': 0, 'users': [list of user indexes]} + # + # Add all users in the list to the specified group, first + # converting from index to full entity ID. + group_value = test_data['groups'][group_spec['group']]['id'] + for user_index in group_spec['users']: + user_value = test_data['users'][user_index]['id'] + self.identity_api.add_user_to_group(user_value, group_value) + return test_data + + def create_assignments(self, assignment_pattern, test_data): + """Create the assignments specified in the test plan.""" + # First store how many assignments are already in the system, + # so during the tests we can check the number of new assignments + # created. + test_data['initial_assignment_count'] = ( + len(self.assignment_api.list_role_assignments())) + + # Now create the new assignments in the test plan + for assignment in assignment_pattern: + # Each assignment is a dict of the form: + # + # { 'user': 0, 'project':1, 'role': 6} + # + # where the value of each item is the index into the array of + # entities created earlier. + # + # We process the assignment dict to create the args required to + # make the create_grant() call. + args = {} + for param in assignment: + if param == 'inherited_to_projects': + args[param] = assignment[param] + else: + # Turn 'entity : 0' into 'entity_id = ac6736ba873d' + # where entity in user, group, project or domain + key, value = self._convert_entity_shorthand( + param, assignment, test_data) + args[key] = value + self.assignment_api.create_grant(**args) + return test_data + + def execute_assignment_cases(self, test_plan, test_data): + """Execute the test plan, based on the created test_data.""" + def check_results(expected, actual, param_arg_count): + if param_arg_count == 0: + # It was an unfiltered call, so default fixture assignments + # might be polluting our answer - so we take into account + # how many assignments there were before the test. + self.assertEqual( + len(expected) + test_data['initial_assignment_count'], + len(actual)) + else: + self.assertThat(actual, matchers.HasLength(len(expected))) + + for each_expected in expected: + expected_assignment = {} + for param in each_expected: + if param == 'inherited_to_projects': + expected_assignment[param] = each_expected[param] + elif param == 'indirect': + # We're expecting the result to contain an indirect + # dict with the details how the role came to be placed + # on this entity - so convert the key/value pairs of + # that dict into real entity references. + indirect_term = {} + for indirect_param in each_expected[param]: + key, value = self._convert_entity_shorthand( + indirect_param, each_expected[param], + test_data) + indirect_term[key] = value + expected_assignment[param] = indirect_term + else: + # Convert a simple shorthand entry into a full + # entity reference + key, value = self._convert_entity_shorthand( + param, each_expected, test_data) + expected_assignment[key] = value + self.assertIn(expected_assignment, actual) + + def convert_group_ids_sourced_from_list(index_list, reference_data): + value_list = [] + for group_index in index_list: + value_list.append( + reference_data['groups'][group_index]['id']) + return value_list + + # Go through each test in the array, processing the input params, which + # we build into an args dict, and then call list_role_assignments. Then + # check the results against those specified in the test plan. + for test in test_plan.get('tests', []): + args = {} + for param in test['params']: + if param in ['effective', 'inherited', 'include_subtree']: + # Just pass the value into the args + args[param] = test['params'][param] + elif param == 'source_from_group_ids': + # Convert the list of indexes into a list of IDs + args[param] = convert_group_ids_sourced_from_list( + test['params']['source_from_group_ids'], test_data) + else: + # Turn 'entity : 0' into 'entity_id = ac6736ba873d' + # where entity in user, group, project or domain + key, value = self._convert_entity_shorthand( + param, test['params'], test_data) + args[key] = value + results = self.assignment_api.list_role_assignments(**args) + check_results(test['results'], results, len(args)) + + def execute_assignment_plan(self, test_plan): + """Create entities, assignments and execute the test plan. + + The standard method to call to create entities and assignments and + execute the tests as specified in the test_plan. The test_data + dict is returned so that, if required, the caller can execute + additional manual tests with the entities and assignments created. + + """ + test_data = self.create_entities(test_plan['entities']) + if 'implied_roles' in test_plan: + self.create_implied_roles(test_plan['implied_roles'], test_data) + if 'group_memberships' in test_plan: + self.create_group_memberships(test_plan['group_memberships'], + test_data) + if 'assignments' in test_plan: + test_data = self.create_assignments(test_plan['assignments'], + test_data) + self.execute_assignment_cases(test_plan, test_data) + return test_data + + +class AssignmentTests(AssignmentTestHelperMixin): + + def _get_domain_fixture(self): + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + return domain + + def test_project_add_and_remove_user_role(self): + user_ids = self.assignment_api.list_user_ids_for_project( + self.tenant_bar['id']) + self.assertNotIn(self.user_two['id'], user_ids) + + self.assignment_api.add_role_to_user_and_project( + tenant_id=self.tenant_bar['id'], + user_id=self.user_two['id'], + role_id=self.role_other['id']) + user_ids = self.assignment_api.list_user_ids_for_project( + self.tenant_bar['id']) + self.assertIn(self.user_two['id'], user_ids) + + self.assignment_api.remove_role_from_user_and_project( + tenant_id=self.tenant_bar['id'], + user_id=self.user_two['id'], + role_id=self.role_other['id']) + + user_ids = self.assignment_api.list_user_ids_for_project( + self.tenant_bar['id']) + self.assertNotIn(self.user_two['id'], user_ids) + + def test_remove_user_role_not_assigned(self): + # Expect failure if attempt to remove a role that was never assigned to + # the user. + self.assertRaises(exception.RoleNotFound, + self.assignment_api. + remove_role_from_user_and_project, + tenant_id=self.tenant_bar['id'], + user_id=self.user_two['id'], + role_id=self.role_other['id']) + + def test_list_user_ids_for_project(self): + user_ids = self.assignment_api.list_user_ids_for_project( + self.tenant_baz['id']) + self.assertEqual(2, len(user_ids)) + self.assertIn(self.user_two['id'], user_ids) + self.assertIn(self.user_badguy['id'], user_ids) + + def test_list_user_ids_for_project_no_duplicates(self): + # Create user + user_ref = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user_ref = self.identity_api.create_user(user_ref) + # Create project + project_ref = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project( + project_ref['id'], project_ref) + # Create 2 roles and give user each role in project + for i in range(2): + role_ref = unit.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + self.assignment_api.add_role_to_user_and_project( + user_id=user_ref['id'], + tenant_id=project_ref['id'], + role_id=role_ref['id']) + # Get the list of user_ids in project + user_ids = self.assignment_api.list_user_ids_for_project( + project_ref['id']) + # Ensure the user is only returned once + self.assertEqual(1, len(user_ids)) + + def test_get_project_user_ids_returns_not_found(self): + self.assertRaises(exception.ProjectNotFound, + self.assignment_api.list_user_ids_for_project, + uuid.uuid4().hex) + + def test_list_role_assignments_unfiltered(self): + """Test unfiltered listing of role assignments.""" + test_plan = { + # Create a domain, with a user, group & project + 'entities': {'domains': {'users': 1, 'groups': 1, 'projects': 1}, + 'roles': 3}, + # Create a grant of each type (user/group on project/domain) + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'group': 0, 'role': 2, 'domain': 0}, + {'group': 0, 'role': 2, 'project': 0}], + 'tests': [ + # Check that we get back the 4 assignments + {'params': {}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'group': 0, 'role': 2, 'domain': 0}, + {'group': 0, 'role': 2, 'project': 0}]} + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_role_assignments_filtered_by_role(self): + """Test listing of role assignments filtered by role ID.""" + test_plan = { + # Create a user, group & project in the default domain + 'entities': {'domains': {'id': CONF.identity.default_domain_id, + 'users': 1, 'groups': 1, 'projects': 1}, + 'roles': 3}, + # Create a grant of each type (user/group on project/domain) + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'group': 0, 'role': 2, 'domain': 0}, + {'group': 0, 'role': 2, 'project': 0}], + 'tests': [ + # Check that when filtering by role, we only get back those + # that match + {'params': {'role': 2}, + 'results': [{'group': 0, 'role': 2, 'domain': 0}, + {'group': 0, 'role': 2, 'project': 0}]} + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_group_role_assignment(self): + # When a group role assignment is created and the role assignments are + # listed then the group role assignment is included in the list. + + test_plan = { + 'entities': {'domains': {'id': CONF.identity.default_domain_id, + 'groups': 1, 'projects': 1}, + 'roles': 1}, + 'assignments': [{'group': 0, 'role': 0, 'project': 0}], + 'tests': [ + {'params': {}, + 'results': [{'group': 0, 'role': 0, 'project': 0}]} + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_role_assignments_bad_role(self): + assignment_list = self.assignment_api.list_role_assignments( + role_id=uuid.uuid4().hex) + self.assertEqual([], assignment_list) + + def test_add_duplicate_role_grant(self): + roles_ref = self.assignment_api.get_roles_for_user_and_project( + self.user_foo['id'], self.tenant_bar['id']) + self.assertNotIn(self.role_admin['id'], roles_ref) + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], self.tenant_bar['id'], self.role_admin['id']) + self.assertRaises(exception.Conflict, + self.assignment_api.add_role_to_user_and_project, + self.user_foo['id'], + self.tenant_bar['id'], + self.role_admin['id']) + + def test_get_role_by_user_and_project_with_user_in_group(self): + """Test for get role by user and project, user was added into a group. + + Test Plan: + + - Create a user, a project & a group, add this user to group + - Create roles and grant them to user and project + - Check the role list get by the user and project was as expected + + """ + user_ref = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user_ref = self.identity_api.create_user(user_ref) + + project_ref = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project_ref['id'], project_ref) + + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + group_id = self.identity_api.create_group(group)['id'] + self.identity_api.add_user_to_group(user_ref['id'], group_id) + + role_ref_list = [] + for i in range(2): + role_ref = unit.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + role_ref_list.append(role_ref) + + self.assignment_api.add_role_to_user_and_project( + user_id=user_ref['id'], + tenant_id=project_ref['id'], + role_id=role_ref['id']) + + role_list = self.assignment_api.get_roles_for_user_and_project( + user_ref['id'], + project_ref['id']) + + self.assertEqual(set([r['id'] for r in role_ref_list]), + set(role_list)) + + def test_get_role_by_user_and_project(self): + roles_ref = self.assignment_api.get_roles_for_user_and_project( + self.user_foo['id'], self.tenant_bar['id']) + self.assertNotIn(self.role_admin['id'], roles_ref) + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], self.tenant_bar['id'], self.role_admin['id']) + roles_ref = self.assignment_api.get_roles_for_user_and_project( + self.user_foo['id'], self.tenant_bar['id']) + self.assertIn(self.role_admin['id'], roles_ref) + self.assertNotIn('member', roles_ref) + + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], self.tenant_bar['id'], 'member') + roles_ref = self.assignment_api.get_roles_for_user_and_project( + self.user_foo['id'], self.tenant_bar['id']) + self.assertIn(self.role_admin['id'], roles_ref) + self.assertIn('member', roles_ref) + + def test_get_roles_for_user_and_domain(self): + """Test for getting roles for user on a domain. + + Test Plan: + + - Create a domain, with 2 users + - Check no roles yet exit + - Give user1 two roles on the domain, user2 one role + - Get roles on user1 and the domain - maybe sure we only + get back the 2 roles on user1 + - Delete both roles from user1 + - Check we get no roles back for user1 on domain + + """ + new_domain = unit.new_domain_ref() + self.resource_api.create_domain(new_domain['id'], new_domain) + new_user1 = unit.new_user_ref(domain_id=new_domain['id']) + new_user1 = self.identity_api.create_user(new_user1) + new_user2 = unit.new_user_ref(domain_id=new_domain['id']) + new_user2 = self.identity_api.create_user(new_user2) + roles_ref = self.assignment_api.list_grants( + user_id=new_user1['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + # Now create the grants (roles are defined in default_fixtures) + self.assignment_api.create_grant(user_id=new_user1['id'], + domain_id=new_domain['id'], + role_id='member') + self.assignment_api.create_grant(user_id=new_user1['id'], + domain_id=new_domain['id'], + role_id='other') + self.assignment_api.create_grant(user_id=new_user2['id'], + domain_id=new_domain['id'], + role_id='admin') + # Read back the roles for user1 on domain + roles_ids = self.assignment_api.get_roles_for_user_and_domain( + new_user1['id'], new_domain['id']) + self.assertEqual(2, len(roles_ids)) + self.assertIn(self.role_member['id'], roles_ids) + self.assertIn(self.role_other['id'], roles_ids) + + # Now delete both grants for user1 + self.assignment_api.delete_grant(user_id=new_user1['id'], + domain_id=new_domain['id'], + role_id='member') + self.assignment_api.delete_grant(user_id=new_user1['id'], + domain_id=new_domain['id'], + role_id='other') + roles_ref = self.assignment_api.list_grants( + user_id=new_user1['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + + def test_get_roles_for_user_and_domain_returns_not_found(self): + """Test errors raised when getting roles for user on a domain. + + Test Plan: + + - Check non-existing user gives UserNotFound + - Check non-existing domain gives DomainNotFound + + """ + new_domain = self._get_domain_fixture() + new_user1 = unit.new_user_ref(domain_id=new_domain['id']) + new_user1 = self.identity_api.create_user(new_user1) + + self.assertRaises(exception.UserNotFound, + self.assignment_api.get_roles_for_user_and_domain, + uuid.uuid4().hex, + new_domain['id']) + + self.assertRaises(exception.DomainNotFound, + self.assignment_api.get_roles_for_user_and_domain, + new_user1['id'], + uuid.uuid4().hex) + + def test_get_roles_for_user_and_project_returns_not_found(self): + self.assertRaises(exception.UserNotFound, + self.assignment_api.get_roles_for_user_and_project, + uuid.uuid4().hex, + self.tenant_bar['id']) + + self.assertRaises(exception.ProjectNotFound, + self.assignment_api.get_roles_for_user_and_project, + self.user_foo['id'], + uuid.uuid4().hex) + + def test_add_role_to_user_and_project_returns_not_found(self): + self.assertRaises(exception.ProjectNotFound, + self.assignment_api.add_role_to_user_and_project, + self.user_foo['id'], + uuid.uuid4().hex, + self.role_admin['id']) + + self.assertRaises(exception.RoleNotFound, + self.assignment_api.add_role_to_user_and_project, + self.user_foo['id'], + self.tenant_bar['id'], + uuid.uuid4().hex) + + def test_add_role_to_user_and_project_no_user(self): + # If add_role_to_user_and_project and the user doesn't exist, then + # no error. + user_id_not_exist = uuid.uuid4().hex + self.assignment_api.add_role_to_user_and_project( + user_id_not_exist, self.tenant_bar['id'], self.role_admin['id']) + + def test_remove_role_from_user_and_project(self): + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], self.tenant_bar['id'], 'member') + self.assignment_api.remove_role_from_user_and_project( + self.user_foo['id'], self.tenant_bar['id'], 'member') + roles_ref = self.assignment_api.get_roles_for_user_and_project( + self.user_foo['id'], self.tenant_bar['id']) + self.assertNotIn('member', roles_ref) + self.assertRaises(exception.NotFound, + self.assignment_api. + remove_role_from_user_and_project, + self.user_foo['id'], + self.tenant_bar['id'], + 'member') + + def test_get_role_grant_by_user_and_project(self): + roles_ref = self.assignment_api.list_grants( + user_id=self.user_foo['id'], + project_id=self.tenant_bar['id']) + self.assertEqual(1, len(roles_ref)) + self.assignment_api.create_grant(user_id=self.user_foo['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_admin['id']) + roles_ref = self.assignment_api.list_grants( + user_id=self.user_foo['id'], + project_id=self.tenant_bar['id']) + self.assertIn(self.role_admin['id'], + [role_ref['id'] for role_ref in roles_ref]) + + self.assignment_api.create_grant(user_id=self.user_foo['id'], + project_id=self.tenant_bar['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + user_id=self.user_foo['id'], + project_id=self.tenant_bar['id']) + + roles_ref_ids = [] + for ref in roles_ref: + roles_ref_ids.append(ref['id']) + self.assertIn(self.role_admin['id'], roles_ref_ids) + self.assertIn('member', roles_ref_ids) + + def test_remove_role_grant_from_user_and_project(self): + self.assignment_api.create_grant(user_id=self.user_foo['id'], + project_id=self.tenant_baz['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + user_id=self.user_foo['id'], + project_id=self.tenant_baz['id']) + self.assertDictEqual(self.role_member, roles_ref[0]) + + self.assignment_api.delete_grant(user_id=self.user_foo['id'], + project_id=self.tenant_baz['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + user_id=self.user_foo['id'], + project_id=self.tenant_baz['id']) + self.assertEqual(0, len(roles_ref)) + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + user_id=self.user_foo['id'], + project_id=self.tenant_baz['id'], + role_id='member') + + def test_get_role_assignment_by_project_not_found(self): + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.check_grant_role_id, + user_id=self.user_foo['id'], + project_id=self.tenant_baz['id'], + role_id='member') + + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.check_grant_role_id, + group_id=uuid.uuid4().hex, + project_id=self.tenant_baz['id'], + role_id='member') + + def test_get_role_assignment_by_domain_not_found(self): + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.check_grant_role_id, + user_id=self.user_foo['id'], + domain_id=self.domain_default['id'], + role_id='member') + + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.check_grant_role_id, + group_id=uuid.uuid4().hex, + domain_id=self.domain_default['id'], + role_id='member') + + def test_del_role_assignment_by_project_not_found(self): + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + user_id=self.user_foo['id'], + project_id=self.tenant_baz['id'], + role_id='member') + + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + group_id=uuid.uuid4().hex, + project_id=self.tenant_baz['id'], + role_id='member') + + def test_del_role_assignment_by_domain_not_found(self): + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + user_id=self.user_foo['id'], + domain_id=self.domain_default['id'], + role_id='member') + + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + group_id=uuid.uuid4().hex, + domain_id=self.domain_default['id'], + role_id='member') + + def test_get_and_remove_role_grant_by_group_and_project(self): + new_domain = unit.new_domain_ref() + self.resource_api.create_domain(new_domain['id'], new_domain) + new_group = unit.new_group_ref(domain_id=new_domain['id']) + new_group = self.identity_api.create_group(new_group) + new_user = unit.new_user_ref(domain_id=new_domain['id']) + new_user = self.identity_api.create_user(new_user) + self.identity_api.add_user_to_group(new_user['id'], + new_group['id']) + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + project_id=self.tenant_bar['id']) + self.assertEqual(0, len(roles_ref)) + self.assignment_api.create_grant(group_id=new_group['id'], + project_id=self.tenant_bar['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + project_id=self.tenant_bar['id']) + self.assertDictEqual(self.role_member, roles_ref[0]) + + self.assignment_api.delete_grant(group_id=new_group['id'], + project_id=self.tenant_bar['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + project_id=self.tenant_bar['id']) + self.assertEqual(0, len(roles_ref)) + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + group_id=new_group['id'], + project_id=self.tenant_bar['id'], + role_id='member') + + def test_get_and_remove_role_grant_by_group_and_domain(self): + new_domain = unit.new_domain_ref() + self.resource_api.create_domain(new_domain['id'], new_domain) + new_group = unit.new_group_ref(domain_id=new_domain['id']) + new_group = self.identity_api.create_group(new_group) + new_user = unit.new_user_ref(domain_id=new_domain['id']) + new_user = self.identity_api.create_user(new_user) + self.identity_api.add_user_to_group(new_user['id'], + new_group['id']) + + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + + self.assignment_api.create_grant(group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='member') + + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + domain_id=new_domain['id']) + self.assertDictEqual(self.role_member, roles_ref[0]) + + self.assignment_api.delete_grant(group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='member') + + def test_get_and_remove_correct_role_grant_from_a_mix(self): + new_domain = unit.new_domain_ref() + self.resource_api.create_domain(new_domain['id'], new_domain) + new_project = unit.new_project_ref(domain_id=new_domain['id']) + self.resource_api.create_project(new_project['id'], new_project) + new_group = unit.new_group_ref(domain_id=new_domain['id']) + new_group = self.identity_api.create_group(new_group) + new_group2 = unit.new_group_ref(domain_id=new_domain['id']) + new_group2 = self.identity_api.create_group(new_group2) + new_user = unit.new_user_ref(domain_id=new_domain['id']) + new_user = self.identity_api.create_user(new_user) + new_user2 = unit.new_user_ref(domain_id=new_domain['id']) + new_user2 = self.identity_api.create_user(new_user2) + self.identity_api.add_user_to_group(new_user['id'], + new_group['id']) + # First check we have no grants + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + # Now add the grant we are going to test for, and some others as + # well just to make sure we get back the right one + self.assignment_api.create_grant(group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='member') + + self.assignment_api.create_grant(group_id=new_group2['id'], + domain_id=new_domain['id'], + role_id=self.role_admin['id']) + self.assignment_api.create_grant(user_id=new_user2['id'], + domain_id=new_domain['id'], + role_id=self.role_admin['id']) + self.assignment_api.create_grant(group_id=new_group['id'], + project_id=new_project['id'], + role_id=self.role_admin['id']) + + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + domain_id=new_domain['id']) + self.assertDictEqual(self.role_member, roles_ref[0]) + + self.assignment_api.delete_grant(group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='member') + + def test_get_and_remove_role_grant_by_user_and_domain(self): + new_domain = unit.new_domain_ref() + self.resource_api.create_domain(new_domain['id'], new_domain) + new_user = unit.new_user_ref(domain_id=new_domain['id']) + new_user = self.identity_api.create_user(new_user) + roles_ref = self.assignment_api.list_grants( + user_id=new_user['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + self.assignment_api.create_grant(user_id=new_user['id'], + domain_id=new_domain['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + user_id=new_user['id'], + domain_id=new_domain['id']) + self.assertDictEqual(self.role_member, roles_ref[0]) + + self.assignment_api.delete_grant(user_id=new_user['id'], + domain_id=new_domain['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + user_id=new_user['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + user_id=new_user['id'], + domain_id=new_domain['id'], + role_id='member') + + def test_get_and_remove_role_grant_by_group_and_cross_domain(self): + group1_domain1_role = unit.new_role_ref() + self.role_api.create_role(group1_domain1_role['id'], + group1_domain1_role) + group1_domain2_role = unit.new_role_ref() + self.role_api.create_role(group1_domain2_role['id'], + group1_domain2_role) + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + group1 = unit.new_group_ref(domain_id=domain1['id']) + group1 = self.identity_api.create_group(group1) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + domain_id=domain1['id']) + self.assertEqual(0, len(roles_ref)) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + domain_id=domain2['id']) + self.assertEqual(0, len(roles_ref)) + self.assignment_api.create_grant(group_id=group1['id'], + domain_id=domain1['id'], + role_id=group1_domain1_role['id']) + self.assignment_api.create_grant(group_id=group1['id'], + domain_id=domain2['id'], + role_id=group1_domain2_role['id']) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + domain_id=domain1['id']) + self.assertDictEqual(group1_domain1_role, roles_ref[0]) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + domain_id=domain2['id']) + self.assertDictEqual(group1_domain2_role, roles_ref[0]) + + self.assignment_api.delete_grant(group_id=group1['id'], + domain_id=domain2['id'], + role_id=group1_domain2_role['id']) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + domain_id=domain2['id']) + self.assertEqual(0, len(roles_ref)) + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + group_id=group1['id'], + domain_id=domain2['id'], + role_id=group1_domain2_role['id']) + + def test_get_and_remove_role_grant_by_user_and_cross_domain(self): + user1_domain1_role = unit.new_role_ref() + self.role_api.create_role(user1_domain1_role['id'], user1_domain1_role) + user1_domain2_role = unit.new_role_ref() + self.role_api.create_role(user1_domain2_role['id'], user1_domain2_role) + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + user1 = unit.new_user_ref(domain_id=domain1['id']) + user1 = self.identity_api.create_user(user1) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + domain_id=domain1['id']) + self.assertEqual(0, len(roles_ref)) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + domain_id=domain2['id']) + self.assertEqual(0, len(roles_ref)) + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=user1_domain1_role['id']) + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain2['id'], + role_id=user1_domain2_role['id']) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + domain_id=domain1['id']) + self.assertDictEqual(user1_domain1_role, roles_ref[0]) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + domain_id=domain2['id']) + self.assertDictEqual(user1_domain2_role, roles_ref[0]) + + self.assignment_api.delete_grant(user_id=user1['id'], + domain_id=domain2['id'], + role_id=user1_domain2_role['id']) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + domain_id=domain2['id']) + self.assertEqual(0, len(roles_ref)) + self.assertRaises(exception.RoleAssignmentNotFound, + self.assignment_api.delete_grant, + user_id=user1['id'], + domain_id=domain2['id'], + role_id=user1_domain2_role['id']) + + def test_role_grant_by_group_and_cross_domain_project(self): + role1 = unit.new_role_ref() + self.role_api.create_role(role1['id'], role1) + role2 = unit.new_role_ref() + self.role_api.create_role(role2['id'], role2) + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + group1 = unit.new_group_ref(domain_id=domain1['id']) + group1 = self.identity_api.create_group(group1) + project1 = unit.new_project_ref(domain_id=domain2['id']) + self.resource_api.create_project(project1['id'], project1) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + project_id=project1['id']) + self.assertEqual(0, len(roles_ref)) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=project1['id'], + role_id=role1['id']) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=project1['id'], + role_id=role2['id']) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + project_id=project1['id']) + + roles_ref_ids = [] + for ref in roles_ref: + roles_ref_ids.append(ref['id']) + self.assertIn(role1['id'], roles_ref_ids) + self.assertIn(role2['id'], roles_ref_ids) + + self.assignment_api.delete_grant(group_id=group1['id'], + project_id=project1['id'], + role_id=role1['id']) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + project_id=project1['id']) + self.assertEqual(1, len(roles_ref)) + self.assertDictEqual(role2, roles_ref[0]) + + def test_role_grant_by_user_and_cross_domain_project(self): + role1 = unit.new_role_ref() + self.role_api.create_role(role1['id'], role1) + role2 = unit.new_role_ref() + self.role_api.create_role(role2['id'], role2) + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + user1 = unit.new_user_ref(domain_id=domain1['id']) + user1 = self.identity_api.create_user(user1) + project1 = unit.new_project_ref(domain_id=domain2['id']) + self.resource_api.create_project(project1['id'], project1) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEqual(0, len(roles_ref)) + self.assignment_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role1['id']) + self.assignment_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role2['id']) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + + roles_ref_ids = [] + for ref in roles_ref: + roles_ref_ids.append(ref['id']) + self.assertIn(role1['id'], roles_ref_ids) + self.assertIn(role2['id'], roles_ref_ids) + + self.assignment_api.delete_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role1['id']) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEqual(1, len(roles_ref)) + self.assertDictEqual(role2, roles_ref[0]) + + def test_delete_user_grant_no_user(self): + # Can delete a grant where the user doesn't exist. + role = unit.new_role_ref() + role_id = role['id'] + self.role_api.create_role(role_id, role) + + user_id = uuid.uuid4().hex + + self.assignment_api.create_grant(role_id, user_id=user_id, + project_id=self.tenant_bar['id']) + + self.assignment_api.delete_grant(role_id, user_id=user_id, + project_id=self.tenant_bar['id']) + + def test_delete_group_grant_no_group(self): + # Can delete a grant where the group doesn't exist. + role = unit.new_role_ref() + role_id = role['id'] + self.role_api.create_role(role_id, role) + + group_id = uuid.uuid4().hex + + self.assignment_api.create_grant(role_id, group_id=group_id, + project_id=self.tenant_bar['id']) + + self.assignment_api.delete_grant(role_id, group_id=group_id, + project_id=self.tenant_bar['id']) + + def test_grant_crud_throws_exception_if_invalid_role(self): + """Ensure RoleNotFound thrown if role does not exist.""" + def assert_role_not_found_exception(f, **kwargs): + self.assertRaises(exception.RoleNotFound, f, + role_id=uuid.uuid4().hex, **kwargs) + + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user_resp = self.identity_api.create_user(user) + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + group_resp = self.identity_api.create_group(group) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project_resp = self.resource_api.create_project(project['id'], project) + + for manager_call in [self.assignment_api.create_grant, + self.assignment_api.get_grant, + self.assignment_api.delete_grant]: + assert_role_not_found_exception( + manager_call, + user_id=user_resp['id'], project_id=project_resp['id']) + assert_role_not_found_exception( + manager_call, + group_id=group_resp['id'], project_id=project_resp['id']) + assert_role_not_found_exception( + manager_call, + user_id=user_resp['id'], + domain_id=CONF.identity.default_domain_id) + assert_role_not_found_exception( + manager_call, + group_id=group_resp['id'], + domain_id=CONF.identity.default_domain_id) + + def test_multi_role_grant_by_user_group_on_project_domain(self): + role_list = [] + for _ in range(10): + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + role_list.append(role) + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + user1 = unit.new_user_ref(domain_id=domain1['id']) + user1 = self.identity_api.create_user(user1) + group1 = unit.new_group_ref(domain_id=domain1['id']) + group1 = self.identity_api.create_group(group1) + group2 = unit.new_group_ref(domain_id=domain1['id']) + group2 = self.identity_api.create_group(group2) + project1 = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project1['id'], project1) + + self.identity_api.add_user_to_group(user1['id'], + group1['id']) + self.identity_api.add_user_to_group(user1['id'], + group2['id']) + + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEqual(0, len(roles_ref)) + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=role_list[0]['id']) + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id']) + self.assignment_api.create_grant(group_id=group1['id'], + domain_id=domain1['id'], + role_id=role_list[2]['id']) + self.assignment_api.create_grant(group_id=group1['id'], + domain_id=domain1['id'], + role_id=role_list[3]['id']) + self.assignment_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role_list[4]['id']) + self.assignment_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role_list[5]['id']) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=project1['id'], + role_id=role_list[6]['id']) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=project1['id'], + role_id=role_list[7]['id']) + roles_ref = self.assignment_api.list_grants(user_id=user1['id'], + domain_id=domain1['id']) + self.assertEqual(2, len(roles_ref)) + self.assertIn(role_list[0], roles_ref) + self.assertIn(role_list[1], roles_ref) + roles_ref = self.assignment_api.list_grants(group_id=group1['id'], + domain_id=domain1['id']) + self.assertEqual(2, len(roles_ref)) + self.assertIn(role_list[2], roles_ref) + self.assertIn(role_list[3], roles_ref) + roles_ref = self.assignment_api.list_grants(user_id=user1['id'], + project_id=project1['id']) + self.assertEqual(2, len(roles_ref)) + self.assertIn(role_list[4], roles_ref) + self.assertIn(role_list[5], roles_ref) + roles_ref = self.assignment_api.list_grants(group_id=group1['id'], + project_id=project1['id']) + self.assertEqual(2, len(roles_ref)) + self.assertIn(role_list[6], roles_ref) + self.assertIn(role_list[7], roles_ref) + + # Now test the alternate way of getting back lists of grants, + # where user and group roles are combined. These should match + # the above results. + combined_list = self.assignment_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEqual(4, len(combined_list)) + self.assertIn(role_list[4]['id'], combined_list) + self.assertIn(role_list[5]['id'], combined_list) + self.assertIn(role_list[6]['id'], combined_list) + self.assertIn(role_list[7]['id'], combined_list) + + combined_role_list = self.assignment_api.get_roles_for_user_and_domain( + user1['id'], domain1['id']) + self.assertEqual(4, len(combined_role_list)) + self.assertIn(role_list[0]['id'], combined_role_list) + self.assertIn(role_list[1]['id'], combined_role_list) + self.assertIn(role_list[2]['id'], combined_role_list) + self.assertIn(role_list[3]['id'], combined_role_list) + + def test_multi_group_grants_on_project_domain(self): + """Test multiple group roles for user on project and domain. + + Test Plan: + + - Create 6 roles + - Create a domain, with a project, user and two groups + - Make the user a member of both groups + - Check no roles yet exit + - Assign a role to each user and both groups on both the + project and domain + - Get a list of effective roles for the user on both the + project and domain, checking we get back the correct three + roles + + """ + role_list = [] + for _ in range(6): + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + role_list.append(role) + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + user1 = unit.new_user_ref(domain_id=domain1['id']) + user1 = self.identity_api.create_user(user1) + group1 = unit.new_group_ref(domain_id=domain1['id']) + group1 = self.identity_api.create_group(group1) + group2 = unit.new_group_ref(domain_id=domain1['id']) + group2 = self.identity_api.create_group(group2) + project1 = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project1['id'], project1) + + self.identity_api.add_user_to_group(user1['id'], + group1['id']) + self.identity_api.add_user_to_group(user1['id'], + group2['id']) + + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEqual(0, len(roles_ref)) + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=role_list[0]['id']) + self.assignment_api.create_grant(group_id=group1['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id']) + self.assignment_api.create_grant(group_id=group2['id'], + domain_id=domain1['id'], + role_id=role_list[2]['id']) + self.assignment_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role_list[3]['id']) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=project1['id'], + role_id=role_list[4]['id']) + self.assignment_api.create_grant(group_id=group2['id'], + project_id=project1['id'], + role_id=role_list[5]['id']) + + # Read by the roles, ensuring we get the correct 3 roles for + # both project and domain + combined_list = self.assignment_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEqual(3, len(combined_list)) + self.assertIn(role_list[3]['id'], combined_list) + self.assertIn(role_list[4]['id'], combined_list) + self.assertIn(role_list[5]['id'], combined_list) + + combined_role_list = self.assignment_api.get_roles_for_user_and_domain( + user1['id'], domain1['id']) + self.assertEqual(3, len(combined_role_list)) + self.assertIn(role_list[0]['id'], combined_role_list) + self.assertIn(role_list[1]['id'], combined_role_list) + self.assertIn(role_list[2]['id'], combined_role_list) + + def test_delete_role_with_user_and_group_grants(self): + role1 = unit.new_role_ref() + self.role_api.create_role(role1['id'], role1) + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + project1 = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project1['id'], project1) + user1 = unit.new_user_ref(domain_id=domain1['id']) + user1 = self.identity_api.create_user(user1) + group1 = unit.new_group_ref(domain_id=domain1['id']) + group1 = self.identity_api.create_group(group1) + self.assignment_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role1['id']) + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=role1['id']) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=project1['id'], + role_id=role1['id']) + self.assignment_api.create_grant(group_id=group1['id'], + domain_id=domain1['id'], + role_id=role1['id']) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEqual(1, len(roles_ref)) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + project_id=project1['id']) + self.assertEqual(1, len(roles_ref)) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + domain_id=domain1['id']) + self.assertEqual(1, len(roles_ref)) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + domain_id=domain1['id']) + self.assertEqual(1, len(roles_ref)) + self.role_api.delete_role(role1['id']) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEqual(0, len(roles_ref)) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + project_id=project1['id']) + self.assertEqual(0, len(roles_ref)) + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + domain_id=domain1['id']) + self.assertEqual(0, len(roles_ref)) + roles_ref = self.assignment_api.list_grants( + group_id=group1['id'], + domain_id=domain1['id']) + self.assertEqual(0, len(roles_ref)) + + def test_list_role_assignment_by_domain(self): + """Test listing of role assignment filtered by domain.""" + test_plan = { + # A domain with 3 users, 1 group, a spoiler domain and 2 roles. + 'entities': {'domains': [{'users': 3, 'groups': 1}, 1], + 'roles': 2}, + # Users 1 & 2 are in the group + 'group_memberships': [{'group': 0, 'users': [1, 2]}], + # Assign a role for user 0 and the group + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, + {'group': 0, 'role': 1, 'domain': 0}], + 'tests': [ + # List all effective assignments for domain[0]. + # Should get one direct user role and user roles for each of + # the users in the group. + {'params': {'domain': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 1, 'role': 1, 'domain': 0, + 'indirect': {'group': 0}}, + {'user': 2, 'role': 1, 'domain': 0, + 'indirect': {'group': 0}} + ]}, + # Using domain[1] should return nothing + {'params': {'domain': 1, 'effective': True}, + 'results': []}, + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_role_assignment_by_user_with_domain_group_roles(self): + """Test listing assignments by user, with group roles on a domain.""" + test_plan = { + # A domain with 3 users, 3 groups, a spoiler domain + # plus 3 roles. + 'entities': {'domains': [{'users': 3, 'groups': 3}, 1], + 'roles': 3}, + # Users 1 & 2 are in the group 0, User 1 also in group 1 + 'group_memberships': [{'group': 0, 'users': [0, 1]}, + {'group': 1, 'users': [0]}], + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, + {'group': 0, 'role': 1, 'domain': 0}, + {'group': 1, 'role': 2, 'domain': 0}, + # ...and two spoiler assignments + {'user': 1, 'role': 1, 'domain': 0}, + {'group': 2, 'role': 2, 'domain': 0}], + 'tests': [ + # List all effective assignments for user[0]. + # Should get one direct user role and a user roles for each of + # groups 0 and 1 + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'domain': 0, + 'indirect': {'group': 0}}, + {'user': 0, 'role': 2, 'domain': 0, + 'indirect': {'group': 1}} + ]}, + # Adding domain[0] as a filter should return the same data + {'params': {'user': 0, 'domain': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'domain': 0, + 'indirect': {'group': 0}}, + {'user': 0, 'role': 2, 'domain': 0, + 'indirect': {'group': 1}} + ]}, + # Using domain[1] should return nothing + {'params': {'user': 0, 'domain': 1, 'effective': True}, + 'results': []}, + # Using user[2] should return nothing + {'params': {'user': 2, 'domain': 0, 'effective': True}, + 'results': []}, + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_role_assignment_using_sourced_groups(self): + """Test listing assignments when restricted by source groups.""" + test_plan = { + # The default domain with 3 users, 3 groups, 3 projects, + # plus 3 roles. + 'entities': {'domains': {'id': CONF.identity.default_domain_id, + 'users': 3, 'groups': 3, 'projects': 3}, + 'roles': 3}, + # Users 0 & 1 are in the group 0, User 0 also in group 1 + 'group_memberships': [{'group': 0, 'users': [0, 1]}, + {'group': 1, 'users': [0]}], + # Spread the assignments around - we want to be able to show that + # if sourced by group, assignments from other sources are excluded + 'assignments': [{'user': 0, 'role': 0, 'project': 0}, + {'group': 0, 'role': 1, 'project': 1}, + {'group': 1, 'role': 2, 'project': 0}, + {'group': 1, 'role': 2, 'project': 1}, + {'user': 2, 'role': 1, 'project': 1}, + {'group': 2, 'role': 2, 'project': 2} + ], + 'tests': [ + # List all effective assignments sourced from groups 0 and 1 + {'params': {'source_from_group_ids': [0, 1], + 'effective': True}, + 'results': [{'group': 0, 'role': 1, 'project': 1}, + {'group': 1, 'role': 2, 'project': 0}, + {'group': 1, 'role': 2, 'project': 1} + ]}, + # Adding a role a filter should further restrict the entries + {'params': {'source_from_group_ids': [0, 1], 'role': 2, + 'effective': True}, + 'results': [{'group': 1, 'role': 2, 'project': 0}, + {'group': 1, 'role': 2, 'project': 1} + ]}, + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_role_assignment_using_sourced_groups_with_domains(self): + """Test listing domain assignments when restricted by source groups.""" + test_plan = { + # A domain with 3 users, 3 groups, 3 projects, a second domain, + # plus 3 roles. + 'entities': {'domains': [{'users': 3, 'groups': 3, 'projects': 3}, + 1], + 'roles': 3}, + # Users 0 & 1 are in the group 0, User 0 also in group 1 + 'group_memberships': [{'group': 0, 'users': [0, 1]}, + {'group': 1, 'users': [0]}], + # Spread the assignments around - we want to be able to show that + # if sourced by group, assignments from other sources are excluded + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, + {'group': 0, 'role': 1, 'domain': 1}, + {'group': 1, 'role': 2, 'project': 0}, + {'group': 1, 'role': 2, 'project': 1}, + {'user': 2, 'role': 1, 'project': 1}, + {'group': 2, 'role': 2, 'project': 2} + ], + 'tests': [ + # List all effective assignments sourced from groups 0 and 1 + {'params': {'source_from_group_ids': [0, 1], + 'effective': True}, + 'results': [{'group': 0, 'role': 1, 'domain': 1}, + {'group': 1, 'role': 2, 'project': 0}, + {'group': 1, 'role': 2, 'project': 1} + ]}, + # Adding a role a filter should further restrict the entries + {'params': {'source_from_group_ids': [0, 1], 'role': 1, + 'effective': True}, + 'results': [{'group': 0, 'role': 1, 'domain': 1}, + ]}, + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_role_assignment_fails_with_userid_and_source_groups(self): + """Show we trap this unsupported internal combination of params.""" + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + group = self.identity_api.create_group(group) + self.assertRaises(exception.UnexpectedError, + self.assignment_api.list_role_assignments, + effective=True, + user_id=self.user_foo['id'], + source_from_group_ids=[group['id']]) + + def test_add_user_to_project(self): + self.assignment_api.add_user_to_project(self.tenant_baz['id'], + self.user_foo['id']) + tenants = self.assignment_api.list_projects_for_user( + self.user_foo['id']) + self.assertIn(self.tenant_baz, tenants) + + def test_add_user_to_project_missing_default_role(self): + self.role_api.delete_role(CONF.member_role_id) + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + CONF.member_role_id) + self.assignment_api.add_user_to_project(self.tenant_baz['id'], + self.user_foo['id']) + tenants = ( + self.assignment_api.list_projects_for_user(self.user_foo['id'])) + self.assertIn(self.tenant_baz, tenants) + default_role = self.role_api.get_role(CONF.member_role_id) + self.assertIsNotNone(default_role) + + def test_add_user_to_project_returns_not_found(self): + self.assertRaises(exception.ProjectNotFound, + self.assignment_api.add_user_to_project, + uuid.uuid4().hex, + self.user_foo['id']) + + def test_add_user_to_project_no_user(self): + # If add_user_to_project and the user doesn't exist, then + # no error. + user_id_not_exist = uuid.uuid4().hex + self.assignment_api.add_user_to_project(self.tenant_bar['id'], + user_id_not_exist) + + def test_remove_user_from_project(self): + self.assignment_api.add_user_to_project(self.tenant_baz['id'], + self.user_foo['id']) + self.assignment_api.remove_user_from_project(self.tenant_baz['id'], + self.user_foo['id']) + tenants = self.assignment_api.list_projects_for_user( + self.user_foo['id']) + self.assertNotIn(self.tenant_baz, tenants) + + def test_remove_user_from_project_race_delete_role(self): + self.assignment_api.add_user_to_project(self.tenant_baz['id'], + self.user_foo['id']) + self.assignment_api.add_role_to_user_and_project( + tenant_id=self.tenant_baz['id'], + user_id=self.user_foo['id'], + role_id=self.role_other['id']) + + # Mock a race condition, delete a role after + # get_roles_for_user_and_project() is called in + # remove_user_from_project(). + roles = self.assignment_api.get_roles_for_user_and_project( + self.user_foo['id'], self.tenant_baz['id']) + self.role_api.delete_role(self.role_other['id']) + self.assignment_api.get_roles_for_user_and_project = mock.Mock( + return_value=roles) + self.assignment_api.remove_user_from_project(self.tenant_baz['id'], + self.user_foo['id']) + tenants = self.assignment_api.list_projects_for_user( + self.user_foo['id']) + self.assertNotIn(self.tenant_baz, tenants) + + def test_remove_user_from_project_returns_not_found(self): + self.assertRaises(exception.ProjectNotFound, + self.assignment_api.remove_user_from_project, + uuid.uuid4().hex, + self.user_foo['id']) + + self.assertRaises(exception.UserNotFound, + self.assignment_api.remove_user_from_project, + self.tenant_bar['id'], + uuid.uuid4().hex) + + self.assertRaises(exception.NotFound, + self.assignment_api.remove_user_from_project, + self.tenant_baz['id'], + self.user_foo['id']) + + def test_list_user_project_ids_returns_not_found(self): + self.assertRaises(exception.UserNotFound, + self.assignment_api.list_projects_for_user, + uuid.uuid4().hex) + + def test_delete_user_with_project_association(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = self.identity_api.create_user(user) + self.assignment_api.add_user_to_project(self.tenant_bar['id'], + user['id']) + self.identity_api.delete_user(user['id']) + self.assertRaises(exception.UserNotFound, + self.assignment_api.list_projects_for_user, + user['id']) + + def test_delete_user_with_project_roles(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = self.identity_api.create_user(user) + self.assignment_api.add_role_to_user_and_project( + user['id'], + self.tenant_bar['id'], + self.role_member['id']) + self.identity_api.delete_user(user['id']) + self.assertRaises(exception.UserNotFound, + self.assignment_api.list_projects_for_user, + user['id']) + + def test_delete_role_returns_not_found(self): + self.assertRaises(exception.RoleNotFound, + self.role_api.delete_role, + uuid.uuid4().hex) + + def test_delete_project_with_role_assignments(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project['id'], project) + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], project['id'], 'member') + self.resource_api.delete_project(project['id']) + self.assertRaises(exception.ProjectNotFound, + self.assignment_api.list_user_ids_for_project, + project['id']) + + def test_delete_role_check_role_grant(self): + role = unit.new_role_ref() + alt_role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + self.role_api.create_role(alt_role['id'], alt_role) + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], self.tenant_bar['id'], role['id']) + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], self.tenant_bar['id'], alt_role['id']) + self.role_api.delete_role(role['id']) + roles_ref = self.assignment_api.get_roles_for_user_and_project( + self.user_foo['id'], self.tenant_bar['id']) + self.assertNotIn(role['id'], roles_ref) + self.assertIn(alt_role['id'], roles_ref) + + def test_list_projects_for_user(self): + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + user1 = unit.new_user_ref(domain_id=domain['id']) + user1 = self.identity_api.create_user(user1) + user_projects = self.assignment_api.list_projects_for_user(user1['id']) + self.assertEqual(0, len(user_projects)) + self.assignment_api.create_grant(user_id=user1['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_member['id']) + self.assignment_api.create_grant(user_id=user1['id'], + project_id=self.tenant_baz['id'], + role_id=self.role_member['id']) + user_projects = self.assignment_api.list_projects_for_user(user1['id']) + self.assertEqual(2, len(user_projects)) + + def test_list_projects_for_user_with_grants(self): + # Create two groups each with a role on a different project, and + # make user1 a member of both groups. Both these new projects + # should now be included, along with any direct user grants. + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + user1 = unit.new_user_ref(domain_id=domain['id']) + user1 = self.identity_api.create_user(user1) + group1 = unit.new_group_ref(domain_id=domain['id']) + group1 = self.identity_api.create_group(group1) + group2 = unit.new_group_ref(domain_id=domain['id']) + group2 = self.identity_api.create_group(group2) + project1 = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project1['id'], project1) + project2 = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project2['id'], project2) + self.identity_api.add_user_to_group(user1['id'], group1['id']) + self.identity_api.add_user_to_group(user1['id'], group2['id']) + + # Create 3 grants, one user grant, the other two as group grants + self.assignment_api.create_grant(user_id=user1['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_member['id']) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=project1['id'], + role_id=self.role_admin['id']) + self.assignment_api.create_grant(group_id=group2['id'], + project_id=project2['id'], + role_id=self.role_admin['id']) + user_projects = self.assignment_api.list_projects_for_user(user1['id']) + self.assertEqual(3, len(user_projects)) + + def test_create_grant_no_user(self): + # If call create_grant with a user that doesn't exist, doesn't fail. + self.assignment_api.create_grant( + self.role_other['id'], + user_id=uuid.uuid4().hex, + project_id=self.tenant_bar['id']) + + def test_create_grant_no_group(self): + # If call create_grant with a group that doesn't exist, doesn't fail. + self.assignment_api.create_grant( + self.role_other['id'], + group_id=uuid.uuid4().hex, + project_id=self.tenant_bar['id']) + + def test_delete_group_removes_role_assignments(self): + # When a group is deleted any role assignments for the group are + # removed. + + MEMBER_ROLE_ID = 'member' + + def get_member_assignments(): + assignments = self.assignment_api.list_role_assignments() + return [x for x in assignments if x['role_id'] == MEMBER_ROLE_ID] + + orig_member_assignments = get_member_assignments() + + # Create a group. + new_group = unit.new_group_ref( + domain_id=CONF.identity.default_domain_id) + new_group = self.identity_api.create_group(new_group) + + # Create a project. + new_project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(new_project['id'], new_project) + + # Assign a role to the group. + self.assignment_api.create_grant( + group_id=new_group['id'], project_id=new_project['id'], + role_id=MEMBER_ROLE_ID) + + # Delete the group. + self.identity_api.delete_group(new_group['id']) + + # Check that the role assignment for the group is gone + member_assignments = get_member_assignments() + + self.assertThat(member_assignments, + matchers.Equals(orig_member_assignments)) + + def test_get_roles_for_groups_on_domain(self): + """Test retrieving group domain roles. + + Test Plan: + + - Create a domain, three groups and three roles + - Assign one an inherited and the others a non-inherited group role + to the domain + - Ensure that only the non-inherited roles are returned on the domain + + """ + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + group_list = [] + group_id_list = [] + role_list = [] + for _ in range(3): + group = unit.new_group_ref(domain_id=domain1['id']) + group = self.identity_api.create_group(group) + group_list.append(group) + group_id_list.append(group['id']) + + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + role_list.append(role) + + # Assign the roles - one is inherited + self.assignment_api.create_grant(group_id=group_list[0]['id'], + domain_id=domain1['id'], + role_id=role_list[0]['id']) + self.assignment_api.create_grant(group_id=group_list[1]['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id']) + self.assignment_api.create_grant(group_id=group_list[2]['id'], + domain_id=domain1['id'], + role_id=role_list[2]['id'], + inherited_to_projects=True) + + # Now get the effective roles for the groups on the domain project. We + # shouldn't get back the inherited role. + + role_refs = self.assignment_api.get_roles_for_groups( + group_id_list, domain_id=domain1['id']) + + self.assertThat(role_refs, matchers.HasLength(2)) + self.assertIn(role_list[0], role_refs) + self.assertIn(role_list[1], role_refs) + + def test_get_roles_for_groups_on_project(self): + """Test retrieving group project roles. + + Test Plan: + + - Create two domains, two projects, six groups and six roles + - Project1 is in Domain1, Project2 is in Domain2 + - Domain2/Project2 are spoilers + - Assign a different direct group role to each project as well + as both an inherited and non-inherited role to each domain + - Get the group roles for Project 1 - depending on whether we have + enabled inheritance, we should either get back just the direct role + or both the direct one plus the inherited domain role from Domain 1 + + """ + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + project1 = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project1['id'], project1) + project2 = unit.new_project_ref(domain_id=domain2['id']) + self.resource_api.create_project(project2['id'], project2) + group_list = [] + group_id_list = [] + role_list = [] + for _ in range(6): + group = unit.new_group_ref(domain_id=domain1['id']) + group = self.identity_api.create_group(group) + group_list.append(group) + group_id_list.append(group['id']) + + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + role_list.append(role) + + # Assign the roles - one inherited and one non-inherited on Domain1, + # plus one on Project1 + self.assignment_api.create_grant(group_id=group_list[0]['id'], + domain_id=domain1['id'], + role_id=role_list[0]['id']) + self.assignment_api.create_grant(group_id=group_list[1]['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id'], + inherited_to_projects=True) + self.assignment_api.create_grant(group_id=group_list[2]['id'], + project_id=project1['id'], + role_id=role_list[2]['id']) + + # ...and a duplicate set of spoiler assignments to Domain2/Project2 + self.assignment_api.create_grant(group_id=group_list[3]['id'], + domain_id=domain2['id'], + role_id=role_list[3]['id']) + self.assignment_api.create_grant(group_id=group_list[4]['id'], + domain_id=domain2['id'], + role_id=role_list[4]['id'], + inherited_to_projects=True) + self.assignment_api.create_grant(group_id=group_list[5]['id'], + project_id=project2['id'], + role_id=role_list[5]['id']) + + # Now get the effective roles for all groups on the Project1. With + # inheritance off, we should only get back the direct role. + + self.config_fixture.config(group='os_inherit', enabled=False) + role_refs = self.assignment_api.get_roles_for_groups( + group_id_list, project_id=project1['id']) + + self.assertThat(role_refs, matchers.HasLength(1)) + self.assertIn(role_list[2], role_refs) + + # With inheritance on, we should also get back the inherited role from + # its owning domain. + + self.config_fixture.config(group='os_inherit', enabled=True) + role_refs = self.assignment_api.get_roles_for_groups( + group_id_list, project_id=project1['id']) + + self.assertThat(role_refs, matchers.HasLength(2)) + self.assertIn(role_list[1], role_refs) + self.assertIn(role_list[2], role_refs) + + def test_list_domains_for_groups(self): + """Test retrieving domains for a list of groups. + + Test Plan: + + - Create three domains, three groups and one role + - Assign a non-inherited group role to two domains, and an inherited + group role to the third + - Ensure only the domains with non-inherited roles are returned + + """ + domain_list = [] + group_list = [] + group_id_list = [] + for _ in range(3): + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + domain_list.append(domain) + + group = unit.new_group_ref(domain_id=domain['id']) + group = self.identity_api.create_group(group) + group_list.append(group) + group_id_list.append(group['id']) + + role1 = unit.new_role_ref() + self.role_api.create_role(role1['id'], role1) + + # Assign the roles - one is inherited + self.assignment_api.create_grant(group_id=group_list[0]['id'], + domain_id=domain_list[0]['id'], + role_id=role1['id']) + self.assignment_api.create_grant(group_id=group_list[1]['id'], + domain_id=domain_list[1]['id'], + role_id=role1['id']) + self.assignment_api.create_grant(group_id=group_list[2]['id'], + domain_id=domain_list[2]['id'], + role_id=role1['id'], + inherited_to_projects=True) + + # Now list the domains that have roles for any of the 3 groups + # We shouldn't get back domain[2] since that had an inherited role. + + domain_refs = ( + self.assignment_api.list_domains_for_groups(group_id_list)) + + self.assertThat(domain_refs, matchers.HasLength(2)) + self.assertIn(domain_list[0], domain_refs) + self.assertIn(domain_list[1], domain_refs) + + def test_list_projects_for_groups(self): + """Test retrieving projects for a list of groups. + + Test Plan: + + - Create two domains, four projects, seven groups and seven roles + - Project1-3 are in Domain1, Project4 is in Domain2 + - Domain2/Project4 are spoilers + - Project1 and 2 have direct group roles, Project3 has no direct + roles but should inherit a group role from Domain1 + - Get the projects for the group roles that are assigned to Project1 + Project2 and the inherited one on Domain1. Depending on whether we + have enabled inheritance, we should either get back just the projects + with direct roles (Project 1 and 2) or also Project3 due to its + inherited role from Domain1. + + """ + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + project1 = unit.new_project_ref(domain_id=domain1['id']) + project1 = self.resource_api.create_project(project1['id'], project1) + project2 = unit.new_project_ref(domain_id=domain1['id']) + project2 = self.resource_api.create_project(project2['id'], project2) + project3 = unit.new_project_ref(domain_id=domain1['id']) + project3 = self.resource_api.create_project(project3['id'], project3) + project4 = unit.new_project_ref(domain_id=domain2['id']) + project4 = self.resource_api.create_project(project4['id'], project4) + group_list = [] + role_list = [] + for _ in range(7): + group = unit.new_group_ref(domain_id=domain1['id']) + group = self.identity_api.create_group(group) + group_list.append(group) + + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + role_list.append(role) + + # Assign the roles - one inherited and one non-inherited on Domain1, + # plus one on Project1 and Project2 + self.assignment_api.create_grant(group_id=group_list[0]['id'], + domain_id=domain1['id'], + role_id=role_list[0]['id']) + self.assignment_api.create_grant(group_id=group_list[1]['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id'], + inherited_to_projects=True) + self.assignment_api.create_grant(group_id=group_list[2]['id'], + project_id=project1['id'], + role_id=role_list[2]['id']) + self.assignment_api.create_grant(group_id=group_list[3]['id'], + project_id=project2['id'], + role_id=role_list[3]['id']) + + # ...and a few of spoiler assignments to Domain2/Project4 + self.assignment_api.create_grant(group_id=group_list[4]['id'], + domain_id=domain2['id'], + role_id=role_list[4]['id']) + self.assignment_api.create_grant(group_id=group_list[5]['id'], + domain_id=domain2['id'], + role_id=role_list[5]['id'], + inherited_to_projects=True) + self.assignment_api.create_grant(group_id=group_list[6]['id'], + project_id=project4['id'], + role_id=role_list[6]['id']) + + # Now get the projects for the groups that have roles on Project1, + # Project2 and the inherited role on Domain!. With inheritance off, + # we should only get back the projects with direct role. + + self.config_fixture.config(group='os_inherit', enabled=False) + group_id_list = [group_list[1]['id'], group_list[2]['id'], + group_list[3]['id']] + project_refs = ( + self.assignment_api.list_projects_for_groups(group_id_list)) + + self.assertThat(project_refs, matchers.HasLength(2)) + self.assertIn(project1, project_refs) + self.assertIn(project2, project_refs) + + # With inheritance on, we should also get back the Project3 due to the + # inherited role from its owning domain. + + self.config_fixture.config(group='os_inherit', enabled=True) + project_refs = ( + self.assignment_api.list_projects_for_groups(group_id_list)) + + self.assertThat(project_refs, matchers.HasLength(3)) + self.assertIn(project1, project_refs) + self.assertIn(project2, project_refs) + self.assertIn(project3, project_refs) + + def test_update_role_no_name(self): + # A user can update a role and not include the name. + + # description is picked just because it's not name. + self.role_api.update_role(self.role_member['id'], + {'description': uuid.uuid4().hex}) + # If the previous line didn't raise an exception then the test passes. + + def test_update_role_same_name(self): + # A user can update a role and set the name to be the same as it was. + + self.role_api.update_role(self.role_member['id'], + {'name': self.role_member['name']}) + # If the previous line didn't raise an exception then the test passes. + + def test_list_role_assignment_containing_names(self): + # Create Refs + new_role = unit.new_role_ref() + new_domain = self._get_domain_fixture() + new_user = unit.new_user_ref(domain_id=new_domain['id']) + new_project = unit.new_project_ref(domain_id=new_domain['id']) + new_group = unit.new_group_ref(domain_id=new_domain['id']) + # Create entities + new_role = self.role_api.create_role(new_role['id'], new_role) + new_user = self.identity_api.create_user(new_user) + new_group = self.identity_api.create_group(new_group) + self.resource_api.create_project(new_project['id'], new_project) + self.assignment_api.create_grant(user_id=new_user['id'], + project_id=new_project['id'], + role_id=new_role['id']) + self.assignment_api.create_grant(group_id=new_group['id'], + project_id=new_project['id'], + role_id=new_role['id']) + self.assignment_api.create_grant(domain_id=new_domain['id'], + user_id=new_user['id'], + role_id=new_role['id']) + # Get the created assignments with the include_names flag + _asgmt_prj = self.assignment_api.list_role_assignments( + user_id=new_user['id'], + project_id=new_project['id'], + include_names=True) + _asgmt_grp = self.assignment_api.list_role_assignments( + group_id=new_group['id'], + project_id=new_project['id'], + include_names=True) + _asgmt_dmn = self.assignment_api.list_role_assignments( + domain_id=new_domain['id'], + user_id=new_user['id'], + include_names=True) + # Make sure we can get back the correct number of assignments + self.assertThat(_asgmt_prj, matchers.HasLength(1)) + self.assertThat(_asgmt_grp, matchers.HasLength(1)) + self.assertThat(_asgmt_dmn, matchers.HasLength(1)) + # get the first assignment + first_asgmt_prj = _asgmt_prj[0] + first_asgmt_grp = _asgmt_grp[0] + first_asgmt_dmn = _asgmt_dmn[0] + # Assert the names are correct in the project response + self.assertEqual(new_project['name'], + first_asgmt_prj['project_name']) + self.assertEqual(new_project['domain_id'], + first_asgmt_prj['project_domain_id']) + self.assertEqual(new_user['name'], + first_asgmt_prj['user_name']) + self.assertEqual(new_user['domain_id'], + first_asgmt_prj['user_domain_id']) + self.assertEqual(new_role['name'], + first_asgmt_prj['role_name']) + # Assert the names are correct in the group response + self.assertEqual(new_group['name'], + first_asgmt_grp['group_name']) + self.assertEqual(new_group['domain_id'], + first_asgmt_grp['group_domain_id']) + self.assertEqual(new_project['name'], + first_asgmt_grp['project_name']) + self.assertEqual(new_project['domain_id'], + first_asgmt_grp['project_domain_id']) + self.assertEqual(new_role['name'], + first_asgmt_grp['role_name']) + # Assert the names are correct in the domain response + self.assertEqual(new_domain['name'], + first_asgmt_dmn['domain_name']) + self.assertEqual(new_user['name'], + first_asgmt_dmn['user_name']) + self.assertEqual(new_user['domain_id'], + first_asgmt_dmn['user_domain_id']) + self.assertEqual(new_role['name'], + first_asgmt_dmn['role_name']) + + def test_list_role_assignment_does_not_contain_names(self): + """Test names are not included with list role assignments. + + Scenario: + - names are NOT included by default + - names are NOT included when include_names=False + + """ + def assert_does_not_contain_names(assignment): + first_asgmt_prj = assignment[0] + self.assertNotIn('project_name', first_asgmt_prj) + self.assertNotIn('project_domain_id', first_asgmt_prj) + self.assertNotIn('user_name', first_asgmt_prj) + self.assertNotIn('user_domain_id', first_asgmt_prj) + self.assertNotIn('role_name', first_asgmt_prj) + + # Create Refs + new_role = unit.new_role_ref() + new_domain = self._get_domain_fixture() + new_user = unit.new_user_ref(domain_id=new_domain['id']) + new_project = unit.new_project_ref(domain_id=new_domain['id']) + # Create entities + new_role = self.role_api.create_role(new_role['id'], new_role) + new_user = self.identity_api.create_user(new_user) + self.resource_api.create_project(new_project['id'], new_project) + self.assignment_api.create_grant(user_id=new_user['id'], + project_id=new_project['id'], + role_id=new_role['id']) + # Get the created assignments with NO include_names flag + role_assign_without_names = self.assignment_api.list_role_assignments( + user_id=new_user['id'], + project_id=new_project['id']) + assert_does_not_contain_names(role_assign_without_names) + # Get the created assignments with include_names=False + role_assign_without_names = self.assignment_api.list_role_assignments( + user_id=new_user['id'], + project_id=new_project['id'], + include_names=False) + assert_does_not_contain_names(role_assign_without_names) + + def test_delete_user_assignments_user_same_id_as_group(self): + """Test deleting user assignments when user_id == group_id. + + In this scenario, only user assignments must be deleted (i.e. + USER_DOMAIN or USER_PROJECT). + + Test plan: + * Create a user and a group with the same ID; + * Create four roles and assign them to both user and group; + * Delete all user assignments; + * Group assignments must stay intact. + """ + # Create a common ID + common_id = uuid.uuid4().hex + # Create a project + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project = self.resource_api.create_project(project['id'], project) + # Create a user + user = unit.new_user_ref(id=common_id, + domain_id=CONF.identity.default_domain_id) + user = self.identity_api.driver.create_user(common_id, user) + self.assertEqual(common_id, user['id']) + # Create a group + group = unit.new_group_ref(id=common_id, + domain_id=CONF.identity.default_domain_id) + group = self.identity_api.driver.create_group(common_id, group) + self.assertEqual(common_id, group['id']) + # Create four roles + roles = [] + for _ in range(4): + role = unit.new_role_ref() + roles.append(self.role_api.create_role(role['id'], role)) + # Assign roles for user + self.assignment_api.driver.create_grant( + user_id=user['id'], domain_id=CONF.identity.default_domain_id, + role_id=roles[0]['id']) + self.assignment_api.driver.create_grant(user_id=user['id'], + project_id=project['id'], + role_id=roles[1]['id']) + # Assign roles for group + self.assignment_api.driver.create_grant( + group_id=group['id'], domain_id=CONF.identity.default_domain_id, + role_id=roles[2]['id']) + self.assignment_api.driver.create_grant(group_id=group['id'], + project_id=project['id'], + role_id=roles[3]['id']) + # Make sure they were assigned + user_assignments = self.assignment_api.list_role_assignments( + user_id=user['id']) + self.assertThat(user_assignments, matchers.HasLength(2)) + group_assignments = self.assignment_api.list_role_assignments( + group_id=group['id']) + self.assertThat(group_assignments, matchers.HasLength(2)) + # Delete user assignments + self.assignment_api.delete_user_assignments(user_id=user['id']) + # Assert only user assignments were deleted + user_assignments = self.assignment_api.list_role_assignments( + user_id=user['id']) + self.assertThat(user_assignments, matchers.HasLength(0)) + group_assignments = self.assignment_api.list_role_assignments( + group_id=group['id']) + self.assertThat(group_assignments, matchers.HasLength(2)) + # Make sure these remaining assignments are group-related + for assignment in group_assignments: + self.assertThat(assignment.keys(), matchers.Contains('group_id')) + + def test_delete_group_assignments_group_same_id_as_user(self): + """Test deleting group assignments when group_id == user_id. + + In this scenario, only group assignments must be deleted (i.e. + GROUP_DOMAIN or GROUP_PROJECT). + + Test plan: + * Create a group and a user with the same ID; + * Create four roles and assign them to both group and user; + * Delete all group assignments; + * User assignments must stay intact. + """ + # Create a common ID + common_id = uuid.uuid4().hex + # Create a project + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project = self.resource_api.create_project(project['id'], project) + # Create a user + user = unit.new_user_ref(id=common_id, + domain_id=CONF.identity.default_domain_id) + user = self.identity_api.driver.create_user(common_id, user) + self.assertEqual(common_id, user['id']) + # Create a group + group = unit.new_group_ref(id=common_id, + domain_id=CONF.identity.default_domain_id) + group = self.identity_api.driver.create_group(common_id, group) + self.assertEqual(common_id, group['id']) + # Create four roles + roles = [] + for _ in range(4): + role = unit.new_role_ref() + roles.append(self.role_api.create_role(role['id'], role)) + # Assign roles for user + self.assignment_api.driver.create_grant( + user_id=user['id'], domain_id=CONF.identity.default_domain_id, + role_id=roles[0]['id']) + self.assignment_api.driver.create_grant(user_id=user['id'], + project_id=project['id'], + role_id=roles[1]['id']) + # Assign roles for group + self.assignment_api.driver.create_grant( + group_id=group['id'], domain_id=CONF.identity.default_domain_id, + role_id=roles[2]['id']) + self.assignment_api.driver.create_grant(group_id=group['id'], + project_id=project['id'], + role_id=roles[3]['id']) + # Make sure they were assigned + user_assignments = self.assignment_api.list_role_assignments( + user_id=user['id']) + self.assertThat(user_assignments, matchers.HasLength(2)) + group_assignments = self.assignment_api.list_role_assignments( + group_id=group['id']) + self.assertThat(group_assignments, matchers.HasLength(2)) + # Delete group assignments + self.assignment_api.delete_group_assignments(group_id=group['id']) + # Assert only group assignments were deleted + group_assignments = self.assignment_api.list_role_assignments( + group_id=group['id']) + self.assertThat(group_assignments, matchers.HasLength(0)) + user_assignments = self.assignment_api.list_role_assignments( + user_id=user['id']) + self.assertThat(user_assignments, matchers.HasLength(2)) + # Make sure these remaining assignments are user-related + for assignment in group_assignments: + self.assertThat(assignment.keys(), matchers.Contains('user_id')) + + def test_remove_foreign_assignments_when_deleting_a_domain(self): + # A user and a group are in default domain and have assigned a role on + # two new domains. This test makes sure that when one of the new + # domains is deleted, the role assignments for the user and the group + # from the default domain are deleted only on that domain. + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + group = self.identity_api.create_group(group) + + role = unit.new_role_ref() + role = self.role_api.create_role(role['id'], role) + + new_domains = [unit.new_domain_ref(), unit.new_domain_ref()] + for new_domain in new_domains: + self.resource_api.create_domain(new_domain['id'], new_domain) + + self.assignment_api.create_grant(group_id=group['id'], + domain_id=new_domain['id'], + role_id=role['id']) + self.assignment_api.create_grant(user_id=self.user_two['id'], + domain_id=new_domain['id'], + role_id=role['id']) + + # Check there are 4 role assignments for that role + role_assignments = self.assignment_api.list_role_assignments( + role_id=role['id']) + self.assertThat(role_assignments, matchers.HasLength(4)) + + # Delete first new domain and check only 2 assignments were left + self.resource_api.update_domain(new_domains[0]['id'], + {'enabled': False}) + self.resource_api.delete_domain(new_domains[0]['id']) + + role_assignments = self.assignment_api.list_role_assignments( + role_id=role['id']) + self.assertThat(role_assignments, matchers.HasLength(2)) + + # Delete second new domain and check no assignments were left + self.resource_api.update_domain(new_domains[1]['id'], + {'enabled': False}) + self.resource_api.delete_domain(new_domains[1]['id']) + + role_assignments = self.assignment_api.list_role_assignments( + role_id=role['id']) + self.assertEqual([], role_assignments) + + +class InheritanceTests(AssignmentTestHelperMixin): + + def test_role_assignments_user_domain_to_project_inheritance(self): + test_plan = { + 'entities': {'domains': {'users': 2, 'projects': 1}, + 'roles': 3}, + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'user': 0, 'role': 2, 'domain': 0, + 'inherited_to_projects': True}, + {'user': 1, 'role': 1, 'project': 0}], + 'tests': [ + # List all direct assignments for user[0] + {'params': {'user': 0}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'user': 0, 'role': 2, 'domain': 0, + 'inherited_to_projects': 'projects'}]}, + # Now the effective ones - so the domain role should turn into + # a project role + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'user': 0, 'role': 2, 'project': 0, + 'indirect': {'domain': 0}}]}, + # Narrow down to effective roles for user[0] and project[0] + {'params': {'user': 0, 'project': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 1, 'project': 0}, + {'user': 0, 'role': 2, 'project': 0, + 'indirect': {'domain': 0}}]} + ] + } + self.config_fixture.config(group='os_inherit', enabled=True) + self.execute_assignment_plan(test_plan) + + def test_inherited_role_assignments_excluded_if_os_inherit_false(self): + test_plan = { + 'entities': {'domains': {'users': 2, 'groups': 1, 'projects': 1}, + 'roles': 4}, + 'group_memberships': [{'group': 0, 'users': [0]}], + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'user': 0, 'role': 2, 'domain': 0, + 'inherited_to_projects': True}, + {'user': 1, 'role': 1, 'project': 0}, + {'group': 0, 'role': 3, 'project': 0}], + 'tests': [ + # List all direct assignments for user[0], since os-inherit is + # disabled, we should not see the inherited role + {'params': {'user': 0}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}]}, + # Same in effective mode - inherited roles should not be + # included or expanded...but the group role should now + # turn up as a user role, since group expansion is not + # part of os-inherit. + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'project': 0}, + {'user': 0, 'role': 3, 'project': 0, + 'indirect': {'group': 0}}]}, + ] + } + self.config_fixture.config(group='os_inherit', enabled=False) + self.execute_assignment_plan(test_plan) + + def _test_crud_inherited_and_direct_assignment(self, **kwargs): + """Tests inherited and direct assignments for the actor and target + + Ensure it is possible to create both inherited and direct role + assignments for the same actor on the same target. The actor and the + target are specified in the kwargs as ('user_id' or 'group_id') and + ('project_id' or 'domain_id'), respectively. + + """ + self.config_fixture.config(group='os_inherit', enabled=True) + # Create a new role to avoid assignments loaded from default fixtures + role = unit.new_role_ref() + role = self.role_api.create_role(role['id'], role) + + # Define the common assignment entity + assignment_entity = {'role_id': role['id']} + assignment_entity.update(kwargs) + + # Define assignments under test + direct_assignment_entity = assignment_entity.copy() + inherited_assignment_entity = assignment_entity.copy() + inherited_assignment_entity['inherited_to_projects'] = 'projects' + + # Create direct assignment and check grants + self.assignment_api.create_grant(inherited_to_projects=False, + **assignment_entity) + + grants = self.assignment_api.list_role_assignments(role_id=role['id']) + self.assertThat(grants, matchers.HasLength(1)) + self.assertIn(direct_assignment_entity, grants) + + # Now add inherited assignment and check grants + self.assignment_api.create_grant(inherited_to_projects=True, + **assignment_entity) + + grants = self.assignment_api.list_role_assignments(role_id=role['id']) + self.assertThat(grants, matchers.HasLength(2)) + self.assertIn(direct_assignment_entity, grants) + self.assertIn(inherited_assignment_entity, grants) + + # Delete both and check grants + self.assignment_api.delete_grant(inherited_to_projects=False, + **assignment_entity) + self.assignment_api.delete_grant(inherited_to_projects=True, + **assignment_entity) + + grants = self.assignment_api.list_role_assignments(role_id=role['id']) + self.assertEqual([], grants) + + def test_crud_inherited_and_direct_assignment_for_user_on_domain(self): + self._test_crud_inherited_and_direct_assignment( + user_id=self.user_foo['id'], + domain_id=CONF.identity.default_domain_id) + + def test_crud_inherited_and_direct_assignment_for_group_on_domain(self): + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + group = self.identity_api.create_group(group) + + self._test_crud_inherited_and_direct_assignment( + group_id=group['id'], domain_id=CONF.identity.default_domain_id) + + def test_crud_inherited_and_direct_assignment_for_user_on_project(self): + self._test_crud_inherited_and_direct_assignment( + user_id=self.user_foo['id'], project_id=self.tenant_baz['id']) + + def test_crud_inherited_and_direct_assignment_for_group_on_project(self): + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + group = self.identity_api.create_group(group) + + self._test_crud_inherited_and_direct_assignment( + group_id=group['id'], project_id=self.tenant_baz['id']) + + def test_inherited_role_grants_for_user(self): + """Test inherited user roles. + + Test Plan: + + - Enable OS-INHERIT extension + - Create 3 roles + - Create a domain, with a project and a user + - Check no roles yet exit + - Assign a direct user role to the project and a (non-inherited) + user role to the domain + - Get a list of effective roles - should only get the one direct role + - Now add an inherited user role to the domain + - Get a list of effective roles - should have two roles, one + direct and one by virtue of the inherited user role + - Also get effective roles for the domain - the role marked as + inherited should not show up + + """ + self.config_fixture.config(group='os_inherit', enabled=True) + role_list = [] + for _ in range(3): + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + role_list.append(role) + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + user1 = unit.new_user_ref(domain_id=domain1['id']) + user1 = self.identity_api.create_user(user1) + project1 = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project1['id'], project1) + + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEqual(0, len(roles_ref)) + + # Create the first two roles - the domain one is not inherited + self.assignment_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role_list[0]['id']) + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id']) + + # Now get the effective roles for the user and project, this + # should only include the direct role assignment on the project + combined_list = self.assignment_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEqual(1, len(combined_list)) + self.assertIn(role_list[0]['id'], combined_list) + + # Now add an inherited role on the domain + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain1['id'], + role_id=role_list[2]['id'], + inherited_to_projects=True) + + # Now get the effective roles for the user and project again, this + # should now include the inherited role on the domain + combined_list = self.assignment_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEqual(2, len(combined_list)) + self.assertIn(role_list[0]['id'], combined_list) + self.assertIn(role_list[2]['id'], combined_list) + + # Finally, check that the inherited role does not appear as a valid + # directly assigned role on the domain itself + combined_role_list = self.assignment_api.get_roles_for_user_and_domain( + user1['id'], domain1['id']) + self.assertEqual(1, len(combined_role_list)) + self.assertIn(role_list[1]['id'], combined_role_list) + + # TODO(henry-nash): The test above uses get_roles_for_user_and_project + # and get_roles_for_user_and_domain, which will, in a subsequent patch, + # be re-implemented to simply call list_role_assignments (see blueprint + # remove-role-metadata). + # + # The test plan below therefore mirrors this test, to ensure that + # list_role_assignments works the same. Once get_roles_for_user_and + # project/domain have been re-implemented then the manual tests above + # can be refactored to simply ensure it gives the same answers. + test_plan = { + # A domain with a user & project, plus 3 roles. + 'entities': {'domains': {'users': 1, 'projects': 1}, + 'roles': 3}, + 'assignments': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 1, 'domain': 0}, + {'user': 0, 'role': 2, 'domain': 0, + 'inherited_to_projects': True}], + 'tests': [ + # List all effective assignments for user[0] on project[0]. + # Should get one direct role and one inherited role. + {'params': {'user': 0, 'project': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 2, 'project': 0, + 'indirect': {'domain': 0}}]}, + # Ensure effective mode on the domain does not list the + # inherited role on that domain + {'params': {'user': 0, 'domain': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 1, 'domain': 0}]}, + # Ensure non-inherited mode also only returns the non-inherited + # role on the domain + {'params': {'user': 0, 'domain': 0, 'inherited': False}, + 'results': [{'user': 0, 'role': 1, 'domain': 0}]}, + ] + } + self.execute_assignment_plan(test_plan) + + def test_inherited_role_grants_for_group(self): + """Test inherited group roles. + + Test Plan: + + - Enable OS-INHERIT extension + - Create 4 roles + - Create a domain, with a project, user and two groups + - Make the user a member of both groups + - Check no roles yet exit + - Assign a direct user role to the project and a (non-inherited) + group role on the domain + - Get a list of effective roles - should only get the one direct role + - Now add two inherited group roles to the domain + - Get a list of effective roles - should have three roles, one + direct and two by virtue of inherited group roles + + """ + self.config_fixture.config(group='os_inherit', enabled=True) + role_list = [] + for _ in range(4): + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + role_list.append(role) + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + user1 = unit.new_user_ref(domain_id=domain1['id']) + user1 = self.identity_api.create_user(user1) + group1 = unit.new_group_ref(domain_id=domain1['id']) + group1 = self.identity_api.create_group(group1) + group2 = unit.new_group_ref(domain_id=domain1['id']) + group2 = self.identity_api.create_group(group2) + project1 = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project1['id'], project1) + + self.identity_api.add_user_to_group(user1['id'], + group1['id']) + self.identity_api.add_user_to_group(user1['id'], + group2['id']) + + roles_ref = self.assignment_api.list_grants( + user_id=user1['id'], + project_id=project1['id']) + self.assertEqual(0, len(roles_ref)) + + # Create two roles - the domain one is not inherited + self.assignment_api.create_grant(user_id=user1['id'], + project_id=project1['id'], + role_id=role_list[0]['id']) + self.assignment_api.create_grant(group_id=group1['id'], + domain_id=domain1['id'], + role_id=role_list[1]['id']) + + # Now get the effective roles for the user and project, this + # should only include the direct role assignment on the project + combined_list = self.assignment_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEqual(1, len(combined_list)) + self.assertIn(role_list[0]['id'], combined_list) + + # Now add to more group roles, both inherited, to the domain + self.assignment_api.create_grant(group_id=group2['id'], + domain_id=domain1['id'], + role_id=role_list[2]['id'], + inherited_to_projects=True) + self.assignment_api.create_grant(group_id=group2['id'], + domain_id=domain1['id'], + role_id=role_list[3]['id'], + inherited_to_projects=True) + + # Now get the effective roles for the user and project again, this + # should now include the inherited roles on the domain + combined_list = self.assignment_api.get_roles_for_user_and_project( + user1['id'], project1['id']) + self.assertEqual(3, len(combined_list)) + self.assertIn(role_list[0]['id'], combined_list) + self.assertIn(role_list[2]['id'], combined_list) + self.assertIn(role_list[3]['id'], combined_list) + + # TODO(henry-nash): The test above uses get_roles_for_user_and_project + # which will, in a subsequent patch, be re-implemented to simply call + # list_role_assignments (see blueprint remove-role-metadata). + # + # The test plan below therefore mirrors this test, to ensure that + # list_role_assignments works the same. Once + # get_roles_for_user_and_project has been re-implemented then the + # manual tests above can be refactored to simply ensure it gives + # the same answers. + test_plan = { + # A domain with a user and project, 2 groups, plus 4 roles. + 'entities': {'domains': {'users': 1, 'projects': 1, 'groups': 2}, + 'roles': 4}, + 'group_memberships': [{'group': 0, 'users': [0]}, + {'group': 1, 'users': [0]}], + 'assignments': [{'user': 0, 'role': 0, 'project': 0}, + {'group': 0, 'role': 1, 'domain': 0}, + {'group': 1, 'role': 2, 'domain': 0, + 'inherited_to_projects': True}, + {'group': 1, 'role': 3, 'domain': 0, + 'inherited_to_projects': True}], + 'tests': [ + # List all effective assignments for user[0] on project[0]. + # Should get one direct role and both inherited roles, but + # not the direct one on domain[0], even though user[0] is + # in group[0]. + {'params': {'user': 0, 'project': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 2, 'project': 0, + 'indirect': {'domain': 0, 'group': 1}}, + {'user': 0, 'role': 3, 'project': 0, + 'indirect': {'domain': 0, 'group': 1}}]} + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_projects_for_user_with_inherited_grants(self): + """Test inherited user roles. + + Test Plan: + + - Enable OS-INHERIT extension + - Create a domain, with two projects and a user + - Assign an inherited user role on the domain, as well as a direct + user role to a separate project in a different domain + - Get a list of projects for user, should return all three projects + + """ + self.config_fixture.config(group='os_inherit', enabled=True) + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + user1 = unit.new_user_ref(domain_id=domain['id']) + user1 = self.identity_api.create_user(user1) + project1 = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project1['id'], project1) + project2 = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project2['id'], project2) + + # Create 2 grants, one on a project and one inherited grant + # on the domain + self.assignment_api.create_grant(user_id=user1['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_member['id']) + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain['id'], + role_id=self.role_admin['id'], + inherited_to_projects=True) + # Should get back all three projects, one by virtue of the direct + # grant, plus both projects in the domain + user_projects = self.assignment_api.list_projects_for_user(user1['id']) + self.assertEqual(3, len(user_projects)) + + # TODO(henry-nash): The test above uses list_projects_for_user + # which may, in a subsequent patch, be re-implemented to call + # list_role_assignments and then report only the distinct projects. + # + # The test plan below therefore mirrors this test, to ensure that + # list_role_assignments works the same. Once list_projects_for_user + # has been re-implemented then the manual tests above can be + # refactored. + test_plan = { + # A domain with 1 project, plus a second domain with 2 projects, + # as well as a user. Also, create 2 roles. + 'entities': {'domains': [{'projects': 1}, + {'users': 1, 'projects': 2}], + 'roles': 2}, + 'assignments': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 1, 'domain': 1, + 'inherited_to_projects': True}], + 'tests': [ + # List all effective assignments for user[0] + # Should get one direct role plus one inherited role for each + # project in domain + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 1, 'project': 1, + 'indirect': {'domain': 1}}, + {'user': 0, 'role': 1, 'project': 2, + 'indirect': {'domain': 1}}]} + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_projects_for_user_with_inherited_user_project_grants(self): + """Test inherited role assignments for users on nested projects. + + Test Plan: + + - Enable OS-INHERIT extension + - Create a hierarchy of projects with one root and one leaf project + - Assign an inherited user role on root project + - Assign a non-inherited user role on root project + - Get a list of projects for user, should return both projects + - Disable OS-INHERIT extension + - Get a list of projects for user, should return only root project + + """ + # Enable OS-INHERIT extension + self.config_fixture.config(group='os_inherit', enabled=True) + root_project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + root_project = self.resource_api.create_project(root_project['id'], + root_project) + leaf_project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, + parent_id=root_project['id']) + leaf_project = self.resource_api.create_project(leaf_project['id'], + leaf_project) + + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = self.identity_api.create_user(user) + + # Grant inherited user role + self.assignment_api.create_grant(user_id=user['id'], + project_id=root_project['id'], + role_id=self.role_admin['id'], + inherited_to_projects=True) + # Grant non-inherited user role + self.assignment_api.create_grant(user_id=user['id'], + project_id=root_project['id'], + role_id=self.role_member['id']) + # Should get back both projects: because the direct role assignment for + # the root project and inherited role assignment for leaf project + user_projects = self.assignment_api.list_projects_for_user(user['id']) + self.assertEqual(2, len(user_projects)) + self.assertIn(root_project, user_projects) + self.assertIn(leaf_project, user_projects) + + # Disable OS-INHERIT extension + self.config_fixture.config(group='os_inherit', enabled=False) + # Should get back just root project - due the direct role assignment + user_projects = self.assignment_api.list_projects_for_user(user['id']) + self.assertEqual(1, len(user_projects)) + self.assertIn(root_project, user_projects) + + # TODO(henry-nash): The test above uses list_projects_for_user + # which may, in a subsequent patch, be re-implemented to call + # list_role_assignments and then report only the distinct projects. + # + # The test plan below therefore mirrors this test, to ensure that + # list_role_assignments works the same. Once list_projects_for_user + # has been re-implemented then the manual tests above can be + # refactored. + test_plan = { + # A domain with a project and sub-project, plus a user. + # Also, create 2 roles. + 'entities': { + 'domains': {'id': CONF.identity.default_domain_id, 'users': 1, + 'projects': {'project': 1}}, + 'roles': 2}, + # A direct role and an inherited role on the parent + 'assignments': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 1, 'project': 0, + 'inherited_to_projects': True}], + 'tests': [ + # List all effective assignments for user[0] - should get back + # one direct role plus one inherited role. + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 1, 'project': 1, + 'indirect': {'project': 0}}]} + ] + } + + test_plan_with_os_inherit_disabled = { + 'tests': [ + # List all effective assignments for user[0] - should only get + # back the one direct role. + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0}]} + ] + } + self.config_fixture.config(group='os_inherit', enabled=True) + test_data = self.execute_assignment_plan(test_plan) + self.config_fixture.config(group='os_inherit', enabled=False) + # Pass the existing test data in to allow execution of 2nd test plan + self.execute_assignment_cases( + test_plan_with_os_inherit_disabled, test_data) + + def test_list_projects_for_user_with_inherited_group_grants(self): + """Test inherited group roles. + + Test Plan: + + - Enable OS-INHERIT extension + - Create two domains, each with two projects + - Create a user and group + - Make the user a member of the group + - Assign a user role two projects, an inherited + group role to one domain and an inherited regular role on + the other domain + - Get a list of projects for user, should return both pairs of projects + from the domain, plus the one separate project + + """ + self.config_fixture.config(group='os_inherit', enabled=True) + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + project1 = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project1['id'], project1) + project2 = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project2['id'], project2) + project3 = unit.new_project_ref(domain_id=domain2['id']) + self.resource_api.create_project(project3['id'], project3) + project4 = unit.new_project_ref(domain_id=domain2['id']) + self.resource_api.create_project(project4['id'], project4) + user1 = unit.new_user_ref(domain_id=domain['id']) + user1 = self.identity_api.create_user(user1) + group1 = unit.new_group_ref(domain_id=domain['id']) + group1 = self.identity_api.create_group(group1) + self.identity_api.add_user_to_group(user1['id'], group1['id']) + + # Create 4 grants: + # - one user grant on a project in domain2 + # - one user grant on a project in the default domain + # - one inherited user grant on domain + # - one inherited group grant on domain2 + self.assignment_api.create_grant(user_id=user1['id'], + project_id=project3['id'], + role_id=self.role_member['id']) + self.assignment_api.create_grant(user_id=user1['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_member['id']) + self.assignment_api.create_grant(user_id=user1['id'], + domain_id=domain['id'], + role_id=self.role_admin['id'], + inherited_to_projects=True) + self.assignment_api.create_grant(group_id=group1['id'], + domain_id=domain2['id'], + role_id=self.role_admin['id'], + inherited_to_projects=True) + # Should get back all five projects, but without a duplicate for + # project3 (since it has both a direct user role and an inherited role) + user_projects = self.assignment_api.list_projects_for_user(user1['id']) + self.assertEqual(5, len(user_projects)) + + # TODO(henry-nash): The test above uses list_projects_for_user + # which may, in a subsequent patch, be re-implemented to call + # list_role_assignments and then report only the distinct projects. + # + # The test plan below therefore mirrors this test, to ensure that + # list_role_assignments works the same. Once list_projects_for_user + # has been re-implemented then the manual tests above can be + # refactored. + test_plan = { + # A domain with a 1 project, plus a second domain with 2 projects, + # as well as a user & group and a 3rd domain with 2 projects. + # Also, created 2 roles. + 'entities': {'domains': [{'projects': 1}, + {'users': 1, 'groups': 1, 'projects': 2}, + {'projects': 2}], + 'roles': 2}, + 'group_memberships': [{'group': 0, 'users': [0]}], + 'assignments': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 0, 'project': 3}, + {'user': 0, 'role': 1, 'domain': 1, + 'inherited_to_projects': True}, + {'user': 0, 'role': 1, 'domain': 2, + 'inherited_to_projects': True}], + 'tests': [ + # List all effective assignments for user[0] + # Should get back both direct roles plus roles on both projects + # from each domain. Duplicates should not be filtered out. + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 3}, + {'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 1, 'project': 1, + 'indirect': {'domain': 1}}, + {'user': 0, 'role': 1, 'project': 2, + 'indirect': {'domain': 1}}, + {'user': 0, 'role': 1, 'project': 3, + 'indirect': {'domain': 2}}, + {'user': 0, 'role': 1, 'project': 4, + 'indirect': {'domain': 2}}]} + ] + } + self.execute_assignment_plan(test_plan) + + def test_list_projects_for_user_with_inherited_group_project_grants(self): + """Test inherited role assignments for groups on nested projects. + + Test Plan: + + - Enable OS-INHERIT extension + - Create a hierarchy of projects with one root and one leaf project + - Assign an inherited group role on root project + - Assign a non-inherited group role on root project + - Get a list of projects for user, should return both projects + - Disable OS-INHERIT extension + - Get a list of projects for user, should return only root project + + """ + self.config_fixture.config(group='os_inherit', enabled=True) + root_project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + root_project = self.resource_api.create_project(root_project['id'], + root_project) + leaf_project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, + parent_id=root_project['id']) + leaf_project = self.resource_api.create_project(leaf_project['id'], + leaf_project) + + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = self.identity_api.create_user(user) + + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + group = self.identity_api.create_group(group) + self.identity_api.add_user_to_group(user['id'], group['id']) + + # Grant inherited group role + self.assignment_api.create_grant(group_id=group['id'], + project_id=root_project['id'], + role_id=self.role_admin['id'], + inherited_to_projects=True) + # Grant non-inherited group role + self.assignment_api.create_grant(group_id=group['id'], + project_id=root_project['id'], + role_id=self.role_member['id']) + # Should get back both projects: because the direct role assignment for + # the root project and inherited role assignment for leaf project + user_projects = self.assignment_api.list_projects_for_user(user['id']) + self.assertEqual(2, len(user_projects)) + self.assertIn(root_project, user_projects) + self.assertIn(leaf_project, user_projects) + + # Disable OS-INHERIT extension + self.config_fixture.config(group='os_inherit', enabled=False) + # Should get back just root project - due the direct role assignment + user_projects = self.assignment_api.list_projects_for_user(user['id']) + self.assertEqual(1, len(user_projects)) + self.assertIn(root_project, user_projects) + + # TODO(henry-nash): The test above uses list_projects_for_user + # which may, in a subsequent patch, be re-implemented to call + # list_role_assignments and then report only the distinct projects. + # + # The test plan below therefore mirrors this test, to ensure that + # list_role_assignments works the same. Once list_projects_for_user + # has been re-implemented then the manual tests above can be + # refactored. + test_plan = { + # A domain with a project ans sub-project, plus a user. + # Also, create 2 roles. + 'entities': { + 'domains': {'id': CONF.identity.default_domain_id, 'users': 1, + 'groups': 1, + 'projects': {'project': 1}}, + 'roles': 2}, + 'group_memberships': [{'group': 0, 'users': [0]}], + # A direct role and an inherited role on the parent + 'assignments': [{'group': 0, 'role': 0, 'project': 0}, + {'group': 0, 'role': 1, 'project': 0, + 'inherited_to_projects': True}], + 'tests': [ + # List all effective assignments for user[0] - should get back + # one direct role plus one inherited role. + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0, + 'indirect': {'group': 0}}, + {'user': 0, 'role': 1, 'project': 1, + 'indirect': {'group': 0, 'project': 0}}]} + ] + } + + test_plan_with_os_inherit_disabled = { + 'tests': [ + # List all effective assignments for user[0] - should only get + # back the one direct role. + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0, + 'indirect': {'group': 0}}]} + ] + } + self.config_fixture.config(group='os_inherit', enabled=True) + test_data = self.execute_assignment_plan(test_plan) + self.config_fixture.config(group='os_inherit', enabled=False) + # Pass the existing test data in to allow execution of 2nd test plan + self.execute_assignment_cases( + test_plan_with_os_inherit_disabled, test_data) + + def test_list_assignments_for_tree(self): + """Test we correctly list direct assignments for a tree""" + # Enable OS-INHERIT extension + self.config_fixture.config(group='os_inherit', enabled=True) + + test_plan = { + # Create a domain with a project hierarchy 3 levels deep: + # + # project 0 + # ____________|____________ + # | | + # project 1 project 4 + # ______|_____ ______|_____ + # | | | | + # project 2 project 3 project 5 project 6 + # + # Also, create 1 user and 4 roles. + 'entities': { + 'domains': { + 'projects': {'project': [{'project': 2}, + {'project': 2}]}, + 'users': 1}, + 'roles': 4}, + 'assignments': [ + # Direct assignment to projects 1 and 2 + {'user': 0, 'role': 0, 'project': 1}, + {'user': 0, 'role': 1, 'project': 2}, + # Also an inherited assignment on project 1 + {'user': 0, 'role': 2, 'project': 1, + 'inherited_to_projects': True}, + # ...and two spoiler assignments, one to the root and one + # to project 4 + {'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 3, 'project': 4}], + 'tests': [ + # List all assignments for project 1 and its subtree. + {'params': {'project': 1, 'include_subtree': True}, + 'results': [ + # Only the actual assignments should be returned, no + # expansion of inherited assignments + {'user': 0, 'role': 0, 'project': 1}, + {'user': 0, 'role': 1, 'project': 2}, + {'user': 0, 'role': 2, 'project': 1, + 'inherited_to_projects': 'projects'}]} + ] + } + + self.execute_assignment_plan(test_plan) + + def test_list_effective_assignments_for_tree(self): + """Test we correctly list effective assignments for a tree""" + # Enable OS-INHERIT extension + self.config_fixture.config(group='os_inherit', enabled=True) + + test_plan = { + # Create a domain with a project hierarchy 3 levels deep: + # + # project 0 + # ____________|____________ + # | | + # project 1 project 4 + # ______|_____ ______|_____ + # | | | | + # project 2 project 3 project 5 project 6 + # + # Also, create 1 user and 4 roles. + 'entities': { + 'domains': { + 'projects': {'project': [{'project': 2}, + {'project': 2}]}, + 'users': 1}, + 'roles': 4}, + 'assignments': [ + # An inherited assignment on project 1 + {'user': 0, 'role': 1, 'project': 1, + 'inherited_to_projects': True}, + # A direct assignment to project 2 + {'user': 0, 'role': 2, 'project': 2}, + # ...and two spoiler assignments, one to the root and one + # to project 4 + {'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 3, 'project': 4}], + 'tests': [ + # List all effective assignments for project 1 and its subtree. + {'params': {'project': 1, 'effective': True, + 'include_subtree': True}, + 'results': [ + # The inherited assignment on project 1 should appear only + # on its children + {'user': 0, 'role': 1, 'project': 2, + 'indirect': {'project': 1}}, + {'user': 0, 'role': 1, 'project': 3, + 'indirect': {'project': 1}}, + # And finally the direct assignment on project 2 + {'user': 0, 'role': 2, 'project': 2}]} + ] + } + + self.execute_assignment_plan(test_plan) + + def test_list_effective_assignments_for_tree_with_mixed_assignments(self): + """Test that we correctly combine assignments for a tree. + + In this test we want to ensure that when asking for a list of + assignments in a subtree, any assignments inherited from above the + subtree are correctly combined with any assignments within the subtree + itself. + + """ + # Enable OS-INHERIT extension + self.config_fixture.config(group='os_inherit', enabled=True) + + test_plan = { + # Create a domain with a project hierarchy 3 levels deep: + # + # project 0 + # ____________|____________ + # | | + # project 1 project 4 + # ______|_____ ______|_____ + # | | | | + # project 2 project 3 project 5 project 6 + # + # Also, create 2 users, 1 group and 4 roles. + 'entities': { + 'domains': { + 'projects': {'project': [{'project': 2}, + {'project': 2}]}, + 'users': 2, 'groups': 1}, + 'roles': 4}, + # Both users are part of the same group + 'group_memberships': [{'group': 0, 'users': [0, 1]}], + # We are going to ask for listing of assignment on project 1 and + # it's subtree. So first we'll add two inherited assignments above + # this (one user and one for a group that contains this user). + 'assignments': [{'user': 0, 'role': 0, 'project': 0, + 'inherited_to_projects': True}, + {'group': 0, 'role': 1, 'project': 0, + 'inherited_to_projects': True}, + # Now an inherited assignment on project 1 itself, + # which should ONLY show up on its children + {'user': 0, 'role': 2, 'project': 1, + 'inherited_to_projects': True}, + # ...and a direct assignment on one of those + # children + {'user': 0, 'role': 3, 'project': 2}, + # The rest are spoiler assignments + {'user': 0, 'role': 2, 'project': 5}, + {'user': 0, 'role': 3, 'project': 4}], + 'tests': [ + # List all effective assignments for project 1 and its subtree. + {'params': {'project': 1, 'user': 0, 'effective': True, + 'include_subtree': True}, + 'results': [ + # First, we should see the inherited user assignment from + # project 0 on all projects in the subtree + {'user': 0, 'role': 0, 'project': 1, + 'indirect': {'project': 0}}, + {'user': 0, 'role': 0, 'project': 2, + 'indirect': {'project': 0}}, + {'user': 0, 'role': 0, 'project': 3, + 'indirect': {'project': 0}}, + # Also the inherited group assignment from project 0 on + # the subtree + {'user': 0, 'role': 1, 'project': 1, + 'indirect': {'project': 0, 'group': 0}}, + {'user': 0, 'role': 1, 'project': 2, + 'indirect': {'project': 0, 'group': 0}}, + {'user': 0, 'role': 1, 'project': 3, + 'indirect': {'project': 0, 'group': 0}}, + # The inherited assignment on project 1 should appear only + # on its children + {'user': 0, 'role': 2, 'project': 2, + 'indirect': {'project': 1}}, + {'user': 0, 'role': 2, 'project': 3, + 'indirect': {'project': 1}}, + # And finally the direct assignment on project 2 + {'user': 0, 'role': 3, 'project': 2}]} + ] + } + + self.execute_assignment_plan(test_plan) + + def test_list_effective_assignments_for_tree_with_domain_assignments(self): + """Test we correctly honor domain inherited assignments on the tree""" + # Enable OS-INHERIT extension + self.config_fixture.config(group='os_inherit', enabled=True) + + test_plan = { + # Create a domain with a project hierarchy 3 levels deep: + # + # project 0 + # ____________|____________ + # | | + # project 1 project 4 + # ______|_____ ______|_____ + # | | | | + # project 2 project 3 project 5 project 6 + # + # Also, create 1 user and 4 roles. + 'entities': { + 'domains': { + 'projects': {'project': [{'project': 2}, + {'project': 2}]}, + 'users': 1}, + 'roles': 4}, + 'assignments': [ + # An inherited assignment on the domain (which should be + # applied to all the projects) + {'user': 0, 'role': 1, 'domain': 0, + 'inherited_to_projects': True}, + # A direct assignment to project 2 + {'user': 0, 'role': 2, 'project': 2}, + # ...and two spoiler assignments, one to the root and one + # to project 4 + {'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 3, 'project': 4}], + 'tests': [ + # List all effective assignments for project 1 and its subtree. + {'params': {'project': 1, 'effective': True, + 'include_subtree': True}, + 'results': [ + # The inherited assignment from the domain should appear + # only on the part of the subtree we are interested in + {'user': 0, 'role': 1, 'project': 1, + 'indirect': {'domain': 0}}, + {'user': 0, 'role': 1, 'project': 2, + 'indirect': {'domain': 0}}, + {'user': 0, 'role': 1, 'project': 3, + 'indirect': {'domain': 0}}, + # And finally the direct assignment on project 2 + {'user': 0, 'role': 2, 'project': 2}]} + ] + } + + self.execute_assignment_plan(test_plan) + + def test_list_user_ids_for_project_with_inheritance(self): + test_plan = { + # A domain with a project and sub-project, plus four users, + # two groups, as well as 4 roles. + 'entities': { + 'domains': {'id': CONF.identity.default_domain_id, 'users': 4, + 'groups': 2, + 'projects': {'project': 1}}, + 'roles': 4}, + # Each group has a unique user member + 'group_memberships': [{'group': 0, 'users': [1]}, + {'group': 1, 'users': [3]}], + # Set up assignments so that there should end up with four + # effective assignments on project 1 - one direct, one due to + # group membership and one user assignment inherited from the + # parent and one group assignment inhertied from the parent. + 'assignments': [{'user': 0, 'role': 0, 'project': 1}, + {'group': 0, 'role': 1, 'project': 1}, + {'user': 2, 'role': 2, 'project': 0, + 'inherited_to_projects': True}, + {'group': 1, 'role': 3, 'project': 0, + 'inherited_to_projects': True}], + } + # Use assignment plan helper to create all the entities and + # assignments - then we'll run our own tests using the data + test_data = self.execute_assignment_plan(test_plan) + self.config_fixture.config(group='os_inherit', enabled=True) + user_ids = self.assignment_api.list_user_ids_for_project( + test_data['projects'][1]['id']) + self.assertThat(user_ids, matchers.HasLength(4)) + for x in range(0, 4): + self.assertIn(test_data['users'][x]['id'], user_ids) + + def test_list_role_assignment_using_inherited_sourced_groups(self): + """Test listing inherited assignments when restricted by groups.""" + test_plan = { + # A domain with 3 users, 3 groups, 3 projects, a second domain, + # plus 3 roles. + 'entities': {'domains': [{'users': 3, 'groups': 3, 'projects': 3}, + 1], + 'roles': 3}, + # Users 0 & 1 are in the group 0, User 0 also in group 1 + 'group_memberships': [{'group': 0, 'users': [0, 1]}, + {'group': 1, 'users': [0]}], + # Spread the assignments around - we want to be able to show that + # if sourced by group, assignments from other sources are excluded + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}, + {'group': 0, 'role': 1, 'domain': 1}, + {'group': 1, 'role': 2, 'domain': 0, + 'inherited_to_projects': True}, + {'group': 1, 'role': 2, 'project': 1}, + {'user': 2, 'role': 1, 'project': 1, + 'inherited_to_projects': True}, + {'group': 2, 'role': 2, 'project': 2} + ], + 'tests': [ + # List all effective assignments sourced from groups 0 and 1. + # We should see the inherited group assigned on the 3 projects + # from domain 0, as well as the direct assignments. + {'params': {'source_from_group_ids': [0, 1], + 'effective': True}, + 'results': [{'group': 0, 'role': 1, 'domain': 1}, + {'group': 1, 'role': 2, 'project': 0, + 'indirect': {'domain': 0}}, + {'group': 1, 'role': 2, 'project': 1, + 'indirect': {'domain': 0}}, + {'group': 1, 'role': 2, 'project': 2, + 'indirect': {'domain': 0}}, + {'group': 1, 'role': 2, 'project': 1} + ]}, + ] + } + self.execute_assignment_plan(test_plan) + + +class ImpliedRoleTests(AssignmentTestHelperMixin): + + def test_implied_role_crd(self): + prior_role_ref = unit.new_role_ref() + self.role_api.create_role(prior_role_ref['id'], prior_role_ref) + implied_role_ref = unit.new_role_ref() + self.role_api.create_role(implied_role_ref['id'], implied_role_ref) + + self.role_api.create_implied_role( + prior_role_ref['id'], + implied_role_ref['id']) + implied_role = self.role_api.get_implied_role( + prior_role_ref['id'], + implied_role_ref['id']) + expected_implied_role_ref = { + 'prior_role_id': prior_role_ref['id'], + 'implied_role_id': implied_role_ref['id']} + self.assertDictContainsSubset( + expected_implied_role_ref, + implied_role) + + self.role_api.delete_implied_role( + prior_role_ref['id'], + implied_role_ref['id']) + self.assertRaises(exception.ImpliedRoleNotFound, + self.role_api.get_implied_role, + uuid.uuid4().hex, + uuid.uuid4().hex) + + def test_delete_implied_role_returns_not_found(self): + self.assertRaises(exception.ImpliedRoleNotFound, + self.role_api.delete_implied_role, + uuid.uuid4().hex, + uuid.uuid4().hex) + + def test_role_assignments_simple_tree_of_implied_roles(self): + """Test that implied roles are expanded out.""" + test_plan = { + 'entities': {'domains': {'users': 1, 'projects': 1}, + 'roles': 4}, + # Three level tree of implied roles + 'implied_roles': [{'role': 0, 'implied_roles': 1}, + {'role': 1, 'implied_roles': [2, 3]}], + 'assignments': [{'user': 0, 'role': 0, 'project': 0}], + 'tests': [ + # List all direct assignments for user[0], this should just + # show the one top level role assignment + {'params': {'user': 0}, + 'results': [{'user': 0, 'role': 0, 'project': 0}]}, + # Listing in effective mode should show the implied roles + # expanded out + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 1, 'project': 0, + 'indirect': {'role': 0}}, + {'user': 0, 'role': 2, 'project': 0, + 'indirect': {'role': 1}}, + {'user': 0, 'role': 3, 'project': 0, + 'indirect': {'role': 1}}]}, + ] + } + self.execute_assignment_plan(test_plan) + + def test_circular_inferences(self): + """Test that implied roles are expanded out.""" + test_plan = { + 'entities': {'domains': {'users': 1, 'projects': 1}, + 'roles': 4}, + # Three level tree of implied roles + 'implied_roles': [{'role': 0, 'implied_roles': [1]}, + {'role': 1, 'implied_roles': [2, 3]}, + {'role': 3, 'implied_roles': [0]}], + 'assignments': [{'user': 0, 'role': 0, 'project': 0}], + 'tests': [ + # List all direct assignments for user[0], this should just + # show the one top level role assignment + {'params': {'user': 0}, + 'results': [{'user': 0, 'role': 0, 'project': 0}]}, + # Listing in effective mode should show the implied roles + # expanded out + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 0, 'project': 0, + 'indirect': {'role': 3}}, + {'user': 0, 'role': 1, 'project': 0, + 'indirect': {'role': 0}}, + {'user': 0, 'role': 2, 'project': 0, + 'indirect': {'role': 1}}, + {'user': 0, 'role': 3, 'project': 0, + 'indirect': {'role': 1}}]}, + ] + } + self.execute_assignment_plan(test_plan) + + def test_role_assignments_directed_graph_of_implied_roles(self): + """Test that a role can have multiple, different prior roles.""" + test_plan = { + 'entities': {'domains': {'users': 1, 'projects': 1}, + 'roles': 6}, + # Three level tree of implied roles, where one of the roles at the + # bottom is implied by more than one top level role + 'implied_roles': [{'role': 0, 'implied_roles': [1, 2]}, + {'role': 1, 'implied_roles': [3, 4]}, + {'role': 5, 'implied_roles': 4}], + # The user gets both top level roles + 'assignments': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 5, 'project': 0}], + 'tests': [ + # The implied roles should be expanded out and there should be + # two entries for the role that had two different prior roles. + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 5, 'project': 0}, + {'user': 0, 'role': 1, 'project': 0, + 'indirect': {'role': 0}}, + {'user': 0, 'role': 2, 'project': 0, + 'indirect': {'role': 0}}, + {'user': 0, 'role': 3, 'project': 0, + 'indirect': {'role': 1}}, + {'user': 0, 'role': 4, 'project': 0, + 'indirect': {'role': 1}}, + {'user': 0, 'role': 4, 'project': 0, + 'indirect': {'role': 5}}]}, + ] + } + test_data = self.execute_assignment_plan(test_plan) + + # We should also be able to get a similar (yet summarized) answer to + # the above by calling get_roles_for_user_and_project(), which should + # list the role_ids, yet remove any duplicates + role_ids = self.assignment_api.get_roles_for_user_and_project( + test_data['users'][0]['id'], test_data['projects'][0]['id']) + # We should see 6 entries, not 7, since role index 5 appeared twice in + # the answer from list_role_assignments + self.assertThat(role_ids, matchers.HasLength(6)) + for x in range(0, 5): + self.assertIn(test_data['roles'][x]['id'], role_ids) + + def test_role_assignments_implied_roles_filtered_by_role(self): + """Test that you can filter by role even if roles are implied.""" + test_plan = { + 'entities': {'domains': {'users': 1, 'projects': 2}, + 'roles': 4}, + # Three level tree of implied roles + 'implied_roles': [{'role': 0, 'implied_roles': 1}, + {'role': 1, 'implied_roles': [2, 3]}], + 'assignments': [{'user': 0, 'role': 0, 'project': 0}, + {'user': 0, 'role': 3, 'project': 1}], + 'tests': [ + # List effective roles filtering by one of the implied roles, + # showing that the filter was implied post expansion of + # implied roles (and that non impled roles are included in + # the filter + {'params': {'role': 3, 'effective': True}, + 'results': [{'user': 0, 'role': 3, 'project': 0, + 'indirect': {'role': 1}}, + {'user': 0, 'role': 3, 'project': 1}]}, + ] + } + self.execute_assignment_plan(test_plan) + + def test_role_assignments_simple_tree_of_implied_roles_on_domain(self): + """Test that implied roles are expanded out when placed on a domain.""" + test_plan = { + 'entities': {'domains': {'users': 1}, + 'roles': 4}, + # Three level tree of implied roles + 'implied_roles': [{'role': 0, 'implied_roles': 1}, + {'role': 1, 'implied_roles': [2, 3]}], + 'assignments': [{'user': 0, 'role': 0, 'domain': 0}], + 'tests': [ + # List all direct assignments for user[0], this should just + # show the one top level role assignment + {'params': {'user': 0}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}]}, + # Listing in effective mode should how the implied roles + # expanded out + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'domain': 0}, + {'user': 0, 'role': 1, 'domain': 0, + 'indirect': {'role': 0}}, + {'user': 0, 'role': 2, 'domain': 0, + 'indirect': {'role': 1}}, + {'user': 0, 'role': 3, 'domain': 0, + 'indirect': {'role': 1}}]}, + ] + } + self.execute_assignment_plan(test_plan) + + def test_role_assignments_inherited_implied_roles(self): + """Test that you can intermix inherited and implied roles.""" + test_plan = { + 'entities': {'domains': {'users': 1, 'projects': 1}, + 'roles': 4}, + # Simply one level of implied roles + 'implied_roles': [{'role': 0, 'implied_roles': 1}], + # Assign to top level role as an inherited assignment to the + # domain + 'assignments': [{'user': 0, 'role': 0, 'domain': 0, + 'inherited_to_projects': True}], + 'tests': [ + # List all direct assignments for user[0], this should just + # show the one top level role assignment + {'params': {'user': 0}, + 'results': [{'user': 0, 'role': 0, 'domain': 0, + 'inherited_to_projects': 'projects'}]}, + # List in effective mode - we should only see the initial and + # implied role on the project (since inherited roles are not + # active on their anchor point). + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 0, 'project': 0, + 'indirect': {'domain': 0}}, + {'user': 0, 'role': 1, 'project': 0, + 'indirect': {'domain': 0, 'role': 0}}]}, + ] + } + self.config_fixture.config(group='os_inherit', enabled=True) + self.execute_assignment_plan(test_plan) + + def test_role_assignments_domain_specific_with_implied_roles(self): + test_plan = { + 'entities': {'domains': {'users': 1, 'projects': 1, 'roles': 2}, + 'roles': 2}, + # Two level tree of implied roles, with the top and 1st level being + # domain specific roles, and the bottom level being infered global + # roles. + 'implied_roles': [{'role': 0, 'implied_roles': [1]}, + {'role': 1, 'implied_roles': [2, 3]}], + 'assignments': [{'user': 0, 'role': 0, 'project': 0}], + 'tests': [ + # List all direct assignments for user[0], this should just + # show the one top level role assignment, even though this is a + # domain specific role (since we are in non-effective mode and + # we show any direct role assignment in that mode). + {'params': {'user': 0}, + 'results': [{'user': 0, 'role': 0, 'project': 0}]}, + # Now the effective ones - so the implied roles should be + # expanded out, as well as any domain specific roles should be + # removed. + {'params': {'user': 0, 'effective': True}, + 'results': [{'user': 0, 'role': 2, 'project': 0, + 'indirect': {'role': 1}}, + {'user': 0, 'role': 3, 'project': 0, + 'indirect': {'role': 1}}]}, + ] + } + self.execute_assignment_plan(test_plan) diff --git a/keystone-moon/keystone/tests/unit/assignment/test_core.py b/keystone-moon/keystone/tests/unit/assignment/test_core.py new file mode 100644 index 00000000..494e19c3 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/assignment/test_core.py @@ -0,0 +1,123 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +from keystone import exception +from keystone.tests import unit +from keystone.tests.unit import default_fixtures + + +class RoleTests(object): + + def test_get_role_returns_not_found(self): + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + uuid.uuid4().hex) + + def test_create_duplicate_role_name_fails(self): + role = unit.new_role_ref(id='fake1', name='fake1name') + self.role_api.create_role('fake1', role) + role['id'] = 'fake2' + self.assertRaises(exception.Conflict, + self.role_api.create_role, + 'fake2', + role) + + def test_rename_duplicate_role_name_fails(self): + role1 = unit.new_role_ref(id='fake1', name='fake1name') + role2 = unit.new_role_ref(id='fake2', name='fake2name') + self.role_api.create_role('fake1', role1) + self.role_api.create_role('fake2', role2) + role1['name'] = 'fake2name' + self.assertRaises(exception.Conflict, + self.role_api.update_role, + 'fake1', + role1) + + def test_role_crud(self): + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + role_ref = self.role_api.get_role(role['id']) + role_ref_dict = {x: role_ref[x] for x in role_ref} + self.assertDictEqual(role, role_ref_dict) + + role['name'] = uuid.uuid4().hex + updated_role_ref = self.role_api.update_role(role['id'], role) + role_ref = self.role_api.get_role(role['id']) + role_ref_dict = {x: role_ref[x] for x in role_ref} + self.assertDictEqual(role, role_ref_dict) + self.assertDictEqual(role_ref_dict, updated_role_ref) + + self.role_api.delete_role(role['id']) + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + role['id']) + + def test_update_role_returns_not_found(self): + role = unit.new_role_ref() + self.assertRaises(exception.RoleNotFound, + self.role_api.update_role, + role['id'], + role) + + def test_list_roles(self): + roles = self.role_api.list_roles() + self.assertEqual(len(default_fixtures.ROLES), len(roles)) + role_ids = set(role['id'] for role in roles) + expected_role_ids = set(role['id'] for role in default_fixtures.ROLES) + self.assertEqual(expected_role_ids, role_ids) + + @unit.skip_if_cache_disabled('role') + def test_cache_layer_role_crud(self): + role = unit.new_role_ref() + role_id = role['id'] + # Create role + self.role_api.create_role(role_id, role) + role_ref = self.role_api.get_role(role_id) + updated_role_ref = copy.deepcopy(role_ref) + updated_role_ref['name'] = uuid.uuid4().hex + # Update role, bypassing the role api manager + self.role_api.driver.update_role(role_id, updated_role_ref) + # Verify get_role still returns old ref + self.assertDictEqual(role_ref, self.role_api.get_role(role_id)) + # Invalidate Cache + self.role_api.get_role.invalidate(self.role_api, role_id) + # Verify get_role returns the new role_ref + self.assertDictEqual(updated_role_ref, + self.role_api.get_role(role_id)) + # Update role back to original via the assignment api manager + self.role_api.update_role(role_id, role_ref) + # Verify get_role returns the original role ref + self.assertDictEqual(role_ref, self.role_api.get_role(role_id)) + # Delete role bypassing the role api manager + self.role_api.driver.delete_role(role_id) + # Verify get_role still returns the role_ref + self.assertDictEqual(role_ref, self.role_api.get_role(role_id)) + # Invalidate cache + self.role_api.get_role.invalidate(self.role_api, role_id) + # Verify RoleNotFound is now raised + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + role_id) + # recreate role + self.role_api.create_role(role_id, role) + self.role_api.get_role(role_id) + # delete role via the assignment api manager + self.role_api.delete_role(role_id) + # verity RoleNotFound is now raised + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + role_id) |