aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/common/test_ldap.py
blob: 41568890b5b60b302d4904ed3d61a332456153af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import uuid

import ldap.dn
import mock
from oslo_config import cfg
from testtools import matchers

import os
import shutil
import tempfile

from keystone.common import ldap as ks_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone.tests import unit as tests
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import fakeldap

CONF = cfg.CONF


class DnCompareTest(tests.BaseTestCase):
    """Tests for the DN comparison functions in keystone.common.ldap.core."""

    def test_prep(self):
        # prep_case_insensitive returns the string with spaces at the front and
        # end if it's already lowercase and no insignificant characters.
        value = 'lowercase value'
        self.assertEqual(value, ks_ldap.prep_case_insensitive(value))

    def test_prep_lowercase(self):
        # prep_case_insensitive returns the string with spaces at the front and
        # end and lowercases the value.
        value = 'UPPERCASE VALUE'
        exp_value = value.lower()
        self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))

    def test_prep_insignificant(self):
        # prep_case_insensitive remove insignificant spaces.
        value = 'before   after'
        exp_value = 'before after'
        self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))

    def test_prep_insignificant_pre_post(self):
        # prep_case_insensitive remove insignificant spaces.
        value = '   value   '
        exp_value = 'value'
        self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))

    def test_ava_equal_same(self):
        # is_ava_value_equal returns True if the two values are the same.
        value = 'val1'
        self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value))

    def test_ava_equal_complex(self):
        # is_ava_value_equal returns True if the two values are the same using
        # a value that's got different capitalization and insignificant chars.
        val1 = 'before   after'
        val2 = '  BEFORE  afTer '
        self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2))

    def test_ava_different(self):
        # is_ava_value_equal returns False if the values aren't the same.
        self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2'))

    def test_rdn_same(self):
        # is_rdn_equal returns True if the two values are the same.
        rdn = ldap.dn.str2dn('cn=val1')[0]
        self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn))

    def test_rdn_diff_length(self):
        # is_rdn_equal returns False if the RDNs have a different number of
        # AVAs.
        rdn1 = ldap.dn.str2dn('cn=cn1')[0]
        rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
        self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))

    def test_rdn_multi_ava_same_order(self):
        # is_rdn_equal returns True if the RDNs have the same number of AVAs
        # and the values are the same.
        rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
        rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0]
        self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))

    def test_rdn_multi_ava_diff_order(self):
        # is_rdn_equal returns True if the RDNs have the same number of AVAs
        # and the values are the same, even if in a different order
        rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
        rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0]
        self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))

    def test_rdn_multi_ava_diff_type(self):
        # is_rdn_equal returns False if the RDNs have the same number of AVAs
        # and the attribute types are different.
        rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
        rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0]
        self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))

    def test_rdn_attr_type_case_diff(self):
        # is_rdn_equal returns True for same RDNs even when attr type case is
        # different.
        rdn1 = ldap.dn.str2dn('cn=cn1')[0]
        rdn2 = ldap.dn.str2dn('CN=cn1')[0]
        self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))

    def test_rdn_attr_type_alias(self):
        # is_rdn_equal returns False for same RDNs even when attr type alias is
        # used. Note that this is a limitation since an LDAP server should
        # consider them equal.
        rdn1 = ldap.dn.str2dn('cn=cn1')[0]
        rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0]
        self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))

    def test_dn_same(self):
        # is_dn_equal returns True if the DNs are the same.
        dn = 'cn=Babs Jansen,ou=OpenStack'
        self.assertTrue(ks_ldap.is_dn_equal(dn, dn))

    def test_dn_equal_unicode(self):
        # is_dn_equal can accept unicode
        dn = u'cn=fäké,ou=OpenStack'
        self.assertTrue(ks_ldap.is_dn_equal(dn, dn))

    def test_dn_diff_length(self):
        # is_dn_equal returns False if the DNs don't have the same number of
        # RDNs
        dn1 = 'cn=Babs Jansen,ou=OpenStack'
        dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com'
        self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2))

    def test_dn_equal_rdns(self):
        # is_dn_equal returns True if the DNs have the same number of RDNs
        # and each RDN is the same.
        dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource'
        dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack'
        self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2))

    def test_dn_parsed_dns(self):
        # is_dn_equal can also accept parsed DNs.
        dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource')
        dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack')
        self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2))

    def test_startswith_under_child(self):
        # dn_startswith returns True if descendant_dn is a child of dn.
        child = 'cn=Babs Jansen,ou=OpenStack'
        parent = 'ou=OpenStack'
        self.assertTrue(ks_ldap.dn_startswith(child, parent))

    def test_startswith_parent(self):
        # dn_startswith returns False if descendant_dn is a parent of dn.
        child = 'cn=Babs Jansen,ou=OpenStack'
        parent = 'ou=OpenStack'
        self.assertFalse(ks_ldap.dn_startswith(parent, child))

    def test_startswith_same(self):
        # dn_startswith returns False if DNs are the same.
        dn = 'cn=Babs Jansen,ou=OpenStack'
        self.assertFalse(ks_ldap.dn_startswith(dn, dn))

    def test_startswith_not_parent(self):
        # dn_startswith returns False if descendant_dn is not under the dn
        child = 'cn=Babs Jansen,ou=OpenStack'
        parent = 'dc=example.com'
        self.assertFalse(ks_ldap.dn_startswith(child, parent))

    def test_startswith_descendant(self):
        # dn_startswith returns True if descendant_dn is a descendant of dn.
        descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com'
        dn = 'ou=OpenStack,dc=example.com'
        self.assertTrue(ks_ldap.dn_startswith(descendant, dn))

        descendant = 'uid=12345,ou=Users,dc=example,dc=com'
        dn = 'ou=Users,dc=example,dc=com'
        self.assertTrue(ks_ldap.dn_startswith(descendant, dn))

    def test_startswith_parsed_dns(self):
        # dn_startswith also accepts parsed DNs.
        descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack')
        dn = ldap.dn.str2dn('ou=OpenStack')
        self.assertTrue(ks_ldap.dn_startswith(descendant, dn))

    def test_startswith_unicode(self):
        # dn_startswith accepts unicode.
        child = u'cn=cn=fäké,ou=OpenStäck'
        parent = 'ou=OpenStäck'
        self.assertTrue(ks_ldap.dn_startswith(child, parent))


