aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py202
1 files changed, 0 insertions, 202 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py b/keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py
deleted file mode 100644
index a284114a..00000000
--- a/keystone-moon/keystone/tests/unit/test_ldap_pool_livetest.py
+++ /dev/null
@@ -1,202 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-import ldappool
-from oslo_config import cfg
-
-from keystone.common.ldap import core as ldap_core
-from keystone.identity.backends import ldap
-from keystone.tests import unit
-from keystone.tests.unit import fakeldap
-from keystone.tests.unit import test_backend_ldap_pool
-from keystone.tests.unit import test_ldap_livetest
-
-
-CONF = cfg.CONF
-
-
-class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
- test_ldap_livetest.LiveLDAPIdentity):
- """Executes existing LDAP live test with pooled LDAP handler.
-
- Also executes common pool specific tests via Mixin class.
-
- """
-
- def setUp(self):
- super(LiveLDAPPoolIdentity, self).setUp()
- self.addCleanup(self.cleanup_pools)
- # storing to local variable to avoid long references
- self.conn_pools = ldap_core.PooledLDAPHandler.connection_pools
-
- def config_files(self):
- config_files = super(LiveLDAPPoolIdentity, self).config_files()
- config_files.append(unit.dirs.tests_conf('backend_pool_liveldap.conf'))
- return config_files
-
- def test_assert_connector_used_not_fake_ldap_pool(self):
- handler = ldap_core._get_connection(CONF.ldap.url, use_pool=True)
- self.assertNotEqual(type(handler.Connector),
- type(fakeldap.FakeLdapPool))
- self.assertEqual(type(ldappool.StateConnector),
- type(handler.Connector))
-
- def test_async_search_and_result3(self):
- self.config_fixture.config(group='ldap', page_size=1)
- self.test_user_enable_attribute_mask()
-
- def test_pool_size_expands_correctly(self):
-
- who = CONF.ldap.user
- cred = CONF.ldap.password
- # get related connection manager instance
- ldappool_cm = self.conn_pools[CONF.ldap.url]
-
- def _get_conn():
- return ldappool_cm.connection(who, cred)
-
- with _get_conn() as c1: # 1
- self.assertEqual(1, len(ldappool_cm))
- self.assertTrue(c1.connected, True)
- self.assertTrue(c1.active, True)
- with _get_conn() as c2: # conn2
- self.assertEqual(2, len(ldappool_cm))
- self.assertTrue(c2.connected)
- self.assertTrue(c2.active)
-
- self.assertEqual(2, len(ldappool_cm))
- # c2 went out of context, its connected but not active
- self.assertTrue(c2.connected)
- self.assertFalse(c2.active)
- with _get_conn() as c3: # conn3
- self.assertEqual(2, len(ldappool_cm))
- self.assertTrue(c3.connected)
- self.assertTrue(c3.active)
- self.assertTrue(c3 is c2) # same connection is reused
- self.assertTrue(c2.active)
- with _get_conn() as c4: # conn4
- self.assertEqual(3, len(ldappool_cm))
- self.assertTrue(c4.connected)
- self.assertTrue(c4.active)
-
- def test_password_change_with_auth_pool_disabled(self):
- self.config_fixture.config(group='ldap', use_auth_pool=False)
- old_password = self.user_sna['password']
-
- self.test_password_change_with_pool()
-
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=self.user_sna['id'],
- password=old_password)
-
- def _create_user_and_authenticate(self, password):
- # TODO(shaleh): port to new_user_ref()
- user_dict = {
- 'domain_id': CONF.identity.default_domain_id,
- 'name': uuid.uuid4().hex,
- 'password': password}
- user = self.identity_api.create_user(user_dict)
-
- self.identity_api.authenticate(
- context={},
- user_id=user['id'],
- password=password)
-
- return self.identity_api.get_user(user['id'])
-
- def _get_auth_conn_pool_cm(self):
- pool_url = ldap_core.PooledLDAPHandler.auth_pool_prefix + CONF.ldap.url
- return self.conn_pools[pool_url]
-
- def _do_password_change_for_one_user(self, password, new_password):
- self.config_fixture.config(group='ldap', use_auth_pool=True)
- self.cleanup_pools()
- self.load_backends()
-
- user1 = self._create_user_and_authenticate(password)
- auth_cm = self._get_auth_conn_pool_cm()
- self.assertEqual(1, len(auth_cm))
- user2 = self._create_user_and_authenticate(password)
- self.assertEqual(1, len(auth_cm))
- user3 = self._create_user_and_authenticate(password)
- self.assertEqual(1, len(auth_cm))
- user4 = self._create_user_and_authenticate(password)
- self.assertEqual(1, len(auth_cm))
- user5 = self._create_user_and_authenticate(password)
- self.assertEqual(1, len(auth_cm))
-
- # connection pool size remains 1 even for different user ldap bind
- # as there is only one active connection at a time
-
- user_api = ldap.UserApi(CONF)
- u1_dn = user_api._id_to_dn_string(user1['id'])
- u2_dn = user_api._id_to_dn_string(user2['id'])
- u3_dn = user_api._id_to_dn_string(user3['id'])
- u4_dn = user_api._id_to_dn_string(user4['id'])
- u5_dn = user_api._id_to_dn_string(user5['id'])
-
- # now create multiple active connections for end user auth case which
- # will force to keep them in pool. After that, modify one of user
- # password. Need to make sure that user connection is in middle
- # of pool list.
- auth_cm = self._get_auth_conn_pool_cm()
- with auth_cm.connection(u1_dn, password) as _:
- with auth_cm.connection(u2_dn, password) as _:
- with auth_cm.connection(u3_dn, password) as _:
- with auth_cm.connection(u4_dn, password) as _:
- with auth_cm.connection(u5_dn, password) as _:
- self.assertEqual(5, len(auth_cm))
- _.unbind_s()
-
- user3['password'] = new_password
- self.identity_api.update_user(user3['id'], user3)
-
- return user3
-
- def test_password_change_with_auth_pool_enabled_long_lifetime(self):
- self.config_fixture.config(group='ldap',
- auth_pool_connection_lifetime=600)
- old_password = 'my_password'
- new_password = 'new_password'
- user = self._do_password_change_for_one_user(old_password,
- new_password)
- user.pop('password')
-
- # with long connection lifetime auth_pool can bind to old password
- # successfully which is not desired if password change is frequent
- # use case in a deployment.
- # This can happen in multiple concurrent connections case only.
- user_ref = self.identity_api.authenticate(
- context={}, user_id=user['id'], password=old_password)
-
- self.assertDictEqual(user, user_ref)
-
- def test_password_change_with_auth_pool_enabled_no_lifetime(self):
- self.config_fixture.config(group='ldap',
- auth_pool_connection_lifetime=0)
-
- old_password = 'my_password'
- new_password = 'new_password'
- user = self._do_password_change_for_one_user(old_password,
- new_password)
- # now as connection lifetime is zero, so authentication
- # with old password will always fail.
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={}, user_id=user['id'],
- password=old_password)