diff options
author | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
---|---|---|
committer | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
commit | 920a49cfa055733d575282973e23558c33087a4a (patch) | |
tree | d371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py | |
parent | ef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff) |
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd
Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py | 202 |
1 files changed, 0 insertions, 202 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py b/keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py deleted file mode 100644 index a284114a..00000000 --- a/keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py +++ /dev/null @@ -1,202 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -import ldappool -from oslo_config import cfg - -from keystone.common.ldap import core as ldap_core -from keystone.identity.backends import ldap -from keystone.tests import unit -from keystone.tests.unit import fakeldap -from keystone.tests.unit import test_backend_ldap_pool -from keystone.tests.unit import test_ldap_livetest - - -CONF = cfg.CONF - - -class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin, - test_ldap_livetest.LiveLDAPIdentity): - """Executes existing LDAP live test with pooled LDAP handler. - - Also executes common pool specific tests via Mixin class. - - """ - - def setUp(self): - super(LiveLDAPPoolIdentity, self).setUp() - self.addCleanup(self.cleanup_pools) - # storing to local variable to avoid long references - self.conn_pools = ldap_core.PooledLDAPHandler.connection_pools - - def config_files(self): - config_files = super(LiveLDAPPoolIdentity, self).config_files() - config_files.append(unit.dirs.tests_conf('backend_pool_liveldap.conf')) - return config_files - - def test_assert_connector_used_not_fake_ldap_pool(self): - handler = ldap_core._get_connection(CONF.ldap.url, use_pool=True) - self.assertNotEqual(type(handler.Connector), - type(fakeldap.FakeLdapPool)) - self.assertEqual(type(ldappool.StateConnector), - type(handler.Connector)) - - def test_async_search_and_result3(self): - self.config_fixture.config(group='ldap', page_size=1) - self.test_user_enable_attribute_mask() - - def test_pool_size_expands_correctly(self): - - who = CONF.ldap.user - cred = CONF.ldap.password - # get related connection manager instance - ldappool_cm = self.conn_pools[CONF.ldap.url] - - def _get_conn(): - return ldappool_cm.connection(who, cred) - - with _get_conn() as c1: # 1 - self.assertEqual(1, len(ldappool_cm)) - self.assertTrue(c1.connected, True) - self.assertTrue(c1.active, True) - with _get_conn() as c2: # conn2 - self.assertEqual(2, len(ldappool_cm)) - self.assertTrue(c2.connected) - self.assertTrue(c2.active) - - self.assertEqual(2, len(ldappool_cm)) - # c2 went out of context, its connected but not active - self.assertTrue(c2.connected) - self.assertFalse(c2.active) - with _get_conn() as c3: # conn3 - self.assertEqual(2, len(ldappool_cm)) - self.assertTrue(c3.connected) - self.assertTrue(c3.active) - self.assertTrue(c3 is c2) # same connection is reused - self.assertTrue(c2.active) - with _get_conn() as c4: # conn4 - self.assertEqual(3, len(ldappool_cm)) - self.assertTrue(c4.connected) - self.assertTrue(c4.active) - - def test_password_change_with_auth_pool_disabled(self): - self.config_fixture.config(group='ldap', use_auth_pool=False) - old_password = self.user_sna['password'] - - self.test_password_change_with_pool() - - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=self.user_sna['id'], - password=old_password) - - def _create_user_and_authenticate(self, password): - # TODO(shaleh): port to new_user_ref() - user_dict = { - 'domain_id': CONF.identity.default_domain_id, - 'name': uuid.uuid4().hex, - 'password': password} - user = self.identity_api.create_user(user_dict) - - self.identity_api.authenticate( - context={}, - user_id=user['id'], - password=password) - - return self.identity_api.get_user(user['id']) - - def _get_auth_conn_pool_cm(self): - pool_url = ldap_core.PooledLDAPHandler.auth_pool_prefix + CONF.ldap.url - return self.conn_pools[pool_url] - - def _do_password_change_for_one_user(self, password, new_password): - self.config_fixture.config(group='ldap', use_auth_pool=True) - self.cleanup_pools() - self.load_backends() - - user1 = self._create_user_and_authenticate(password) - auth_cm = self._get_auth_conn_pool_cm() - self.assertEqual(1, len(auth_cm)) - user2 = self._create_user_and_authenticate(password) - self.assertEqual(1, len(auth_cm)) - user3 = self._create_user_and_authenticate(password) - self.assertEqual(1, len(auth_cm)) - user4 = self._create_user_and_authenticate(password) - self.assertEqual(1, len(auth_cm)) - user5 = self._create_user_and_authenticate(password) - self.assertEqual(1, len(auth_cm)) - - # connection pool size remains 1 even for different user ldap bind - # as there is only one active connection at a time - - user_api = ldap.UserApi(CONF) - u1_dn = user_api._id_to_dn_string(user1['id']) - u2_dn = user_api._id_to_dn_string(user2['id']) - u3_dn = user_api._id_to_dn_string(user3['id']) - u4_dn = user_api._id_to_dn_string(user4['id']) - u5_dn = user_api._id_to_dn_string(user5['id']) - - # now create multiple active connections for end user auth case which - # will force to keep them in pool. After that, modify one of user - # password. Need to make sure that user connection is in middle - # of pool list. - auth_cm = self._get_auth_conn_pool_cm() - with auth_cm.connection(u1_dn, password) as _: - with auth_cm.connection(u2_dn, password) as _: - with auth_cm.connection(u3_dn, password) as _: - with auth_cm.connection(u4_dn, password) as _: - with auth_cm.connection(u5_dn, password) as _: - self.assertEqual(5, len(auth_cm)) - _.unbind_s() - - user3['password'] = new_password - self.identity_api.update_user(user3['id'], user3) - - return user3 - - def test_password_change_with_auth_pool_enabled_long_lifetime(self): - self.config_fixture.config(group='ldap', - auth_pool_connection_lifetime=600) - old_password = 'my_password' - new_password = 'new_password' - user = self._do_password_change_for_one_user(old_password, - new_password) - user.pop('password') - - # with long connection lifetime auth_pool can bind to old password - # successfully which is not desired if password change is frequent - # use case in a deployment. - # This can happen in multiple concurrent connections case only. - user_ref = self.identity_api.authenticate( - context={}, user_id=user['id'], password=old_password) - - self.assertDictEqual(user, user_ref) - - def test_password_change_with_auth_pool_enabled_no_lifetime(self): - self.config_fixture.config(group='ldap', - auth_pool_connection_lifetime=0) - - old_password = 'my_password' - new_password = 'new_password' - user = self._do_password_change_for_one_user(old_password, - new_password) - # now as connection lifetime is zero, so authentication - # with old password will always fail. - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, user_id=user['id'], - password=old_password) |