class LDAPDeleteTreeTest(tests.TestCase):

    def setUp(self):
        super(LDAPDeleteTreeTest, self).setUp()

        ks_ldap.register_handler('fake://',
                                 fakeldap.FakeLdapNoSubtreeDelete)
        self.load_backends()
        self.load_fixtures(default_fixtures)

        self.addCleanup(self.clear_database)
        self.addCleanup(common_ldap_core._HANDLERS.clear)

    def clear_database(self):
        for shelf in fakeldap.FakeShelves:
            fakeldap.FakeShelves[shelf].clear()

    def config_overrides(self):
        super(LDAPDeleteTreeTest, self).config_overrides()
        self.config_fixture.config(
            group='identity',
            driver='keystone.identity.backends.ldap.Identity')

    def config_files(self):
        config_files = super(LDAPDeleteTreeTest, self).config_files()
        config_files.append(tests.dirs.tests_conf('backend_ldap.conf'))
        return config_files

    def test_deleteTree(self):
        """Test manually deleting a tree.

        Few LDAP servers support CONTROL_DELETETREE.  This test
        exercises the alternate code paths in BaseLdap.deleteTree.

        """
        conn = self.identity_api.user.get_connection()
        id_attr = self.identity_api.user.id_attr
        objclass = self.identity_api.user.object_class.lower()
        tree_dn = self.identity_api.user.tree_dn

        def create_entry(name, parent_dn=None):
            if not parent_dn:
                parent_dn = tree_dn
            dn = '%s=%s,%s' % (id_attr, name, parent_dn)
            attrs = [('objectclass', [objclass, 'ldapsubentry']),
                     (id_attr, [name])]
            conn.add_s(dn, attrs)
            return dn

        # create 3 entries like this:
        # cn=base
        # cn=child,cn=base
        # cn=grandchild,cn=child,cn=base
        # then attempt to deleteTree(cn=base)
        base_id = 'base'
        base_dn = create_entry(base_id)
        child_dn = create_entry('child', base_dn)
        grandchild_dn = create_entry('grandchild', child_dn)

        # verify that the three entries were created
        scope = ldap.SCOPE_SUBTREE
        filt = '(|(objectclass=*)(objectclass=ldapsubentry))'
        entries = conn.search_s(base_dn, scope, filt,
                                attrlist=common_ldap_core.DN_ONLY)
        self.assertThat(entries, matchers.HasLength(3))
        sort_ents = sorted([e[0] for e in entries], key=len, reverse=True)
        self.assertEqual([grandchild_dn, child_dn, base_dn], sort_ents)

        # verify that a non-leaf node can't be deleted directly by the
        # LDAP server
        self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF,
                          conn.delete_s, base_dn)
        self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF,
                          conn.delete_s, child_dn)

        # call our deleteTree implementation
        self.identity_api.user.deleteTree(base_id)
        self.assertRaises(ldap.NO_SUCH_OBJECT,
                          conn.search_s, base_dn, ldap.SCOPE_BASE)
        self.assertRaises(ldap.NO_SUCH_OBJECT,
                          conn.search_s, child_dn, ldap.SCOPE_BASE)
        self.assertRaises(ldap.NO_SUCH_OBJECT,
                          conn.search_s, grandchild_dn, ldap.SCOPE_BASE)


