aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/core.py
diff options
context:
space:
mode:
authorRuan HE <ruan.he@orange.com>2016-06-09 08:12:34 +0000
committerGerrit Code Review <gerrit@172.30.200.206>2016-06-09 08:12:34 +0000
commit4bc079a2664f9a407e332291f34d174625a9d5ea (patch)
tree7481cd5d0a9b3ce37c44c797a1e0d39881221cbe /keystone-moon/keystone/tests/unit/core.py
parent2f179c5790fbbf6144205d3c6e5089e6eb5f048a (diff)
parent2e7b4f2027a1147ca28301e4f88adf8274b39a1f (diff)
Merge "Update Keystone core to Mitaka."
Diffstat (limited to 'keystone-moon/keystone/tests/unit/core.py')
-rw-r--r--keystone-moon/keystone/tests/unit/core.py388
1 files changed, 288 insertions, 100 deletions
diff --git a/keystone-moon/keystone/tests/unit/core.py b/keystone-moon/keystone/tests/unit/core.py
index eb8b9f65..1054e131 100644
--- a/keystone-moon/keystone/tests/unit/core.py
+++ b/keystone-moon/keystone/tests/unit/core.py
@@ -14,8 +14,11 @@
from __future__ import absolute_import
import atexit
+import base64
import datetime
import functools
+import hashlib
+import json
import logging
import os
import re
@@ -28,14 +31,16 @@ import warnings
import fixtures
from oslo_config import cfg
from oslo_config import fixture as config_fixture
+from oslo_context import context as oslo_context
+from oslo_context import fixture as oslo_ctx_fixture
from oslo_log import fixture as log_fixture
from oslo_log import log
from oslo_utils import timeutils
-import oslotest.base as oslotest
from oslotest import mockpatch
from paste.deploy import loadwsgi
import six
from sqlalchemy import exc
+import testtools
from testtools import testcase
# NOTE(ayoung)
@@ -45,24 +50,20 @@ from keystone.common import environment # noqa
environment.use_eventlet()
from keystone import auth
-from keystone.common import config as common_cfg
+from keystone.common import config
from keystone.common import dependency
-from keystone.common import kvs
from keystone.common.kvs import core as kvs_core
from keystone.common import sql
-from keystone import config
-from keystone import controllers
from keystone import exception
from keystone import notifications
-from keystone.policy.backends import rules
from keystone.server import common
-from keystone import service
from keystone.tests.unit import ksfixtures
+from keystone.version import controllers
+from keystone.version import service
config.configure()
-LOG = log.getLogger(__name__)
PID = six.text_type(os.getpid())
TESTSDIR = os.path.dirname(os.path.abspath(__file__))
TESTCONF = os.path.join(TESTSDIR, 'config_files')
@@ -82,7 +83,6 @@ TMPDIR = _calc_tmpdir()
CONF = cfg.CONF
log.register_options(CONF)
-rules.init()
IN_MEM_DB_CONN_STRING = 'sqlite://'
@@ -208,6 +208,22 @@ def skip_if_cache_disabled(*sections):
return wrapper
+def skip_if_cache_is_enabled(*sections):
+ def wrapper(f):
+ @functools.wraps(f)
+ def inner(*args, **kwargs):
+ if CONF.cache.enabled:
+ for s in sections:
+ conf_sec = getattr(CONF, s, None)
+ if conf_sec is not None:
+ if getattr(conf_sec, 'caching', True):
+ raise testcase.TestSkipped('%s caching enabled.' %
+ s)
+ return f(*args, **kwargs)
+ return inner
+ return wrapper
+
+
def skip_if_no_multiple_domains_support(f):
"""Decorator to skip tests for identity drivers limited to one domain."""
@functools.wraps(f)
@@ -223,113 +239,230 @@ class UnexpectedExit(Exception):
pass
-def new_ref():
- """Populates a ref with attributes common to some API entities."""
- return {
+def new_region_ref(parent_region_id=None, **kwargs):
+ ref = {
'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
- 'enabled': True}
-
+ 'parent_region_id': parent_region_id}
-def new_region_ref():
- ref = new_ref()
- # Region doesn't have name or enabled.
- del ref['name']
- del ref['enabled']
- ref['parent_region_id'] = None
+ ref.update(kwargs)
return ref
-def new_service_ref():
- ref = new_ref()
- ref['type'] = uuid.uuid4().hex
+def new_service_ref(**kwargs):
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True,
+ 'type': uuid.uuid4().hex,
+ }
+ ref.update(kwargs)
return ref
-def new_endpoint_ref(service_id, interface='public', default_region_id=None,
- **kwargs):
- ref = new_ref()
- del ref['enabled'] # enabled is optional
- ref['interface'] = interface
- ref['service_id'] = service_id
- ref['url'] = 'https://' + uuid.uuid4().hex + '.com'
- ref['region_id'] = default_region_id
+NEEDS_REGION_ID = object()
+
+
+def new_endpoint_ref(service_id, interface='public',
+ region_id=NEEDS_REGION_ID, **kwargs):
+
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'interface': interface,
+ 'service_id': service_id,
+ 'url': 'https://' + uuid.uuid4().hex + '.com',
+ }
+
+ if region_id is NEEDS_REGION_ID:
+ ref['region_id'] = uuid.uuid4().hex
+ elif region_id is None and kwargs.get('region') is not None:
+ # pre-3.2 form endpoints are not supported by this function
+ raise NotImplementedError("use new_endpoint_ref_with_region")
+ else:
+ ref['region_id'] = region_id
ref.update(kwargs)
return ref
-def new_domain_ref():
- ref = new_ref()
+def new_endpoint_ref_with_region(service_id, region, interface='public',
+ **kwargs):
+ """Define an endpoint_ref having a pre-3.2 form.
+
+ Contains the deprecated 'region' instead of 'region_id'.
+ """
+ ref = new_endpoint_ref(service_id, interface, region=region,
+ region_id='invalid', **kwargs)
+ del ref['region_id']
return ref
-def new_project_ref(domain_id=None, parent_id=None, is_domain=False):
- ref = new_ref()
- ref['domain_id'] = domain_id
- ref['parent_id'] = parent_id
- ref['is_domain'] = is_domain
+def new_domain_ref(**kwargs):
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True
+ }
+ ref.update(kwargs)
+ return ref
+
+
+def new_project_ref(domain_id=None, is_domain=False, **kwargs):
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True,
+ 'domain_id': domain_id,
+ 'is_domain': is_domain,
+ }
+ # NOTE(henry-nash): We don't include parent_id in the initial list above
+ # since specifying it is optional depending on where the project sits in
+ # the hierarchy (and a parent_id of None has meaning - i.e. it's a top
+ # level project).
+ ref.update(kwargs)
return ref
-def new_user_ref(domain_id, project_id=None):
- ref = new_ref()
- ref['domain_id'] = domain_id
- ref['email'] = uuid.uuid4().hex
- ref['password'] = uuid.uuid4().hex
+def new_user_ref(domain_id, project_id=None, **kwargs):
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True,
+ 'domain_id': domain_id,
+ 'email': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ }
if project_id:
ref['default_project_id'] = project_id
+ ref.update(kwargs)
return ref
-def new_group_ref(domain_id):
- ref = new_ref()
- ref['domain_id'] = domain_id
+def new_federated_user_ref(idp_id=None, protocol_id=None, **kwargs):
+ ref = {
+ 'idp_id': idp_id or 'ORG_IDP',
+ 'protocol_id': protocol_id or 'saml2',
+ 'unique_id': uuid.uuid4().hex,
+ 'display_name': uuid.uuid4().hex,
+ }
+ ref.update(kwargs)
return ref
-def new_credential_ref(user_id, project_id=None, cred_type=None):
- ref = dict()
- ref['id'] = uuid.uuid4().hex
- ref['user_id'] = user_id
- if cred_type == 'ec2':
- ref['type'] = 'ec2'
- ref['blob'] = uuid.uuid4().hex
- else:
- ref['type'] = 'cert'
- ref['blob'] = uuid.uuid4().hex
+def new_group_ref(domain_id, **kwargs):
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'domain_id': domain_id
+ }
+ ref.update(kwargs)
+ return ref
+
+
+def new_credential_ref(user_id, project_id=None, type='cert', **kwargs):
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'user_id': user_id,
+ 'type': type,
+ }
+
if project_id:
ref['project_id'] = project_id
+ if 'blob' not in kwargs:
+ ref['blob'] = uuid.uuid4().hex
+
+ ref.update(kwargs)
return ref
-def new_role_ref():
- ref = new_ref()
- # Roles don't have a description or the enabled flag
- del ref['description']
- del ref['enabled']
+def new_cert_credential(user_id, project_id=None, blob=None, **kwargs):
+ if blob is None:
+ blob = {'access': uuid.uuid4().hex, 'secret': uuid.uuid4().hex}
+
+ credential = new_credential_ref(user_id=user_id,
+ project_id=project_id,
+ blob=json.dumps(blob),
+ type='cert',
+ **kwargs)
+ return blob, credential
+
+
+def new_ec2_credential(user_id, project_id=None, blob=None, **kwargs):
+ if blob is None:
+ blob = {
+ 'access': uuid.uuid4().hex,
+ 'secret': uuid.uuid4().hex,
+ 'trust_id': None
+ }
+
+ if 'id' not in kwargs:
+ access = blob['access'].encode('utf-8')
+ kwargs['id'] = hashlib.sha256(access).hexdigest()
+
+ credential = new_credential_ref(user_id=user_id,
+ project_id=project_id,
+ blob=json.dumps(blob),
+ type='ec2',
+ **kwargs)
+ return blob, credential
+
+
+def new_totp_credential(user_id, project_id=None, blob=None):
+ if not blob:
+ blob = base64.b32encode(uuid.uuid4().hex).rstrip('=')
+ credential = new_credential_ref(user_id=user_id,
+ project_id=project_id,
+ blob=blob,
+ type='totp')
+ return credential
+
+
+def new_role_ref(**kwargs):
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': None
+ }
+ ref.update(kwargs)
return ref
-def new_policy_ref():
- ref = new_ref()
- ref['blob'] = uuid.uuid4().hex
- ref['type'] = uuid.uuid4().hex
+def new_policy_ref(**kwargs):
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True,
+ # Store serialized JSON data as the blob to mimic real world usage.
+ 'blob': json.dumps({'data': uuid.uuid4().hex, }),
+ 'type': uuid.uuid4().hex,
+ }
+
+ ref.update(kwargs)
return ref
def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
impersonation=None, expires=None, role_ids=None,
role_names=None, remaining_uses=None,
- allow_redelegation=False):
- ref = dict()
- ref['id'] = uuid.uuid4().hex
- ref['trustor_user_id'] = trustor_user_id
- ref['trustee_user_id'] = trustee_user_id
- ref['impersonation'] = impersonation or False
- ref['project_id'] = project_id
- ref['remaining_uses'] = remaining_uses
- ref['allow_redelegation'] = allow_redelegation
+ allow_redelegation=False, redelegation_count=None, **kwargs):
+ ref = {
+ 'id': uuid.uuid4().hex,
+ 'trustor_user_id': trustor_user_id,
+ 'trustee_user_id': trustee_user_id,
+ 'impersonation': impersonation or False,
+ 'project_id': project_id,
+ 'remaining_uses': remaining_uses,
+ 'allow_redelegation': allow_redelegation,
+ }
+
+ if isinstance(redelegation_count, int):
+ ref.update(redelegation_count=redelegation_count)
if isinstance(expires, six.string_types):
ref['expires_at'] = expires
@@ -351,10 +484,25 @@ def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
for role_name in role_names:
ref['roles'].append({'name': role_name})
+ ref.update(kwargs)
return ref
-class BaseTestCase(oslotest.BaseTestCase):
+def create_user(api, domain_id, **kwargs):
+ """Create a user via the API. Keep the created password.
+
+ The password is saved and restored when api.create_user() is called.
+ Only use this routine if there is a requirement for the user object to
+ have a valid password after api.create_user() is called.
+ """
+ user = new_user_ref(domain_id=domain_id, **kwargs)
+ password = user['password']
+ user = api.create_user(user)
+ user['password'] = password
+ return user
+
+
+class BaseTestCase(testtools.TestCase):
"""Light weight base test class.
This is a placeholder that will eventually go away once the
@@ -365,6 +513,10 @@ class BaseTestCase(oslotest.BaseTestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
+
+ self.useFixture(fixtures.NestedTempfile())
+ self.useFixture(fixtures.TempHomeDir())
+
self.useFixture(mockpatch.PatchObject(sys, 'exit',
side_effect=UnexpectedExit))
self.useFixture(log_fixture.get_logging_handle_error_fixture())
@@ -373,6 +525,10 @@ class BaseTestCase(oslotest.BaseTestCase):
module='^keystone\\.')
warnings.simplefilter('error', exc.SAWarning)
self.addCleanup(warnings.resetwarnings)
+ # Ensure we have an empty threadlocal context at the start of each
+ # test.
+ self.assertIsNone(oslo_context.get_current())
+ self.useFixture(oslo_ctx_fixture.ClearRequestContext())
def cleanup_instance(self, *names):
"""Create a function suitable for use with self.addCleanup.
@@ -395,6 +551,9 @@ class TestCase(BaseTestCase):
def config_files(self):
return []
+ def _policy_fixture(self):
+ return ksfixtures.Policy(dirs.etc('policy.json'), self.config_fixture)
+
def config_overrides(self):
# NOTE(morganfainberg): enforce config_overrides can only ever be
# called a single time.
@@ -403,18 +562,19 @@ class TestCase(BaseTestCase):
signing_certfile = 'examples/pki/certs/signing_cert.pem'
signing_keyfile = 'examples/pki/private/signing_key.pem'
- self.config_fixture.config(group='oslo_policy',
- policy_file=dirs.etc('policy.json'))
+
+ self.useFixture(self._policy_fixture())
+
self.config_fixture.config(
# TODO(morganfainberg): Make Cache Testing a separate test case
# in tempest, and move it out of the base unit tests.
group='cache',
backend='dogpile.cache.memory',
enabled=True,
- proxies=['keystone.tests.unit.test_cache.CacheIsolatingProxy'])
+ proxies=['oslo_cache.testing.CacheIsolatingProxy'])
self.config_fixture.config(
group='catalog',
- driver='templated',
+ driver='sql',
template_file=dirs.tests('default_catalog.templates'))
self.config_fixture.config(
group='kvs',
@@ -422,7 +582,6 @@ class TestCase(BaseTestCase):
('keystone.tests.unit.test_kvs.'
'KVSBackendForcedKeyMangleFixture'),
'keystone.tests.unit.test_kvs.KVSBackendFixture'])
- self.config_fixture.config(group='revoke', driver='kvs')
self.config_fixture.config(
group='signing', certfile=signing_certfile,
keyfile=signing_keyfile,
@@ -444,17 +603,15 @@ class TestCase(BaseTestCase):
'routes.middleware=INFO',
'stevedore.extension=INFO',
'keystone.notifications=INFO',
- 'keystone.common._memcache_pool=INFO',
'keystone.common.ldap=INFO',
])
self.auth_plugin_config_override()
def auth_plugin_config_override(self, methods=None, **method_classes):
- if methods is not None:
- self.config_fixture.config(group='auth', methods=methods)
- common_cfg.setup_authentication()
- if method_classes:
- self.config_fixture.config(group='auth', **method_classes)
+ self.useFixture(
+ ksfixtures.ConfigAuthPlugins(self.config_fixture,
+ methods,
+ **method_classes))
def _assert_config_overrides_called(self):
assert self.__config_overrides_called is True
@@ -462,6 +619,7 @@ class TestCase(BaseTestCase):
def setUp(self):
super(TestCase, self).setUp()
self.__config_overrides_called = False
+ self.__load_backends_called = False
self.addCleanup(CONF.reset)
self.config_fixture = self.useFixture(config_fixture.Config(CONF))
self.addCleanup(delattr, self, 'config_fixture')
@@ -473,9 +631,10 @@ class TestCase(BaseTestCase):
def mocked_register_auth_plugin_opt(conf, opt):
self.config_fixture.register_opt(opt, group='auth')
self.useFixture(mockpatch.PatchObject(
- common_cfg, '_register_auth_plugin_opt',
+ config, '_register_auth_plugin_opt',
new=mocked_register_auth_plugin_opt))
+ self.sql_driver_version_overrides = {}
self.config_overrides()
# NOTE(morganfainberg): ensure config_overrides has been called.
self.addCleanup(self._assert_config_overrides_called)
@@ -498,8 +657,6 @@ class TestCase(BaseTestCase):
# tests aren't used.
self.addCleanup(dependency.reset)
- self.addCleanup(kvs.INMEMDB.clear)
-
# Ensure Notification subscriptions and resource types are empty
self.addCleanup(notifications.clear_subscribers)
self.addCleanup(notifications.reset_notifier)
@@ -515,7 +672,6 @@ class TestCase(BaseTestCase):
def load_backends(self):
"""Initializes each manager and assigns them to an attribute."""
-
# TODO(blk-u): Shouldn't need to clear the registry here, but some
# tests call load_backends multiple times. These should be fixed to
# only call load_backends once.
@@ -541,7 +697,7 @@ class TestCase(BaseTestCase):
This is useful to load managers initialized by extensions. No extra
backends are loaded by default.
- :return: dict of name -> manager
+ :returns: dict of name -> manager
"""
return {}
@@ -573,7 +729,8 @@ class TestCase(BaseTestCase):
fixtures_to_cleanup.append(attrname)
for tenant in fixtures.TENANTS:
- if hasattr(self, 'tenant_%s' % tenant['id']):
+ tenant_attr_name = 'tenant_%s' % tenant['name'].lower()
+ if hasattr(self, tenant_attr_name):
try:
# This will clear out any roles on the project as well
self.resource_api.delete_project(tenant['id'])
@@ -582,9 +739,8 @@ class TestCase(BaseTestCase):
rv = self.resource_api.create_project(
tenant['id'], tenant)
- attrname = 'tenant_%s' % tenant['id']
- setattr(self, attrname, rv)
- fixtures_to_cleanup.append(attrname)
+ setattr(self, tenant_attr_name, rv)
+ fixtures_to_cleanup.append(tenant_attr_name)
for role in fixtures.ROLES:
try:
@@ -625,6 +781,17 @@ class TestCase(BaseTestCase):
setattr(self, attrname, user_copy)
fixtures_to_cleanup.append(attrname)
+ for role_assignment in fixtures.ROLE_ASSIGNMENTS:
+ role_id = role_assignment['role_id']
+ user = role_assignment['user']
+ tenant_id = role_assignment['tenant_id']
+ user_id = getattr(self, 'user_%s' % user)['id']
+ try:
+ self.assignment_api.add_role_to_user_and_project(
+ user_id, tenant_id, role_id)
+ except exception.Conflict:
+ pass
+
self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
def _paste_config(self, config):
@@ -648,6 +815,10 @@ class TestCase(BaseTestCase):
:param delta: Maximum allowable time delta, defined in seconds.
"""
+ if a == b:
+ # Short-circuit if the values are the same.
+ return
+
msg = '%s != %s within %s delta' % (a, b, delta)
self.assertTrue(abs(a - b).seconds <= delta, msg)
@@ -664,11 +835,11 @@ class TestCase(BaseTestCase):
if isinstance(expected_regexp, six.string_types):
expected_regexp = re.compile(expected_regexp)
- if isinstance(exc_value.args[0], unicode):
- if not expected_regexp.search(unicode(exc_value)):
+ if isinstance(exc_value.args[0], six.text_type):
+ if not expected_regexp.search(six.text_type(exc_value)):
raise self.failureException(
'"%s" does not match "%s"' %
- (expected_regexp.pattern, unicode(exc_value)))
+ (expected_regexp.pattern, six.text_type(exc_value)))
else:
if not expected_regexp.search(str(exc_value)):
raise self.failureException(
@@ -708,12 +879,29 @@ class TestCase(BaseTestCase):
class SQLDriverOverrides(object):
"""A mixin for consolidating sql-specific test overrides."""
+
def config_overrides(self):
super(SQLDriverOverrides, self).config_overrides()
# SQL specific driver overrides
self.config_fixture.config(group='catalog', driver='sql')
self.config_fixture.config(group='identity', driver='sql')
self.config_fixture.config(group='policy', driver='sql')
- self.config_fixture.config(group='revoke', driver='sql')
self.config_fixture.config(group='token', driver='sql')
self.config_fixture.config(group='trust', driver='sql')
+
+ def use_specific_sql_driver_version(self, driver_path,
+ versionless_backend, version_suffix):
+ """Add this versioned driver to the list that will be loaded.
+
+ :param driver_path: The path to the drivers, e.g. 'keystone.assignment'
+ :param versionless_backend: The name of the versionless drivers, e.g.
+ 'backends'
+ :param version_suffix: The suffix for the version , e.g. ``V8_``
+
+ This method assumes that versioned drivers are named:
+ <version_suffix><name of versionless driver>, e.g. 'V8_backends'.
+
+ """
+ self.sql_driver_version_overrides[driver_path] = {
+ 'versionless_backend': versionless_backend,
+ 'versioned_backend': version_suffix + versionless_backend}