diff options
author | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
---|---|---|
committer | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
commit | 920a49cfa055733d575282973e23558c33087a4a (patch) | |
tree | d371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/common | |
parent | ef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff) |
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd
Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/common')
12 files changed, 0 insertions, 3304 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/__init__.py b/keystone-moon/keystone/tests/unit/common/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/keystone-moon/keystone/tests/unit/common/__init__.py +++ /dev/null diff --git a/keystone-moon/keystone/tests/unit/common/test_authorization.py b/keystone-moon/keystone/tests/unit/common/test_authorization.py deleted file mode 100644 index 73ddbc61..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_authorization.py +++ /dev/null @@ -1,161 +0,0 @@ -# Copyright 2015 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import copy -import uuid - -from keystone.common import authorization -from keystone import exception -from keystone.federation import constants as federation_constants -from keystone.models import token_model -from keystone.tests import unit -from keystone.tests.unit import test_token_provider - - -class TestTokenToAuthContext(unit.BaseTestCase): - def test_token_is_project_scoped_with_trust(self): - # Check auth_context result when the token is project-scoped and has - # trust info. - - # SAMPLE_V3_TOKEN has OS-TRUST:trust in it. - token_data = test_token_provider.SAMPLE_V3_TOKEN - token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=token_data) - - auth_context = authorization.token_to_auth_context(token) - - self.assertEqual(token, auth_context['token']) - self.assertTrue(auth_context['is_delegated_auth']) - self.assertEqual(token_data['token']['user']['id'], - auth_context['user_id']) - self.assertEqual(token_data['token']['user']['domain']['id'], - auth_context['user_domain_id']) - self.assertEqual(token_data['token']['project']['id'], - auth_context['project_id']) - self.assertEqual(token_data['token']['project']['domain']['id'], - auth_context['project_domain_id']) - self.assertNotIn('domain_id', auth_context) - self.assertNotIn('domain_name', auth_context) - self.assertEqual(token_data['token']['OS-TRUST:trust']['id'], - auth_context['trust_id']) - self.assertEqual( - token_data['token']['OS-TRUST:trust']['trustor_user_id'], - auth_context['trustor_id']) - self.assertEqual( - token_data['token']['OS-TRUST:trust']['trustee_user_id'], - auth_context['trustee_id']) - self.assertItemsEqual( - [r['name'] for r in token_data['token']['roles']], - auth_context['roles']) - self.assertIsNone(auth_context['consumer_id']) - self.assertIsNone(auth_context['access_token_id']) - self.assertNotIn('group_ids', auth_context) - - def test_token_is_domain_scoped(self): - # Check contents of auth_context when token is domain-scoped. - token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) - del token_data['token']['project'] - - domain_id = uuid.uuid4().hex - domain_name = uuid.uuid4().hex - token_data['token']['domain'] = {'id': domain_id, 'name': domain_name} - - token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=token_data) - - auth_context = authorization.token_to_auth_context(token) - - self.assertNotIn('project_id', auth_context) - self.assertNotIn('project_domain_id', auth_context) - - self.assertEqual(domain_id, auth_context['domain_id']) - self.assertEqual(domain_name, auth_context['domain_name']) - - def test_token_is_unscoped(self): - # Check contents of auth_context when the token is unscoped. - token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) - del token_data['token']['project'] - - token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=token_data) - - auth_context = authorization.token_to_auth_context(token) - - self.assertNotIn('project_id', auth_context) - self.assertNotIn('project_domain_id', auth_context) - self.assertNotIn('domain_id', auth_context) - self.assertNotIn('domain_name', auth_context) - - def test_token_is_for_federated_user(self): - # When the token is for a federated user then group_ids is in - # auth_context. - token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) - - group_ids = [uuid.uuid4().hex for x in range(1, 5)] - - federation_data = {'identity_provider': {'id': uuid.uuid4().hex}, - 'protocol': {'id': 'saml2'}, - 'groups': [{'id': gid} for gid in group_ids]} - token_data['token']['user'][federation_constants.FEDERATION] = ( - federation_data) - - token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=token_data) - - auth_context = authorization.token_to_auth_context(token) - - self.assertItemsEqual(group_ids, auth_context['group_ids']) - - def test_oauth_variables_set_for_oauth_token(self): - token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) - access_token_id = uuid.uuid4().hex - consumer_id = uuid.uuid4().hex - token_data['token']['OS-OAUTH1'] = {'access_token_id': access_token_id, - 'consumer_id': consumer_id} - token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=token_data) - - auth_context = authorization.token_to_auth_context(token) - - self.assertEqual(access_token_id, auth_context['access_token_id']) - self.assertEqual(consumer_id, auth_context['consumer_id']) - - def test_oauth_variables_not_set(self): - token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) - token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=token_data) - - auth_context = authorization.token_to_auth_context(token) - - self.assertIsNone(auth_context['access_token_id']) - self.assertIsNone(auth_context['consumer_id']) - - def test_token_is_not_KeystoneToken_raises_exception(self): - # If the token isn't a KeystoneToken then an UnexpectedError exception - # is raised. - self.assertRaises(exception.UnexpectedError, - authorization.token_to_auth_context, {}) - - def test_user_id_missing_in_token_raises_exception(self): - # If there's no user ID in the token then an Unauthorized - # exception is raised. - token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) - del token_data['token']['user']['id'] - - token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=token_data) - - self.assertRaises(exception.Unauthorized, - authorization.token_to_auth_context, token) diff --git a/keystone-moon/keystone/tests/unit/common/test_base64utils.py b/keystone-moon/keystone/tests/unit/common/test_base64utils.py deleted file mode 100644 index 355a2e03..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_base64utils.py +++ /dev/null @@ -1,208 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from keystone.common import base64utils -from keystone.tests import unit - -base64_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' - 'abcdefghijklmnopqrstuvwxyz' - '0123456789' - '+/=') # includes pad char - -base64url_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' - 'abcdefghijklmnopqrstuvwxyz' - '0123456789' - '-_=') # includes pad char - - -class TestValid(unit.BaseTestCase): - def test_valid_base64(self): - self.assertTrue(base64utils.is_valid_base64('+/==')) - self.assertTrue(base64utils.is_valid_base64('+/+=')) - self.assertTrue(base64utils.is_valid_base64('+/+/')) - - self.assertFalse(base64utils.is_valid_base64('-_==')) - self.assertFalse(base64utils.is_valid_base64('-_-=')) - self.assertFalse(base64utils.is_valid_base64('-_-_')) - - self.assertTrue(base64utils.is_valid_base64('abcd')) - self.assertFalse(base64utils.is_valid_base64('abcde')) - self.assertFalse(base64utils.is_valid_base64('abcde==')) - self.assertFalse(base64utils.is_valid_base64('abcdef')) - self.assertTrue(base64utils.is_valid_base64('abcdef==')) - self.assertFalse(base64utils.is_valid_base64('abcdefg')) - self.assertTrue(base64utils.is_valid_base64('abcdefg=')) - self.assertTrue(base64utils.is_valid_base64('abcdefgh')) - - self.assertFalse(base64utils.is_valid_base64('-_==')) - - def test_valid_base64url(self): - self.assertFalse(base64utils.is_valid_base64url('+/==')) - self.assertFalse(base64utils.is_valid_base64url('+/+=')) - self.assertFalse(base64utils.is_valid_base64url('+/+/')) - - self.assertTrue(base64utils.is_valid_base64url('-_==')) - self.assertTrue(base64utils.is_valid_base64url('-_-=')) - self.assertTrue(base64utils.is_valid_base64url('-_-_')) - - self.assertTrue(base64utils.is_valid_base64url('abcd')) - self.assertFalse(base64utils.is_valid_base64url('abcde')) - self.assertFalse(base64utils.is_valid_base64url('abcde==')) - self.assertFalse(base64utils.is_valid_base64url('abcdef')) - self.assertTrue(base64utils.is_valid_base64url('abcdef==')) - self.assertFalse(base64utils.is_valid_base64url('abcdefg')) - self.assertTrue(base64utils.is_valid_base64url('abcdefg=')) - self.assertTrue(base64utils.is_valid_base64url('abcdefgh')) - - self.assertTrue(base64utils.is_valid_base64url('-_==')) - - -class TestBase64Padding(unit.BaseTestCase): - - def test_filter(self): - self.assertEqual('', base64utils.filter_formatting('')) - self.assertEqual('', base64utils.filter_formatting(' ')) - self.assertEqual('a', base64utils.filter_formatting('a')) - self.assertEqual('a', base64utils.filter_formatting(' a')) - self.assertEqual('a', base64utils.filter_formatting('a ')) - self.assertEqual('ab', base64utils.filter_formatting('ab')) - self.assertEqual('ab', base64utils.filter_formatting(' ab')) - self.assertEqual('ab', base64utils.filter_formatting('ab ')) - self.assertEqual('ab', base64utils.filter_formatting('a b')) - self.assertEqual('ab', base64utils.filter_formatting(' a b')) - self.assertEqual('ab', base64utils.filter_formatting('a b ')) - self.assertEqual('ab', base64utils.filter_formatting('a\nb\n ')) - - text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' - 'abcdefghijklmnopqrstuvwxyz' - '0123456789' - '+/=') - self.assertEqual(base64_alphabet, - base64utils.filter_formatting(text)) - - text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n' - ' abcdefghijklmnopqrstuvwxyz\n' - '\t\f\r' - ' 0123456789\n' - ' +/=') - self.assertEqual(base64_alphabet, - base64utils.filter_formatting(text)) - self.assertEqual(base64url_alphabet, - base64utils.base64_to_base64url(base64_alphabet)) - - text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' - 'abcdefghijklmnopqrstuvwxyz' - '0123456789' - '-_=') - self.assertEqual(base64url_alphabet, - base64utils.filter_formatting(text)) - - text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n' - ' abcdefghijklmnopqrstuvwxyz\n' - '\t\f\r' - ' 0123456789\n' - '-_=') - self.assertEqual(base64url_alphabet, - base64utils.filter_formatting(text)) - - def test_alphabet_conversion(self): - self.assertEqual(base64url_alphabet, - base64utils.base64_to_base64url(base64_alphabet)) - - self.assertEqual(base64_alphabet, - base64utils.base64url_to_base64(base64url_alphabet)) - - def test_is_padded(self): - self.assertTrue(base64utils.base64_is_padded('ABCD')) - self.assertTrue(base64utils.base64_is_padded('ABC=')) - self.assertTrue(base64utils.base64_is_padded('AB==')) - - self.assertTrue(base64utils.base64_is_padded('1234ABCD')) - self.assertTrue(base64utils.base64_is_padded('1234ABC=')) - self.assertTrue(base64utils.base64_is_padded('1234AB==')) - - self.assertFalse(base64utils.base64_is_padded('ABC')) - self.assertFalse(base64utils.base64_is_padded('AB')) - self.assertFalse(base64utils.base64_is_padded('A')) - self.assertFalse(base64utils.base64_is_padded('')) - - self.assertRaises(base64utils.InvalidBase64Error, - base64utils.base64_is_padded, '=') - - self.assertRaises(base64utils.InvalidBase64Error, - base64utils.base64_is_padded, 'AB=C') - - self.assertRaises(base64utils.InvalidBase64Error, - base64utils.base64_is_padded, 'AB=') - - self.assertRaises(base64utils.InvalidBase64Error, - base64utils.base64_is_padded, 'ABCD=') - - self.assertRaises(ValueError, base64utils.base64_is_padded, - 'ABC', pad='==') - self.assertRaises(base64utils.InvalidBase64Error, - base64utils.base64_is_padded, 'A=BC') - - def test_strip_padding(self): - self.assertEqual('ABCD', base64utils.base64_strip_padding('ABCD')) - self.assertEqual('ABC', base64utils.base64_strip_padding('ABC=')) - self.assertEqual('AB', base64utils.base64_strip_padding('AB==')) - self.assertRaises(ValueError, base64utils.base64_strip_padding, - 'ABC=', pad='==') - self.assertEqual('ABC', base64utils.base64_strip_padding('ABC')) - - def test_assure_padding(self): - self.assertEqual('ABCD', base64utils.base64_assure_padding('ABCD')) - self.assertEqual('ABC=', base64utils.base64_assure_padding('ABC')) - self.assertEqual('ABC=', base64utils.base64_assure_padding('ABC=')) - self.assertEqual('AB==', base64utils.base64_assure_padding('AB')) - self.assertEqual('AB==', base64utils.base64_assure_padding('AB==')) - self.assertRaises(ValueError, base64utils.base64_assure_padding, - 'ABC', pad='==') - - def test_base64_percent_encoding(self): - self.assertEqual('ABCD', base64utils.base64url_percent_encode('ABCD')) - self.assertEqual('ABC%3D', - base64utils.base64url_percent_encode('ABC=')) - self.assertEqual('AB%3D%3D', - base64utils.base64url_percent_encode('AB==')) - - self.assertEqual('ABCD', base64utils.base64url_percent_decode('ABCD')) - self.assertEqual('ABC=', - base64utils.base64url_percent_decode('ABC%3D')) - self.assertEqual('AB==', - base64utils.base64url_percent_decode('AB%3D%3D')) - self.assertRaises(base64utils.InvalidBase64Error, - base64utils.base64url_percent_encode, 'chars') - self.assertRaises(base64utils.InvalidBase64Error, - base64utils.base64url_percent_decode, 'AB%3D%3') - - -class TestTextWrap(unit.BaseTestCase): - - def test_wrapping(self): - raw_text = 'abcdefgh' - wrapped_text = 'abc\ndef\ngh\n' - - self.assertEqual(wrapped_text, - base64utils.base64_wrap(raw_text, width=3)) - - t = '\n'.join(base64utils.base64_wrap_iter(raw_text, width=3)) + '\n' - self.assertEqual(wrapped_text, t) - - raw_text = 'abcdefgh' - wrapped_text = 'abcd\nefgh\n' - - self.assertEqual(wrapped_text, - base64utils.base64_wrap(raw_text, width=4)) diff --git a/keystone-moon/keystone/tests/unit/common/test_connection_pool.py b/keystone-moon/keystone/tests/unit/common/test_connection_pool.py deleted file mode 100644 index 3813e033..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_connection_pool.py +++ /dev/null @@ -1,135 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import threading -import time - -import mock -import six -from six.moves import queue -import testtools -from testtools import matchers - -from keystone.common.cache import _memcache_pool -from keystone import exception -from keystone.tests.unit import core - - -class _TestConnectionPool(_memcache_pool.ConnectionPool): - destroyed_value = 'destroyed' - - def _create_connection(self): - return mock.MagicMock() - - def _destroy_connection(self, conn): - conn(self.destroyed_value) - - -class TestConnectionPool(core.TestCase): - def setUp(self): - super(TestConnectionPool, self).setUp() - self.unused_timeout = 10 - self.maxsize = 2 - self.connection_pool = _TestConnectionPool( - maxsize=self.maxsize, - unused_timeout=self.unused_timeout) - self.addCleanup(self.cleanup_instance('connection_pool')) - - def test_get_context_manager(self): - self.assertThat(self.connection_pool.queue, matchers.HasLength(0)) - with self.connection_pool.acquire() as conn: - self.assertEqual(1, self.connection_pool._acquired) - self.assertEqual(0, self.connection_pool._acquired) - self.assertThat(self.connection_pool.queue, matchers.HasLength(1)) - self.assertEqual(conn, self.connection_pool.queue[0].connection) - - def test_cleanup_pool(self): - self.test_get_context_manager() - newtime = time.time() + self.unused_timeout * 2 - non_expired_connection = _memcache_pool._PoolItem( - ttl=(newtime * 2), - connection=mock.MagicMock()) - self.connection_pool.queue.append(non_expired_connection) - self.assertThat(self.connection_pool.queue, matchers.HasLength(2)) - with mock.patch.object(time, 'time', return_value=newtime): - conn = self.connection_pool.queue[0].connection - with self.connection_pool.acquire(): - pass - conn.assert_has_calls( - [mock.call(self.connection_pool.destroyed_value)]) - self.assertThat(self.connection_pool.queue, matchers.HasLength(1)) - self.assertEqual(0, non_expired_connection.connection.call_count) - - def test_acquire_conn_exception_returns_acquired_count(self): - class TestException(Exception): - pass - - with mock.patch.object(_TestConnectionPool, '_create_connection', - side_effect=TestException): - with testtools.ExpectedException(TestException): - with self.connection_pool.acquire(): - pass - self.assertThat(self.connection_pool.queue, - matchers.HasLength(0)) - self.assertEqual(0, self.connection_pool._acquired) - - def test_connection_pool_limits_maximum_connections(self): - # NOTE(morganfainberg): To ensure we don't lockup tests until the - # job limit, explicitly call .get_nowait() and .put_nowait() in this - # case. - conn1 = self.connection_pool.get_nowait() - conn2 = self.connection_pool.get_nowait() - - # Use a nowait version to raise an Empty exception indicating we would - # not get another connection until one is placed back into the queue. - self.assertRaises(queue.Empty, self.connection_pool.get_nowait) - - # Place the connections back into the pool. - self.connection_pool.put_nowait(conn1) - self.connection_pool.put_nowait(conn2) - - # Make sure we can get a connection out of the pool again. - self.connection_pool.get_nowait() - - def test_connection_pool_maximum_connection_get_timeout(self): - connection_pool = _TestConnectionPool( - maxsize=1, - unused_timeout=self.unused_timeout, - conn_get_timeout=0) - - def _acquire_connection(): - with connection_pool.acquire(): - pass - - # Make sure we've consumed the only available connection from the pool - conn = connection_pool.get_nowait() - - self.assertRaises(exception.UnexpectedError, _acquire_connection) - - # Put the connection back and ensure we can acquire the connection - # after it is available. - connection_pool.put_nowait(conn) - _acquire_connection() - - -class TestMemcacheClientOverrides(core.BaseTestCase): - - def test_client_stripped_of_threading_local(self): - """threading.local overrides are restored for _MemcacheClient""" - client_class = _memcache_pool._MemcacheClient - # get the genuine thread._local from MRO - thread_local = client_class.__mro__[2] - self.assertTrue(thread_local is threading.local) - for field in six.iterkeys(thread_local.__dict__): - if field not in ('__dict__', '__weakref__'): - self.assertNotEqual(id(getattr(thread_local, field, None)), - id(getattr(client_class, field, None))) diff --git a/keystone-moon/keystone/tests/unit/common/test_injection.py b/keystone-moon/keystone/tests/unit/common/test_injection.py deleted file mode 100644 index 9a5d1e7d..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_injection.py +++ /dev/null @@ -1,238 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from keystone.common import dependency -from keystone.tests import unit - - -class TestDependencyInjection(unit.BaseTestCase): - def setUp(self): - super(TestDependencyInjection, self).setUp() - dependency.reset() - self.addCleanup(dependency.reset) - - def test_dependency_injection(self): - class Interface(object): - def do_work(self): - assert False - - @dependency.provider('first_api') - class FirstImplementation(Interface): - def do_work(self): - return True - - @dependency.provider('second_api') - class SecondImplementation(Interface): - def do_work(self): - return True - - @dependency.requires('first_api', 'second_api') - class Consumer(object): - def do_work_with_dependencies(self): - assert self.first_api.do_work() - assert self.second_api.do_work() - - # initialize dependency providers - first_api = FirstImplementation() - second_api = SecondImplementation() - - # ... sometime later, initialize a dependency consumer - consumer = Consumer() - - # the expected dependencies should be available to the consumer - self.assertIs(consumer.first_api, first_api) - self.assertIs(consumer.second_api, second_api) - self.assertIsInstance(consumer.first_api, Interface) - self.assertIsInstance(consumer.second_api, Interface) - consumer.do_work_with_dependencies() - - def test_dependency_provider_configuration(self): - @dependency.provider('api') - class Configurable(object): - def __init__(self, value=None): - self.value = value - - def get_value(self): - return self.value - - @dependency.requires('api') - class Consumer(object): - def get_value(self): - return self.api.get_value() - - # initialize dependency providers - api = Configurable(value=True) - - # ... sometime later, initialize a dependency consumer - consumer = Consumer() - - # the expected dependencies should be available to the consumer - self.assertIs(consumer.api, api) - self.assertIsInstance(consumer.api, Configurable) - self.assertTrue(consumer.get_value()) - - def test_dependency_consumer_configuration(self): - @dependency.provider('api') - class Provider(object): - def get_value(self): - return True - - @dependency.requires('api') - class Configurable(object): - def __init__(self, value=None): - self.value = value - - def get_value(self): - if self.value: - return self.api.get_value() - - # initialize dependency providers - api = Provider() - - # ... sometime later, initialize a dependency consumer - consumer = Configurable(value=True) - - # the expected dependencies should be available to the consumer - self.assertIs(consumer.api, api) - self.assertIsInstance(consumer.api, Provider) - self.assertTrue(consumer.get_value()) - - def test_inherited_dependency(self): - class Interface(object): - def do_work(self): - assert False - - @dependency.provider('first_api') - class FirstImplementation(Interface): - def do_work(self): - return True - - @dependency.provider('second_api') - class SecondImplementation(Interface): - def do_work(self): - return True - - @dependency.requires('first_api') - class ParentConsumer(object): - def do_work_with_dependencies(self): - assert self.first_api.do_work() - - @dependency.requires('second_api') - class ChildConsumer(ParentConsumer): - def do_work_with_dependencies(self): - assert self.second_api.do_work() - super(ChildConsumer, self).do_work_with_dependencies() - - # initialize dependency providers - first_api = FirstImplementation() - second_api = SecondImplementation() - - # ... sometime later, initialize a dependency consumer - consumer = ChildConsumer() - - # dependencies should be naturally inherited - self.assertEqual( - set(['first_api']), - ParentConsumer._dependencies) - self.assertEqual( - set(['first_api', 'second_api']), - ChildConsumer._dependencies) - self.assertEqual( - set(['first_api', 'second_api']), - consumer._dependencies) - - # the expected dependencies should be available to the consumer - self.assertIs(consumer.first_api, first_api) - self.assertIs(consumer.second_api, second_api) - self.assertIsInstance(consumer.first_api, Interface) - self.assertIsInstance(consumer.second_api, Interface) - consumer.do_work_with_dependencies() - - def test_unresolvable_dependency(self): - @dependency.requires(uuid.uuid4().hex) - class Consumer(object): - pass - - def for_test(): - Consumer() - dependency.resolve_future_dependencies() - - self.assertRaises(dependency.UnresolvableDependencyException, for_test) - - def test_circular_dependency(self): - p1_name = uuid.uuid4().hex - p2_name = uuid.uuid4().hex - - @dependency.provider(p1_name) - @dependency.requires(p2_name) - class P1(object): - pass - - @dependency.provider(p2_name) - @dependency.requires(p1_name) - class P2(object): - pass - - p1 = P1() - p2 = P2() - - dependency.resolve_future_dependencies() - - self.assertIs(getattr(p1, p2_name), p2) - self.assertIs(getattr(p2, p1_name), p1) - - def test_reset(self): - # Can reset the registry of providers. - - p_id = uuid.uuid4().hex - - @dependency.provider(p_id) - class P(object): - pass - - p_inst = P() - - self.assertIs(dependency.get_provider(p_id), p_inst) - - dependency.reset() - - self.assertFalse(dependency._REGISTRY) - - def test_get_provider(self): - # Can get the instance of a provider using get_provider - - provider_name = uuid.uuid4().hex - - @dependency.provider(provider_name) - class P(object): - pass - - provider_instance = P() - retrieved_provider_instance = dependency.get_provider(provider_name) - self.assertIs(provider_instance, retrieved_provider_instance) - - def test_get_provider_not_provided_error(self): - # If no provider and provider is required then fails. - - provider_name = uuid.uuid4().hex - self.assertRaises(KeyError, dependency.get_provider, provider_name) - - def test_get_provider_not_provided_optional(self): - # If no provider and provider is optional then returns None. - - provider_name = uuid.uuid4().hex - self.assertIsNone(dependency.get_provider(provider_name, - dependency.GET_OPTIONAL)) diff --git a/keystone-moon/keystone/tests/unit/common/test_json_home.py b/keystone-moon/keystone/tests/unit/common/test_json_home.py deleted file mode 100644 index 94e2d138..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_json_home.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright 2014 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import copy - -from testtools import matchers - -from keystone.common import json_home -from keystone.tests import unit - - -class JsonHomeTest(unit.BaseTestCase): - def test_build_v3_resource_relation(self): - resource_name = self.getUniqueString() - relation = json_home.build_v3_resource_relation(resource_name) - exp_relation = ( - 'http://docs.openstack.org/api/openstack-identity/3/rel/%s' % - resource_name) - self.assertThat(relation, matchers.Equals(exp_relation)) - - def test_build_v3_extension_resource_relation(self): - extension_name = self.getUniqueString() - extension_version = self.getUniqueString() - resource_name = self.getUniqueString() - relation = json_home.build_v3_extension_resource_relation( - extension_name, extension_version, resource_name) - exp_relation = ( - 'http://docs.openstack.org/api/openstack-identity/3/ext/%s/%s/rel/' - '%s' % (extension_name, extension_version, resource_name)) - self.assertThat(relation, matchers.Equals(exp_relation)) - - def test_build_v3_parameter_relation(self): - parameter_name = self.getUniqueString() - relation = json_home.build_v3_parameter_relation(parameter_name) - exp_relation = ( - 'http://docs.openstack.org/api/openstack-identity/3/param/%s' % - parameter_name) - self.assertThat(relation, matchers.Equals(exp_relation)) - - def test_build_v3_extension_parameter_relation(self): - extension_name = self.getUniqueString() - extension_version = self.getUniqueString() - parameter_name = self.getUniqueString() - relation = json_home.build_v3_extension_parameter_relation( - extension_name, extension_version, parameter_name) - exp_relation = ( - 'http://docs.openstack.org/api/openstack-identity/3/ext/%s/%s/' - 'param/%s' % (extension_name, extension_version, parameter_name)) - self.assertThat(relation, matchers.Equals(exp_relation)) - - def test_translate_urls(self): - href_rel = self.getUniqueString() - href = self.getUniqueString() - href_template_rel = self.getUniqueString() - href_template = self.getUniqueString() - href_vars = {self.getUniqueString(): self.getUniqueString()} - original_json_home = { - 'resources': { - href_rel: {'href': href}, - href_template_rel: { - 'href-template': href_template, - 'href-vars': href_vars} - } - } - - new_json_home = copy.deepcopy(original_json_home) - new_prefix = self.getUniqueString() - json_home.translate_urls(new_json_home, new_prefix) - - exp_json_home = { - 'resources': { - href_rel: {'href': new_prefix + href}, - href_template_rel: { - 'href-template': new_prefix + href_template, - 'href-vars': href_vars} - } - } - - self.assertThat(new_json_home, matchers.Equals(exp_json_home)) diff --git a/keystone-moon/keystone/tests/unit/common/test_ldap.py b/keystone-moon/keystone/tests/unit/common/test_ldap.py deleted file mode 100644 index eed77286..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_ldap.py +++ /dev/null @@ -1,584 +0,0 @@ -# -*- coding: utf-8 -*- -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import tempfile -import uuid - -import fixtures -import ldap.dn -import mock -from oslo_config import cfg -from testtools import matchers - -from keystone.common import driver_hints -from keystone.common import ldap as ks_ldap -from keystone.common.ldap import core as common_ldap_core -from keystone.tests import unit -from keystone.tests.unit import default_fixtures -from keystone.tests.unit import fakeldap -from keystone.tests.unit.ksfixtures import database - - -CONF = cfg.CONF - - -class DnCompareTest(unit.BaseTestCase): - """Tests for the DN comparison functions in keystone.common.ldap.core.""" - - def test_prep(self): - # prep_case_insensitive returns the string with spaces at the front and - # end if it's already lowercase and no insignificant characters. - value = 'lowercase value' - self.assertEqual(value, ks_ldap.prep_case_insensitive(value)) - - def test_prep_lowercase(self): - # prep_case_insensitive returns the string with spaces at the front and - # end and lowercases the value. - value = 'UPPERCASE VALUE' - exp_value = value.lower() - self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value)) - - def test_prep_insignificant(self): - # prep_case_insensitive remove insignificant spaces. - value = 'before after' - exp_value = 'before after' - self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value)) - - def test_prep_insignificant_pre_post(self): - # prep_case_insensitive remove insignificant spaces. - value = ' value ' - exp_value = 'value' - self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value)) - - def test_ava_equal_same(self): - # is_ava_value_equal returns True if the two values are the same. - value = 'val1' - self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value)) - - def test_ava_equal_complex(self): - # is_ava_value_equal returns True if the two values are the same using - # a value that's got different capitalization and insignificant chars. - val1 = 'before after' - val2 = ' BEFORE afTer ' - self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2)) - - def test_ava_different(self): - # is_ava_value_equal returns False if the values aren't the same. - self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2')) - - def test_rdn_same(self): - # is_rdn_equal returns True if the two values are the same. - rdn = ldap.dn.str2dn('cn=val1')[0] - self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn)) - - def test_rdn_diff_length(self): - # is_rdn_equal returns False if the RDNs have a different number of - # AVAs. - rdn1 = ldap.dn.str2dn('cn=cn1')[0] - rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] - self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2)) - - def test_rdn_multi_ava_same_order(self): - # is_rdn_equal returns True if the RDNs have the same number of AVAs - # and the values are the same. - rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] - rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0] - self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2)) - - def test_rdn_multi_ava_diff_order(self): - # is_rdn_equal returns True if the RDNs have the same number of AVAs - # and the values are the same, even if in a different order - rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] - rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0] - self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2)) - - def test_rdn_multi_ava_diff_type(self): - # is_rdn_equal returns False if the RDNs have the same number of AVAs - # and the attribute types are different. - rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] - rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0] - self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2)) - - def test_rdn_attr_type_case_diff(self): - # is_rdn_equal returns True for same RDNs even when attr type case is - # different. - rdn1 = ldap.dn.str2dn('cn=cn1')[0] - rdn2 = ldap.dn.str2dn('CN=cn1')[0] - self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2)) - - def test_rdn_attr_type_alias(self): - # is_rdn_equal returns False for same RDNs even when attr type alias is - # used. Note that this is a limitation since an LDAP server should - # consider them equal. - rdn1 = ldap.dn.str2dn('cn=cn1')[0] - rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0] - self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2)) - - def test_dn_same(self): - # is_dn_equal returns True if the DNs are the same. - dn = 'cn=Babs Jansen,ou=OpenStack' - self.assertTrue(ks_ldap.is_dn_equal(dn, dn)) - - def test_dn_equal_unicode(self): - # is_dn_equal can accept unicode - dn = u'cn=fäké,ou=OpenStack' - self.assertTrue(ks_ldap.is_dn_equal(dn, dn)) - - def test_dn_diff_length(self): - # is_dn_equal returns False if the DNs don't have the same number of - # RDNs - dn1 = 'cn=Babs Jansen,ou=OpenStack' - dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com' - self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2)) - - def test_dn_equal_rdns(self): - # is_dn_equal returns True if the DNs have the same number of RDNs - # and each RDN is the same. - dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource' - dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack' - self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2)) - - def test_dn_parsed_dns(self): - # is_dn_equal can also accept parsed DNs. - dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource') - dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack') - self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2)) - - def test_startswith_under_child(self): - # dn_startswith returns True if descendant_dn is a child of dn. - child = 'cn=Babs Jansen,ou=OpenStack' - parent = 'ou=OpenStack' - self.assertTrue(ks_ldap.dn_startswith(child, parent)) - - def test_startswith_parent(self): - # dn_startswith returns False if descendant_dn is a parent of dn. - child = 'cn=Babs Jansen,ou=OpenStack' - parent = 'ou=OpenStack' - self.assertFalse(ks_ldap.dn_startswith(parent, child)) - - def test_startswith_same(self): - # dn_startswith returns False if DNs are the same. - dn = 'cn=Babs Jansen,ou=OpenStack' - self.assertFalse(ks_ldap.dn_startswith(dn, dn)) - - def test_startswith_not_parent(self): - # dn_startswith returns False if descendant_dn is not under the dn - child = 'cn=Babs Jansen,ou=OpenStack' - parent = 'dc=example.com' - self.assertFalse(ks_ldap.dn_startswith(child, parent)) - - def test_startswith_descendant(self): - # dn_startswith returns True if descendant_dn is a descendant of dn. - descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com' - dn = 'ou=OpenStack,dc=example.com' - self.assertTrue(ks_ldap.dn_startswith(descendant, dn)) - - descendant = 'uid=12345,ou=Users,dc=example,dc=com' - dn = 'ou=Users,dc=example,dc=com' - self.assertTrue(ks_ldap.dn_startswith(descendant, dn)) - - def test_startswith_parsed_dns(self): - # dn_startswith also accepts parsed DNs. - descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack') - dn = ldap.dn.str2dn('ou=OpenStack') - self.assertTrue(ks_ldap.dn_startswith(descendant, dn)) - - def test_startswith_unicode(self): - # dn_startswith accepts unicode. - child = u'cn=fäké,ou=OpenStäck' - parent = u'ou=OpenStäck' - self.assertTrue(ks_ldap.dn_startswith(child, parent)) - - -class LDAPDeleteTreeTest(unit.TestCase): - - def setUp(self): - super(LDAPDeleteTreeTest, self).setUp() - - ks_ldap.register_handler('fake://', - fakeldap.FakeLdapNoSubtreeDelete) - self.useFixture(database.Database(self.sql_driver_version_overrides)) - - self.load_backends() - self.load_fixtures(default_fixtures) - - self.addCleanup(self.clear_database) - self.addCleanup(common_ldap_core._HANDLERS.clear) - - def clear_database(self): - for shelf in fakeldap.FakeShelves: - fakeldap.FakeShelves[shelf].clear() - - def config_overrides(self): - super(LDAPDeleteTreeTest, self).config_overrides() - self.config_fixture.config(group='identity', driver='ldap') - - def config_files(self): - config_files = super(LDAPDeleteTreeTest, self).config_files() - config_files.append(unit.dirs.tests_conf('backend_ldap.conf')) - return config_files - - def test_delete_tree(self): - """Test manually deleting a tree. - - Few LDAP servers support CONTROL_DELETETREE. This test - exercises the alternate code paths in BaseLdap.delete_tree. - - """ - conn = self.identity_api.user.get_connection() - id_attr = self.identity_api.user.id_attr - objclass = self.identity_api.user.object_class.lower() - tree_dn = self.identity_api.user.tree_dn - - def create_entry(name, parent_dn=None): - if not parent_dn: - parent_dn = tree_dn - dn = '%s=%s,%s' % (id_attr, name, parent_dn) - attrs = [('objectclass', [objclass, 'ldapsubentry']), - (id_attr, [name])] - conn.add_s(dn, attrs) - return dn - - # create 3 entries like this: - # cn=base - # cn=child,cn=base - # cn=grandchild,cn=child,cn=base - # then attempt to delete_tree(cn=base) - base_id = 'base' - base_dn = create_entry(base_id) - child_dn = create_entry('child', base_dn) - grandchild_dn = create_entry('grandchild', child_dn) - - # verify that the three entries were created - scope = ldap.SCOPE_SUBTREE - filt = '(|(objectclass=*)(objectclass=ldapsubentry))' - entries = conn.search_s(base_dn, scope, filt, - attrlist=common_ldap_core.DN_ONLY) - self.assertThat(entries, matchers.HasLength(3)) - sort_ents = sorted([e[0] for e in entries], key=len, reverse=True) - self.assertEqual([grandchild_dn, child_dn, base_dn], sort_ents) - - # verify that a non-leaf node can't be deleted directly by the - # LDAP server - self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF, - conn.delete_s, base_dn) - self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF, - conn.delete_s, child_dn) - - # call our delete_tree implementation - self.identity_api.user.delete_tree(base_id) - self.assertRaises(ldap.NO_SUCH_OBJECT, - conn.search_s, base_dn, ldap.SCOPE_BASE) - self.assertRaises(ldap.NO_SUCH_OBJECT, - conn.search_s, child_dn, ldap.SCOPE_BASE) - self.assertRaises(ldap.NO_SUCH_OBJECT, - conn.search_s, grandchild_dn, ldap.SCOPE_BASE) - - -class MultiURLTests(unit.TestCase): - """Tests for setting multiple LDAP URLs.""" - - def test_multiple_urls_with_comma_no_conn_pool(self): - urls = 'ldap://localhost,ldap://backup.localhost' - self.config_fixture.config(group='ldap', url=urls, use_pool=False) - base_ldap = ks_ldap.BaseLdap(CONF) - ldap_connection = base_ldap.get_connection() - self.assertEqual(urls, ldap_connection.conn.conn._uri) - - def test_multiple_urls_with_comma_with_conn_pool(self): - urls = 'ldap://localhost,ldap://backup.localhost' - self.config_fixture.config(group='ldap', url=urls, use_pool=True) - base_ldap = ks_ldap.BaseLdap(CONF) - ldap_connection = base_ldap.get_connection() - self.assertEqual(urls, ldap_connection.conn.conn_pool.uri) - - -class SslTlsTest(unit.TestCase): - """Tests for the SSL/TLS functionality in keystone.common.ldap.core.""" - - @mock.patch.object(ks_ldap.core.KeystoneLDAPHandler, 'simple_bind_s') - @mock.patch.object(ldap.ldapobject.LDAPObject, 'start_tls_s') - def _init_ldap_connection(self, config, mock_ldap_one, mock_ldap_two): - # Attempt to connect to initialize python-ldap. - base_ldap = ks_ldap.BaseLdap(config) - base_ldap.get_connection() - - def test_certfile_trust_tls(self): - # We need this to actually exist, so we create a tempfile. - (handle, certfile) = tempfile.mkstemp() - self.addCleanup(os.unlink, certfile) - self.addCleanup(os.close, handle) - self.config_fixture.config(group='ldap', - url='ldap://localhost', - use_tls=True, - tls_cacertfile=certfile) - - self._init_ldap_connection(CONF) - - # Ensure the cert trust option is set. - self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)) - - def test_certdir_trust_tls(self): - # We need this to actually exist, so we create a tempdir. - certdir = self.useFixture(fixtures.TempDir()).path - self.config_fixture.config(group='ldap', - url='ldap://localhost', - use_tls=True, - tls_cacertdir=certdir) - - self._init_ldap_connection(CONF) - - # Ensure the cert trust option is set. - self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR)) - - def test_certfile_trust_ldaps(self): - # We need this to actually exist, so we create a tempfile. - (handle, certfile) = tempfile.mkstemp() - self.addCleanup(os.unlink, certfile) - self.addCleanup(os.close, handle) - self.config_fixture.config(group='ldap', - url='ldaps://localhost', - use_tls=False, - tls_cacertfile=certfile) - - self._init_ldap_connection(CONF) - - # Ensure the cert trust option is set. - self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)) - - def test_certdir_trust_ldaps(self): - # We need this to actually exist, so we create a tempdir. - certdir = self.useFixture(fixtures.TempDir()).path - self.config_fixture.config(group='ldap', - url='ldaps://localhost', - use_tls=False, - tls_cacertdir=certdir) - - self._init_ldap_connection(CONF) - - # Ensure the cert trust option is set. - self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR)) - - -class LDAPPagedResultsTest(unit.TestCase): - """Tests the paged results functionality in keystone.common.ldap.core.""" - - def setUp(self): - super(LDAPPagedResultsTest, self).setUp() - self.clear_database() - - ks_ldap.register_handler('fake://', fakeldap.FakeLdap) - self.addCleanup(common_ldap_core._HANDLERS.clear) - self.useFixture(database.Database(self.sql_driver_version_overrides)) - - self.load_backends() - self.load_fixtures(default_fixtures) - - def clear_database(self): - for shelf in fakeldap.FakeShelves: - fakeldap.FakeShelves[shelf].clear() - - def config_overrides(self): - super(LDAPPagedResultsTest, self).config_overrides() - self.config_fixture.config(group='identity', driver='ldap') - - def config_files(self): - config_files = super(LDAPPagedResultsTest, self).config_files() - config_files.append(unit.dirs.tests_conf('backend_ldap.conf')) - return config_files - - @mock.patch.object(fakeldap.FakeLdap, 'search_ext') - @mock.patch.object(fakeldap.FakeLdap, 'result3') - def test_paged_results_control_api(self, mock_result3, mock_search_ext): - mock_result3.return_value = ('', [], 1, []) - - self.config_fixture.config(group='ldap', - page_size=1) - - conn = self.identity_api.user.get_connection() - conn._paged_search_s('dc=example,dc=test', - ldap.SCOPE_SUBTREE, - 'objectclass=*') - - -class CommonLdapTestCase(unit.BaseTestCase): - """These test cases call functions in keystone.common.ldap.""" - - def test_binary_attribute_values(self): - result = [( - 'cn=junk,dc=example,dc=com', - { - 'cn': ['junk'], - 'sn': [uuid.uuid4().hex], - 'mail': [uuid.uuid4().hex], - 'binary_attr': ['\x00\xFF\x00\xFF'] - } - ), ] - py_result = ks_ldap.convert_ldap_result(result) - # The attribute containing the binary value should - # not be present in the converted result. - self.assertNotIn('binary_attr', py_result[0][1]) - - def test_utf8_conversion(self): - value_unicode = u'fäké1' - value_utf8 = value_unicode.encode('utf-8') - - result_utf8 = ks_ldap.utf8_encode(value_unicode) - self.assertEqual(value_utf8, result_utf8) - - result_utf8 = ks_ldap.utf8_encode(value_utf8) - self.assertEqual(value_utf8, result_utf8) - - result_unicode = ks_ldap.utf8_decode(value_utf8) - self.assertEqual(value_unicode, result_unicode) - - result_unicode = ks_ldap.utf8_decode(value_unicode) - self.assertEqual(value_unicode, result_unicode) - - self.assertRaises(TypeError, - ks_ldap.utf8_encode, - 100) - - result_unicode = ks_ldap.utf8_decode(100) - self.assertEqual(u'100', result_unicode) - - def test_user_id_begins_with_0(self): - user_id = '0123456' - result = [( - 'cn=dummy,dc=example,dc=com', - { - 'user_id': [user_id], - 'enabled': ['TRUE'] - } - ), ] - py_result = ks_ldap.convert_ldap_result(result) - # The user id should be 0123456, and the enabled - # flag should be True - self.assertIs(py_result[0][1]['enabled'][0], True) - self.assertEqual(user_id, py_result[0][1]['user_id'][0]) - - def test_user_id_begins_with_0_and_enabled_bit_mask(self): - user_id = '0123456' - bitmask = '225' - expected_bitmask = 225 - result = [( - 'cn=dummy,dc=example,dc=com', - { - 'user_id': [user_id], - 'enabled': [bitmask] - } - ), ] - py_result = ks_ldap.convert_ldap_result(result) - # The user id should be 0123456, and the enabled - # flag should be 225 - self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0]) - self.assertEqual(user_id, py_result[0][1]['user_id'][0]) - - def test_user_id_and_bitmask_begins_with_0(self): - user_id = '0123456' - bitmask = '0225' - expected_bitmask = 225 - result = [( - 'cn=dummy,dc=example,dc=com', - { - 'user_id': [user_id], - 'enabled': [bitmask] - } - ), ] - py_result = ks_ldap.convert_ldap_result(result) - # The user id should be 0123456, and the enabled - # flag should be 225, the 0 is dropped. - self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0]) - self.assertEqual(user_id, py_result[0][1]['user_id'][0]) - - def test_user_id_and_user_name_with_boolean_string(self): - boolean_strings = ['TRUE', 'FALSE', 'true', 'false', 'True', 'False', - 'TrUe' 'FaLse'] - for user_name in boolean_strings: - user_id = uuid.uuid4().hex - result = [( - 'cn=dummy,dc=example,dc=com', - { - 'user_id': [user_id], - 'user_name': [user_name] - } - ), ] - py_result = ks_ldap.convert_ldap_result(result) - # The user name should still be a string value. - self.assertEqual(user_name, py_result[0][1]['user_name'][0]) - - -class LDAPFilterQueryCompositionTest(unit.TestCase): - """These test cases test LDAP filter generation.""" - - def setUp(self): - super(LDAPFilterQueryCompositionTest, self).setUp() - - self.base_ldap = ks_ldap.BaseLdap(self.config_fixture.conf) - - # The tests need an attribute mapping to use. - self.attribute_name = uuid.uuid4().hex - self.filter_attribute_name = uuid.uuid4().hex - self.base_ldap.attribute_mapping = { - self.attribute_name: self.filter_attribute_name - } - - def test_return_query_with_no_hints(self): - hints = driver_hints.Hints() - # NOTE: doesn't have to be a real query, we just need to make sure the - # same string is returned if there are no hints. - query = uuid.uuid4().hex - self.assertEqual(query, - self.base_ldap.filter_query(hints=hints, query=query)) - - # make sure the default query is an empty string - self.assertEqual('', self.base_ldap.filter_query(hints=hints)) - - def test_filter_with_empty_query_and_hints_set(self): - hints = driver_hints.Hints() - username = uuid.uuid4().hex - hints.add_filter(name=self.attribute_name, - value=username, - comparator='equals', - case_sensitive=False) - expected_ldap_filter = '(&(%s=%s))' % ( - self.filter_attribute_name, username) - self.assertEqual(expected_ldap_filter, - self.base_ldap.filter_query(hints=hints)) - - def test_filter_with_both_query_and_hints_set(self): - hints = driver_hints.Hints() - # NOTE: doesn't have to be a real query, we just need to make sure the - # filter string is concatenated correctly - query = uuid.uuid4().hex - username = uuid.uuid4().hex - expected_result = '(&%(query)s(%(user_name_attr)s=%(username)s))' % ( - {'query': query, - 'user_name_attr': self.filter_attribute_name, - 'username': username}) - hints.add_filter(self.attribute_name, username) - self.assertEqual(expected_result, - self.base_ldap.filter_query(hints=hints, query=query)) - - def test_filter_with_hints_and_query_is_none(self): - hints = driver_hints.Hints() - username = uuid.uuid4().hex - hints.add_filter(name=self.attribute_name, - value=username, - comparator='equals', - case_sensitive=False) - expected_ldap_filter = '(&(%s=%s))' % ( - self.filter_attribute_name, username) - self.assertEqual(expected_ldap_filter, - self.base_ldap.filter_query(hints=hints, query=None)) diff --git a/keystone-moon/keystone/tests/unit/common/test_manager.py b/keystone-moon/keystone/tests/unit/common/test_manager.py deleted file mode 100644 index 7ef91e15..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_manager.py +++ /dev/null @@ -1,40 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock - -from keystone import catalog -from keystone.common import manager -from keystone.tests import unit - - -class TestCreateLegacyDriver(unit.BaseTestCase): - - @mock.patch('oslo_log.versionutils.report_deprecated_feature') - def test_class_is_properly_deprecated(self, mock_reporter): - Driver = manager.create_legacy_driver(catalog.CatalogDriverV8) - - # NOTE(dstanek): I want to subvert the requirement for this - # class to implement all of the abstract methods. - Driver.__abstractmethods__ = set() - impl = Driver() - - details = { - 'as_of': 'Liberty', - 'what': 'keystone.catalog.core.Driver', - 'in_favor_of': 'keystone.catalog.core.CatalogDriverV8', - 'remove_in': mock.ANY, - } - mock_reporter.assert_called_with(mock.ANY, mock.ANY, details) - self.assertEqual('N', mock_reporter.call_args[0][2]['remove_in'][0]) - - self.assertIsInstance(impl, catalog.CatalogDriverV8) diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py deleted file mode 100644 index aa2e6f72..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_notifications.py +++ /dev/null @@ -1,1248 +0,0 @@ -# Copyright 2013 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import uuid - -import mock -from oslo_config import cfg -from oslo_config import fixture as config_fixture -from oslotest import mockpatch -from pycadf import cadftaxonomy -from pycadf import cadftype -from pycadf import eventfactory -from pycadf import resource as cadfresource - -from keystone import notifications -from keystone.tests import unit -from keystone.tests.unit import test_v3 - - -CONF = cfg.CONF - -EXP_RESOURCE_TYPE = uuid.uuid4().hex -CREATED_OPERATION = notifications.ACTIONS.created -UPDATED_OPERATION = notifications.ACTIONS.updated -DELETED_OPERATION = notifications.ACTIONS.deleted -DISABLED_OPERATION = notifications.ACTIONS.disabled - - -class ArbitraryException(Exception): - pass - - -def register_callback(operation, resource_type=EXP_RESOURCE_TYPE): - """Helper for creating and registering a mock callback.""" - callback = mock.Mock(__name__='callback', - im_class=mock.Mock(__name__='class')) - notifications.register_event_callback(operation, resource_type, callback) - return callback - - -class AuditNotificationsTestCase(unit.BaseTestCase): - def setUp(self): - super(AuditNotificationsTestCase, self).setUp() - self.config_fixture = self.useFixture(config_fixture.Config(CONF)) - self.addCleanup(notifications.clear_subscribers) - - def _test_notification_operation(self, notify_function, operation): - exp_resource_id = uuid.uuid4().hex - callback = register_callback(operation) - notify_function(EXP_RESOURCE_TYPE, exp_resource_id) - callback.assert_called_once_with('identity', EXP_RESOURCE_TYPE, - operation, - {'resource_info': exp_resource_id}) - self.config_fixture.config(notification_format='cadf') - with mock.patch( - 'keystone.notifications._create_cadf_payload') as cadf_notify: - notify_function(EXP_RESOURCE_TYPE, exp_resource_id) - initiator = None - cadf_notify.assert_called_once_with( - operation, EXP_RESOURCE_TYPE, exp_resource_id, - notifications.taxonomy.OUTCOME_SUCCESS, initiator) - notify_function(EXP_RESOURCE_TYPE, exp_resource_id, public=False) - cadf_notify.assert_called_once_with( - operation, EXP_RESOURCE_TYPE, exp_resource_id, - notifications.taxonomy.OUTCOME_SUCCESS, initiator) - - def test_resource_created_notification(self): - self._test_notification_operation(notifications.Audit.created, - CREATED_OPERATION) - - def test_resource_updated_notification(self): - self._test_notification_operation(notifications.Audit.updated, - UPDATED_OPERATION) - - def test_resource_deleted_notification(self): - self._test_notification_operation(notifications.Audit.deleted, - DELETED_OPERATION) - - def test_resource_disabled_notification(self): - self._test_notification_operation(notifications.Audit.disabled, - DISABLED_OPERATION) - - -class NotificationsTestCase(unit.BaseTestCase): - - def test_send_notification(self): - """Test _send_notification. - - Test the private method _send_notification to ensure event_type, - payload, and context are built and passed properly. - - """ - resource = uuid.uuid4().hex - resource_type = EXP_RESOURCE_TYPE - operation = CREATED_OPERATION - - # NOTE(ldbragst): Even though notifications._send_notification doesn't - # contain logic that creates cases, this is supposed to test that - # context is always empty and that we ensure the resource ID of the - # resource in the notification is contained in the payload. It was - # agreed that context should be empty in Keystone's case, which is - # also noted in the /keystone/notifications.py module. This test - # ensures and maintains these conditions. - expected_args = [ - {}, # empty context - 'identity.%s.created' % resource_type, # event_type - {'resource_info': resource}, # payload - 'INFO', # priority is always INFO... - ] - - with mock.patch.object(notifications._get_notifier(), - '_notify') as mocked: - notifications._send_notification(operation, resource_type, - resource) - mocked.assert_called_once_with(*expected_args) - - def test_send_notification_with_opt_out(self): - """Test the private method _send_notification with opt-out. - - Test that _send_notification does not notify when a valid - notification_opt_out configuration is provided. - """ - resource = uuid.uuid4().hex - resource_type = EXP_RESOURCE_TYPE - operation = CREATED_OPERATION - event_type = 'identity.%s.created' % resource_type - - # NOTE(diazjf): Here we add notification_opt_out to the - # configuration so that we should return before _get_notifer is - # called. This is because we are opting out notifications for the - # passed resource_type and operation. - conf = self.useFixture(config_fixture.Config(CONF)) - conf.config(notification_opt_out=event_type) - - with mock.patch.object(notifications._get_notifier(), - '_notify') as mocked: - - notifications._send_notification(operation, resource_type, - resource) - mocked.assert_not_called() - - def test_send_audit_notification_with_opt_out(self): - """Test the private method _send_audit_notification with opt-out. - - Test that _send_audit_notification does not notify when a valid - notification_opt_out configuration is provided. - """ - resource_type = EXP_RESOURCE_TYPE - - action = CREATED_OPERATION + '.' + resource_type - initiator = mock - target = mock - outcome = 'success' - event_type = 'identity.%s.created' % resource_type - - conf = self.useFixture(config_fixture.Config(CONF)) - conf.config(notification_opt_out=event_type) - - with mock.patch.object(notifications._get_notifier(), - '_notify') as mocked: - - notifications._send_audit_notification(action, - initiator, - outcome, - target, - event_type) - mocked.assert_not_called() - - def test_opt_out_authenticate_event(self): - """Test that authenticate events are successfully opted out.""" - resource_type = EXP_RESOURCE_TYPE - - action = CREATED_OPERATION + '.' + resource_type - initiator = mock - target = mock - outcome = 'success' - event_type = 'identity.authenticate' - meter_name = '%s.%s' % (event_type, outcome) - - conf = self.useFixture(config_fixture.Config(CONF)) - conf.config(notification_opt_out=meter_name) - - with mock.patch.object(notifications._get_notifier(), - '_notify') as mocked: - - notifications._send_audit_notification(action, - initiator, - outcome, - target, - event_type) - mocked.assert_not_called() - - -class BaseNotificationTest(test_v3.RestfulTestCase): - - def setUp(self): - super(BaseNotificationTest, self).setUp() - - self._notifications = [] - self._audits = [] - - def fake_notify(operation, resource_type, resource_id, - actor_dict=None, public=True): - note = { - 'resource_id': resource_id, - 'operation': operation, - 'resource_type': resource_type, - 'send_notification_called': True, - 'public': public} - if actor_dict: - note['actor_id'] = actor_dict.get('id') - note['actor_type'] = actor_dict.get('type') - note['actor_operation'] = actor_dict.get('actor_operation') - self._notifications.append(note) - - self.useFixture(mockpatch.PatchObject( - notifications, '_send_notification', fake_notify)) - - def fake_audit(action, initiator, outcome, target, - event_type, **kwargs): - service_security = cadftaxonomy.SERVICE_SECURITY - - event = eventfactory.EventFactory().new_event( - eventType=cadftype.EVENTTYPE_ACTIVITY, - outcome=outcome, - action=action, - initiator=initiator, - target=target, - observer=cadfresource.Resource(typeURI=service_security)) - - for key, value in kwargs.items(): - setattr(event, key, value) - - audit = { - 'payload': event.as_dict(), - 'event_type': event_type, - 'send_notification_called': True} - self._audits.append(audit) - - self.useFixture(mockpatch.PatchObject( - notifications, '_send_audit_notification', fake_audit)) - - def _assert_last_note(self, resource_id, operation, resource_type, - actor_id=None, actor_type=None, - actor_operation=None): - # NOTE(stevemar): If 'basic' format is not used, then simply - # return since this assertion is not valid. - if CONF.notification_format != 'basic': - return - self.assertTrue(len(self._notifications) > 0) - note = self._notifications[-1] - self.assertEqual(operation, note['operation']) - self.assertEqual(resource_id, note['resource_id']) - self.assertEqual(resource_type, note['resource_type']) - self.assertTrue(note['send_notification_called']) - if actor_id: - self.assertEqual(actor_id, note['actor_id']) - self.assertEqual(actor_type, note['actor_type']) - self.assertEqual(actor_operation, note['actor_operation']) - - def _assert_last_audit(self, resource_id, operation, resource_type, - target_uri): - # NOTE(stevemar): If 'cadf' format is not used, then simply - # return since this assertion is not valid. - if CONF.notification_format != 'cadf': - return - self.assertTrue(len(self._audits) > 0) - audit = self._audits[-1] - payload = audit['payload'] - self.assertEqual(resource_id, payload['resource_info']) - action = '%s.%s' % (operation, resource_type) - self.assertEqual(action, payload['action']) - self.assertEqual(target_uri, payload['target']['typeURI']) - self.assertEqual(resource_id, payload['target']['id']) - event_type = '%s.%s.%s' % ('identity', resource_type, operation) - self.assertEqual(event_type, audit['event_type']) - self.assertTrue(audit['send_notification_called']) - - def _assert_initiator_data_is_set(self, operation, resource_type, typeURI): - self.assertTrue(len(self._audits) > 0) - audit = self._audits[-1] - payload = audit['payload'] - self.assertEqual(self.user_id, payload['initiator']['id']) - self.assertEqual(self.project_id, payload['initiator']['project_id']) - self.assertEqual(typeURI, payload['target']['typeURI']) - action = '%s.%s' % (operation, resource_type) - self.assertEqual(action, payload['action']) - - def _assert_notify_not_sent(self, resource_id, operation, resource_type, - public=True): - unexpected = { - 'resource_id': resource_id, - 'operation': operation, - 'resource_type': resource_type, - 'send_notification_called': True, - 'public': public} - for note in self._notifications: - self.assertNotEqual(unexpected, note) - - def _assert_notify_sent(self, resource_id, operation, resource_type, - public=True): - expected = { - 'resource_id': resource_id, - 'operation': operation, - 'resource_type': resource_type, - 'send_notification_called': True, - 'public': public} - for note in self._notifications: - if expected == note: - break - else: - self.fail("Notification not sent.") - - -class NotificationsForEntities(BaseNotificationTest): - - def test_create_group(self): - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self._assert_last_note(group_ref['id'], CREATED_OPERATION, 'group') - self._assert_last_audit(group_ref['id'], CREATED_OPERATION, 'group', - cadftaxonomy.SECURITY_GROUP) - - def test_create_project(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self._assert_last_note( - project_ref['id'], CREATED_OPERATION, 'project') - self._assert_last_audit(project_ref['id'], CREATED_OPERATION, - 'project', cadftaxonomy.SECURITY_PROJECT) - - def test_create_role(self): - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role') - self._assert_last_audit(role_ref['id'], CREATED_OPERATION, 'role', - cadftaxonomy.SECURITY_ROLE) - - def test_create_user(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - self._assert_last_note(user_ref['id'], CREATED_OPERATION, 'user') - self._assert_last_audit(user_ref['id'], CREATED_OPERATION, 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - - def test_create_trust(self): - trustor = unit.new_user_ref(domain_id=self.domain_id) - trustor = self.identity_api.create_user(trustor) - trustee = unit.new_user_ref(domain_id=self.domain_id) - trustee = self.identity_api.create_user(trustee) - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - trust_ref = unit.new_trust_ref(trustor['id'], - trustee['id']) - self.trust_api.create_trust(trust_ref['id'], - trust_ref, - [role_ref]) - self._assert_last_note( - trust_ref['id'], CREATED_OPERATION, 'OS-TRUST:trust') - self._assert_last_audit(trust_ref['id'], CREATED_OPERATION, - 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) - - def test_delete_group(self): - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self.identity_api.delete_group(group_ref['id']) - self._assert_last_note(group_ref['id'], DELETED_OPERATION, 'group') - self._assert_last_audit(group_ref['id'], DELETED_OPERATION, 'group', - cadftaxonomy.SECURITY_GROUP) - - def test_delete_project(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.resource_api.delete_project(project_ref['id']) - self._assert_last_note( - project_ref['id'], DELETED_OPERATION, 'project') - self._assert_last_audit(project_ref['id'], DELETED_OPERATION, - 'project', cadftaxonomy.SECURITY_PROJECT) - - def test_delete_role(self): - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - self.role_api.delete_role(role_ref['id']) - self._assert_last_note(role_ref['id'], DELETED_OPERATION, 'role') - self._assert_last_audit(role_ref['id'], DELETED_OPERATION, 'role', - cadftaxonomy.SECURITY_ROLE) - - def test_delete_user(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - self.identity_api.delete_user(user_ref['id']) - self._assert_last_note(user_ref['id'], DELETED_OPERATION, 'user') - self._assert_last_audit(user_ref['id'], DELETED_OPERATION, 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - - def test_create_domain(self): - domain_ref = unit.new_domain_ref() - self.resource_api.create_domain(domain_ref['id'], domain_ref) - self._assert_last_note(domain_ref['id'], CREATED_OPERATION, 'domain') - self._assert_last_audit(domain_ref['id'], CREATED_OPERATION, 'domain', - cadftaxonomy.SECURITY_DOMAIN) - - def test_update_domain(self): - domain_ref = unit.new_domain_ref() - self.resource_api.create_domain(domain_ref['id'], domain_ref) - domain_ref['description'] = uuid.uuid4().hex - self.resource_api.update_domain(domain_ref['id'], domain_ref) - self._assert_last_note(domain_ref['id'], UPDATED_OPERATION, 'domain') - self._assert_last_audit(domain_ref['id'], UPDATED_OPERATION, 'domain', - cadftaxonomy.SECURITY_DOMAIN) - - def test_delete_domain(self): - domain_ref = unit.new_domain_ref() - self.resource_api.create_domain(domain_ref['id'], domain_ref) - domain_ref['enabled'] = False - self.resource_api.update_domain(domain_ref['id'], domain_ref) - self.resource_api.delete_domain(domain_ref['id']) - self._assert_last_note(domain_ref['id'], DELETED_OPERATION, 'domain') - self._assert_last_audit(domain_ref['id'], DELETED_OPERATION, 'domain', - cadftaxonomy.SECURITY_DOMAIN) - - def test_delete_trust(self): - trustor = unit.new_user_ref(domain_id=self.domain_id) - trustor = self.identity_api.create_user(trustor) - trustee = unit.new_user_ref(domain_id=self.domain_id) - trustee = self.identity_api.create_user(trustee) - role_ref = unit.new_role_ref() - trust_ref = unit.new_trust_ref(trustor['id'], trustee['id']) - self.trust_api.create_trust(trust_ref['id'], - trust_ref, - [role_ref]) - self.trust_api.delete_trust(trust_ref['id']) - self._assert_last_note( - trust_ref['id'], DELETED_OPERATION, 'OS-TRUST:trust') - self._assert_last_audit(trust_ref['id'], DELETED_OPERATION, - 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) - - def test_create_endpoint(self): - endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id, - interface='public', - region_id=self.region_id) - self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) - self._assert_notify_sent(endpoint_ref['id'], CREATED_OPERATION, - 'endpoint') - self._assert_last_audit(endpoint_ref['id'], CREATED_OPERATION, - 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) - - def test_update_endpoint(self): - endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id, - interface='public', - region_id=self.region_id) - self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) - self.catalog_api.update_endpoint(endpoint_ref['id'], endpoint_ref) - self._assert_notify_sent(endpoint_ref['id'], UPDATED_OPERATION, - 'endpoint') - self._assert_last_audit(endpoint_ref['id'], UPDATED_OPERATION, - 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) - - def test_delete_endpoint(self): - endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id, - interface='public', - region_id=self.region_id) - self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) - self.catalog_api.delete_endpoint(endpoint_ref['id']) - self._assert_notify_sent(endpoint_ref['id'], DELETED_OPERATION, - 'endpoint') - self._assert_last_audit(endpoint_ref['id'], DELETED_OPERATION, - 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) - - def test_create_service(self): - service_ref = unit.new_service_ref() - self.catalog_api.create_service(service_ref['id'], service_ref) - self._assert_notify_sent(service_ref['id'], CREATED_OPERATION, - 'service') - self._assert_last_audit(service_ref['id'], CREATED_OPERATION, - 'service', cadftaxonomy.SECURITY_SERVICE) - - def test_update_service(self): - service_ref = unit.new_service_ref() - self.catalog_api.create_service(service_ref['id'], service_ref) - self.catalog_api.update_service(service_ref['id'], service_ref) - self._assert_notify_sent(service_ref['id'], UPDATED_OPERATION, - 'service') - self._assert_last_audit(service_ref['id'], UPDATED_OPERATION, - 'service', cadftaxonomy.SECURITY_SERVICE) - - def test_delete_service(self): - service_ref = unit.new_service_ref() - self.catalog_api.create_service(service_ref['id'], service_ref) - self.catalog_api.delete_service(service_ref['id']) - self._assert_notify_sent(service_ref['id'], DELETED_OPERATION, - 'service') - self._assert_last_audit(service_ref['id'], DELETED_OPERATION, - 'service', cadftaxonomy.SECURITY_SERVICE) - - def test_create_region(self): - region_ref = unit.new_region_ref() - self.catalog_api.create_region(region_ref) - self._assert_notify_sent(region_ref['id'], CREATED_OPERATION, - 'region') - self._assert_last_audit(region_ref['id'], CREATED_OPERATION, - 'region', cadftaxonomy.SECURITY_REGION) - - def test_update_region(self): - region_ref = unit.new_region_ref() - self.catalog_api.create_region(region_ref) - self.catalog_api.update_region(region_ref['id'], region_ref) - self._assert_notify_sent(region_ref['id'], UPDATED_OPERATION, - 'region') - self._assert_last_audit(region_ref['id'], UPDATED_OPERATION, - 'region', cadftaxonomy.SECURITY_REGION) - - def test_delete_region(self): - region_ref = unit.new_region_ref() - self.catalog_api.create_region(region_ref) - self.catalog_api.delete_region(region_ref['id']) - self._assert_notify_sent(region_ref['id'], DELETED_OPERATION, - 'region') - self._assert_last_audit(region_ref['id'], DELETED_OPERATION, - 'region', cadftaxonomy.SECURITY_REGION) - - def test_create_policy(self): - policy_ref = unit.new_policy_ref() - self.policy_api.create_policy(policy_ref['id'], policy_ref) - self._assert_notify_sent(policy_ref['id'], CREATED_OPERATION, - 'policy') - self._assert_last_audit(policy_ref['id'], CREATED_OPERATION, - 'policy', cadftaxonomy.SECURITY_POLICY) - - def test_update_policy(self): - policy_ref = unit.new_policy_ref() - self.policy_api.create_policy(policy_ref['id'], policy_ref) - self.policy_api.update_policy(policy_ref['id'], policy_ref) - self._assert_notify_sent(policy_ref['id'], UPDATED_OPERATION, - 'policy') - self._assert_last_audit(policy_ref['id'], UPDATED_OPERATION, - 'policy', cadftaxonomy.SECURITY_POLICY) - - def test_delete_policy(self): - policy_ref = unit.new_policy_ref() - self.policy_api.create_policy(policy_ref['id'], policy_ref) - self.policy_api.delete_policy(policy_ref['id']) - self._assert_notify_sent(policy_ref['id'], DELETED_OPERATION, - 'policy') - self._assert_last_audit(policy_ref['id'], DELETED_OPERATION, - 'policy', cadftaxonomy.SECURITY_POLICY) - - def test_disable_domain(self): - domain_ref = unit.new_domain_ref() - self.resource_api.create_domain(domain_ref['id'], domain_ref) - domain_ref['enabled'] = False - self.resource_api.update_domain(domain_ref['id'], domain_ref) - self._assert_notify_sent(domain_ref['id'], 'disabled', 'domain', - public=False) - - def test_disable_of_disabled_domain_does_not_notify(self): - domain_ref = unit.new_domain_ref(enabled=False) - self.resource_api.create_domain(domain_ref['id'], domain_ref) - # The domain_ref above is not changed during the create process. We - # can use the same ref to perform the update. - self.resource_api.update_domain(domain_ref['id'], domain_ref) - self._assert_notify_not_sent(domain_ref['id'], 'disabled', 'domain', - public=False) - - def test_update_group(self): - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self.identity_api.update_group(group_ref['id'], group_ref) - self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group') - self._assert_last_audit(group_ref['id'], UPDATED_OPERATION, 'group', - cadftaxonomy.SECURITY_GROUP) - - def test_update_project(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.resource_api.update_project(project_ref['id'], project_ref) - self._assert_notify_sent( - project_ref['id'], UPDATED_OPERATION, 'project', public=True) - self._assert_last_audit(project_ref['id'], UPDATED_OPERATION, - 'project', cadftaxonomy.SECURITY_PROJECT) - - def test_disable_project(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - project_ref['enabled'] = False - self.resource_api.update_project(project_ref['id'], project_ref) - self._assert_notify_sent(project_ref['id'], 'disabled', 'project', - public=False) - - def test_disable_of_disabled_project_does_not_notify(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id, - enabled=False) - self.resource_api.create_project(project_ref['id'], project_ref) - # The project_ref above is not changed during the create process. We - # can use the same ref to perform the update. - self.resource_api.update_project(project_ref['id'], project_ref) - self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project', - public=False) - - def test_update_project_does_not_send_disable(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - project_ref['enabled'] = True - self.resource_api.update_project(project_ref['id'], project_ref) - self._assert_last_note( - project_ref['id'], UPDATED_OPERATION, 'project') - self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project') - - def test_update_role(self): - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - self.role_api.update_role(role_ref['id'], role_ref) - self._assert_last_note(role_ref['id'], UPDATED_OPERATION, 'role') - self._assert_last_audit(role_ref['id'], UPDATED_OPERATION, 'role', - cadftaxonomy.SECURITY_ROLE) - - def test_update_user(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - self.identity_api.update_user(user_ref['id'], user_ref) - self._assert_last_note(user_ref['id'], UPDATED_OPERATION, 'user') - self._assert_last_audit(user_ref['id'], UPDATED_OPERATION, 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - - def test_config_option_no_events(self): - self.config_fixture.config(notification_format='basic') - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - # The regular notifications will still be emitted, since they are - # used for callback handling. - self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role') - # No audit event should have occurred - self.assertEqual(0, len(self._audits)) - - def test_add_user_to_group(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self.identity_api.add_user_to_group(user_ref['id'], group_ref['id']) - self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group', - actor_id=user_ref['id'], actor_type='user', - actor_operation='added') - - def test_remove_user_from_group(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self.identity_api.add_user_to_group(user_ref['id'], group_ref['id']) - self.identity_api.remove_user_from_group(user_ref['id'], - group_ref['id']) - self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group', - actor_id=user_ref['id'], actor_type='user', - actor_operation='removed') - - -class CADFNotificationsForEntities(NotificationsForEntities): - - def setUp(self): - super(CADFNotificationsForEntities, self).setUp() - self.config_fixture.config(notification_format='cadf') - - def test_initiator_data_is_set(self): - ref = unit.new_domain_ref() - resp = self.post('/domains', body={'domain': ref}) - resource_id = resp.result.get('domain').get('id') - self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain', - cadftaxonomy.SECURITY_DOMAIN) - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'domain', - cadftaxonomy.SECURITY_DOMAIN) - - -class V2Notifications(BaseNotificationTest): - - def setUp(self): - super(V2Notifications, self).setUp() - self.config_fixture.config(notification_format='cadf') - - def test_user(self): - token = self.get_scoped_token() - resp = self.admin_request( - method='POST', - path='/v2.0/users', - body={ - 'user': { - 'name': uuid.uuid4().hex, - 'password': uuid.uuid4().hex, - 'enabled': True, - }, - }, - token=token, - ) - user_id = resp.result.get('user').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - # test for delete user - self.admin_request( - method='DELETE', - path='/v2.0/users/%s' % user_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - - def test_role(self): - token = self.get_scoped_token() - resp = self.admin_request( - method='POST', - path='/v2.0/OS-KSADM/roles', - body={ - 'role': { - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - }, - }, - token=token, - ) - role_id = resp.result.get('role').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'role', - cadftaxonomy.SECURITY_ROLE) - # test for delete role - self.admin_request( - method='DELETE', - path='/v2.0/OS-KSADM/roles/%s' % role_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'role', - cadftaxonomy.SECURITY_ROLE) - - def test_service_and_endpoint(self): - token = self.get_scoped_token() - resp = self.admin_request( - method='POST', - path='/v2.0/OS-KSADM/services', - body={ - 'OS-KSADM:service': { - 'name': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - }, - }, - token=token, - ) - service_id = resp.result.get('OS-KSADM:service').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'service', - cadftaxonomy.SECURITY_SERVICE) - resp = self.admin_request( - method='POST', - path='/v2.0/endpoints', - body={ - 'endpoint': { - 'region': uuid.uuid4().hex, - 'service_id': service_id, - 'publicurl': uuid.uuid4().hex, - 'adminurl': uuid.uuid4().hex, - 'internalurl': uuid.uuid4().hex, - }, - }, - token=token, - ) - endpoint_id = resp.result.get('endpoint').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'endpoint', - cadftaxonomy.SECURITY_ENDPOINT) - # test for delete endpoint - self.admin_request( - method='DELETE', - path='/v2.0/endpoints/%s' % endpoint_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'endpoint', - cadftaxonomy.SECURITY_ENDPOINT) - # test for delete service - self.admin_request( - method='DELETE', - path='/v2.0/OS-KSADM/services/%s' % service_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'service', - cadftaxonomy.SECURITY_SERVICE) - - def test_project(self): - token = self.get_scoped_token() - resp = self.admin_request( - method='POST', - path='/v2.0/tenants', - body={ - 'tenant': { - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - 'enabled': True - }, - }, - token=token, - ) - project_id = resp.result.get('tenant').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'project', - cadftaxonomy.SECURITY_PROJECT) - # test for delete project - self.admin_request( - method='DELETE', - path='/v2.0/tenants/%s' % project_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'project', - cadftaxonomy.SECURITY_PROJECT) - - -class TestEventCallbacks(test_v3.RestfulTestCase): - - def setUp(self): - super(TestEventCallbacks, self).setUp() - self.has_been_called = False - - def _project_deleted_callback(self, service, resource_type, operation, - payload): - self.has_been_called = True - - def _project_created_callback(self, service, resource_type, operation, - payload): - self.has_been_called = True - - def test_notification_received(self): - callback = register_callback(CREATED_OPERATION, 'project') - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.assertTrue(callback.called) - - def test_notification_method_not_callable(self): - fake_method = None - self.assertRaises(TypeError, - notifications.register_event_callback, - UPDATED_OPERATION, - 'project', - [fake_method]) - - def test_notification_event_not_valid(self): - self.assertRaises(ValueError, - notifications.register_event_callback, - uuid.uuid4().hex, - 'project', - self._project_deleted_callback) - - def test_event_registration_for_unknown_resource_type(self): - # Registration for unknown resource types should succeed. If no event - # is issued for that resource type, the callback wont be triggered. - notifications.register_event_callback(DELETED_OPERATION, - uuid.uuid4().hex, - self._project_deleted_callback) - resource_type = uuid.uuid4().hex - notifications.register_event_callback(DELETED_OPERATION, - resource_type, - self._project_deleted_callback) - - def test_provider_event_callback_subscription(self): - callback_called = [] - - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = { - CREATED_OPERATION: {'project': self.foo_callback}} - - def foo_callback(self, service, resource_type, operation, - payload): - # uses callback_called from the closure - callback_called.append(True) - - Foo() - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.assertEqual([True], callback_called) - - def test_provider_event_callbacks_subscription(self): - callback_called = [] - - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = { - CREATED_OPERATION: { - 'project': [self.callback_0, self.callback_1]}} - - def callback_0(self, service, resource_type, operation, payload): - # uses callback_called from the closure - callback_called.append('cb0') - - def callback_1(self, service, resource_type, operation, payload): - # uses callback_called from the closure - callback_called.append('cb1') - - Foo() - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.assertItemsEqual(['cb1', 'cb0'], callback_called) - - def test_invalid_event_callbacks(self): - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = 'bogus' - - self.assertRaises(AttributeError, Foo) - - def test_invalid_event_callbacks_event(self): - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = {CREATED_OPERATION: 'bogus'} - - self.assertRaises(AttributeError, Foo) - - def test_using_an_unbound_method_as_a_callback_fails(self): - # NOTE(dstanek): An unbound method is when you reference a method - # from a class object. You'll get a method that isn't bound to a - # particular instance so there is no magic 'self'. You can call it, - # but you have to pass in the instance manually like: C.m(C()). - # If you reference the method from an instance then you get a method - # that effectively curries the self argument for you - # (think functools.partial). Obviously is we don't have an - # instance then we can't call the method. - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = {CREATED_OPERATION: - {'project': Foo.callback}} - - def callback(self, *args): - pass - - # TODO(dstanek): it would probably be nice to fail early using - # something like: - # self.assertRaises(TypeError, Foo) - Foo() - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.assertRaises(TypeError, self.resource_api.create_project, - project_ref['id'], project_ref) - - -class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): - - LOCAL_HOST = 'localhost' - ACTION = 'authenticate' - ROLE_ASSIGNMENT = 'role_assignment' - - def setUp(self): - super(CadfNotificationsWrapperTestCase, self).setUp() - self._notifications = [] - - def fake_notify(action, initiator, outcome, target, - event_type, **kwargs): - service_security = cadftaxonomy.SERVICE_SECURITY - - event = eventfactory.EventFactory().new_event( - eventType=cadftype.EVENTTYPE_ACTIVITY, - outcome=outcome, - action=action, - initiator=initiator, - target=target, - observer=cadfresource.Resource(typeURI=service_security)) - - for key, value in kwargs.items(): - setattr(event, key, value) - - note = { - 'action': action, - 'initiator': initiator, - 'event': event, - 'event_type': event_type, - 'send_notification_called': True} - self._notifications.append(note) - - self.useFixture(mockpatch.PatchObject( - notifications, '_send_audit_notification', fake_notify)) - - def _assert_last_note(self, action, user_id, event_type=None): - self.assertTrue(self._notifications) - note = self._notifications[-1] - self.assertEqual(action, note['action']) - initiator = note['initiator'] - self.assertEqual(user_id, initiator.id) - self.assertEqual(self.LOCAL_HOST, initiator.host.address) - self.assertTrue(note['send_notification_called']) - if event_type: - self.assertEqual(event_type, note['event_type']) - - def _assert_event(self, role_id, project=None, domain=None, - user=None, group=None, inherit=False): - """Assert that the CADF event is valid. - - In the case of role assignments, the event will have extra data, - specifically, the role, target, actor, and if the role is inherited. - - An example event, as a dictionary is seen below: - { - 'typeURI': 'http://schemas.dmtf.org/cloud/audit/1.0/event', - 'initiator': { - 'typeURI': 'service/security/account/user', - 'host': {'address': 'localhost'}, - 'id': 'openstack:0a90d95d-582c-4efb-9cbc-e2ca7ca9c341', - 'name': u'bccc2d9bfc2a46fd9e33bcf82f0b5c21' - }, - 'target': { - 'typeURI': 'service/security/account/user', - 'id': 'openstack:d48ea485-ef70-4f65-8d2b-01aa9d7ec12d' - }, - 'observer': { - 'typeURI': 'service/security', - 'id': 'openstack:d51dd870-d929-4aba-8d75-dcd7555a0c95' - }, - 'eventType': 'activity', - 'eventTime': '2014-08-21T21:04:56.204536+0000', - 'role': u'0e6b990380154a2599ce6b6e91548a68', - 'domain': u'24bdcff1aab8474895dbaac509793de1', - 'inherited_to_projects': False, - 'group': u'c1e22dc67cbd469ea0e33bf428fe597a', - 'action': 'created.role_assignment', - 'outcome': 'success', - 'id': 'openstack:782689dd-f428-4f13-99c7-5c70f94a5ac1' - } - """ - note = self._notifications[-1] - event = note['event'] - if project: - self.assertEqual(project, event.project) - if domain: - self.assertEqual(domain, event.domain) - if group: - self.assertEqual(group, event.group) - elif user: - self.assertEqual(user, event.user) - self.assertEqual(role_id, event.role) - self.assertEqual(inherit, event.inherited_to_projects) - - def test_v3_authenticate_user_name_and_domain_id(self): - user_id = self.user_id - user_name = self.user['name'] - password = self.user['password'] - domain_id = self.domain_id - data = self.build_authentication_request(username=user_name, - user_domain_id=domain_id, - password=password) - self.post('/auth/tokens', body=data) - self._assert_last_note(self.ACTION, user_id) - - def test_v3_authenticate_user_id(self): - user_id = self.user_id - password = self.user['password'] - data = self.build_authentication_request(user_id=user_id, - password=password) - self.post('/auth/tokens', body=data) - self._assert_last_note(self.ACTION, user_id) - - def test_v3_authenticate_user_name_and_domain_name(self): - user_id = self.user_id - user_name = self.user['name'] - password = self.user['password'] - domain_name = self.domain['name'] - data = self.build_authentication_request(username=user_name, - user_domain_name=domain_name, - password=password) - self.post('/auth/tokens', body=data) - self._assert_last_note(self.ACTION, user_id) - - def _test_role_assignment(self, url, role, project=None, domain=None, - user=None, group=None): - self.put(url) - action = "%s.%s" % (CREATED_OPERATION, self.ROLE_ASSIGNMENT) - event_type = '%s.%s.%s' % (notifications.SERVICE, - self.ROLE_ASSIGNMENT, CREATED_OPERATION) - self._assert_last_note(action, self.user_id, event_type) - self._assert_event(role, project, domain, user, group) - self.delete(url) - action = "%s.%s" % (DELETED_OPERATION, self.ROLE_ASSIGNMENT) - event_type = '%s.%s.%s' % (notifications.SERVICE, - self.ROLE_ASSIGNMENT, DELETED_OPERATION) - self._assert_last_note(action, self.user_id, event_type) - self._assert_event(role, project, domain, user, None) - - def test_user_project_grant(self): - url = ('/projects/%s/users/%s/roles/%s' % - (self.project_id, self.user_id, self.role_id)) - self._test_role_assignment(url, self.role_id, - project=self.project_id, - user=self.user_id) - - def test_group_domain_grant(self): - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group = self.identity_api.create_group(group_ref) - self.identity_api.add_user_to_group(self.user_id, group['id']) - url = ('/domains/%s/groups/%s/roles/%s' % - (self.domain_id, group['id'], self.role_id)) - self._test_role_assignment(url, self.role_id, - domain=self.domain_id, - user=self.user_id, - group=group['id']) - - def test_add_role_to_user_and_project(self): - # A notification is sent when add_role_to_user_and_project is called on - # the assignment manager. - - project_ref = unit.new_project_ref(self.domain_id) - project = self.resource_api.create_project( - project_ref['id'], project_ref) - tenant_id = project['id'] - - self.assignment_api.add_role_to_user_and_project( - self.user_id, tenant_id, self.role_id) - - self.assertTrue(self._notifications) - note = self._notifications[-1] - self.assertEqual('created.role_assignment', note['action']) - self.assertTrue(note['send_notification_called']) - - self._assert_event(self.role_id, project=tenant_id, user=self.user_id) - - def test_remove_role_from_user_and_project(self): - # A notification is sent when remove_role_from_user_and_project is - # called on the assignment manager. - - self.assignment_api.remove_role_from_user_and_project( - self.user_id, self.project_id, self.role_id) - - self.assertTrue(self._notifications) - note = self._notifications[-1] - self.assertEqual('deleted.role_assignment', note['action']) - self.assertTrue(note['send_notification_called']) - - self._assert_event(self.role_id, project=self.project_id, - user=self.user_id) - - -class TestCallbackRegistration(unit.BaseTestCase): - def setUp(self): - super(TestCallbackRegistration, self).setUp() - self.mock_log = mock.Mock() - # Force the callback logging to occur - self.mock_log.logger.getEffectiveLevel.return_value = logging.DEBUG - - def verify_log_message(self, data): - """Verify log message. - - Tests that use this are a little brittle because adding more - logging can break them. - - TODO(dstanek): remove the need for this in a future refactoring - - """ - log_fn = self.mock_log.debug - self.assertEqual(len(data), log_fn.call_count) - for datum in data: - log_fn.assert_any_call(mock.ANY, datum) - - def test_a_function_callback(self): - def callback(*args, **kwargs): - pass - - resource_type = 'thing' - with mock.patch('keystone.notifications.LOG', self.mock_log): - notifications.register_event_callback( - CREATED_OPERATION, resource_type, callback) - - callback = 'keystone.tests.unit.common.test_notifications.callback' - expected_log_data = { - 'callback': callback, - 'event': 'identity.%s.created' % resource_type - } - self.verify_log_message([expected_log_data]) - - def test_a_method_callback(self): - class C(object): - def callback(self, *args, **kwargs): - pass - - with mock.patch('keystone.notifications.LOG', self.mock_log): - notifications.register_event_callback( - CREATED_OPERATION, 'thing', C().callback) - - callback = 'keystone.tests.unit.common.test_notifications.C.callback' - expected_log_data = { - 'callback': callback, - 'event': 'identity.thing.created' - } - self.verify_log_message([expected_log_data]) - - def test_a_list_of_callbacks(self): - def callback(*args, **kwargs): - pass - - class C(object): - def callback(self, *args, **kwargs): - pass - - with mock.patch('keystone.notifications.LOG', self.mock_log): - notifications.register_event_callback( - CREATED_OPERATION, 'thing', [callback, C().callback]) - - callback_1 = 'keystone.tests.unit.common.test_notifications.callback' - callback_2 = 'keystone.tests.unit.common.test_notifications.C.callback' - expected_log_data = [ - { - 'callback': callback_1, - 'event': 'identity.thing.created' - }, - { - 'callback': callback_2, - 'event': 'identity.thing.created' - }, - ] - self.verify_log_message(expected_log_data) - - def test_an_invalid_callback(self): - self.assertRaises(TypeError, - notifications.register_event_callback, - (CREATED_OPERATION, 'thing', object())) - - def test_an_invalid_event(self): - def callback(*args, **kwargs): - pass - - self.assertRaises(ValueError, - notifications.register_event_callback, - uuid.uuid4().hex, - 'thing', - callback) diff --git a/keystone-moon/keystone/tests/unit/common/test_pemutils.py b/keystone-moon/keystone/tests/unit/common/test_pemutils.py deleted file mode 100644 index c2f58518..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_pemutils.py +++ /dev/null @@ -1,337 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import base64 - -from six import moves - -from keystone.common import pemutils -from keystone.tests import unit as tests - - -# List of 2-tuples, (pem_type, pem_header) -headers = pemutils.PEM_TYPE_TO_HEADER.items() - - -def make_data(size, offset=0): - return ''.join([chr(x % 255) for x in moves.range(offset, size + offset)]) - - -def make_base64_from_data(data): - return base64.b64encode(data) - - -def wrap_base64(base64_text): - wrapped_text = '\n'.join([base64_text[x:x + 64] - for x in moves.range(0, len(base64_text), 64)]) - wrapped_text += '\n' - return wrapped_text - - -def make_pem(header, data): - base64_text = make_base64_from_data(data) - wrapped_text = wrap_base64(base64_text) - - result = '-----BEGIN %s-----\n' % header - result += wrapped_text - result += '-----END %s-----\n' % header - - return result - - -class PEM(object): - """PEM text and it's associated data broken out, used for testing. - - """ - def __init__(self, pem_header='CERTIFICATE', pem_type='cert', - data_size=70, data_offset=0): - self.pem_header = pem_header - self.pem_type = pem_type - self.data_size = data_size - self.data_offset = data_offset - self.data = make_data(self.data_size, self.data_offset) - self.base64_text = make_base64_from_data(self.data) - self.wrapped_base64 = wrap_base64(self.base64_text) - self.pem_text = make_pem(self.pem_header, self.data) - - -class TestPEMParseResult(tests.BaseTestCase): - - def test_pem_types(self): - for pem_type in pemutils.pem_types: - pem_header = pemutils.PEM_TYPE_TO_HEADER[pem_type] - r = pemutils.PEMParseResult(pem_type=pem_type) - self.assertEqual(pem_type, r.pem_type) - self.assertEqual(pem_header, r.pem_header) - - pem_type = 'xxx' - self.assertRaises(ValueError, - pemutils.PEMParseResult, pem_type=pem_type) - - def test_pem_headers(self): - for pem_header in pemutils.pem_headers: - pem_type = pemutils.PEM_HEADER_TO_TYPE[pem_header] - r = pemutils.PEMParseResult(pem_header=pem_header) - self.assertEqual(pem_type, r.pem_type) - self.assertEqual(pem_header, r.pem_header) - - pem_header = 'xxx' - self.assertRaises(ValueError, - pemutils.PEMParseResult, pem_header=pem_header) - - -class TestPEMParse(tests.BaseTestCase): - def test_parse_none(self): - text = '' - text += 'bla bla\n' - text += 'yada yada yada\n' - text += 'burfl blatz bingo\n' - - parse_results = pemutils.parse_pem(text) - self.assertEqual(0, len(parse_results)) - - self.assertEqual(False, pemutils.is_pem(text)) - - def test_parse_invalid(self): - p = PEM(pem_type='xxx', - pem_header='XXX') - text = p.pem_text - - self.assertRaises(ValueError, - pemutils.parse_pem, text) - - def test_parse_one(self): - data_size = 70 - count = len(headers) - pems = [] - - for i in moves.range(count): - pems.append(PEM(pem_type=headers[i][0], - pem_header=headers[i][1], - data_size=data_size + i, - data_offset=i)) - - for i in moves.range(count): - p = pems[i] - text = p.pem_text - - parse_results = pemutils.parse_pem(text) - self.assertEqual(1, len(parse_results)) - - r = parse_results[0] - self.assertEqual(p.pem_type, r.pem_type) - self.assertEqual(p.pem_header, r.pem_header) - self.assertEqual(p.pem_text, - text[r.pem_start:r.pem_end]) - self.assertEqual(p.wrapped_base64, - text[r.base64_start:r.base64_end]) - self.assertEqual(p.data, r.binary_data) - - def test_parse_one_embedded(self): - p = PEM(data_offset=0) - text = '' - text += 'bla bla\n' - text += 'yada yada yada\n' - text += p.pem_text - text += 'burfl blatz bingo\n' - - parse_results = pemutils.parse_pem(text) - self.assertEqual(1, len(parse_results)) - - r = parse_results[0] - self.assertEqual(p.pem_type, r.pem_type) - self.assertEqual(p.pem_header, r.pem_header) - self.assertEqual(p.pem_text, - text[r.pem_start:r.pem_end]) - self.assertEqual(p.wrapped_base64, - text[r.base64_start: r.base64_end]) - self.assertEqual(p.data, r.binary_data) - - def test_parse_multple(self): - data_size = 70 - count = len(headers) - pems = [] - text = '' - - for i in moves.range(count): - pems.append(PEM(pem_type=headers[i][0], - pem_header=headers[i][1], - data_size=data_size + i, - data_offset=i)) - - for i in moves.range(count): - text += pems[i].pem_text - - parse_results = pemutils.parse_pem(text) - self.assertEqual(count, len(parse_results)) - - for i in moves.range(count): - r = parse_results[i] - p = pems[i] - - self.assertEqual(p.pem_type, r.pem_type) - self.assertEqual(p.pem_header, r.pem_header) - self.assertEqual(p.pem_text, - text[r.pem_start:r.pem_end]) - self.assertEqual(p.wrapped_base64, - text[r.base64_start: r.base64_end]) - self.assertEqual(p.data, r.binary_data) - - def test_parse_multple_find_specific(self): - data_size = 70 - count = len(headers) - pems = [] - text = '' - - for i in moves.range(count): - pems.append(PEM(pem_type=headers[i][0], - pem_header=headers[i][1], - data_size=data_size + i, - data_offset=i)) - - for i in moves.range(count): - text += pems[i].pem_text - - for i in moves.range(count): - parse_results = pemutils.parse_pem(text, pem_type=headers[i][0]) - self.assertEqual(1, len(parse_results)) - - r = parse_results[0] - p = pems[i] - - self.assertEqual(p.pem_type, r.pem_type) - self.assertEqual(p.pem_header, r.pem_header) - self.assertEqual(p.pem_text, - text[r.pem_start:r.pem_end]) - self.assertEqual(p.wrapped_base64, - text[r.base64_start:r.base64_end]) - self.assertEqual(p.data, r.binary_data) - - def test_parse_multple_embedded(self): - data_size = 75 - count = len(headers) - pems = [] - text = '' - - for i in moves.range(count): - pems.append(PEM(pem_type=headers[i][0], - pem_header=headers[i][1], - data_size=data_size + i, - data_offset=i)) - - for i in moves.range(count): - text += 'bla bla\n' - text += 'yada yada yada\n' - text += pems[i].pem_text - text += 'burfl blatz bingo\n' - - parse_results = pemutils.parse_pem(text) - self.assertEqual(count, len(parse_results)) - - for i in moves.range(count): - r = parse_results[i] - p = pems[i] - - self.assertEqual(p.pem_type, r.pem_type) - self.assertEqual(p.pem_header, r.pem_header) - self.assertEqual(p.pem_text, - text[r.pem_start:r.pem_end]) - self.assertEqual(p.wrapped_base64, - text[r.base64_start:r.base64_end]) - self.assertEqual(p.data, r.binary_data) - - def test_get_pem_data_none(self): - text = '' - text += 'bla bla\n' - text += 'yada yada yada\n' - text += 'burfl blatz bingo\n' - - data = pemutils.get_pem_data(text) - self.assertIsNone(data) - - def test_get_pem_data_invalid(self): - p = PEM(pem_type='xxx', - pem_header='XXX') - text = p.pem_text - - self.assertRaises(ValueError, - pemutils.get_pem_data, text) - - def test_get_pem_data(self): - data_size = 70 - count = len(headers) - pems = [] - - for i in moves.range(count): - pems.append(PEM(pem_type=headers[i][0], - pem_header=headers[i][1], - data_size=data_size + i, - data_offset=i)) - - for i in moves.range(count): - p = pems[i] - text = p.pem_text - - data = pemutils.get_pem_data(text, p.pem_type) - self.assertEqual(p.data, data) - - def test_is_pem(self): - data_size = 70 - count = len(headers) - pems = [] - - for i in moves.range(count): - pems.append(PEM(pem_type=headers[i][0], - pem_header=headers[i][1], - data_size=data_size + i, - data_offset=i)) - - for i in moves.range(count): - p = pems[i] - text = p.pem_text - self.assertTrue(pemutils.is_pem(text, pem_type=p.pem_type)) - self.assertFalse(pemutils.is_pem(text, - pem_type=p.pem_type + 'xxx')) - - def test_base64_to_pem(self): - data_size = 70 - count = len(headers) - pems = [] - - for i in moves.range(count): - pems.append(PEM(pem_type=headers[i][0], - pem_header=headers[i][1], - data_size=data_size + i, - data_offset=i)) - - for i in moves.range(count): - p = pems[i] - pem = pemutils.base64_to_pem(p.base64_text, p.pem_type) - self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data) - - def test_binary_to_pem(self): - data_size = 70 - count = len(headers) - pems = [] - - for i in moves.range(count): - pems.append(PEM(pem_type=headers[i][0], - pem_header=headers[i][1], - data_size=data_size + i, - data_offset=i)) - - for i in moves.range(count): - p = pems[i] - pem = pemutils.binary_to_pem(p.data, p.pem_type) - self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data) diff --git a/keystone-moon/keystone/tests/unit/common/test_sql_core.py b/keystone-moon/keystone/tests/unit/common/test_sql_core.py deleted file mode 100644 index 7d20eb03..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_sql_core.py +++ /dev/null @@ -1,52 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -from sqlalchemy.ext import declarative - -from keystone.common import sql -from keystone.tests import unit -from keystone.tests.unit import utils - - -ModelBase = declarative.declarative_base() - - -class TestModel(ModelBase, sql.ModelDictMixin): - __tablename__ = 'testmodel' - id = sql.Column(sql.String(64), primary_key=True) - text = sql.Column(sql.String(64), nullable=False) - - -class TestModelDictMixin(unit.BaseTestCase): - - def test_creating_a_model_instance_from_a_dict(self): - d = {'id': utils.new_uuid(), 'text': utils.new_uuid()} - m = TestModel.from_dict(d) - self.assertEqual(d['id'], m.id) - self.assertEqual(d['text'], m.text) - - def test_creating_a_dict_from_a_model_instance(self): - m = TestModel(id=utils.new_uuid(), text=utils.new_uuid()) - d = m.to_dict() - self.assertEqual(d['id'], m.id) - self.assertEqual(d['text'], m.text) - - def test_creating_a_model_instance_from_an_invalid_dict(self): - d = {'id': utils.new_uuid(), 'text': utils.new_uuid(), 'extra': None} - self.assertRaises(TypeError, TestModel.from_dict, d) - - def test_creating_a_dict_from_a_model_instance_that_has_extra_attrs(self): - expected = {'id': utils.new_uuid(), 'text': utils.new_uuid()} - m = TestModel(id=expected['id'], text=expected['text']) - m.extra = 'this should not be in the dictionary' - self.assertEqual(expected, m.to_dict()) diff --git a/keystone-moon/keystone/tests/unit/common/test_utils.py b/keystone-moon/keystone/tests/unit/common/test_utils.py deleted file mode 100644 index 3641aacd..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_utils.py +++ /dev/null @@ -1,210 +0,0 @@ -# encoding: utf-8 -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import uuid - -from oslo_config import cfg -from oslo_config import fixture as config_fixture -from oslo_serialization import jsonutils -import six - -from keystone.common import utils as common_utils -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import utils -from keystone.version import service - - -CONF = cfg.CONF - -TZ = utils.TZ - - -class UtilsTestCase(unit.BaseTestCase): - OPTIONAL = object() - - def setUp(self): - super(UtilsTestCase, self).setUp() - self.config_fixture = self.useFixture(config_fixture.Config(CONF)) - - def test_resource_uuid(self): - uuid_str = '536e28c2017e405e89b25a1ed777b952' - self.assertEqual(uuid_str, common_utils.resource_uuid(uuid_str)) - - # Exact 64 length string. - uuid_str = ('536e28c2017e405e89b25a1ed777b952' - 'f13de678ac714bb1b7d1e9a007c10db5') - resource_id_namespace = common_utils.RESOURCE_ID_NAMESPACE - transformed_id = uuid.uuid5(resource_id_namespace, uuid_str).hex - self.assertEqual(transformed_id, common_utils.resource_uuid(uuid_str)) - - # Non-ASCII character test. - non_ascii_ = 'ß' * 32 - transformed_id = uuid.uuid5(resource_id_namespace, non_ascii_).hex - self.assertEqual(transformed_id, - common_utils.resource_uuid(non_ascii_)) - - # This input is invalid because it's length is more than 64. - invalid_input = 'x' * 65 - self.assertRaises(ValueError, common_utils.resource_uuid, - invalid_input) - - # 64 length unicode string, to mimic what is returned from mapping_id - # backend. - uuid_str = six.text_type('536e28c2017e405e89b25a1ed777b952' - 'f13de678ac714bb1b7d1e9a007c10db5') - resource_id_namespace = common_utils.RESOURCE_ID_NAMESPACE - if six.PY2: - uuid_str = uuid_str.encode('utf-8') - transformed_id = uuid.uuid5(resource_id_namespace, uuid_str).hex - self.assertEqual(transformed_id, common_utils.resource_uuid(uuid_str)) - - def test_hash(self): - password = 'right' - wrong = 'wrongwrong' # Two wrongs don't make a right - hashed = common_utils.hash_password(password) - self.assertTrue(common_utils.check_password(password, hashed)) - self.assertFalse(common_utils.check_password(wrong, hashed)) - - def test_verify_normal_password_strict(self): - self.config_fixture.config(strict_password_check=False) - password = uuid.uuid4().hex - verified = common_utils.verify_length_and_trunc_password(password) - self.assertEqual(password, verified) - - def test_that_a_hash_can_not_be_validated_against_a_hash(self): - # NOTE(dstanek): Bug 1279849 reported a problem where passwords - # were not being hashed if they already looked like a hash. This - # would allow someone to hash their password ahead of time - # (potentially getting around password requirements, like - # length) and then they could auth with their original password. - password = uuid.uuid4().hex - hashed_password = common_utils.hash_password(password) - new_hashed_password = common_utils.hash_password(hashed_password) - self.assertFalse(common_utils.check_password(password, - new_hashed_password)) - - def test_verify_long_password_strict(self): - self.config_fixture.config(strict_password_check=False) - self.config_fixture.config(group='identity', max_password_length=5) - max_length = CONF.identity.max_password_length - invalid_password = 'passw0rd' - trunc = common_utils.verify_length_and_trunc_password(invalid_password) - self.assertEqual(invalid_password[:max_length], trunc) - - def test_verify_long_password_strict_raises_exception(self): - self.config_fixture.config(strict_password_check=True) - self.config_fixture.config(group='identity', max_password_length=5) - invalid_password = 'passw0rd' - self.assertRaises(exception.PasswordVerificationError, - common_utils.verify_length_and_trunc_password, - invalid_password) - - def test_hash_long_password_truncation(self): - self.config_fixture.config(strict_password_check=False) - invalid_length_password = '0' * 9999999 - hashed = common_utils.hash_password(invalid_length_password) - self.assertTrue(common_utils.check_password(invalid_length_password, - hashed)) - - def test_hash_long_password_strict(self): - self.config_fixture.config(strict_password_check=True) - invalid_length_password = '0' * 9999999 - self.assertRaises(exception.PasswordVerificationError, - common_utils.hash_password, - invalid_length_password) - - def _create_test_user(self, password=OPTIONAL): - user = {"name": "hthtest"} - if password is not self.OPTIONAL: - user['password'] = password - - return user - - def test_hash_user_password_without_password(self): - user = self._create_test_user() - hashed = common_utils.hash_user_password(user) - self.assertEqual(user, hashed) - - def test_hash_user_password_with_null_password(self): - user = self._create_test_user(password=None) - hashed = common_utils.hash_user_password(user) - self.assertEqual(user, hashed) - - def test_hash_user_password_with_empty_password(self): - password = '' - user = self._create_test_user(password=password) - user_hashed = common_utils.hash_user_password(user) - password_hashed = user_hashed['password'] - self.assertTrue(common_utils.check_password(password, password_hashed)) - - def test_hash_edge_cases(self): - hashed = common_utils.hash_password('secret') - self.assertFalse(common_utils.check_password('', hashed)) - self.assertFalse(common_utils.check_password(None, hashed)) - - def test_hash_unicode(self): - password = u'Comment \xe7a va' - wrong = 'Comment ?a va' - hashed = common_utils.hash_password(password) - self.assertTrue(common_utils.check_password(password, hashed)) - self.assertFalse(common_utils.check_password(wrong, hashed)) - - def test_auth_str_equal(self): - self.assertTrue(common_utils.auth_str_equal('abc123', 'abc123')) - self.assertFalse(common_utils.auth_str_equal('a', 'aaaaa')) - self.assertFalse(common_utils.auth_str_equal('aaaaa', 'a')) - self.assertFalse(common_utils.auth_str_equal('ABC123', 'abc123')) - - def test_unixtime(self): - global TZ - - @utils.timezone - def _test_unixtime(): - epoch = common_utils.unixtime(dt) - self.assertEqual(epoch, epoch_ans, "TZ=%s" % TZ) - - dt = datetime.datetime(1970, 1, 2, 3, 4, 56, 0) - epoch_ans = 56 + 4 * 60 + 3 * 3600 + 86400 - for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']: - TZ = 'UTC' + d - _test_unixtime() - - def test_pki_encoder(self): - data = {'field': 'value'} - json = jsonutils.dumps(data, cls=common_utils.PKIEncoder) - expected_json = '{"field":"value"}' - self.assertEqual(expected_json, json) - - def test_url_safe_check(self): - base_str = 'i am safe' - self.assertFalse(common_utils.is_not_url_safe(base_str)) - for i in common_utils.URL_RESERVED_CHARS: - self.assertTrue(common_utils.is_not_url_safe(base_str + i)) - - def test_url_safe_with_unicode_check(self): - base_str = u'i am \xe7afe' - self.assertFalse(common_utils.is_not_url_safe(base_str)) - for i in common_utils.URL_RESERVED_CHARS: - self.assertTrue(common_utils.is_not_url_safe(base_str + i)) - - -class ServiceHelperTests(unit.BaseTestCase): - - @service.fail_gracefully - def _do_test(self): - raise Exception("Test Exc") - - def test_fail_gracefully(self): - self.assertRaises(unit.UnexpectedExit, self._do_test) |