class SslTlsTest(tests.TestCase):
    """Tests for the SSL/TLS functionality in keystone.common.ldap.core."""

    @mock.patch.object(ks_ldap.core.KeystoneLDAPHandler, 'simple_bind_s')
    @mock.patch.object(ldap.ldapobject.LDAPObject, 'start_tls_s')
    def _init_ldap_connection(self, config, mock_ldap_one, mock_ldap_two):
        # Attempt to connect to initialize python-ldap.
        base_ldap = ks_ldap.BaseLdap(config)
        base_ldap.get_connection()

    def test_certfile_trust_tls(self):
        # We need this to actually exist, so we create a tempfile.
        (handle, certfile) = tempfile.mkstemp()
        self.addCleanup(os.unlink, certfile)
        self.addCleanup(os.close, handle)
        self.config_fixture.config(group='ldap',
                                   url='ldap://localhost',
                                   use_tls=True,
                                   tls_cacertfile=certfile)

        self._init_ldap_connection(CONF)

        # Ensure the cert trust option is set.
        self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE))

    def test_certdir_trust_tls(self):
        # We need this to actually exist, so we create a tempdir.
        certdir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, certdir)
        self.config_fixture.config(group='ldap',
                                   url='ldap://localhost',
                                   use_tls=True,
                                   tls_cacertdir=certdir)

        self._init_ldap_connection(CONF)

        # Ensure the cert trust option is set.
        self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR))

    def test_certfile_trust_ldaps(self):
        # We need this to actually exist, so we create a tempfile.
        (handle, certfile) = tempfile.mkstemp()
        self.addCleanup(os.unlink, certfile)
        self.addCleanup(os.close, handle)
        self.config_fixture.config(group='ldap',
                                   url='ldaps://localhost',
                                   use_tls=False,
                                   tls_cacertfile=certfile)

        self._init_ldap_connection(CONF)

        # Ensure the cert trust option is set.
        self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE))

    def test_certdir_trust_ldaps(self):
        # We need this to actually exist, so we create a tempdir.
        certdir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, certdir)
        self.config_fixture.config(group='ldap',
                                   url='ldaps://localhost',
                                   use_tls=False,
                                   tls_cacertdir=certdir)

        self._init_ldap_connection(CONF)

        # Ensure the cert trust option is set.
        self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR))


class LDAPPagedResultsTest(tests.TestCase):
    """Tests the paged results functionality in keystone.common.ldap.core."""

    def setUp(self):
        super(LDAPPagedResultsTest, self).setUp()
        self.clear_database()

        ks_ldap.register_handler('fake://', fakeldap.FakeLdap)
        self.addCleanup(common_ldap_core._HANDLERS.clear)

        self.load_backends()
        self.load_fixtures(default_fixtures)

    def clear_database(self):
        for shelf in fakeldap.FakeShelves:
            fakeldap.FakeShelves[shelf].clear()

    def config_overrides(self):
        super(LDAPPagedResultsTest, self).config_overrides()
        self.config_fixture.config(
            group='identity',
            driver='keystone.identity.backends.ldap.Identity')

    def config_files(self):
        config_files = super(LDAPPagedResultsTest, self).config_files()
        config_files.append(tests.dirs.tests_conf('backend_ldap.conf'))
        return config_files

    @mock.patch.object(fakeldap.FakeLdap, 'search_ext')
    @mock.patch.object(fakeldap.FakeLdap, 'result3')
    def test_paged_results_control_api(self, mock_result3, mock_search_ext):
        mock_result3.return_value = ('', [], 1, [])

        self.config_fixture.config(group='ldap',
                                   page_size=1)

        conn = self.identity_api.user.get_connection()
        conn._paged_search_s('dc=example,dc=test',
                             ldap.SCOPE_SUBTREE,
                             'objectclass=*')


