diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_filters.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_filters.py | 435 |
1 files changed, 0 insertions, 435 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_filters.py b/keystone-moon/keystone/tests/unit/test_v3_filters.py deleted file mode 100644 index 9dc19af5..00000000 --- a/keystone-moon/keystone/tests/unit/test_v3_filters.py +++ /dev/null @@ -1,435 +0,0 @@ -# Copyright 2012 OpenStack LLC -# Copyright 2013 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_config import cfg -from oslo_serialization import jsonutils -from six.moves import range - -from keystone.tests import unit -from keystone.tests.unit import filtering -from keystone.tests.unit import ksfixtures -from keystone.tests.unit.ksfixtures import temporaryfile -from keystone.tests.unit import test_v3 - - -CONF = cfg.CONF - - -class IdentityTestFilteredCase(filtering.FilterTests, - test_v3.RestfulTestCase): - """Test filter enforcement on the v3 Identity API.""" - - def _policy_fixture(self): - return ksfixtures.Policy(self.tmpfilename, self.config_fixture) - - def setUp(self): - """Setup for Identity Filter Test Cases.""" - self.tempfile = self.useFixture(temporaryfile.SecureTempFile()) - self.tmpfilename = self.tempfile.file_name - super(IdentityTestFilteredCase, self).setUp() - - def load_sample_data(self): - """Create sample data for these tests. - - As well as the usual housekeeping, create a set of domains, - users, roles and projects for the subsequent tests: - - - Three domains: A,B & C. C is disabled. - - DomainA has user1, DomainB has user2 and user3 - - DomainA has group1 and group2, DomainB has group3 - - User1 has a role on DomainA - - Remember that there will also be a fourth domain in existence, - the default domain. - - """ - # Start by creating a few domains - self._populate_default_domain() - self.domainA = unit.new_domain_ref() - self.resource_api.create_domain(self.domainA['id'], self.domainA) - self.domainB = unit.new_domain_ref() - self.resource_api.create_domain(self.domainB['id'], self.domainB) - self.domainC = unit.new_domain_ref() - self.domainC['enabled'] = False - self.resource_api.create_domain(self.domainC['id'], self.domainC) - - # Now create some users, one in domainA and two of them in domainB - self.user1 = unit.create_user(self.identity_api, - domain_id=self.domainA['id']) - self.user2 = unit.create_user(self.identity_api, - domain_id=self.domainB['id']) - self.user3 = unit.create_user(self.identity_api, - domain_id=self.domainB['id']) - - self.role = unit.new_role_ref() - self.role_api.create_role(self.role['id'], self.role) - self.assignment_api.create_grant(self.role['id'], - user_id=self.user1['id'], - domain_id=self.domainA['id']) - - # A default auth request we can use - un-scoped user token - self.auth = self.build_authentication_request( - user_id=self.user1['id'], - password=self.user1['password']) - - def _get_id_list_from_ref_list(self, ref_list): - result_list = [] - for x in ref_list: - result_list.append(x['id']) - return result_list - - def _set_policy(self, new_policy): - with open(self.tmpfilename, "w") as policyfile: - policyfile.write(jsonutils.dumps(new_policy)) - - def test_list_users_filtered_by_domain(self): - """GET /users?domain_id=mydomain (filtered) - - Test Plan: - - - Update policy so api is unprotected - - Use an un-scoped token to make sure we can filter the - users by domainB, getting back the 2 users in that domain - - """ - self._set_policy({"identity:list_users": []}) - url_by_name = '/users?domain_id=%s' % self.domainB['id'] - r = self.get(url_by_name, auth=self.auth) - # We should get back two users, those in DomainB - id_list = self._get_id_list_from_ref_list(r.result.get('users')) - self.assertIn(self.user2['id'], id_list) - self.assertIn(self.user3['id'], id_list) - - def test_list_filtered_domains(self): - """GET /domains?enabled=0 - - Test Plan: - - - Update policy for no protection on api - - Filter by the 'enabled' boolean to get disabled domains, which - should return just domainC - - Try the filter using different ways of specifying True/False - to test that our handling of booleans in filter matching is - correct - - """ - new_policy = {"identity:list_domains": []} - self._set_policy(new_policy) - r = self.get('/domains?enabled=0', auth=self.auth) - id_list = self._get_id_list_from_ref_list(r.result.get('domains')) - self.assertEqual(1, len(id_list)) - self.assertIn(self.domainC['id'], id_list) - - # Try a few ways of specifying 'false' - for val in ('0', 'false', 'False', 'FALSE', 'n', 'no', 'off'): - r = self.get('/domains?enabled=%s' % val, auth=self.auth) - id_list = self._get_id_list_from_ref_list(r.result.get('domains')) - self.assertEqual([self.domainC['id']], id_list) - - # Now try a few ways of specifying 'true' when we should get back - # the other two domains, plus the default domain - for val in ('1', 'true', 'True', 'TRUE', 'y', 'yes', 'on'): - r = self.get('/domains?enabled=%s' % val, auth=self.auth) - id_list = self._get_id_list_from_ref_list(r.result.get('domains')) - self.assertEqual(3, len(id_list)) - self.assertIn(self.domainA['id'], id_list) - self.assertIn(self.domainB['id'], id_list) - self.assertIn(CONF.identity.default_domain_id, id_list) - - r = self.get('/domains?enabled', auth=self.auth) - id_list = self._get_id_list_from_ref_list(r.result.get('domains')) - self.assertEqual(3, len(id_list)) - self.assertIn(self.domainA['id'], id_list) - self.assertIn(self.domainB['id'], id_list) - self.assertIn(CONF.identity.default_domain_id, id_list) - - def test_multiple_filters(self): - """GET /domains?enabled&name=myname - - Test Plan: - - - Update policy for no protection on api - - Filter by the 'enabled' boolean and name - this should - return a single domain - - """ - new_policy = {"identity:list_domains": []} - self._set_policy(new_policy) - - my_url = '/domains?enabled&name=%s' % self.domainA['name'] - r = self.get(my_url, auth=self.auth) - id_list = self._get_id_list_from_ref_list(r.result.get('domains')) - self.assertEqual(1, len(id_list)) - self.assertIn(self.domainA['id'], id_list) - self.assertIs(True, r.result.get('domains')[0]['enabled']) - - def test_invalid_filter_is_ignored(self): - """GET /domains?enableds&name=myname - - Test Plan: - - - Update policy for no protection on api - - Filter by name and 'enableds', which does not exist - - Assert 'enableds' is ignored - - """ - new_policy = {"identity:list_domains": []} - self._set_policy(new_policy) - - my_url = '/domains?enableds=0&name=%s' % self.domainA['name'] - r = self.get(my_url, auth=self.auth) - id_list = self._get_id_list_from_ref_list(r.result.get('domains')) - - # domainA is returned and it is enabled, since enableds=0 is not the - # same as enabled=0 - self.assertEqual(1, len(id_list)) - self.assertIn(self.domainA['id'], id_list) - self.assertIs(True, r.result.get('domains')[0]['enabled']) - - def test_list_users_filtered_by_funny_name(self): - """GET /users?name=%myname% - - Test Plan: - - - Update policy so api is unprotected - - Update a user with name that has filter escape characters - - Ensure we can filter on it - - """ - self._set_policy({"identity:list_users": []}) - user = self.user1 - user['name'] = '%my%name%' - self.identity_api.update_user(user['id'], user) - - url_by_name = '/users?name=%my%name%' - r = self.get(url_by_name, auth=self.auth) - - self.assertEqual(1, len(r.result.get('users'))) - self.assertEqual(user['id'], r.result.get('users')[0]['id']) - - def test_inexact_filters(self): - # Create 20 users - user_list = self._create_test_data('user', 20) - # Set up some names that we can filter on - user = user_list[5] - user['name'] = 'The' - self.identity_api.update_user(user['id'], user) - user = user_list[6] - user['name'] = 'The Ministry' - self.identity_api.update_user(user['id'], user) - user = user_list[7] - user['name'] = 'The Ministry of' - self.identity_api.update_user(user['id'], user) - user = user_list[8] - user['name'] = 'The Ministry of Silly' - self.identity_api.update_user(user['id'], user) - user = user_list[9] - user['name'] = 'The Ministry of Silly Walks' - self.identity_api.update_user(user['id'], user) - # ...and one for useful case insensitivity testing - user = user_list[10] - user['name'] = 'the ministry of silly walks OF' - self.identity_api.update_user(user['id'], user) - - self._set_policy({"identity:list_users": []}) - - url_by_name = '/users?name__contains=Ministry' - r = self.get(url_by_name, auth=self.auth) - self.assertEqual(4, len(r.result.get('users'))) - self._match_with_list(r.result.get('users'), user_list, - list_start=6, list_end=10) - - url_by_name = '/users?name__icontains=miNIstry' - r = self.get(url_by_name, auth=self.auth) - self.assertEqual(5, len(r.result.get('users'))) - self._match_with_list(r.result.get('users'), user_list, - list_start=6, list_end=11) - - url_by_name = '/users?name__startswith=The' - r = self.get(url_by_name, auth=self.auth) - self.assertEqual(5, len(r.result.get('users'))) - self._match_with_list(r.result.get('users'), user_list, - list_start=5, list_end=10) - - url_by_name = '/users?name__istartswith=the' - r = self.get(url_by_name, auth=self.auth) - self.assertEqual(6, len(r.result.get('users'))) - self._match_with_list(r.result.get('users'), user_list, - list_start=5, list_end=11) - - url_by_name = '/users?name__endswith=of' - r = self.get(url_by_name, auth=self.auth) - self.assertEqual(1, len(r.result.get('users'))) - self.assertEqual(r.result.get('users')[0]['id'], user_list[7]['id']) - - url_by_name = '/users?name__iendswith=OF' - r = self.get(url_by_name, auth=self.auth) - self.assertEqual(2, len(r.result.get('users'))) - self.assertEqual(user_list[7]['id'], r.result.get('users')[0]['id']) - self.assertEqual(user_list[10]['id'], r.result.get('users')[1]['id']) - - self._delete_test_data('user', user_list) - - def test_filter_sql_injection_attack(self): - """GET /users?name=<injected sql_statement> - - Test Plan: - - - Attempt to get all entities back by passing a two-term attribute - - Attempt to piggyback filter to damage DB (e.g. drop table) - - """ - self._set_policy({"identity:list_users": [], - "identity:list_groups": [], - "identity:create_group": []}) - - url_by_name = "/users?name=anything' or 'x'='x" - r = self.get(url_by_name, auth=self.auth) - - self.assertEqual(0, len(r.result.get('users'))) - - # See if we can add a SQL command...use the group table instead of the - # user table since 'user' is reserved word for SQLAlchemy. - group = unit.new_group_ref(domain_id=self.domainB['id']) - group = self.identity_api.create_group(group) - - url_by_name = "/users?name=x'; drop table group" - r = self.get(url_by_name, auth=self.auth) - - # Check group table is still there... - url_by_name = "/groups" - r = self.get(url_by_name, auth=self.auth) - self.assertTrue(len(r.result.get('groups')) > 0) - - -class IdentityTestListLimitCase(IdentityTestFilteredCase): - """Test list limiting enforcement on the v3 Identity API.""" - - content_type = 'json' - - def setUp(self): - """Setup for Identity Limit Test Cases.""" - super(IdentityTestListLimitCase, self).setUp() - - # Create 10 entries for each of the entities we are going to test - self.ENTITY_TYPES = ['user', 'group', 'project'] - self.entity_lists = {} - for entity in self.ENTITY_TYPES: - self.entity_lists[entity] = self._create_test_data(entity, 10) - # Make sure we clean up when finished - self.addCleanup(self.clean_up_entity, entity) - - self.service_list = [] - self.addCleanup(self.clean_up_service) - for _ in range(10): - new_entity = unit.new_service_ref() - service = self.catalog_api.create_service(new_entity['id'], - new_entity) - self.service_list.append(service) - - self.policy_list = [] - self.addCleanup(self.clean_up_policy) - for _ in range(10): - new_entity = unit.new_policy_ref() - policy = self.policy_api.create_policy(new_entity['id'], - new_entity) - self.policy_list.append(policy) - - def clean_up_entity(self, entity): - """Clean up entity test data from Identity Limit Test Cases.""" - self._delete_test_data(entity, self.entity_lists[entity]) - - def clean_up_service(self): - """Clean up service test data from Identity Limit Test Cases.""" - for service in self.service_list: - self.catalog_api.delete_service(service['id']) - - def clean_up_policy(self): - """Clean up policy test data from Identity Limit Test Cases.""" - for policy in self.policy_list: - self.policy_api.delete_policy(policy['id']) - - def _test_entity_list_limit(self, entity, driver): - """GET /<entities> (limited) - - Test Plan: - - - For the specified type of entity: - - Update policy for no protection on api - - Add a bunch of entities - - Set the global list limit to 5, and check that getting all - - entities only returns 5 - - Set the driver list_limit to 4, and check that now only 4 are - - returned - - """ - if entity == 'policy': - plural = 'policies' - else: - plural = '%ss' % entity - - self._set_policy({"identity:list_%s" % plural: []}) - self.config_fixture.config(list_limit=5) - self.config_fixture.config(group=driver, list_limit=None) - r = self.get('/%s' % plural, auth=self.auth) - self.assertEqual(5, len(r.result.get(plural))) - self.assertIs(r.result.get('truncated'), True) - - self.config_fixture.config(group=driver, list_limit=4) - r = self.get('/%s' % plural, auth=self.auth) - self.assertEqual(4, len(r.result.get(plural))) - self.assertIs(r.result.get('truncated'), True) - - def test_users_list_limit(self): - self._test_entity_list_limit('user', 'identity') - - def test_groups_list_limit(self): - self._test_entity_list_limit('group', 'identity') - - def test_projects_list_limit(self): - self._test_entity_list_limit('project', 'resource') - - def test_services_list_limit(self): - self._test_entity_list_limit('service', 'catalog') - - def test_non_driver_list_limit(self): - """Check list can be limited without driver level support. - - Policy limiting is not done at the driver level (since it - really isn't worth doing it there). So use this as a test - for ensuring the controller level will successfully limit - in this case. - - """ - self._test_entity_list_limit('policy', 'policy') - - def test_no_limit(self): - """Check truncated attribute not set when list not limited.""" - self._set_policy({"identity:list_services": []}) - r = self.get('/services', auth=self.auth) - self.assertEqual(10, len(r.result.get('services'))) - self.assertIsNone(r.result.get('truncated')) - - def test_at_limit(self): - """Check truncated attribute not set when list at max size.""" - # Test this by overriding the general limit with a higher - # driver-specific limit (allowing all entities to be returned - # in the collection), which should result in a non truncated list - self._set_policy({"identity:list_services": []}) - self.config_fixture.config(list_limit=5) - self.config_fixture.config(group='catalog', list_limit=10) - r = self.get('/services', auth=self.auth) - self.assertEqual(10, len(r.result.get('services'))) - self.assertIsNone(r.result.get('truncated')) |