diff options
author | WuKong <rebirthmonkey@gmail.com> | 2015-06-30 18:47:29 +0200 |
---|---|---|
committer | WuKong <rebirthmonkey@gmail.com> | 2015-06-30 18:47:29 +0200 |
commit | b8c756ecdd7cced1db4300935484e8c83701c82e (patch) | |
tree | 87e51107d82b217ede145de9d9d59e2100725bd7 /keystone-moon/keystone/tests/unit/core.py | |
parent | c304c773bae68fb854ed9eab8fb35c4ef17cf136 (diff) |
migrate moon code from github to opnfv
Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604
Signed-off-by: WuKong <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/core.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/core.py | 660 |
1 files changed, 660 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/core.py b/keystone-moon/keystone/tests/unit/core.py new file mode 100644 index 00000000..caca7dbd --- /dev/null +++ b/keystone-moon/keystone/tests/unit/core.py @@ -0,0 +1,660 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import +import atexit +import functools +import logging +import os +import re +import shutil +import socket +import sys +import warnings + +import fixtures +from oslo_config import cfg +from oslo_config import fixture as config_fixture +from oslo_log import log +import oslotest.base as oslotest +from oslotest import mockpatch +import six +from sqlalchemy import exc +from testtools import testcase +import webob + +# NOTE(ayoung) +# environment.use_eventlet must run before any of the code that will +# call the eventlet monkeypatching. +from keystone.common import environment # noqa +environment.use_eventlet() + +from keystone import auth +from keystone.common import config as common_cfg +from keystone.common import dependency +from keystone.common import kvs +from keystone.common.kvs import core as kvs_core +from keystone import config +from keystone import controllers +from keystone import exception +from keystone import notifications +from keystone.policy.backends import rules +from keystone.server import common +from keystone import service +from keystone.tests.unit import ksfixtures + + +config.configure() + +LOG = log.getLogger(__name__) +PID = six.text_type(os.getpid()) +TESTSDIR = os.path.dirname(os.path.abspath(__file__)) +TESTCONF = os.path.join(TESTSDIR, 'config_files') +ROOTDIR = os.path.normpath(os.path.join(TESTSDIR, '..', '..', '..')) +VENDOR = os.path.join(ROOTDIR, 'vendor') +ETCDIR = os.path.join(ROOTDIR, 'etc') + + +def _calc_tmpdir(): + env_val = os.environ.get('KEYSTONE_TEST_TEMP_DIR') + if not env_val: + return os.path.join(TESTSDIR, 'tmp', PID) + return os.path.join(env_val, PID) + + +TMPDIR = _calc_tmpdir() + +CONF = cfg.CONF +log.register_options(CONF) +rules.init() + +IN_MEM_DB_CONN_STRING = 'sqlite://' + +exception._FATAL_EXCEPTION_FORMAT_ERRORS = True +os.makedirs(TMPDIR) +atexit.register(shutil.rmtree, TMPDIR) + + +class dirs(object): + @staticmethod + def root(*p): + return os.path.join(ROOTDIR, *p) + + @staticmethod + def etc(*p): + return os.path.join(ETCDIR, *p) + + @staticmethod + def tests(*p): + return os.path.join(TESTSDIR, *p) + + @staticmethod + def tmp(*p): + return os.path.join(TMPDIR, *p) + + @staticmethod + def tests_conf(*p): + return os.path.join(TESTCONF, *p) + + +# keystone.common.sql.initialize() for testing. +DEFAULT_TEST_DB_FILE = dirs.tmp('test.db') + + +@atexit.register +def remove_test_databases(): + db = dirs.tmp('test.db') + if os.path.exists(db): + os.unlink(db) + pristine = dirs.tmp('test.db.pristine') + if os.path.exists(pristine): + os.unlink(pristine) + + +def generate_paste_config(extension_name): + # Generate a file, based on keystone-paste.ini, that is named: + # extension_name.ini, and includes extension_name in the pipeline + with open(dirs.etc('keystone-paste.ini'), 'r') as f: + contents = f.read() + + new_contents = contents.replace(' service_v3', + ' %s service_v3' % (extension_name)) + + new_paste_file = dirs.tmp(extension_name + '.ini') + with open(new_paste_file, 'w') as f: + f.write(new_contents) + + return new_paste_file + + +def remove_generated_paste_config(extension_name): + # Remove the generated paste config file, named extension_name.ini + paste_file_to_remove = dirs.tmp(extension_name + '.ini') + os.remove(paste_file_to_remove) + + +def skip_if_cache_disabled(*sections): + """This decorator is used to skip a test if caching is disabled either + globally or for the specific section. + + In the code fragment:: + + @skip_if_cache_is_disabled('assignment', 'token') + def test_method(*args): + ... + + The method test_method would be skipped if caching is disabled globally via + the `enabled` option in the `cache` section of the configuration or if + the `caching` option is set to false in either `assignment` or `token` + sections of the configuration. This decorator can be used with no + arguments to only check global caching. + + If a specified configuration section does not define the `caching` option, + this decorator makes the same assumption as the `should_cache_fn` in + keystone.common.cache that caching should be enabled. + """ + def wrapper(f): + @functools.wraps(f) + def inner(*args, **kwargs): + if not CONF.cache.enabled: + raise testcase.TestSkipped('Cache globally disabled.') + for s in sections: + conf_sec = getattr(CONF, s, None) + if conf_sec is not None: + if not getattr(conf_sec, 'caching', True): + raise testcase.TestSkipped('%s caching disabled.' % s) + return f(*args, **kwargs) + return inner + return wrapper + + +def skip_if_no_multiple_domains_support(f): + """This decorator is used to skip a test if an identity driver + does not support multiple domains. + """ + @functools.wraps(f) + def wrapper(*args, **kwargs): + test_obj = args[0] + if not test_obj.identity_api.multiple_domains_supported: + raise testcase.TestSkipped('No multiple domains support') + return f(*args, **kwargs) + return wrapper + + +class UnexpectedExit(Exception): + pass + + +class BadLog(Exception): + """Raised on invalid call to logging (parameter mismatch).""" + pass + + +class TestClient(object): + def __init__(self, app=None, token=None): + self.app = app + self.token = token + + def request(self, method, path, headers=None, body=None): + if headers is None: + headers = {} + + if self.token: + headers.setdefault('X-Auth-Token', self.token) + + req = webob.Request.blank(path) + req.method = method + for k, v in six.iteritems(headers): + req.headers[k] = v + if body: + req.body = body + return req.get_response(self.app) + + def get(self, path, headers=None): + return self.request('GET', path=path, headers=headers) + + def post(self, path, headers=None, body=None): + return self.request('POST', path=path, headers=headers, body=body) + + def put(self, path, headers=None, body=None): + return self.request('PUT', path=path, headers=headers, body=body) + + +class BaseTestCase(oslotest.BaseTestCase): + """Light weight base test class. + + This is a placeholder that will eventually go away once the + setup/teardown in TestCase is properly trimmed down to the bare + essentials. This is really just a play to speed up the tests by + eliminating unnecessary work. + """ + + def setUp(self): + super(BaseTestCase, self).setUp() + self.useFixture(mockpatch.PatchObject(sys, 'exit', + side_effect=UnexpectedExit)) + + def cleanup_instance(self, *names): + """Create a function suitable for use with self.addCleanup. + + :returns: a callable that uses a closure to delete instance attributes + + """ + def cleanup(): + for name in names: + # TODO(dstanek): remove this 'if' statement once + # load_backend in test_backend_ldap is only called once + # per test + if hasattr(self, name): + delattr(self, name) + return cleanup + + +@dependency.requires('revoke_api') +class TestCase(BaseTestCase): + + def config_files(self): + return [] + + def config_overrides(self): + signing_certfile = 'examples/pki/certs/signing_cert.pem' + signing_keyfile = 'examples/pki/private/signing_key.pem' + self.config_fixture.config(group='oslo_policy', + policy_file=dirs.etc('policy.json')) + self.config_fixture.config( + # TODO(morganfainberg): Make Cache Testing a separate test case + # in tempest, and move it out of the base unit tests. + group='cache', + backend='dogpile.cache.memory', + enabled=True, + proxies=['keystone.tests.unit.test_cache.CacheIsolatingProxy']) + self.config_fixture.config( + group='catalog', + driver='keystone.catalog.backends.templated.Catalog', + template_file=dirs.tests('default_catalog.templates')) + self.config_fixture.config( + group='identity', + driver='keystone.identity.backends.sql.Identity') + self.config_fixture.config( + group='kvs', + backends=[ + ('keystone.tests.unit.test_kvs.' + 'KVSBackendForcedKeyMangleFixture'), + 'keystone.tests.unit.test_kvs.KVSBackendFixture']) + self.config_fixture.config( + group='revoke', + driver='keystone.contrib.revoke.backends.kvs.Revoke') + self.config_fixture.config( + group='signing', certfile=signing_certfile, + keyfile=signing_keyfile, + ca_certs='examples/pki/certs/cacert.pem') + self.config_fixture.config( + group='token', + driver='keystone.token.persistence.backends.kvs.Token') + self.config_fixture.config( + group='trust', + driver='keystone.trust.backends.sql.Trust') + self.config_fixture.config( + group='saml', certfile=signing_certfile, keyfile=signing_keyfile) + self.config_fixture.config( + default_log_levels=[ + 'amqp=WARN', + 'amqplib=WARN', + 'boto=WARN', + 'qpid=WARN', + 'sqlalchemy=WARN', + 'suds=INFO', + 'oslo.messaging=INFO', + 'iso8601=WARN', + 'requests.packages.urllib3.connectionpool=WARN', + 'routes.middleware=INFO', + 'stevedore.extension=INFO', + 'keystone.notifications=INFO', + 'keystone.common._memcache_pool=INFO', + 'keystone.common.ldap=INFO', + ]) + self.auth_plugin_config_override() + + def auth_plugin_config_override(self, methods=None, **method_classes): + if methods is None: + methods = ['external', 'password', 'token', ] + if not method_classes: + method_classes = dict( + external='keystone.auth.plugins.external.DefaultDomain', + password='keystone.auth.plugins.password.Password', + token='keystone.auth.plugins.token.Token', + ) + self.config_fixture.config(group='auth', methods=methods) + common_cfg.setup_authentication() + if method_classes: + self.config_fixture.config(group='auth', **method_classes) + + def setUp(self): + super(TestCase, self).setUp() + self.addCleanup(self.cleanup_instance('config_fixture', 'logger')) + + self.addCleanup(CONF.reset) + + self.useFixture(mockpatch.PatchObject(logging.Handler, 'handleError', + side_effect=BadLog)) + self.config_fixture = self.useFixture(config_fixture.Config(CONF)) + self.config(self.config_files()) + + # NOTE(morganfainberg): mock the auth plugin setup to use the config + # fixture which automatically unregisters options when performing + # cleanup. + def mocked_register_auth_plugin_opt(conf, opt): + self.config_fixture.register_opt(opt, group='auth') + self.register_auth_plugin_opt_patch = self.useFixture( + mockpatch.PatchObject(common_cfg, '_register_auth_plugin_opt', + new=mocked_register_auth_plugin_opt)) + + self.config_overrides() + + self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + + # NOTE(morganfainberg): This code is a copy from the oslo-incubator + # log module. This is not in a function or otherwise available to use + # without having a CONF object to setup logging. This should help to + # reduce the log size by limiting what we log (similar to how Keystone + # would run under mod_wsgi or eventlet). + for pair in CONF.default_log_levels: + mod, _sep, level_name = pair.partition('=') + logger = logging.getLogger(mod) + logger.setLevel(level_name) + + warnings.filterwarnings('error', category=DeprecationWarning, + module='^keystone\\.') + warnings.simplefilter('error', exc.SAWarning) + self.addCleanup(warnings.resetwarnings) + + self.useFixture(ksfixtures.Cache()) + + # Clear the registry of providers so that providers from previous + # tests aren't used. + self.addCleanup(dependency.reset) + + self.addCleanup(kvs.INMEMDB.clear) + + # Ensure Notification subscriptions and resource types are empty + self.addCleanup(notifications.clear_subscribers) + self.addCleanup(notifications.reset_notifier) + + # Reset the auth-plugin registry + self.addCleanup(self.clear_auth_plugin_registry) + + self.addCleanup(setattr, controllers, '_VERSIONS', []) + + def config(self, config_files): + CONF(args=[], project='keystone', default_config_files=config_files) + + def load_backends(self): + """Initializes each manager and assigns them to an attribute.""" + + # TODO(blk-u): Shouldn't need to clear the registry here, but some + # tests call load_backends multiple times. These should be fixed to + # only call load_backends once. + dependency.reset() + + # TODO(morganfainberg): Shouldn't need to clear the registry here, but + # some tests call load_backends multiple times. Since it is not + # possible to re-configure a backend, we need to clear the list. This + # should eventually be removed once testing has been cleaned up. + kvs_core.KEY_VALUE_STORE_REGISTRY.clear() + + self.clear_auth_plugin_registry() + drivers, _unused = common.setup_backends( + load_extra_backends_fn=self.load_extra_backends) + + for manager_name, manager in six.iteritems(drivers): + setattr(self, manager_name, manager) + self.addCleanup(self.cleanup_instance(*drivers.keys())) + + def load_extra_backends(self): + """Override to load managers that aren't loaded by default. + + This is useful to load managers initialized by extensions. No extra + backends are loaded by default. + + :return: dict of name -> manager + """ + return {} + + def load_fixtures(self, fixtures): + """Hacky basic and naive fixture loading based on a python module. + + Expects that the various APIs into the various services are already + defined on `self`. + + """ + # NOTE(dstanek): create a list of attribute names to be removed + # from this instance during cleanup + fixtures_to_cleanup = [] + + # TODO(termie): doing something from json, probably based on Django's + # loaddata will be much preferred. + if (hasattr(self, 'identity_api') and + hasattr(self, 'assignment_api') and + hasattr(self, 'resource_api')): + for domain in fixtures.DOMAINS: + try: + rv = self.resource_api.create_domain(domain['id'], domain) + except exception.Conflict: + rv = self.resource_api.get_domain(domain['id']) + except exception.NotImplemented: + rv = domain + attrname = 'domain_%s' % domain['id'] + setattr(self, attrname, rv) + fixtures_to_cleanup.append(attrname) + + for tenant in fixtures.TENANTS: + if hasattr(self, 'tenant_%s' % tenant['id']): + try: + # This will clear out any roles on the project as well + self.resource_api.delete_project(tenant['id']) + except exception.ProjectNotFound: + pass + rv = self.resource_api.create_project( + tenant['id'], tenant) + + attrname = 'tenant_%s' % tenant['id'] + setattr(self, attrname, rv) + fixtures_to_cleanup.append(attrname) + + for role in fixtures.ROLES: + try: + rv = self.role_api.create_role(role['id'], role) + except exception.Conflict: + rv = self.role_api.get_role(role['id']) + attrname = 'role_%s' % role['id'] + setattr(self, attrname, rv) + fixtures_to_cleanup.append(attrname) + + for user in fixtures.USERS: + user_copy = user.copy() + tenants = user_copy.pop('tenants') + try: + existing_user = getattr(self, 'user_%s' % user['id'], None) + if existing_user is not None: + self.identity_api.delete_user(existing_user['id']) + except exception.UserNotFound: + pass + + # For users, the manager layer will generate the ID + user_copy = self.identity_api.create_user(user_copy) + # Our tests expect that the password is still in the user + # record so that they can reference it, so put it back into + # the dict returned. + user_copy['password'] = user['password'] + + for tenant_id in tenants: + try: + self.assignment_api.add_user_to_project( + tenant_id, user_copy['id']) + except exception.Conflict: + pass + # Use the ID from the fixture as the attribute name, so + # that our tests can easily reference each user dict, while + # the ID in the dict will be the real public ID. + attrname = 'user_%s' % user['id'] + setattr(self, attrname, user_copy) + fixtures_to_cleanup.append(attrname) + + self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup)) + + def _paste_config(self, config): + if not config.startswith('config:'): + test_path = os.path.join(TESTSDIR, config) + etc_path = os.path.join(ROOTDIR, 'etc', config) + for path in [test_path, etc_path]: + if os.path.exists('%s-paste.ini' % path): + return 'config:%s-paste.ini' % path + return config + + def loadapp(self, config, name='main'): + return service.loadapp(self._paste_config(config), name=name) + + def clear_auth_plugin_registry(self): + auth.controllers.AUTH_METHODS.clear() + auth.controllers.AUTH_PLUGINS_LOADED = False + + def assertCloseEnoughForGovernmentWork(self, a, b, delta=3): + """Asserts that two datetimes are nearly equal within a small delta. + + :param delta: Maximum allowable time delta, defined in seconds. + """ + msg = '%s != %s within %s delta' % (a, b, delta) + + self.assertTrue(abs(a - b).seconds <= delta, msg) + + def assertNotEmpty(self, l): + self.assertTrue(len(l)) + + def assertDictEqual(self, d1, d2, msg=None): + self.assertIsInstance(d1, dict) + self.assertIsInstance(d2, dict) + self.assertEqual(d1, d2, msg) + + def assertRaisesRegexp(self, expected_exception, expected_regexp, + callable_obj, *args, **kwargs): + """Asserts that the message in a raised exception matches a regexp. + """ + try: + callable_obj(*args, **kwargs) + except expected_exception as exc_value: + if isinstance(expected_regexp, six.string_types): + expected_regexp = re.compile(expected_regexp) + + if isinstance(exc_value.args[0], unicode): + if not expected_regexp.search(unicode(exc_value)): + raise self.failureException( + '"%s" does not match "%s"' % + (expected_regexp.pattern, unicode(exc_value))) + else: + if not expected_regexp.search(str(exc_value)): + raise self.failureException( + '"%s" does not match "%s"' % + (expected_regexp.pattern, str(exc_value))) + else: + if hasattr(expected_exception, '__name__'): + excName = expected_exception.__name__ + else: + excName = str(expected_exception) + raise self.failureException("%s not raised" % excName) + + def assertDictContainsSubset(self, expected, actual, msg=None): + """Checks whether actual is a superset of expected.""" + + def safe_repr(obj, short=False): + _MAX_LENGTH = 80 + try: + result = repr(obj) + except Exception: + result = object.__repr__(obj) + if not short or len(result) < _MAX_LENGTH: + return result + return result[:_MAX_LENGTH] + ' [truncated]...' + + missing = [] + mismatched = [] + for key, value in six.iteritems(expected): + if key not in actual: + missing.append(key) + elif value != actual[key]: + mismatched.append('%s, expected: %s, actual: %s' % + (safe_repr(key), safe_repr(value), + safe_repr(actual[key]))) + + if not (missing or mismatched): + return + + standardMsg = '' + if missing: + standardMsg = 'Missing: %s' % ','.join(safe_repr(m) for m in + missing) + if mismatched: + if standardMsg: + standardMsg += '; ' + standardMsg += 'Mismatched values: %s' % ','.join(mismatched) + + self.fail(self._formatMessage(msg, standardMsg)) + + @property + def ipv6_enabled(self): + if socket.has_ipv6: + sock = None + try: + sock = socket.socket(socket.AF_INET6) + # NOTE(Mouad): Try to bind to IPv6 loopback ip address. + sock.bind(("::1", 0)) + return True + except socket.error: + pass + finally: + if sock: + sock.close() + return False + + def skip_if_no_ipv6(self): + if not self.ipv6_enabled: + raise self.skipTest("IPv6 is not enabled in the system") + + def skip_if_env_not_set(self, env_var): + if not os.environ.get(env_var): + self.skipTest('Env variable %s is not set.' % env_var) + + +class SQLDriverOverrides(object): + """A mixin for consolidating sql-specific test overrides.""" + def config_overrides(self): + super(SQLDriverOverrides, self).config_overrides() + # SQL specific driver overrides + self.config_fixture.config( + group='catalog', + driver='keystone.catalog.backends.sql.Catalog') + self.config_fixture.config( + group='identity', + driver='keystone.identity.backends.sql.Identity') + self.config_fixture.config( + group='policy', + driver='keystone.policy.backends.sql.Policy') + self.config_fixture.config( + group='revoke', + driver='keystone.contrib.revoke.backends.sql.Revoke') + self.config_fixture.config( + group='token', + driver='keystone.token.persistence.backends.sql.Token') + self.config_fixture.config( + group='trust', + driver='keystone.trust.backends.sql.Trust') |