class CommonLdapTestCase(tests.BaseTestCase):
    """These test cases call functions in keystone.common.ldap."""

    def test_binary_attribute_values(self):
        result = [(
            'cn=junk,dc=example,dc=com',
            {
                'cn': ['junk'],
                'sn': [uuid.uuid4().hex],
                'mail': [uuid.uuid4().hex],
                'binary_attr': ['\x00\xFF\x00\xFF']
            }
        ), ]
        py_result = ks_ldap.convert_ldap_result(result)
        # The attribute containing the binary value should
        # not be present in the converted result.
        self.assertNotIn('binary_attr', py_result[0][1])

    def test_utf8_conversion(self):
        value_unicode = u'fäké1'
        value_utf8 = value_unicode.encode('utf-8')

        result_utf8 = ks_ldap.utf8_encode(value_unicode)
        self.assertEqual(value_utf8, result_utf8)

        result_utf8 = ks_ldap.utf8_encode(value_utf8)
        self.assertEqual(value_utf8, result_utf8)

        result_unicode = ks_ldap.utf8_decode(value_utf8)
        self.assertEqual(value_unicode, result_unicode)

        result_unicode = ks_ldap.utf8_decode(value_unicode)
        self.assertEqual(value_unicode, result_unicode)

        self.assertRaises(TypeError,
                          ks_ldap.utf8_encode,
                          100)

        result_unicode = ks_ldap.utf8_decode(100)
        self.assertEqual(u'100', result_unicode)

    def test_user_id_begins_with_0(self):
        user_id = '0123456'
        result = [(
            'cn=dummy,dc=example,dc=com',
            {
                'user_id': [user_id],
                'enabled': ['TRUE']
            }
        ), ]
        py_result = ks_ldap.convert_ldap_result(result)
        # The user id should be 0123456, and the enabled
        # flag should be True
        self.assertIs(py_result[0][1]['enabled'][0], True)
        self.assertEqual(user_id, py_result[0][1]['user_id'][0])

    def test_user_id_begins_with_0_and_enabled_bit_mask(self):
        user_id = '0123456'
        bitmask = '225'
        expected_bitmask = 225
        result = [(
            'cn=dummy,dc=example,dc=com',
            {
                'user_id': [user_id],
                'enabled': [bitmask]
            }
        ), ]
        py_result = ks_ldap.convert_ldap_result(result)
        # The user id should be 0123456, and the enabled
        # flag should be 225
        self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
        self.assertEqual(user_id, py_result[0][1]['user_id'][0])

    def test_user_id_and_bitmask_begins_with_0(self):
        user_id = '0123456'
        bitmask = '0225'
        expected_bitmask = 225
        result = [(
            'cn=dummy,dc=example,dc=com',
            {
                'user_id': [user_id],
                'enabled': [bitmask]
            }
        ), ]
        py_result = ks_ldap.convert_ldap_result(result)
        # The user id should be 0123456, and the enabled
        # flag should be 225, the 0 is dropped.
        self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
        self.assertEqual(user_id, py_result[0][1]['user_id'][0])

    def test_user_id_and_user_name_with_boolean_string(self):
        boolean_strings = ['TRUE', 'FALSE', 'true', 'false', 'True', 'False',
                           'TrUe' 'FaLse']
        for user_name in boolean_strings:
            user_id = uuid.uuid4().hex
            result = [(
                'cn=dummy,dc=example,dc=com',
                {
                    'user_id': [user_id],
                    'user_name': [user_name]
                }
            ), ]
            py_result = ks_ldap.convert_ldap_result(result)
            # The user name should still be a string value.
            self.assertEqual(user_name, py_result[0][1]['user_name'][0])