aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/fakeldap.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/fakeldap.py')
-rw-r--r--keystone-moon/keystone/tests/unit/fakeldap.py664
1 files changed, 0 insertions, 664 deletions
diff --git a/keystone-moon/keystone/tests/unit/fakeldap.py b/keystone-moon/keystone/tests/unit/fakeldap.py
deleted file mode 100644
index 9ad1f218..00000000
--- a/keystone-moon/keystone/tests/unit/fakeldap.py
+++ /dev/null
@@ -1,664 +0,0 @@
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Fake LDAP server for test harness.
-
-This class does very little error checking, and knows nothing about ldap
-class definitions. It implements the minimum emulation of the python ldap
-library to work with keystone.
-
-"""
-
-import random
-import re
-import shelve
-
-import ldap
-from oslo_config import cfg
-from oslo_log import log
-import six
-from six import moves
-
-from keystone.common.ldap import core
-from keystone import exception
-
-
-SCOPE_NAMES = {
- ldap.SCOPE_BASE: 'SCOPE_BASE',
- ldap.SCOPE_ONELEVEL: 'SCOPE_ONELEVEL',
- ldap.SCOPE_SUBTREE: 'SCOPE_SUBTREE',
-}
-
-# http://msdn.microsoft.com/en-us/library/windows/desktop/aa366991(v=vs.85).aspx # noqa
-CONTROL_TREEDELETE = '1.2.840.113556.1.4.805'
-
-LOG = log.getLogger(__name__)
-CONF = cfg.CONF
-
-
-def _internal_attr(attr_name, value_or_values):
- def normalize_value(value):
- return core.utf8_decode(value)
-
- def normalize_dn(dn):
- # Capitalize the attribute names as an LDAP server might.
-
- # NOTE(blk-u): Special case for this tested value, used with
- # test_user_id_comma. The call to str2dn here isn't always correct
- # here, because `dn` is escaped for an LDAP filter. str2dn() normally
- # works only because there's no special characters in `dn`.
- if dn == 'cn=Doe\\5c, John,ou=Users,cn=example,cn=com':
- return 'CN=Doe\\, John,OU=Users,CN=example,CN=com'
-
- # NOTE(blk-u): Another special case for this tested value. When a
- # roleOccupant has an escaped comma, it gets converted to \2C.
- if dn == 'cn=Doe\\, John,ou=Users,cn=example,cn=com':
- return 'CN=Doe\\2C John,OU=Users,CN=example,CN=com'
-
- try:
- dn = ldap.dn.str2dn(core.utf8_encode(dn))
- except ldap.DECODING_ERROR:
- # NOTE(amakarov): In case of IDs instead of DNs in group members
- # they must be handled as regular values.
- return normalize_value(dn)
-
- norm = []
- for part in dn:
- name, val, i = part[0]
- name = core.utf8_decode(name)
- name = name.upper()
- name = core.utf8_encode(name)
- norm.append([(name, val, i)])
- return core.utf8_decode(ldap.dn.dn2str(norm))
-
- if attr_name in ('member', 'roleOccupant'):
- attr_fn = normalize_dn
- else:
- attr_fn = normalize_value
-
- if isinstance(value_or_values, list):
- return [attr_fn(x) for x in value_or_values]
- return [attr_fn(value_or_values)]
-
-
-def _match_query(query, attrs, attrs_checked):
- """Match an ldap query to an attribute dictionary.
-
- The characters &, |, and ! are supported in the query. No syntax checking
- is performed, so malformed queries will not work correctly.
- """
- # cut off the parentheses
- inner = query[1:-1]
- if inner.startswith(('&', '|')):
- if inner[0] == '&':
- matchfn = all
- else:
- matchfn = any
- # cut off the & or |
- groups = _paren_groups(inner[1:])
- return matchfn(_match_query(group, attrs, attrs_checked)
- for group in groups)
- if inner.startswith('!'):
- # cut off the ! and the nested parentheses
- return not _match_query(query[2:-1], attrs, attrs_checked)
-
- (k, _sep, v) = inner.partition('=')
- attrs_checked.add(k.lower())
- return _match(k, v, attrs)
-
-
-def _paren_groups(source):
- """Split a string into parenthesized groups."""
- count = 0
- start = 0
- result = []
- for pos in moves.range(len(source)):
- if source[pos] == '(':
- if count == 0:
- start = pos
- count += 1
- if source[pos] == ')':
- count -= 1
- if count == 0:
- result.append(source[start:pos + 1])
- return result
-
-
-def _match(key, value, attrs):
- """Match a given key and value against an attribute list."""
- def match_with_wildcards(norm_val, val_list):
- # Case insensitive checking with wildcards
- if norm_val.startswith('*'):
- if norm_val.endswith('*'):
- # Is the string anywhere in the target?
- for x in val_list:
- if norm_val[1:-1] in x:
- return True
- else:
- # Is the string at the end of the target?
- for x in val_list:
- if (norm_val[1:] ==
- x[len(x) - len(norm_val) + 1:]):
- return True
- elif norm_val.endswith('*'):
- # Is the string at the start of the target?
- for x in val_list:
- if norm_val[:-1] == x[:len(norm_val) - 1]:
- return True
- else:
- # Is the string an exact match?
- for x in val_list:
- if check_value == x:
- return True
- return False
-
- if key not in attrs:
- return False
- # This is a pure wild card search, so the answer must be yes!
- if value == '*':
- return True
- if key == 'serviceId':
- # for serviceId, the backend is returning a list of numbers
- # make sure we convert them to strings first before comparing
- # them
- str_sids = [six.text_type(x) for x in attrs[key]]
- return six.text_type(value) in str_sids
- if key != 'objectclass':
- check_value = _internal_attr(key, value)[0].lower()
- norm_values = list(
- _internal_attr(key, x)[0].lower() for x in attrs[key])
- return match_with_wildcards(check_value, norm_values)
- # it is an objectclass check, so check subclasses
- values = _subs(value)
- for v in values:
- if v in attrs[key]:
- return True
- return False
-
-
-def _subs(value):
- """Returns a list of subclass strings.
-
- The strings represent the ldap objectclass plus any subclasses that
- inherit from it. Fakeldap doesn't know about the ldap object structure,
- so subclasses need to be defined manually in the dictionary below.
-
- """
- subs = {'groupOfNames': ['keystoneTenant',
- 'keystoneRole',
- 'keystoneTenantRole']}
- if value in subs:
- return [value] + subs[value]
- return [value]
-
-
-server_fail = False
-
-
-class FakeShelve(dict):
-
- def sync(self):
- pass
-
-
-FakeShelves = {}
-PendingRequests = {}
-
-
-class FakeLdap(core.LDAPHandler):
- """Emulate the python-ldap API.
-
- The python-ldap API requires all strings to be UTF-8 encoded. This
- is assured by the caller of this interface
- (i.e. KeystoneLDAPHandler).
-
- However, internally this emulation MUST process and store strings
- in a canonical form which permits operations on
- characters. Encoded strings do not provide the ability to operate
- on characters. Therefore this emulation accepts UTF-8 encoded
- strings, decodes them to unicode for operations internal to this
- emulation, and encodes them back to UTF-8 when returning values
- from the emulation.
-
- """
-
- __prefix = 'ldap:'
-
- def __init__(self, conn=None):
- super(FakeLdap, self).__init__(conn=conn)
- self._ldap_options = {ldap.OPT_DEREF: ldap.DEREF_NEVER}
-
- def connect(self, url, page_size=0, alias_dereferencing=None,
- use_tls=False, tls_cacertfile=None, tls_cacertdir=None,
- tls_req_cert='demand', chase_referrals=None, debug_level=None,
- use_pool=None, pool_size=None, pool_retry_max=None,
- pool_retry_delay=None, pool_conn_timeout=None,
- pool_conn_lifetime=None):
- if url.startswith('fake://memory'):
- if url not in FakeShelves:
- FakeShelves[url] = FakeShelve()
- self.db = FakeShelves[url]
- else:
- self.db = shelve.open(url[7:])
-
- using_ldaps = url.lower().startswith("ldaps")
-
- if use_tls and using_ldaps:
- raise AssertionError('Invalid TLS / LDAPS combination')
-
- if use_tls:
- if tls_cacertfile:
- ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, tls_cacertfile)
- elif tls_cacertdir:
- ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, tls_cacertdir)
- if tls_req_cert in list(core.LDAP_TLS_CERTS.values()):
- ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, tls_req_cert)
- else:
- raise ValueError("invalid TLS_REQUIRE_CERT tls_req_cert=%s",
- tls_req_cert)
-
- if alias_dereferencing is not None:
- self.set_option(ldap.OPT_DEREF, alias_dereferencing)
- self.page_size = page_size
-
- self.use_pool = use_pool
- self.pool_size = pool_size
- self.pool_retry_max = pool_retry_max
- self.pool_retry_delay = pool_retry_delay
- self.pool_conn_timeout = pool_conn_timeout
- self.pool_conn_lifetime = pool_conn_lifetime
-
- def dn(self, dn):
- return core.utf8_decode(dn)
-
- def _dn_to_id_attr(self, dn):
- return core.utf8_decode(ldap.dn.str2dn(core.utf8_encode(dn))[0][0][0])
-
- def _dn_to_id_value(self, dn):
- return core.utf8_decode(ldap.dn.str2dn(core.utf8_encode(dn))[0][0][1])
-
- def key(self, dn):
- return '%s%s' % (self.__prefix, self.dn(dn))
-
- def simple_bind_s(self, who='', cred='',
- serverctrls=None, clientctrls=None):
- """This method is ignored, but provided for compatibility."""
- if server_fail:
- raise ldap.SERVER_DOWN
- whos = ['cn=Admin', CONF.ldap.user]
- if who in whos and cred in ['password', CONF.ldap.password]:
- return
-
- try:
- attrs = self.db[self.key(who)]
- except KeyError:
- LOG.debug('bind fail: who=%s not found', core.utf8_decode(who))
- raise ldap.NO_SUCH_OBJECT
-
- db_password = None
- try:
- db_password = attrs['userPassword'][0]
- except (KeyError, IndexError):
- LOG.debug('bind fail: password for who=%s not found',
- core.utf8_decode(who))
- raise ldap.INAPPROPRIATE_AUTH
-
- if cred != db_password:
- LOG.debug('bind fail: password for who=%s does not match',
- core.utf8_decode(who))
- raise ldap.INVALID_CREDENTIALS
-
- def unbind_s(self):
- """This method is ignored, but provided for compatibility."""
- if server_fail:
- raise ldap.SERVER_DOWN
-
- def add_s(self, dn, modlist):
- """Add an object with the specified attributes at dn."""
- if server_fail:
- raise ldap.SERVER_DOWN
-
- id_attr_in_modlist = False
- id_attr = self._dn_to_id_attr(dn)
- id_value = self._dn_to_id_value(dn)
-
- # The LDAP API raises a TypeError if attr name is None.
- for k, dummy_v in modlist:
- if k is None:
- raise TypeError('must be string, not None. modlist=%s' %
- modlist)
-
- if k == id_attr:
- for val in dummy_v:
- if core.utf8_decode(val) == id_value:
- id_attr_in_modlist = True
-
- if not id_attr_in_modlist:
- LOG.debug('id_attribute=%(attr)s missing, attributes=%(attrs)s' %
- {'attr': id_attr, 'attrs': modlist})
- raise ldap.NAMING_VIOLATION
- key = self.key(dn)
- LOG.debug('add item: dn=%(dn)s, attrs=%(attrs)s', {
- 'dn': core.utf8_decode(dn), 'attrs': modlist})
- if key in self.db:
- LOG.debug('add item failed: dn=%s is already in store.',
- core.utf8_decode(dn))
- raise ldap.ALREADY_EXISTS(dn)
-
- self.db[key] = {k: _internal_attr(k, v) for k, v in modlist}
- self.db.sync()
-
- def delete_s(self, dn):
- """Remove the ldap object at specified dn."""
- return self.delete_ext_s(dn, serverctrls=[])
-
- def _getChildren(self, dn):
- return [k for k, v in self.db.items()
- if re.match('%s.*,%s' % (
- re.escape(self.__prefix),
- re.escape(self.dn(dn))), k)]
-
- def delete_ext_s(self, dn, serverctrls, clientctrls=None):
- """Remove the ldap object at specified dn."""
- if server_fail:
- raise ldap.SERVER_DOWN
-
- try:
- if CONTROL_TREEDELETE in [c.controlType for c in serverctrls]:
- LOG.debug('FakeLdap subtree_delete item: dn=%s',
- core.utf8_decode(dn))
- children = self._getChildren(dn)
- for c in children:
- del self.db[c]
-
- key = self.key(dn)
- LOG.debug('FakeLdap delete item: dn=%s', core.utf8_decode(dn))
- del self.db[key]
- except KeyError:
- LOG.debug('delete item failed: dn=%s not found.',
- core.utf8_decode(dn))
- raise ldap.NO_SUCH_OBJECT
- self.db.sync()
-
- def modify_s(self, dn, modlist):
- """Modify the object at dn using the attribute list.
-
- :param dn: an LDAP DN
- :param modlist: a list of tuples in the following form:
- ([MOD_ADD | MOD_DELETE | MOD_REPACE], attribute, value)
- """
- if server_fail:
- raise ldap.SERVER_DOWN
-
- key = self.key(dn)
- LOG.debug('modify item: dn=%(dn)s attrs=%(attrs)s', {
- 'dn': core.utf8_decode(dn), 'attrs': modlist})
- try:
- entry = self.db[key]
- except KeyError:
- LOG.debug('modify item failed: dn=%s not found.',
- core.utf8_decode(dn))
- raise ldap.NO_SUCH_OBJECT
-
- for cmd, k, v in modlist:
- values = entry.setdefault(k, [])
- if cmd == ldap.MOD_ADD:
- v = _internal_attr(k, v)
- for x in v:
- if x in values:
- raise ldap.TYPE_OR_VALUE_EXISTS
- values += v
- elif cmd == ldap.MOD_REPLACE:
- values[:] = _internal_attr(k, v)
- elif cmd == ldap.MOD_DELETE:
- if v is None:
- if not values:
- LOG.debug('modify item failed: '
- 'item has no attribute "%s" to delete', k)
- raise ldap.NO_SUCH_ATTRIBUTE
- values[:] = []
- else:
- for val in _internal_attr(k, v):
- try:
- values.remove(val)
- except ValueError:
- LOG.debug('modify item failed: '
- 'item has no attribute "%(k)s" with '
- 'value "%(v)s" to delete', {
- 'k': k, 'v': val})
- raise ldap.NO_SUCH_ATTRIBUTE
- else:
- LOG.debug('modify item failed: unknown command %s', cmd)
- raise NotImplementedError('modify_s action %s not'
- ' implemented' % cmd)
- self.db[key] = entry
- self.db.sync()
-
- def search_s(self, base, scope,
- filterstr='(objectClass=*)', attrlist=None, attrsonly=0):
- """Search for all matching objects under base using the query.
-
- Args:
- base -- dn to search under
- scope -- search scope (base, subtree, onelevel)
- filterstr -- filter objects by
- attrlist -- attrs to return. Returns all attrs if not specified
-
- """
- if server_fail:
- raise ldap.SERVER_DOWN
-
- if (not filterstr) and (scope != ldap.SCOPE_BASE):
- raise AssertionError('Search without filter on onelevel or '
- 'subtree scope')
-
- if scope == ldap.SCOPE_BASE:
- try:
- item_dict = self.db[self.key(base)]
- except KeyError:
- LOG.debug('search fail: dn not found for SCOPE_BASE')
- raise ldap.NO_SUCH_OBJECT
- results = [(base, item_dict)]
- elif scope == ldap.SCOPE_SUBTREE:
- # FIXME - LDAP search with SUBTREE scope must return the base
- # entry, but the code below does _not_. Unfortunately, there are
- # several tests that depend on this broken behavior, and fail
- # when the base entry is returned in the search results. The
- # fix is easy here, just initialize results as above for
- # the SCOPE_BASE case.
- # https://bugs.launchpad.net/keystone/+bug/1368772
- try:
- item_dict = self.db[self.key(base)]
- except KeyError:
- LOG.debug('search fail: dn not found for SCOPE_SUBTREE')
- raise ldap.NO_SUCH_OBJECT
- results = [(base, item_dict)]
- extraresults = [(k[len(self.__prefix):], v)
- for k, v in self.db.items()
- if re.match('%s.*,%s' %
- (re.escape(self.__prefix),
- re.escape(self.dn(base))), k)]
- results.extend(extraresults)
- elif scope == ldap.SCOPE_ONELEVEL:
-
- def get_entries():
- base_dn = ldap.dn.str2dn(core.utf8_encode(base))
- base_len = len(base_dn)
-
- for k, v in self.db.items():
- if not k.startswith(self.__prefix):
- continue
- k_dn_str = k[len(self.__prefix):]
- k_dn = ldap.dn.str2dn(core.utf8_encode(k_dn_str))
- if len(k_dn) != base_len + 1:
- continue
- if k_dn[-base_len:] != base_dn:
- continue
- yield (k_dn_str, v)
-
- results = list(get_entries())
-
- else:
- # openldap client/server raises PROTOCOL_ERROR for unexpected scope
- raise ldap.PROTOCOL_ERROR
-
- objects = []
- for dn, attrs in results:
- # filter the objects by filterstr
- id_attr, id_val, _ = ldap.dn.str2dn(core.utf8_encode(dn))[0][0]
- id_attr = core.utf8_decode(id_attr)
- id_val = core.utf8_decode(id_val)
- match_attrs = attrs.copy()
- match_attrs[id_attr] = [id_val]
- attrs_checked = set()
- if not filterstr or _match_query(filterstr, match_attrs,
- attrs_checked):
- if (filterstr and
- (scope != ldap.SCOPE_BASE) and
- ('objectclass' not in attrs_checked)):
- raise AssertionError('No objectClass in search filter')
- # filter the attributes by attrlist
- attrs = {k: v for k, v in attrs.items()
- if not attrlist or k in attrlist}
- objects.append((dn, attrs))
-
- return objects
-
- def set_option(self, option, invalue):
- self._ldap_options[option] = invalue
-
- def get_option(self, option):
- value = self._ldap_options.get(option)
- return value
-
- def search_ext(self, base, scope,
- filterstr='(objectClass=*)', attrlist=None, attrsonly=0,
- serverctrls=None, clientctrls=None,
- timeout=-1, sizelimit=0):
- if clientctrls is not None or timeout != -1 or sizelimit != 0:
- raise exception.NotImplemented()
-
- # only passing a single server control is supported by this fake ldap
- if len(serverctrls) > 1:
- raise exception.NotImplemented()
-
- # search_ext is async and returns an identifier used for
- # retrieving the results via result3(). This will be emulated by
- # storing the request in a variable with random integer key and
- # performing the real lookup in result3()
- msgid = random.randint(0, 1000)
- PendingRequests[msgid] = (base, scope, filterstr, attrlist, attrsonly,
- serverctrls)
- return msgid
-
- def result3(self, msgid=ldap.RES_ANY, all=1, timeout=None,
- resp_ctrl_classes=None):
- """Execute async request
-
- Only msgid param is supported. Request info is fetched from global
- variable `PendingRequests` by msgid, executed using search_s and
- limited if requested.
- """
- if all != 1 or timeout is not None or resp_ctrl_classes is not None:
- raise exception.NotImplemented()
-
- params = PendingRequests[msgid]
- # search_s accepts a subset of parameters of search_ext,
- # that's why we use only the first 5.
- results = self.search_s(*params[:5])
-
- # extract limit from serverctrl
- serverctrls = params[5]
- ctrl = serverctrls[0]
-
- if ctrl.size:
- rdata = results[:ctrl.size]
- else:
- rdata = results
-
- # real result3 returns various service info -- rtype, rmsgid,
- # serverctrls. Now this info is not used, so all this info is None
- rtype = None
- rmsgid = None
- serverctrls = None
- return (rtype, rdata, rmsgid, serverctrls)
-
-
-class FakeLdapPool(FakeLdap):
- """Emulate the python-ldap API with pooled connections.
-
- This class is used as connector class in PooledLDAPHandler.
-
- """
-
- def __init__(self, uri, retry_max=None, retry_delay=None, conn=None):
- super(FakeLdapPool, self).__init__(conn=conn)
- self.url = uri
- self.connected = None
- self.conn = self
- self._connection_time = 5 # any number greater than 0
-
- def get_lifetime(self):
- return self._connection_time
-
- def simple_bind_s(self, who=None, cred=None,
- serverctrls=None, clientctrls=None):
- if self.url.startswith('fakepool://memory'):
- if self.url not in FakeShelves:
- FakeShelves[self.url] = FakeShelve()
- self.db = FakeShelves[self.url]
- else:
- self.db = shelve.open(self.url[11:])
-
- if not who:
- who = 'cn=Admin'
- if not cred:
- cred = 'password'
-
- super(FakeLdapPool, self).simple_bind_s(who=who, cred=cred,
- serverctrls=serverctrls,
- clientctrls=clientctrls)
-
- def unbind_ext_s(self):
- """Added to extend FakeLdap as connector class."""
- pass
-
-
-class FakeLdapNoSubtreeDelete(FakeLdap):
- """FakeLdap subclass that does not support subtree delete
-
- Same as FakeLdap except delete will throw the LDAP error
- ldap.NOT_ALLOWED_ON_NONLEAF if there is an attempt to delete
- an entry that has children.
- """
-
- def delete_ext_s(self, dn, serverctrls, clientctrls=None):
- """Remove the ldap object at specified dn."""
- if server_fail:
- raise ldap.SERVER_DOWN
-
- try:
- children = self._getChildren(dn)
- if children:
- raise ldap.NOT_ALLOWED_ON_NONLEAF
-
- except KeyError:
- LOG.debug('delete item failed: dn=%s not found.',
- core.utf8_decode(dn))
- raise ldap.NO_SUCH_OBJECT
- super(FakeLdapNoSubtreeDelete, self).delete_ext_s(dn,
- serverctrls,
- clientctrls)