aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/common
diff options
context:
space:
mode:
authorWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
committerWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
commitb8c756ecdd7cced1db4300935484e8c83701c82e (patch)
tree87e51107d82b217ede145de9d9d59e2100725bd7 /keystone-moon/keystone/tests/unit/common
parentc304c773bae68fb854ed9eab8fb35c4ef17cf136 (diff)
migrate moon code from github to opnfv
Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604 Signed-off-by: WuKong <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/common')
-rw-r--r--keystone-moon/keystone/tests/unit/common/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_base64utils.py208
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_connection_pool.py119
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_injection.py293
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_json_home.py91
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_ldap.py502
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_notifications.py974
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_pemutils.py337
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_sql_core.py52
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_utils.py164
10 files changed, 2740 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/__init__.py b/keystone-moon/keystone/tests/unit/common/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/__init__.py
diff --git a/keystone-moon/keystone/tests/unit/common/test_base64utils.py b/keystone-moon/keystone/tests/unit/common/test_base64utils.py
new file mode 100644
index 00000000..b0b75578
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_base64utils.py
@@ -0,0 +1,208 @@
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from keystone.common import base64utils
+from keystone.tests import unit as tests
+
+base64_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
+ 'abcdefghijklmnopqrstuvwxyz'
+ '0123456789'
+ '+/=') # includes pad char
+
+base64url_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
+ 'abcdefghijklmnopqrstuvwxyz'
+ '0123456789'
+ '-_=') # includes pad char
+
+
+class TestValid(tests.BaseTestCase):
+ def test_valid_base64(self):
+ self.assertTrue(base64utils.is_valid_base64('+/=='))
+ self.assertTrue(base64utils.is_valid_base64('+/+='))
+ self.assertTrue(base64utils.is_valid_base64('+/+/'))
+
+ self.assertFalse(base64utils.is_valid_base64('-_=='))
+ self.assertFalse(base64utils.is_valid_base64('-_-='))
+ self.assertFalse(base64utils.is_valid_base64('-_-_'))
+
+ self.assertTrue(base64utils.is_valid_base64('abcd'))
+ self.assertFalse(base64utils.is_valid_base64('abcde'))
+ self.assertFalse(base64utils.is_valid_base64('abcde=='))
+ self.assertFalse(base64utils.is_valid_base64('abcdef'))
+ self.assertTrue(base64utils.is_valid_base64('abcdef=='))
+ self.assertFalse(base64utils.is_valid_base64('abcdefg'))
+ self.assertTrue(base64utils.is_valid_base64('abcdefg='))
+ self.assertTrue(base64utils.is_valid_base64('abcdefgh'))
+
+ self.assertFalse(base64utils.is_valid_base64('-_=='))
+
+ def test_valid_base64url(self):
+ self.assertFalse(base64utils.is_valid_base64url('+/=='))
+ self.assertFalse(base64utils.is_valid_base64url('+/+='))
+ self.assertFalse(base64utils.is_valid_base64url('+/+/'))
+
+ self.assertTrue(base64utils.is_valid_base64url('-_=='))
+ self.assertTrue(base64utils.is_valid_base64url('-_-='))
+ self.assertTrue(base64utils.is_valid_base64url('-_-_'))
+
+ self.assertTrue(base64utils.is_valid_base64url('abcd'))
+ self.assertFalse(base64utils.is_valid_base64url('abcde'))
+ self.assertFalse(base64utils.is_valid_base64url('abcde=='))
+ self.assertFalse(base64utils.is_valid_base64url('abcdef'))
+ self.assertTrue(base64utils.is_valid_base64url('abcdef=='))
+ self.assertFalse(base64utils.is_valid_base64url('abcdefg'))
+ self.assertTrue(base64utils.is_valid_base64url('abcdefg='))
+ self.assertTrue(base64utils.is_valid_base64url('abcdefgh'))
+
+ self.assertTrue(base64utils.is_valid_base64url('-_=='))
+
+
+class TestBase64Padding(tests.BaseTestCase):
+
+ def test_filter(self):
+ self.assertEqual('', base64utils.filter_formatting(''))
+ self.assertEqual('', base64utils.filter_formatting(' '))
+ self.assertEqual('a', base64utils.filter_formatting('a'))
+ self.assertEqual('a', base64utils.filter_formatting(' a'))
+ self.assertEqual('a', base64utils.filter_formatting('a '))
+ self.assertEqual('ab', base64utils.filter_formatting('ab'))
+ self.assertEqual('ab', base64utils.filter_formatting(' ab'))
+ self.assertEqual('ab', base64utils.filter_formatting('ab '))
+ self.assertEqual('ab', base64utils.filter_formatting('a b'))
+ self.assertEqual('ab', base64utils.filter_formatting(' a b'))
+ self.assertEqual('ab', base64utils.filter_formatting('a b '))
+ self.assertEqual('ab', base64utils.filter_formatting('a\nb\n '))
+
+ text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
+ 'abcdefghijklmnopqrstuvwxyz'
+ '0123456789'
+ '+/=')
+ self.assertEqual(base64_alphabet,
+ base64utils.filter_formatting(text))
+
+ text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n'
+ ' abcdefghijklmnopqrstuvwxyz\n'
+ '\t\f\r'
+ ' 0123456789\n'
+ ' +/=')
+ self.assertEqual(base64_alphabet,
+ base64utils.filter_formatting(text))
+ self.assertEqual(base64url_alphabet,
+ base64utils.base64_to_base64url(base64_alphabet))
+
+ text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
+ 'abcdefghijklmnopqrstuvwxyz'
+ '0123456789'
+ '-_=')
+ self.assertEqual(base64url_alphabet,
+ base64utils.filter_formatting(text))
+
+ text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n'
+ ' abcdefghijklmnopqrstuvwxyz\n'
+ '\t\f\r'
+ ' 0123456789\n'
+ '-_=')
+ self.assertEqual(base64url_alphabet,
+ base64utils.filter_formatting(text))
+
+ def test_alphabet_conversion(self):
+ self.assertEqual(base64url_alphabet,
+ base64utils.base64_to_base64url(base64_alphabet))
+
+ self.assertEqual(base64_alphabet,
+ base64utils.base64url_to_base64(base64url_alphabet))
+
+ def test_is_padded(self):
+ self.assertTrue(base64utils.base64_is_padded('ABCD'))
+ self.assertTrue(base64utils.base64_is_padded('ABC='))
+ self.assertTrue(base64utils.base64_is_padded('AB=='))
+
+ self.assertTrue(base64utils.base64_is_padded('1234ABCD'))
+ self.assertTrue(base64utils.base64_is_padded('1234ABC='))
+ self.assertTrue(base64utils.base64_is_padded('1234AB=='))
+
+ self.assertFalse(base64utils.base64_is_padded('ABC'))
+ self.assertFalse(base64utils.base64_is_padded('AB'))
+ self.assertFalse(base64utils.base64_is_padded('A'))
+ self.assertFalse(base64utils.base64_is_padded(''))
+
+ self.assertRaises(base64utils.InvalidBase64Error,
+ base64utils.base64_is_padded, '=')
+
+ self.assertRaises(base64utils.InvalidBase64Error,
+ base64utils.base64_is_padded, 'AB=C')
+
+ self.assertRaises(base64utils.InvalidBase64Error,
+ base64utils.base64_is_padded, 'AB=')
+
+ self.assertRaises(base64utils.InvalidBase64Error,
+ base64utils.base64_is_padded, 'ABCD=')
+
+ self.assertRaises(ValueError, base64utils.base64_is_padded,
+ 'ABC', pad='==')
+ self.assertRaises(base64utils.InvalidBase64Error,
+ base64utils.base64_is_padded, 'A=BC')
+
+ def test_strip_padding(self):
+ self.assertEqual('ABCD', base64utils.base64_strip_padding('ABCD'))
+ self.assertEqual('ABC', base64utils.base64_strip_padding('ABC='))
+ self.assertEqual('AB', base64utils.base64_strip_padding('AB=='))
+ self.assertRaises(ValueError, base64utils.base64_strip_padding,
+ 'ABC=', pad='==')
+ self.assertEqual('ABC', base64utils.base64_strip_padding('ABC'))
+
+ def test_assure_padding(self):
+ self.assertEqual('ABCD', base64utils.base64_assure_padding('ABCD'))
+ self.assertEqual('ABC=', base64utils.base64_assure_padding('ABC'))
+ self.assertEqual('ABC=', base64utils.base64_assure_padding('ABC='))
+ self.assertEqual('AB==', base64utils.base64_assure_padding('AB'))
+ self.assertEqual('AB==', base64utils.base64_assure_padding('AB=='))
+ self.assertRaises(ValueError, base64utils.base64_assure_padding,
+ 'ABC', pad='==')
+
+ def test_base64_percent_encoding(self):
+ self.assertEqual('ABCD', base64utils.base64url_percent_encode('ABCD'))
+ self.assertEqual('ABC%3D',
+ base64utils.base64url_percent_encode('ABC='))
+ self.assertEqual('AB%3D%3D',
+ base64utils.base64url_percent_encode('AB=='))
+
+ self.assertEqual('ABCD', base64utils.base64url_percent_decode('ABCD'))
+ self.assertEqual('ABC=',
+ base64utils.base64url_percent_decode('ABC%3D'))
+ self.assertEqual('AB==',
+ base64utils.base64url_percent_decode('AB%3D%3D'))
+ self.assertRaises(base64utils.InvalidBase64Error,
+ base64utils.base64url_percent_encode, 'chars')
+ self.assertRaises(base64utils.InvalidBase64Error,
+ base64utils.base64url_percent_decode, 'AB%3D%3')
+
+
+class TestTextWrap(tests.BaseTestCase):
+
+ def test_wrapping(self):
+ raw_text = 'abcdefgh'
+ wrapped_text = 'abc\ndef\ngh\n'
+
+ self.assertEqual(wrapped_text,
+ base64utils.base64_wrap(raw_text, width=3))
+
+ t = '\n'.join(base64utils.base64_wrap_iter(raw_text, width=3)) + '\n'
+ self.assertEqual(wrapped_text, t)
+
+ raw_text = 'abcdefgh'
+ wrapped_text = 'abcd\nefgh\n'
+
+ self.assertEqual(wrapped_text,
+ base64utils.base64_wrap(raw_text, width=4))
diff --git a/keystone-moon/keystone/tests/unit/common/test_connection_pool.py b/keystone-moon/keystone/tests/unit/common/test_connection_pool.py
new file mode 100644
index 00000000..74d0420c
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_connection_pool.py
@@ -0,0 +1,119 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import time
+
+import mock
+from six.moves import queue
+import testtools
+from testtools import matchers
+
+from keystone.common.cache import _memcache_pool
+from keystone import exception
+from keystone.tests.unit import core
+
+
+class _TestConnectionPool(_memcache_pool.ConnectionPool):
+ destroyed_value = 'destroyed'
+
+ def _create_connection(self):
+ return mock.MagicMock()
+
+ def _destroy_connection(self, conn):
+ conn(self.destroyed_value)
+
+
+class TestConnectionPool(core.TestCase):
+ def setUp(self):
+ super(TestConnectionPool, self).setUp()
+ self.unused_timeout = 10
+ self.maxsize = 2
+ self.connection_pool = _TestConnectionPool(
+ maxsize=self.maxsize,
+ unused_timeout=self.unused_timeout)
+ self.addCleanup(self.cleanup_instance('connection_pool'))
+
+ def test_get_context_manager(self):
+ self.assertThat(self.connection_pool.queue, matchers.HasLength(0))
+ with self.connection_pool.acquire() as conn:
+ self.assertEqual(1, self.connection_pool._acquired)
+ self.assertEqual(0, self.connection_pool._acquired)
+ self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
+ self.assertEqual(conn, self.connection_pool.queue[0].connection)
+
+ def test_cleanup_pool(self):
+ self.test_get_context_manager()
+ newtime = time.time() + self.unused_timeout * 2
+ non_expired_connection = _memcache_pool._PoolItem(
+ ttl=(newtime * 2),
+ connection=mock.MagicMock())
+ self.connection_pool.queue.append(non_expired_connection)
+ self.assertThat(self.connection_pool.queue, matchers.HasLength(2))
+ with mock.patch.object(time, 'time', return_value=newtime):
+ conn = self.connection_pool.queue[0].connection
+ with self.connection_pool.acquire():
+ pass
+ conn.assert_has_calls(
+ [mock.call(self.connection_pool.destroyed_value)])
+ self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
+ self.assertEqual(0, non_expired_connection.connection.call_count)
+
+ def test_acquire_conn_exception_returns_acquired_count(self):
+ class TestException(Exception):
+ pass
+
+ with mock.patch.object(_TestConnectionPool, '_create_connection',
+ side_effect=TestException):
+ with testtools.ExpectedException(TestException):
+ with self.connection_pool.acquire():
+ pass
+ self.assertThat(self.connection_pool.queue,
+ matchers.HasLength(0))
+ self.assertEqual(0, self.connection_pool._acquired)
+
+ def test_connection_pool_limits_maximum_connections(self):
+ # NOTE(morganfainberg): To ensure we don't lockup tests until the
+ # job limit, explicitly call .get_nowait() and .put_nowait() in this
+ # case.
+ conn1 = self.connection_pool.get_nowait()
+ conn2 = self.connection_pool.get_nowait()
+
+ # Use a nowait version to raise an Empty exception indicating we would
+ # not get another connection until one is placed back into the queue.
+ self.assertRaises(queue.Empty, self.connection_pool.get_nowait)
+
+ # Place the connections back into the pool.
+ self.connection_pool.put_nowait(conn1)
+ self.connection_pool.put_nowait(conn2)
+
+ # Make sure we can get a connection out of the pool again.
+ self.connection_pool.get_nowait()
+
+ def test_connection_pool_maximum_connection_get_timeout(self):
+ connection_pool = _TestConnectionPool(
+ maxsize=1,
+ unused_timeout=self.unused_timeout,
+ conn_get_timeout=0)
+
+ def _acquire_connection():
+ with connection_pool.acquire():
+ pass
+
+ # Make sure we've consumed the only available connection from the pool
+ conn = connection_pool.get_nowait()
+
+ self.assertRaises(exception.UnexpectedError, _acquire_connection)
+
+ # Put the connection back and ensure we can acquire the connection
+ # after it is available.
+ connection_pool.put_nowait(conn)
+ _acquire_connection()
diff --git a/keystone-moon/keystone/tests/unit/common/test_injection.py b/keystone-moon/keystone/tests/unit/common/test_injection.py
new file mode 100644
index 00000000..86bb3c24
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_injection.py
@@ -0,0 +1,293 @@
+# Copyright 2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from keystone.common import dependency
+from keystone.tests import unit as tests
+
+
+class TestDependencyInjection(tests.BaseTestCase):
+ def setUp(self):
+ super(TestDependencyInjection, self).setUp()
+ self.addCleanup(dependency.reset)
+
+ def test_dependency_injection(self):
+ class Interface(object):
+ def do_work(self):
+ assert False
+
+ @dependency.provider('first_api')
+ class FirstImplementation(Interface):
+ def do_work(self):
+ return True
+
+ @dependency.provider('second_api')
+ class SecondImplementation(Interface):
+ def do_work(self):
+ return True
+
+ @dependency.requires('first_api', 'second_api')
+ class Consumer(object):
+ def do_work_with_dependencies(self):
+ assert self.first_api.do_work()
+ assert self.second_api.do_work()
+
+ # initialize dependency providers
+ first_api = FirstImplementation()
+ second_api = SecondImplementation()
+
+ # ... sometime later, initialize a dependency consumer
+ consumer = Consumer()
+
+ # the expected dependencies should be available to the consumer
+ self.assertIs(consumer.first_api, first_api)
+ self.assertIs(consumer.second_api, second_api)
+ self.assertIsInstance(consumer.first_api, Interface)
+ self.assertIsInstance(consumer.second_api, Interface)
+ consumer.do_work_with_dependencies()
+
+ def test_dependency_provider_configuration(self):
+ @dependency.provider('api')
+ class Configurable(object):
+ def __init__(self, value=None):
+ self.value = value
+
+ def get_value(self):
+ return self.value
+
+ @dependency.requires('api')
+ class Consumer(object):
+ def get_value(self):
+ return self.api.get_value()
+
+ # initialize dependency providers
+ api = Configurable(value=True)
+
+ # ... sometime later, initialize a dependency consumer
+ consumer = Consumer()
+
+ # the expected dependencies should be available to the consumer
+ self.assertIs(consumer.api, api)
+ self.assertIsInstance(consumer.api, Configurable)
+ self.assertTrue(consumer.get_value())
+
+ def test_dependency_consumer_configuration(self):
+ @dependency.provider('api')
+ class Provider(object):
+ def get_value(self):
+ return True
+
+ @dependency.requires('api')
+ class Configurable(object):
+ def __init__(self, value=None):
+ self.value = value
+
+ def get_value(self):
+ if self.value:
+ return self.api.get_value()
+
+ # initialize dependency providers
+ api = Provider()
+
+ # ... sometime later, initialize a dependency consumer
+ consumer = Configurable(value=True)
+
+ # the expected dependencies should be available to the consumer
+ self.assertIs(consumer.api, api)
+ self.assertIsInstance(consumer.api, Provider)
+ self.assertTrue(consumer.get_value())
+
+ def test_inherited_dependency(self):
+ class Interface(object):
+ def do_work(self):
+ assert False
+
+ @dependency.provider('first_api')
+ class FirstImplementation(Interface):
+ def do_work(self):
+ return True
+
+ @dependency.provider('second_api')
+ class SecondImplementation(Interface):
+ def do_work(self):
+ return True
+
+ @dependency.requires('first_api')
+ class ParentConsumer(object):
+ def do_work_with_dependencies(self):
+ assert self.first_api.do_work()
+
+ @dependency.requires('second_api')
+ class ChildConsumer(ParentConsumer):
+ def do_work_with_dependencies(self):
+ assert self.second_api.do_work()
+ super(ChildConsumer, self).do_work_with_dependencies()
+
+ # initialize dependency providers
+ first_api = FirstImplementation()
+ second_api = SecondImplementation()
+
+ # ... sometime later, initialize a dependency consumer
+ consumer = ChildConsumer()
+
+ # dependencies should be naturally inherited
+ self.assertEqual(
+ set(['first_api']),
+ ParentConsumer._dependencies)
+ self.assertEqual(
+ set(['first_api', 'second_api']),
+ ChildConsumer._dependencies)
+ self.assertEqual(
+ set(['first_api', 'second_api']),
+ consumer._dependencies)
+
+ # the expected dependencies should be available to the consumer
+ self.assertIs(consumer.first_api, first_api)
+ self.assertIs(consumer.second_api, second_api)
+ self.assertIsInstance(consumer.first_api, Interface)
+ self.assertIsInstance(consumer.second_api, Interface)
+ consumer.do_work_with_dependencies()
+
+ def test_unresolvable_dependency(self):
+ @dependency.requires(uuid.uuid4().hex)
+ class Consumer(object):
+ pass
+
+ def for_test():
+ Consumer()
+ dependency.resolve_future_dependencies()
+
+ self.assertRaises(dependency.UnresolvableDependencyException, for_test)
+
+ def test_circular_dependency(self):
+ p1_name = uuid.uuid4().hex
+ p2_name = uuid.uuid4().hex
+
+ @dependency.provider(p1_name)
+ @dependency.requires(p2_name)
+ class P1(object):
+ pass
+
+ @dependency.provider(p2_name)
+ @dependency.requires(p1_name)
+ class P2(object):
+ pass
+
+ p1 = P1()
+ p2 = P2()
+
+ dependency.resolve_future_dependencies()
+
+ self.assertIs(getattr(p1, p2_name), p2)
+ self.assertIs(getattr(p2, p1_name), p1)
+
+ def test_reset(self):
+ # Can reset the registry of providers.
+
+ p_id = uuid.uuid4().hex
+
+ @dependency.provider(p_id)
+ class P(object):
+ pass
+
+ p_inst = P()
+
+ self.assertIs(dependency.get_provider(p_id), p_inst)
+
+ dependency.reset()
+
+ self.assertFalse(dependency._REGISTRY)
+
+ def test_optional_dependency_not_provided(self):
+ requirement_name = uuid.uuid4().hex
+
+ @dependency.optional(requirement_name)
+ class C1(object):
+ pass
+
+ c1_inst = C1()
+
+ dependency.resolve_future_dependencies()
+
+ self.assertIsNone(getattr(c1_inst, requirement_name))
+
+ def test_optional_dependency_provided(self):
+ requirement_name = uuid.uuid4().hex
+
+ @dependency.optional(requirement_name)
+ class C1(object):
+ pass
+
+ @dependency.provider(requirement_name)
+ class P1(object):
+ pass
+
+ c1_inst = C1()
+ p1_inst = P1()
+
+ dependency.resolve_future_dependencies()
+
+ self.assertIs(getattr(c1_inst, requirement_name), p1_inst)
+
+ def test_optional_and_required(self):
+ p1_name = uuid.uuid4().hex
+ p2_name = uuid.uuid4().hex
+ optional_name = uuid.uuid4().hex
+
+ @dependency.provider(p1_name)
+ @dependency.requires(p2_name)
+ @dependency.optional(optional_name)
+ class P1(object):
+ pass
+
+ @dependency.provider(p2_name)
+ @dependency.requires(p1_name)
+ class P2(object):
+ pass
+
+ p1 = P1()
+ p2 = P2()
+
+ dependency.resolve_future_dependencies()
+
+ self.assertIs(getattr(p1, p2_name), p2)
+ self.assertIs(getattr(p2, p1_name), p1)
+ self.assertIsNone(getattr(p1, optional_name))
+
+ def test_get_provider(self):
+ # Can get the instance of a provider using get_provider
+
+ provider_name = uuid.uuid4().hex
+
+ @dependency.provider(provider_name)
+ class P(object):
+ pass
+
+ provider_instance = P()
+ retrieved_provider_instance = dependency.get_provider(provider_name)
+ self.assertIs(provider_instance, retrieved_provider_instance)
+
+ def test_get_provider_not_provided_error(self):
+ # If no provider and provider is required then fails.
+
+ provider_name = uuid.uuid4().hex
+ self.assertRaises(KeyError, dependency.get_provider, provider_name)
+
+ def test_get_provider_not_provided_optional(self):
+ # If no provider and provider is optional then returns None.
+
+ provider_name = uuid.uuid4().hex
+ self.assertIsNone(dependency.get_provider(provider_name,
+ dependency.GET_OPTIONAL))
diff --git a/keystone-moon/keystone/tests/unit/common/test_json_home.py b/keystone-moon/keystone/tests/unit/common/test_json_home.py
new file mode 100644
index 00000000..fb7f8448
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_json_home.py
@@ -0,0 +1,91 @@
+# Copyright 2014 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import copy
+
+from testtools import matchers
+
+from keystone.common import json_home
+from keystone.tests import unit as tests
+
+
+class JsonHomeTest(tests.BaseTestCase):
+ def test_build_v3_resource_relation(self):
+ resource_name = self.getUniqueString()
+ relation = json_home.build_v3_resource_relation(resource_name)
+ exp_relation = (
+ 'http://docs.openstack.org/api/openstack-identity/3/rel/%s' %
+ resource_name)
+ self.assertThat(relation, matchers.Equals(exp_relation))
+
+ def test_build_v3_extension_resource_relation(self):
+ extension_name = self.getUniqueString()
+ extension_version = self.getUniqueString()
+ resource_name = self.getUniqueString()
+ relation = json_home.build_v3_extension_resource_relation(
+ extension_name, extension_version, resource_name)
+ exp_relation = (
+ 'http://docs.openstack.org/api/openstack-identity/3/ext/%s/%s/rel/'
+ '%s' % (extension_name, extension_version, resource_name))
+ self.assertThat(relation, matchers.Equals(exp_relation))
+
+ def test_build_v3_parameter_relation(self):
+ parameter_name = self.getUniqueString()
+ relation = json_home.build_v3_parameter_relation(parameter_name)
+ exp_relation = (
+ 'http://docs.openstack.org/api/openstack-identity/3/param/%s' %
+ parameter_name)
+ self.assertThat(relation, matchers.Equals(exp_relation))
+
+ def test_build_v3_extension_parameter_relation(self):
+ extension_name = self.getUniqueString()
+ extension_version = self.getUniqueString()
+ parameter_name = self.getUniqueString()
+ relation = json_home.build_v3_extension_parameter_relation(
+ extension_name, extension_version, parameter_name)
+ exp_relation = (
+ 'http://docs.openstack.org/api/openstack-identity/3/ext/%s/%s/'
+ 'param/%s' % (extension_name, extension_version, parameter_name))
+ self.assertThat(relation, matchers.Equals(exp_relation))
+
+ def test_translate_urls(self):
+ href_rel = self.getUniqueString()
+ href = self.getUniqueString()
+ href_template_rel = self.getUniqueString()
+ href_template = self.getUniqueString()
+ href_vars = {self.getUniqueString(): self.getUniqueString()}
+ original_json_home = {
+ 'resources': {
+ href_rel: {'href': href},
+ href_template_rel: {
+ 'href-template': href_template,
+ 'href-vars': href_vars}
+ }
+ }
+
+ new_json_home = copy.deepcopy(original_json_home)
+ new_prefix = self.getUniqueString()
+ json_home.translate_urls(new_json_home, new_prefix)
+
+ exp_json_home = {
+ 'resources': {
+ href_rel: {'href': new_prefix + href},
+ href_template_rel: {
+ 'href-template': new_prefix + href_template,
+ 'href-vars': href_vars}
+ }
+ }
+
+ self.assertThat(new_json_home, matchers.Equals(exp_json_home))
diff --git a/keystone-moon/keystone/tests/unit/common/test_ldap.py b/keystone-moon/keystone/tests/unit/common/test_ldap.py
new file mode 100644
index 00000000..41568890
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_ldap.py
@@ -0,0 +1,502 @@
+# -*- coding: utf-8 -*-
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+import ldap.dn
+import mock
+from oslo_config import cfg
+from testtools import matchers
+
+import os
+import shutil
+import tempfile
+
+from keystone.common import ldap as ks_ldap
+from keystone.common.ldap import core as common_ldap_core
+from keystone.tests import unit as tests
+from keystone.tests.unit import default_fixtures
+from keystone.tests.unit import fakeldap
+
+CONF = cfg.CONF
+
+
+class DnCompareTest(tests.BaseTestCase):
+ """Tests for the DN comparison functions in keystone.common.ldap.core."""
+
+ def test_prep(self):
+ # prep_case_insensitive returns the string with spaces at the front and
+ # end if it's already lowercase and no insignificant characters.
+ value = 'lowercase value'
+ self.assertEqual(value, ks_ldap.prep_case_insensitive(value))
+
+ def test_prep_lowercase(self):
+ # prep_case_insensitive returns the string with spaces at the front and
+ # end and lowercases the value.
+ value = 'UPPERCASE VALUE'
+ exp_value = value.lower()
+ self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
+
+ def test_prep_insignificant(self):
+ # prep_case_insensitive remove insignificant spaces.
+ value = 'before after'
+ exp_value = 'before after'
+ self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
+
+ def test_prep_insignificant_pre_post(self):
+ # prep_case_insensitive remove insignificant spaces.
+ value = ' value '
+ exp_value = 'value'
+ self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
+
+ def test_ava_equal_same(self):
+ # is_ava_value_equal returns True if the two values are the same.
+ value = 'val1'
+ self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value))
+
+ def test_ava_equal_complex(self):
+ # is_ava_value_equal returns True if the two values are the same using
+ # a value that's got different capitalization and insignificant chars.
+ val1 = 'before after'
+ val2 = ' BEFORE afTer '
+ self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2))
+
+ def test_ava_different(self):
+ # is_ava_value_equal returns False if the values aren't the same.
+ self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
+
+ def test_rdn_same(self):
+ # is_rdn_equal returns True if the two values are the same.
+ rdn = ldap.dn.str2dn('cn=val1')[0]
+ self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn))
+
+ def test_rdn_diff_length(self):
+ # is_rdn_equal returns False if the RDNs have a different number of
+ # AVAs.
+ rdn1 = ldap.dn.str2dn('cn=cn1')[0]
+ rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
+ self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
+
+ def test_rdn_multi_ava_same_order(self):
+ # is_rdn_equal returns True if the RDNs have the same number of AVAs
+ # and the values are the same.
+ rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
+ rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0]
+ self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
+
+ def test_rdn_multi_ava_diff_order(self):
+ # is_rdn_equal returns True if the RDNs have the same number of AVAs
+ # and the values are the same, even if in a different order
+ rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
+ rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0]
+ self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
+
+ def test_rdn_multi_ava_diff_type(self):
+ # is_rdn_equal returns False if the RDNs have the same number of AVAs
+ # and the attribute types are different.
+ rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
+ rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0]
+ self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
+
+ def test_rdn_attr_type_case_diff(self):
+ # is_rdn_equal returns True for same RDNs even when attr type case is
+ # different.
+ rdn1 = ldap.dn.str2dn('cn=cn1')[0]
+ rdn2 = ldap.dn.str2dn('CN=cn1')[0]
+ self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
+
+ def test_rdn_attr_type_alias(self):
+ # is_rdn_equal returns False for same RDNs even when attr type alias is
+ # used. Note that this is a limitation since an LDAP server should
+ # consider them equal.
+ rdn1 = ldap.dn.str2dn('cn=cn1')[0]
+ rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0]
+ self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
+
+ def test_dn_same(self):
+ # is_dn_equal returns True if the DNs are the same.
+ dn = 'cn=Babs Jansen,ou=OpenStack'
+ self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
+
+ def test_dn_equal_unicode(self):
+ # is_dn_equal can accept unicode
+ dn = u'cn=fäké,ou=OpenStack'
+ self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
+
+ def test_dn_diff_length(self):
+ # is_dn_equal returns False if the DNs don't have the same number of
+ # RDNs
+ dn1 = 'cn=Babs Jansen,ou=OpenStack'
+ dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com'
+ self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2))
+
+ def test_dn_equal_rdns(self):
+ # is_dn_equal returns True if the DNs have the same number of RDNs
+ # and each RDN is the same.
+ dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource'
+ dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack'
+ self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2))
+
+ def test_dn_parsed_dns(self):
+ # is_dn_equal can also accept parsed DNs.
+ dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource')
+ dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack')
+ self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2))
+
+ def test_startswith_under_child(self):
+ # dn_startswith returns True if descendant_dn is a child of dn.
+ child = 'cn=Babs Jansen,ou=OpenStack'
+ parent = 'ou=OpenStack'
+ self.assertTrue(ks_ldap.dn_startswith(child, parent))
+
+ def test_startswith_parent(self):
+ # dn_startswith returns False if descendant_dn is a parent of dn.
+ child = 'cn=Babs Jansen,ou=OpenStack'
+ parent = 'ou=OpenStack'
+ self.assertFalse(ks_ldap.dn_startswith(parent, child))
+
+ def test_startswith_same(self):
+ # dn_startswith returns False if DNs are the same.
+ dn = 'cn=Babs Jansen,ou=OpenStack'
+ self.assertFalse(ks_ldap.dn_startswith(dn, dn))
+
+ def test_startswith_not_parent(self):
+ # dn_startswith returns False if descendant_dn is not under the dn
+ child = 'cn=Babs Jansen,ou=OpenStack'
+ parent = 'dc=example.com'
+ self.assertFalse(ks_ldap.dn_startswith(child, parent))
+
+ def test_startswith_descendant(self):
+ # dn_startswith returns True if descendant_dn is a descendant of dn.
+ descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com'
+ dn = 'ou=OpenStack,dc=example.com'
+ self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
+
+ descendant = 'uid=12345,ou=Users,dc=example,dc=com'
+ dn = 'ou=Users,dc=example,dc=com'
+ self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
+
+ def test_startswith_parsed_dns(self):
+ # dn_startswith also accepts parsed DNs.
+ descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack')
+ dn = ldap.dn.str2dn('ou=OpenStack')
+ self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
+
+ def test_startswith_unicode(self):
+ # dn_startswith accepts unicode.
+ child = u'cn=cn=fäké,ou=OpenStäck'
+ parent = 'ou=OpenStäck'
+ self.assertTrue(ks_ldap.dn_startswith(child, parent))
+
+
+class LDAPDeleteTreeTest(tests.TestCase):
+
+ def setUp(self):
+ super(LDAPDeleteTreeTest, self).setUp()
+
+ ks_ldap.register_handler('fake://',
+ fakeldap.FakeLdapNoSubtreeDelete)
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
+
+ self.addCleanup(self.clear_database)
+ self.addCleanup(common_ldap_core._HANDLERS.clear)
+
+ def clear_database(self):
+ for shelf in fakeldap.FakeShelves:
+ fakeldap.FakeShelves[shelf].clear()
+
+ def config_overrides(self):
+ super(LDAPDeleteTreeTest, self).config_overrides()
+ self.config_fixture.config(
+ group='identity',
+ driver='keystone.identity.backends.ldap.Identity')
+
+ def config_files(self):
+ config_files = super(LDAPDeleteTreeTest, self).config_files()
+ config_files.append(tests.dirs.tests_conf('backend_ldap.conf'))
+ return config_files
+
+ def test_deleteTree(self):
+ """Test manually deleting a tree.
+
+ Few LDAP servers support CONTROL_DELETETREE. This test
+ exercises the alternate code paths in BaseLdap.deleteTree.
+
+ """
+ conn = self.identity_api.user.get_connection()
+ id_attr = self.identity_api.user.id_attr
+ objclass = self.identity_api.user.object_class.lower()
+ tree_dn = self.identity_api.user.tree_dn
+
+ def create_entry(name, parent_dn=None):
+ if not parent_dn:
+ parent_dn = tree_dn
+ dn = '%s=%s,%s' % (id_attr, name, parent_dn)
+ attrs = [('objectclass', [objclass, 'ldapsubentry']),
+ (id_attr, [name])]
+ conn.add_s(dn, attrs)
+ return dn
+
+ # create 3 entries like this:
+ # cn=base
+ # cn=child,cn=base
+ # cn=grandchild,cn=child,cn=base
+ # then attempt to deleteTree(cn=base)
+ base_id = 'base'
+ base_dn = create_entry(base_id)
+ child_dn = create_entry('child', base_dn)
+ grandchild_dn = create_entry('grandchild', child_dn)
+
+ # verify that the three entries were created
+ scope = ldap.SCOPE_SUBTREE
+ filt = '(|(objectclass=*)(objectclass=ldapsubentry))'
+ entries = conn.search_s(base_dn, scope, filt,
+ attrlist=common_ldap_core.DN_ONLY)
+ self.assertThat(entries, matchers.HasLength(3))
+ sort_ents = sorted([e[0] for e in entries], key=len, reverse=True)
+ self.assertEqual([grandchild_dn, child_dn, base_dn], sort_ents)
+
+ # verify that a non-leaf node can't be deleted directly by the
+ # LDAP server
+ self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF,
+ conn.delete_s, base_dn)
+ self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF,
+ conn.delete_s, child_dn)
+
+ # call our deleteTree implementation
+ self.identity_api.user.deleteTree(base_id)
+ self.assertRaises(ldap.NO_SUCH_OBJECT,
+ conn.search_s, base_dn, ldap.SCOPE_BASE)
+ self.assertRaises(ldap.NO_SUCH_OBJECT,
+ conn.search_s, child_dn, ldap.SCOPE_BASE)
+ self.assertRaises(ldap.NO_SUCH_OBJECT,
+ conn.search_s, grandchild_dn, ldap.SCOPE_BASE)
+
+
+class SslTlsTest(tests.TestCase):
+ """Tests for the SSL/TLS functionality in keystone.common.ldap.core."""
+
+ @mock.patch.object(ks_ldap.core.KeystoneLDAPHandler, 'simple_bind_s')
+ @mock.patch.object(ldap.ldapobject.LDAPObject, 'start_tls_s')
+ def _init_ldap_connection(self, config, mock_ldap_one, mock_ldap_two):
+ # Attempt to connect to initialize python-ldap.
+ base_ldap = ks_ldap.BaseLdap(config)
+ base_ldap.get_connection()
+
+ def test_certfile_trust_tls(self):
+ # We need this to actually exist, so we create a tempfile.
+ (handle, certfile) = tempfile.mkstemp()
+ self.addCleanup(os.unlink, certfile)
+ self.addCleanup(os.close, handle)
+ self.config_fixture.config(group='ldap',
+ url='ldap://localhost',
+ use_tls=True,
+ tls_cacertfile=certfile)
+
+ self._init_ldap_connection(CONF)
+
+ # Ensure the cert trust option is set.
+ self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE))
+
+ def test_certdir_trust_tls(self):
+ # We need this to actually exist, so we create a tempdir.
+ certdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, certdir)
+ self.config_fixture.config(group='ldap',
+ url='ldap://localhost',
+ use_tls=True,
+ tls_cacertdir=certdir)
+
+ self._init_ldap_connection(CONF)
+
+ # Ensure the cert trust option is set.
+ self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR))
+
+ def test_certfile_trust_ldaps(self):
+ # We need this to actually exist, so we create a tempfile.
+ (handle, certfile) = tempfile.mkstemp()
+ self.addCleanup(os.unlink, certfile)
+ self.addCleanup(os.close, handle)
+ self.config_fixture.config(group='ldap',
+ url='ldaps://localhost',
+ use_tls=False,
+ tls_cacertfile=certfile)
+
+ self._init_ldap_connection(CONF)
+
+ # Ensure the cert trust option is set.
+ self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE))
+
+ def test_certdir_trust_ldaps(self):
+ # We need this to actually exist, so we create a tempdir.
+ certdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, certdir)
+ self.config_fixture.config(group='ldap',
+ url='ldaps://localhost',
+ use_tls=False,
+ tls_cacertdir=certdir)
+
+ self._init_ldap_connection(CONF)
+
+ # Ensure the cert trust option is set.
+ self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR))
+
+
+class LDAPPagedResultsTest(tests.TestCase):
+ """Tests the paged results functionality in keystone.common.ldap.core."""
+
+ def setUp(self):
+ super(LDAPPagedResultsTest, self).setUp()
+ self.clear_database()
+
+ ks_ldap.register_handler('fake://', fakeldap.FakeLdap)
+ self.addCleanup(common_ldap_core._HANDLERS.clear)
+
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
+
+ def clear_database(self):
+ for shelf in fakeldap.FakeShelves:
+ fakeldap.FakeShelves[shelf].clear()
+
+ def config_overrides(self):
+ super(LDAPPagedResultsTest, self).config_overrides()
+ self.config_fixture.config(
+ group='identity',
+ driver='keystone.identity.backends.ldap.Identity')
+
+ def config_files(self):
+ config_files = super(LDAPPagedResultsTest, self).config_files()
+ config_files.append(tests.dirs.tests_conf('backend_ldap.conf'))
+ return config_files
+
+ @mock.patch.object(fakeldap.FakeLdap, 'search_ext')
+ @mock.patch.object(fakeldap.FakeLdap, 'result3')
+ def test_paged_results_control_api(self, mock_result3, mock_search_ext):
+ mock_result3.return_value = ('', [], 1, [])
+
+ self.config_fixture.config(group='ldap',
+ page_size=1)
+
+ conn = self.identity_api.user.get_connection()
+ conn._paged_search_s('dc=example,dc=test',
+ ldap.SCOPE_SUBTREE,
+ 'objectclass=*')
+
+
+class CommonLdapTestCase(tests.BaseTestCase):
+ """These test cases call functions in keystone.common.ldap."""
+
+ def test_binary_attribute_values(self):
+ result = [(
+ 'cn=junk,dc=example,dc=com',
+ {
+ 'cn': ['junk'],
+ 'sn': [uuid.uuid4().hex],
+ 'mail': [uuid.uuid4().hex],
+ 'binary_attr': ['\x00\xFF\x00\xFF']
+ }
+ ), ]
+ py_result = ks_ldap.convert_ldap_result(result)
+ # The attribute containing the binary value should
+ # not be present in the converted result.
+ self.assertNotIn('binary_attr', py_result[0][1])
+
+ def test_utf8_conversion(self):
+ value_unicode = u'fäké1'
+ value_utf8 = value_unicode.encode('utf-8')
+
+ result_utf8 = ks_ldap.utf8_encode(value_unicode)
+ self.assertEqual(value_utf8, result_utf8)
+
+ result_utf8 = ks_ldap.utf8_encode(value_utf8)
+ self.assertEqual(value_utf8, result_utf8)
+
+ result_unicode = ks_ldap.utf8_decode(value_utf8)
+ self.assertEqual(value_unicode, result_unicode)
+
+ result_unicode = ks_ldap.utf8_decode(value_unicode)
+ self.assertEqual(value_unicode, result_unicode)
+
+ self.assertRaises(TypeError,
+ ks_ldap.utf8_encode,
+ 100)
+
+ result_unicode = ks_ldap.utf8_decode(100)
+ self.assertEqual(u'100', result_unicode)
+
+ def test_user_id_begins_with_0(self):
+ user_id = '0123456'
+ result = [(
+ 'cn=dummy,dc=example,dc=com',
+ {
+ 'user_id': [user_id],
+ 'enabled': ['TRUE']
+ }
+ ), ]
+ py_result = ks_ldap.convert_ldap_result(result)
+ # The user id should be 0123456, and the enabled
+ # flag should be True
+ self.assertIs(py_result[0][1]['enabled'][0], True)
+ self.assertEqual(user_id, py_result[0][1]['user_id'][0])
+
+ def test_user_id_begins_with_0_and_enabled_bit_mask(self):
+ user_id = '0123456'
+ bitmask = '225'
+ expected_bitmask = 225
+ result = [(
+ 'cn=dummy,dc=example,dc=com',
+ {
+ 'user_id': [user_id],
+ 'enabled': [bitmask]
+ }
+ ), ]
+ py_result = ks_ldap.convert_ldap_result(result)
+ # The user id should be 0123456, and the enabled
+ # flag should be 225
+ self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
+ self.assertEqual(user_id, py_result[0][1]['user_id'][0])
+
+ def test_user_id_and_bitmask_begins_with_0(self):
+ user_id = '0123456'
+ bitmask = '0225'
+ expected_bitmask = 225
+ result = [(
+ 'cn=dummy,dc=example,dc=com',
+ {
+ 'user_id': [user_id],
+ 'enabled': [bitmask]
+ }
+ ), ]
+ py_result = ks_ldap.convert_ldap_result(result)
+ # The user id should be 0123456, and the enabled
+ # flag should be 225, the 0 is dropped.
+ self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
+ self.assertEqual(user_id, py_result[0][1]['user_id'][0])
+
+ def test_user_id_and_user_name_with_boolean_string(self):
+ boolean_strings = ['TRUE', 'FALSE', 'true', 'false', 'True', 'False',
+ 'TrUe' 'FaLse']
+ for user_name in boolean_strings:
+ user_id = uuid.uuid4().hex
+ result = [(
+ 'cn=dummy,dc=example,dc=com',
+ {
+ 'user_id': [user_id],
+ 'user_name': [user_name]
+ }
+ ), ]
+ py_result = ks_ldap.convert_ldap_result(result)
+ # The user name should still be a string value.
+ self.assertEqual(user_name, py_result[0][1]['user_name'][0])
diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py
new file mode 100644
index 00000000..55dd556d
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_notifications.py
@@ -0,0 +1,974 @@
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import uuid
+
+import mock
+from oslo_config import cfg
+from oslo_config import fixture as config_fixture
+from oslotest import mockpatch
+from pycadf import cadftaxonomy
+from pycadf import cadftype
+from pycadf import eventfactory
+from pycadf import resource as cadfresource
+import testtools
+
+from keystone.common import dependency
+from keystone import notifications
+from keystone.tests.unit import test_v3
+
+
+CONF = cfg.CONF
+
+EXP_RESOURCE_TYPE = uuid.uuid4().hex
+CREATED_OPERATION = notifications.ACTIONS.created
+UPDATED_OPERATION = notifications.ACTIONS.updated
+DELETED_OPERATION = notifications.ACTIONS.deleted
+DISABLED_OPERATION = notifications.ACTIONS.disabled
+
+
+class ArbitraryException(Exception):
+ pass
+
+
+def register_callback(operation, resource_type=EXP_RESOURCE_TYPE):
+ """Helper for creating and registering a mock callback.
+
+ """
+ callback = mock.Mock(__name__='callback',
+ im_class=mock.Mock(__name__='class'))
+ notifications.register_event_callback(operation, resource_type, callback)
+ return callback
+
+
+class AuditNotificationsTestCase(testtools.TestCase):
+ def setUp(self):
+ super(AuditNotificationsTestCase, self).setUp()
+ self.config_fixture = self.useFixture(config_fixture.Config(CONF))
+ self.addCleanup(notifications.clear_subscribers)
+
+ def _test_notification_operation(self, notify_function, operation):
+ exp_resource_id = uuid.uuid4().hex
+ callback = register_callback(operation)
+ notify_function(EXP_RESOURCE_TYPE, exp_resource_id)
+ callback.assert_called_once_with('identity', EXP_RESOURCE_TYPE,
+ operation,
+ {'resource_info': exp_resource_id})
+ self.config_fixture.config(notification_format='cadf')
+ with mock.patch(
+ 'keystone.notifications._create_cadf_payload') as cadf_notify:
+ notify_function(EXP_RESOURCE_TYPE, exp_resource_id)
+ initiator = None
+ cadf_notify.assert_called_once_with(
+ operation, EXP_RESOURCE_TYPE, exp_resource_id,
+ notifications.taxonomy.OUTCOME_SUCCESS, initiator)
+ notify_function(EXP_RESOURCE_TYPE, exp_resource_id, public=False)
+ cadf_notify.assert_called_once_with(
+ operation, EXP_RESOURCE_TYPE, exp_resource_id,
+ notifications.taxonomy.OUTCOME_SUCCESS, initiator)
+
+ def test_resource_created_notification(self):
+ self._test_notification_operation(notifications.Audit.created,
+ CREATED_OPERATION)
+
+ def test_resource_updated_notification(self):
+ self._test_notification_operation(notifications.Audit.updated,
+ UPDATED_OPERATION)
+
+ def test_resource_deleted_notification(self):
+ self._test_notification_operation(notifications.Audit.deleted,
+ DELETED_OPERATION)
+
+ def test_resource_disabled_notification(self):
+ self._test_notification_operation(notifications.Audit.disabled,
+ DISABLED_OPERATION)
+
+
+class NotificationsWrapperTestCase(testtools.TestCase):
+ def create_fake_ref(self):
+ resource_id = uuid.uuid4().hex
+ return resource_id, {
+ 'id': resource_id,
+ 'key': uuid.uuid4().hex
+ }
+
+ @notifications.created(EXP_RESOURCE_TYPE)
+ def create_resource(self, resource_id, data):
+ return data
+
+ def test_resource_created_notification(self):
+ exp_resource_id, data = self.create_fake_ref()
+ callback = register_callback(CREATED_OPERATION)
+
+ self.create_resource(exp_resource_id, data)
+ callback.assert_called_with('identity', EXP_RESOURCE_TYPE,
+ CREATED_OPERATION,
+ {'resource_info': exp_resource_id})
+
+ @notifications.updated(EXP_RESOURCE_TYPE)
+ def update_resource(self, resource_id, data):
+ return data
+
+ def test_resource_updated_notification(self):
+ exp_resource_id, data = self.create_fake_ref()
+ callback = register_callback(UPDATED_OPERATION)
+
+ self.update_resource(exp_resource_id, data)
+ callback.assert_called_with('identity', EXP_RESOURCE_TYPE,
+ UPDATED_OPERATION,
+ {'resource_info': exp_resource_id})
+
+ @notifications.deleted(EXP_RESOURCE_TYPE)
+ def delete_resource(self, resource_id):
+ pass
+
+ def test_resource_deleted_notification(self):
+ exp_resource_id = uuid.uuid4().hex
+ callback = register_callback(DELETED_OPERATION)
+
+ self.delete_resource(exp_resource_id)
+ callback.assert_called_with('identity', EXP_RESOURCE_TYPE,
+ DELETED_OPERATION,
+ {'resource_info': exp_resource_id})
+
+ @notifications.created(EXP_RESOURCE_TYPE)
+ def create_exception(self, resource_id):
+ raise ArbitraryException()
+
+ def test_create_exception_without_notification(self):
+ callback = register_callback(CREATED_OPERATION)
+ self.assertRaises(
+ ArbitraryException, self.create_exception, uuid.uuid4().hex)
+ self.assertFalse(callback.called)
+
+ @notifications.created(EXP_RESOURCE_TYPE)
+ def update_exception(self, resource_id):
+ raise ArbitraryException()
+
+ def test_update_exception_without_notification(self):
+ callback = register_callback(UPDATED_OPERATION)
+ self.assertRaises(
+ ArbitraryException, self.update_exception, uuid.uuid4().hex)
+ self.assertFalse(callback.called)
+
+ @notifications.deleted(EXP_RESOURCE_TYPE)
+ def delete_exception(self, resource_id):
+ raise ArbitraryException()
+
+ def test_delete_exception_without_notification(self):
+ callback = register_callback(DELETED_OPERATION)
+ self.assertRaises(
+ ArbitraryException, self.delete_exception, uuid.uuid4().hex)
+ self.assertFalse(callback.called)
+
+
+class NotificationsTestCase(testtools.TestCase):
+ def setUp(self):
+ super(NotificationsTestCase, self).setUp()
+
+ # these should use self.config_fixture.config(), but they haven't
+ # been registered yet
+ CONF.rpc_backend = 'fake'
+ CONF.notification_driver = ['fake']
+
+ def test_send_notification(self):
+ """Test the private method _send_notification to ensure event_type,
+ payload, and context are built and passed properly.
+ """
+ resource = uuid.uuid4().hex
+ resource_type = EXP_RESOURCE_TYPE
+ operation = CREATED_OPERATION
+
+ # NOTE(ldbragst): Even though notifications._send_notification doesn't
+ # contain logic that creates cases, this is supposed to test that
+ # context is always empty and that we ensure the resource ID of the
+ # resource in the notification is contained in the payload. It was
+ # agreed that context should be empty in Keystone's case, which is
+ # also noted in the /keystone/notifications.py module. This test
+ # ensures and maintains these conditions.
+ expected_args = [
+ {}, # empty context
+ 'identity.%s.created' % resource_type, # event_type
+ {'resource_info': resource}, # payload
+ 'INFO', # priority is always INFO...
+ ]
+
+ with mock.patch.object(notifications._get_notifier(),
+ '_notify') as mocked:
+ notifications._send_notification(operation, resource_type,
+ resource)
+ mocked.assert_called_once_with(*expected_args)
+
+
+class BaseNotificationTest(test_v3.RestfulTestCase):
+
+ def setUp(self):
+ super(BaseNotificationTest, self).setUp()
+
+ self._notifications = []
+ self._audits = []
+
+ def fake_notify(operation, resource_type, resource_id,
+ public=True):
+ note = {
+ 'resource_id': resource_id,
+ 'operation': operation,
+ 'resource_type': resource_type,
+ 'send_notification_called': True,
+ 'public': public}
+ self._notifications.append(note)
+
+ self.useFixture(mockpatch.PatchObject(
+ notifications, '_send_notification', fake_notify))
+
+ def fake_audit(action, initiator, outcome, target,
+ event_type, **kwargs):
+ service_security = cadftaxonomy.SERVICE_SECURITY
+
+ event = eventfactory.EventFactory().new_event(
+ eventType=cadftype.EVENTTYPE_ACTIVITY,
+ outcome=outcome,
+ action=action,
+ initiator=initiator,
+ target=target,
+ observer=cadfresource.Resource(typeURI=service_security))
+
+ for key, value in kwargs.items():
+ setattr(event, key, value)
+
+ audit = {
+ 'payload': event.as_dict(),
+ 'event_type': event_type,
+ 'send_notification_called': True}
+ self._audits.append(audit)
+
+ self.useFixture(mockpatch.PatchObject(
+ notifications, '_send_audit_notification', fake_audit))
+
+ def _assert_last_note(self, resource_id, operation, resource_type):
+ # NOTE(stevemar): If 'basic' format is not used, then simply
+ # return since this assertion is not valid.
+ if CONF.notification_format != 'basic':
+ return
+ self.assertTrue(len(self._notifications) > 0)
+ note = self._notifications[-1]
+ self.assertEqual(note['operation'], operation)
+ self.assertEqual(note['resource_id'], resource_id)
+ self.assertEqual(note['resource_type'], resource_type)
+ self.assertTrue(note['send_notification_called'])
+
+ def _assert_last_audit(self, resource_id, operation, resource_type,
+ target_uri):
+ # NOTE(stevemar): If 'cadf' format is not used, then simply
+ # return since this assertion is not valid.
+ if CONF.notification_format != 'cadf':
+ return
+ self.assertTrue(len(self._audits) > 0)
+ audit = self._audits[-1]
+ payload = audit['payload']
+ self.assertEqual(resource_id, payload['resource_info'])
+ action = '%s.%s' % (operation, resource_type)
+ self.assertEqual(action, payload['action'])
+ self.assertEqual(target_uri, payload['target']['typeURI'])
+ self.assertEqual(resource_id, payload['target']['id'])
+ event_type = '%s.%s.%s' % ('identity', resource_type, operation)
+ self.assertEqual(event_type, audit['event_type'])
+ self.assertTrue(audit['send_notification_called'])
+
+ def _assert_notify_not_sent(self, resource_id, operation, resource_type,
+ public=True):
+ unexpected = {
+ 'resource_id': resource_id,
+ 'operation': operation,
+ 'resource_type': resource_type,
+ 'send_notification_called': True,
+ 'public': public}
+ for note in self._notifications:
+ self.assertNotEqual(unexpected, note)
+
+ def _assert_notify_sent(self, resource_id, operation, resource_type,
+ public=True):
+ expected = {
+ 'resource_id': resource_id,
+ 'operation': operation,
+ 'resource_type': resource_type,
+ 'send_notification_called': True,
+ 'public': public}
+ for note in self._notifications:
+ if expected == note:
+ break
+ else:
+ self.fail("Notification not sent.")
+
+
+class NotificationsForEntities(BaseNotificationTest):
+
+ def test_create_group(self):
+ group_ref = self.new_group_ref(domain_id=self.domain_id)
+ group_ref = self.identity_api.create_group(group_ref)
+ self._assert_last_note(group_ref['id'], CREATED_OPERATION, 'group')
+ self._assert_last_audit(group_ref['id'], CREATED_OPERATION, 'group',
+ cadftaxonomy.SECURITY_GROUP)
+
+ def test_create_project(self):
+ project_ref = self.new_project_ref(domain_id=self.domain_id)
+ self.assignment_api.create_project(project_ref['id'], project_ref)
+ self._assert_last_note(
+ project_ref['id'], CREATED_OPERATION, 'project')
+ self._assert_last_audit(project_ref['id'], CREATED_OPERATION,
+ 'project', cadftaxonomy.SECURITY_PROJECT)
+
+ def test_create_role(self):
+ role_ref = self.new_role_ref()
+ self.role_api.create_role(role_ref['id'], role_ref)
+ self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role')
+ self._assert_last_audit(role_ref['id'], CREATED_OPERATION, 'role',
+ cadftaxonomy.SECURITY_ROLE)
+
+ def test_create_user(self):
+ user_ref = self.new_user_ref(domain_id=self.domain_id)
+ user_ref = self.identity_api.create_user(user_ref)
+ self._assert_last_note(user_ref['id'], CREATED_OPERATION, 'user')
+ self._assert_last_audit(user_ref['id'], CREATED_OPERATION, 'user',
+ cadftaxonomy.SECURITY_ACCOUNT_USER)
+
+ def test_create_trust(self):
+ trustor = self.new_user_ref(domain_id=self.domain_id)
+ trustor = self.identity_api.create_user(trustor)
+ trustee = self.new_user_ref(domain_id=self.domain_id)
+ trustee = self.identity_api.create_user(trustee)
+ role_ref = self.new_role_ref()
+ self.role_api.create_role(role_ref['id'], role_ref)
+ trust_ref = self.new_trust_ref(trustor['id'],
+ trustee['id'])
+ self.trust_api.create_trust(trust_ref['id'],
+ trust_ref,
+ [role_ref])
+ self._assert_last_note(
+ trust_ref['id'], CREATED_OPERATION, 'OS-TRUST:trust')
+ self._assert_last_audit(trust_ref['id'], CREATED_OPERATION,
+ 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST)
+
+ def test_delete_group(self):
+ group_ref = self.new_group_ref(domain_id=self.domain_id)
+ group_ref = self.identity_api.create_group(group_ref)
+ self.identity_api.delete_group(group_ref['id'])
+ self._assert_last_note(group_ref['id'], DELETED_OPERATION, 'group')
+ self._assert_last_audit(group_ref['id'], DELETED_OPERATION, 'group',
+ cadftaxonomy.SECURITY_GROUP)
+
+ def test_delete_project(self):
+ project_ref = self.new_project_ref(domain_id=self.domain_id)
+ self.assignment_api.create_project(project_ref['id'], project_ref)
+ self.assignment_api.delete_project(project_ref['id'])
+ self._assert_last_note(
+ project_ref['id'], DELETED_OPERATION, 'project')
+ self._assert_last_audit(project_ref['id'], DELETED_OPERATION,
+ 'project', cadftaxonomy.SECURITY_PROJECT)
+
+ def test_delete_role(self):
+ role_ref = self.new_role_ref()
+ self.role_api.create_role(role_ref['id'], role_ref)
+ self.role_api.delete_role(role_ref['id'])
+ self._assert_last_note(role_ref['id'], DELETED_OPERATION, 'role')
+ self._assert_last_audit(role_ref['id'], DELETED_OPERATION, 'role',
+ cadftaxonomy.SECURITY_ROLE)
+
+ def test_delete_user(self):
+ user_ref = self.new_user_ref(domain_id=self.domain_id)
+ user_ref = self.identity_api.create_user(user_ref)
+ self.identity_api.delete_user(user_ref['id'])
+ self._assert_last_note(user_ref['id'], DELETED_OPERATION, 'user')
+ self._assert_last_audit(user_ref['id'], DELETED_OPERATION, 'user',
+ cadftaxonomy.SECURITY_ACCOUNT_USER)
+
+ def test_create_domain(self):
+ domain_ref = self.new_domain_ref()
+ self.resource_api.create_domain(domain_ref['id'], domain_ref)
+ self._assert_last_note(domain_ref['id'], CREATED_OPERATION, 'domain')
+ self._assert_last_audit(domain_ref['id'], CREATED_OPERATION, 'domain',
+ cadftaxonomy.SECURITY_DOMAIN)
+
+ def test_update_domain(self):
+ domain_ref = self.new_domain_ref()
+ self.assignment_api.create_domain(domain_ref['id'], domain_ref)
+ domain_ref['description'] = uuid.uuid4().hex
+ self.assignment_api.update_domain(domain_ref['id'], domain_ref)
+ self._assert_last_note(domain_ref['id'], UPDATED_OPERATION, 'domain')
+ self._assert_last_audit(domain_ref['id'], UPDATED_OPERATION, 'domain',
+ cadftaxonomy.SECURITY_DOMAIN)
+
+ def test_delete_domain(self):
+ domain_ref = self.new_domain_ref()
+ self.assignment_api.create_domain(domain_ref['id'], domain_ref)
+ domain_ref['enabled'] = False
+ self.assignment_api.update_domain(domain_ref['id'], domain_ref)
+ self.assignment_api.delete_domain(domain_ref['id'])
+ self._assert_last_note(domain_ref['id'], DELETED_OPERATION, 'domain')
+ self._assert_last_audit(domain_ref['id'], DELETED_OPERATION, 'domain',
+ cadftaxonomy.SECURITY_DOMAIN)
+
+ def test_delete_trust(self):
+ trustor = self.new_user_ref(domain_id=self.domain_id)
+ trustor = self.identity_api.create_user(trustor)
+ trustee = self.new_user_ref(domain_id=self.domain_id)
+ trustee = self.identity_api.create_user(trustee)
+ role_ref = self.new_role_ref()
+ trust_ref = self.new_trust_ref(trustor['id'], trustee['id'])
+ self.trust_api.create_trust(trust_ref['id'],
+ trust_ref,
+ [role_ref])
+ self.trust_api.delete_trust(trust_ref['id'])
+ self._assert_last_note(
+ trust_ref['id'], DELETED_OPERATION, 'OS-TRUST:trust')
+ self._assert_last_audit(trust_ref['id'], DELETED_OPERATION,
+ 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST)
+
+ def test_create_endpoint(self):
+ endpoint_ref = self.new_endpoint_ref(service_id=self.service_id)
+ self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref)
+ self._assert_notify_sent(endpoint_ref['id'], CREATED_OPERATION,
+ 'endpoint')
+ self._assert_last_audit(endpoint_ref['id'], CREATED_OPERATION,
+ 'endpoint', cadftaxonomy.SECURITY_ENDPOINT)
+
+ def test_update_endpoint(self):
+ endpoint_ref = self.new_endpoint_ref(service_id=self.service_id)
+ self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref)
+ self.catalog_api.update_endpoint(endpoint_ref['id'], endpoint_ref)
+ self._assert_notify_sent(endpoint_ref['id'], UPDATED_OPERATION,
+ 'endpoint')
+ self._assert_last_audit(endpoint_ref['id'], UPDATED_OPERATION,
+ 'endpoint', cadftaxonomy.SECURITY_ENDPOINT)
+
+ def test_delete_endpoint(self):
+ endpoint_ref = self.new_endpoint_ref(service_id=self.service_id)
+ self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref)
+ self.catalog_api.delete_endpoint(endpoint_ref['id'])
+ self._assert_notify_sent(endpoint_ref['id'], DELETED_OPERATION,
+ 'endpoint')
+ self._assert_last_audit(endpoint_ref['id'], DELETED_OPERATION,
+ 'endpoint', cadftaxonomy.SECURITY_ENDPOINT)
+
+ def test_create_service(self):
+ service_ref = self.new_service_ref()
+ self.catalog_api.create_service(service_ref['id'], service_ref)
+ self._assert_notify_sent(service_ref['id'], CREATED_OPERATION,
+ 'service')
+ self._assert_last_audit(service_ref['id'], CREATED_OPERATION,
+ 'service', cadftaxonomy.SECURITY_SERVICE)
+
+ def test_update_service(self):
+ service_ref = self.new_service_ref()
+ self.catalog_api.create_service(service_ref['id'], service_ref)
+ self.catalog_api.update_service(service_ref['id'], service_ref)
+ self._assert_notify_sent(service_ref['id'], UPDATED_OPERATION,
+ 'service')
+ self._assert_last_audit(service_ref['id'], UPDATED_OPERATION,
+ 'service', cadftaxonomy.SECURITY_SERVICE)
+
+ def test_delete_service(self):
+ service_ref = self.new_service_ref()
+ self.catalog_api.create_service(service_ref['id'], service_ref)
+ self.catalog_api.delete_service(service_ref['id'])
+ self._assert_notify_sent(service_ref['id'], DELETED_OPERATION,
+ 'service')
+ self._assert_last_audit(service_ref['id'], DELETED_OPERATION,
+ 'service', cadftaxonomy.SECURITY_SERVICE)
+
+ def test_create_region(self):
+ region_ref = self.new_region_ref()
+ self.catalog_api.create_region(region_ref)
+ self._assert_notify_sent(region_ref['id'], CREATED_OPERATION,
+ 'region')
+ self._assert_last_audit(region_ref['id'], CREATED_OPERATION,
+ 'region', cadftaxonomy.SECURITY_REGION)
+
+ def test_update_region(self):
+ region_ref = self.new_region_ref()
+ self.catalog_api.create_region(region_ref)
+ self.catalog_api.update_region(region_ref['id'], region_ref)
+ self._assert_notify_sent(region_ref['id'], UPDATED_OPERATION,
+ 'region')
+ self._assert_last_audit(region_ref['id'], UPDATED_OPERATION,
+ 'region', cadftaxonomy.SECURITY_REGION)
+
+ def test_delete_region(self):
+ region_ref = self.new_region_ref()
+ self.catalog_api.create_region(region_ref)
+ self.catalog_api.delete_region(region_ref['id'])
+ self._assert_notify_sent(region_ref['id'], DELETED_OPERATION,
+ 'region')
+ self._assert_last_audit(region_ref['id'], DELETED_OPERATION,
+ 'region', cadftaxonomy.SECURITY_REGION)
+
+ def test_create_policy(self):
+ policy_ref = self.new_policy_ref()
+ self.policy_api.create_policy(policy_ref['id'], policy_ref)
+ self._assert_notify_sent(policy_ref['id'], CREATED_OPERATION,
+ 'policy')
+ self._assert_last_audit(policy_ref['id'], CREATED_OPERATION,
+ 'policy', cadftaxonomy.SECURITY_POLICY)
+
+ def test_update_policy(self):
+ policy_ref = self.new_policy_ref()
+ self.policy_api.create_policy(policy_ref['id'], policy_ref)
+ self.policy_api.update_policy(policy_ref['id'], policy_ref)
+ self._assert_notify_sent(policy_ref['id'], UPDATED_OPERATION,
+ 'policy')
+ self._assert_last_audit(policy_ref['id'], UPDATED_OPERATION,
+ 'policy', cadftaxonomy.SECURITY_POLICY)
+
+ def test_delete_policy(self):
+ policy_ref = self.new_policy_ref()
+ self.policy_api.create_policy(policy_ref['id'], policy_ref)
+ self.policy_api.delete_policy(policy_ref['id'])
+ self._assert_notify_sent(policy_ref['id'], DELETED_OPERATION,
+ 'policy')
+ self._assert_last_audit(policy_ref['id'], DELETED_OPERATION,
+ 'policy', cadftaxonomy.SECURITY_POLICY)
+
+ def test_disable_domain(self):
+ domain_ref = self.new_domain_ref()
+ self.assignment_api.create_domain(domain_ref['id'], domain_ref)
+ domain_ref['enabled'] = False
+ self.assignment_api.update_domain(domain_ref['id'], domain_ref)
+ self._assert_notify_sent(domain_ref['id'], 'disabled', 'domain',
+ public=False)
+
+ def test_disable_of_disabled_domain_does_not_notify(self):
+ domain_ref = self.new_domain_ref()
+ domain_ref['enabled'] = False
+ self.assignment_api.create_domain(domain_ref['id'], domain_ref)
+ # The domain_ref above is not changed during the create process. We
+ # can use the same ref to perform the update.
+ self.assignment_api.update_domain(domain_ref['id'], domain_ref)
+ self._assert_notify_not_sent(domain_ref['id'], 'disabled', 'domain',
+ public=False)
+
+ def test_update_group(self):
+ group_ref = self.new_group_ref(domain_id=self.domain_id)
+ group_ref = self.identity_api.create_group(group_ref)
+ self.identity_api.update_group(group_ref['id'], group_ref)
+ self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group')
+ self._assert_last_audit(group_ref['id'], UPDATED_OPERATION, 'group',
+ cadftaxonomy.SECURITY_GROUP)
+
+ def test_update_project(self):
+ project_ref = self.new_project_ref(domain_id=self.domain_id)
+ self.assignment_api.create_project(project_ref['id'], project_ref)
+ self.assignment_api.update_project(project_ref['id'], project_ref)
+ self._assert_notify_sent(
+ project_ref['id'], UPDATED_OPERATION, 'project', public=True)
+ self._assert_last_audit(project_ref['id'], UPDATED_OPERATION,
+ 'project', cadftaxonomy.SECURITY_PROJECT)
+
+ def test_disable_project(self):
+ project_ref = self.new_project_ref(domain_id=self.domain_id)
+ self.assignment_api.create_project(project_ref['id'], project_ref)
+ project_ref['enabled'] = False
+ self.assignment_api.update_project(project_ref['id'], project_ref)
+ self._assert_notify_sent(project_ref['id'], 'disabled', 'project',
+ public=False)
+
+ def test_disable_of_disabled_project_does_not_notify(self):
+ project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref['enabled'] = False
+ self.assignment_api.create_project(project_ref['id'], project_ref)
+ # The project_ref above is not changed during the create process. We
+ # can use the same ref to perform the update.
+ self.assignment_api.update_project(project_ref['id'], project_ref)
+ self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project',
+ public=False)
+
+ def test_update_project_does_not_send_disable(self):
+ project_ref = self.new_project_ref(domain_id=self.domain_id)
+ self.assignment_api.create_project(project_ref['id'], project_ref)
+ project_ref['enabled'] = True
+ self.assignment_api.update_project(project_ref['id'], project_ref)
+ self._assert_last_note(
+ project_ref['id'], UPDATED_OPERATION, 'project')
+ self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project')
+
+ def test_update_role(self):
+ role_ref = self.new_role_ref()
+ self.role_api.create_role(role_ref['id'], role_ref)
+ self.role_api.update_role(role_ref['id'], role_ref)
+ self._assert_last_note(role_ref['id'], UPDATED_OPERATION, 'role')
+ self._assert_last_audit(role_ref['id'], UPDATED_OPERATION, 'role',
+ cadftaxonomy.SECURITY_ROLE)
+
+ def test_update_user(self):
+ user_ref = self.new_user_ref(domain_id=self.domain_id)
+ user_ref = self.identity_api.create_user(user_ref)
+ self.identity_api.update_user(user_ref['id'], user_ref)
+ self._assert_last_note(user_ref['id'], UPDATED_OPERATION, 'user')
+ self._assert_last_audit(user_ref['id'], UPDATED_OPERATION, 'user',
+ cadftaxonomy.SECURITY_ACCOUNT_USER)
+
+ def test_config_option_no_events(self):
+ self.config_fixture.config(notification_format='basic')
+ role_ref = self.new_role_ref()
+ self.role_api.create_role(role_ref['id'], role_ref)
+ # The regular notifications will still be emitted, since they are
+ # used for callback handling.
+ self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role')
+ # No audit event should have occurred
+ self.assertEqual(0, len(self._audits))
+
+
+class CADFNotificationsForEntities(NotificationsForEntities):
+
+ def setUp(self):
+ super(CADFNotificationsForEntities, self).setUp()
+ self.config_fixture.config(notification_format='cadf')
+
+ def test_initiator_data_is_set(self):
+ ref = self.new_domain_ref()
+ resp = self.post('/domains', body={'domain': ref})
+ resource_id = resp.result.get('domain').get('id')
+ self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain',
+ cadftaxonomy.SECURITY_DOMAIN)
+ self.assertTrue(len(self._audits) > 0)
+ audit = self._audits[-1]
+ payload = audit['payload']
+ self.assertEqual(self.user_id, payload['initiator']['id'])
+ self.assertEqual(self.project_id, payload['initiator']['project_id'])
+
+
+class TestEventCallbacks(test_v3.RestfulTestCase):
+
+ def setUp(self):
+ super(TestEventCallbacks, self).setUp()
+ self.has_been_called = False
+
+ def _project_deleted_callback(self, service, resource_type, operation,
+ payload):
+ self.has_been_called = True
+
+ def _project_created_callback(self, service, resource_type, operation,
+ payload):
+ self.has_been_called = True
+
+ def test_notification_received(self):
+ callback = register_callback(CREATED_OPERATION, 'project')
+ project_ref = self.new_project_ref(domain_id=self.domain_id)
+ self.assignment_api.create_project(project_ref['id'], project_ref)
+ self.assertTrue(callback.called)
+
+ def test_notification_method_not_callable(self):
+ fake_method = None
+ self.assertRaises(TypeError,
+ notifications.register_event_callback,
+ UPDATED_OPERATION,
+ 'project',
+ [fake_method])
+
+ def test_notification_event_not_valid(self):
+ self.assertRaises(ValueError,
+ notifications.register_event_callback,
+ uuid.uuid4().hex,
+ 'project',
+ self._project_deleted_callback)
+
+ def test_event_registration_for_unknown_resource_type(self):
+ # Registration for unknown resource types should succeed. If no event
+ # is issued for that resource type, the callback wont be triggered.
+ notifications.register_event_callback(DELETED_OPERATION,
+ uuid.uuid4().hex,
+ self._project_deleted_callback)
+ resource_type = uuid.uuid4().hex
+ notifications.register_event_callback(DELETED_OPERATION,
+ resource_type,
+ self._project_deleted_callback)
+
+ def test_provider_event_callbacks_subscription(self):
+ callback_called = []
+
+ @dependency.provider('foo_api')
+ class Foo(object):
+ def __init__(self):
+ self.event_callbacks = {
+ CREATED_OPERATION: {'project': [self.foo_callback]}}
+
+ def foo_callback(self, service, resource_type, operation,
+ payload):
+ # uses callback_called from the closure
+ callback_called.append(True)
+
+ Foo()
+ project_ref = self.new_project_ref(domain_id=self.domain_id)
+ self.assignment_api.create_project(project_ref['id'], project_ref)
+ self.assertEqual([True], callback_called)
+
+ def test_invalid_event_callbacks(self):
+ @dependency.provider('foo_api')
+ class Foo(object):
+ def __init__(self):
+ self.event_callbacks = 'bogus'
+
+ self.assertRaises(ValueError, Foo)
+
+ def test_invalid_event_callbacks_event(self):
+ @dependency.provider('foo_api')
+ class Foo(object):
+ def __init__(self):
+ self.event_callbacks = {CREATED_OPERATION: 'bogus'}
+
+ self.assertRaises(ValueError, Foo)
+
+
+class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase):
+
+ LOCAL_HOST = 'localhost'
+ ACTION = 'authenticate'
+ ROLE_ASSIGNMENT = 'role_assignment'
+
+ def setUp(self):
+ super(CadfNotificationsWrapperTestCase, self).setUp()
+ self._notifications = []
+
+ def fake_notify(action, initiator, outcome, target,
+ event_type, **kwargs):
+ service_security = cadftaxonomy.SERVICE_SECURITY
+
+ event = eventfactory.EventFactory().new_event(
+ eventType=cadftype.EVENTTYPE_ACTIVITY,
+ outcome=outcome,
+ action=action,
+ initiator=initiator,
+ target=target,
+ observer=cadfresource.Resource(typeURI=service_security))
+
+ for key, value in kwargs.items():
+ setattr(event, key, value)
+
+ note = {
+ 'action': action,
+ 'initiator': initiator,
+ 'event': event,
+ 'send_notification_called': True}
+ self._notifications.append(note)
+
+ self.useFixture(mockpatch.PatchObject(
+ notifications, '_send_audit_notification', fake_notify))
+
+ def _assert_last_note(self, action, user_id):
+ self.assertTrue(self._notifications)
+ note = self._notifications[-1]
+ self.assertEqual(note['action'], action)
+ initiator = note['initiator']
+ self.assertEqual(initiator.id, user_id)
+ self.assertEqual(initiator.host.address, self.LOCAL_HOST)
+ self.assertTrue(note['send_notification_called'])
+
+ def _assert_event(self, role_id, project=None, domain=None,
+ user=None, group=None, inherit=False):
+ """Assert that the CADF event is valid.
+
+ In the case of role assignments, the event will have extra data,
+ specifically, the role, target, actor, and if the role is inherited.
+
+ An example event, as a dictionary is seen below:
+ {
+ 'typeURI': 'http://schemas.dmtf.org/cloud/audit/1.0/event',
+ 'initiator': {
+ 'typeURI': 'service/security/account/user',
+ 'host': {'address': 'localhost'},
+ 'id': 'openstack:0a90d95d-582c-4efb-9cbc-e2ca7ca9c341',
+ 'name': u'bccc2d9bfc2a46fd9e33bcf82f0b5c21'
+ },
+ 'target': {
+ 'typeURI': 'service/security/account/user',
+ 'id': 'openstack:d48ea485-ef70-4f65-8d2b-01aa9d7ec12d'
+ },
+ 'observer': {
+ 'typeURI': 'service/security',
+ 'id': 'openstack:d51dd870-d929-4aba-8d75-dcd7555a0c95'
+ },
+ 'eventType': 'activity',
+ 'eventTime': '2014-08-21T21:04:56.204536+0000',
+ 'role': u'0e6b990380154a2599ce6b6e91548a68',
+ 'domain': u'24bdcff1aab8474895dbaac509793de1',
+ 'inherited_to_projects': False,
+ 'group': u'c1e22dc67cbd469ea0e33bf428fe597a',
+ 'action': 'created.role_assignment',
+ 'outcome': 'success',
+ 'id': 'openstack:782689dd-f428-4f13-99c7-5c70f94a5ac1'
+ }
+ """
+
+ note = self._notifications[-1]
+ event = note['event']
+ if project:
+ self.assertEqual(project, event.project)
+ if domain:
+ self.assertEqual(domain, event.domain)
+ if user:
+ self.assertEqual(user, event.user)
+ if group:
+ self.assertEqual(group, event.group)
+ self.assertEqual(role_id, event.role)
+ self.assertEqual(inherit, event.inherited_to_projects)
+
+ def test_v3_authenticate_user_name_and_domain_id(self):
+ user_id = self.user_id
+ user_name = self.user['name']
+ password = self.user['password']
+ domain_id = self.domain_id
+ data = self.build_authentication_request(username=user_name,
+ user_domain_id=domain_id,
+ password=password)
+ self.post('/auth/tokens', body=data)
+ self._assert_last_note(self.ACTION, user_id)
+
+ def test_v3_authenticate_user_id(self):
+ user_id = self.user_id
+ password = self.user['password']
+ data = self.build_authentication_request(user_id=user_id,
+ password=password)
+ self.post('/auth/tokens', body=data)
+ self._assert_last_note(self.ACTION, user_id)
+
+ def test_v3_authenticate_user_name_and_domain_name(self):
+ user_id = self.user_id
+ user_name = self.user['name']
+ password = self.user['password']
+ domain_name = self.domain['name']
+ data = self.build_authentication_request(username=user_name,
+ user_domain_name=domain_name,
+ password=password)
+ self.post('/auth/tokens', body=data)
+ self._assert_last_note(self.ACTION, user_id)
+
+ def _test_role_assignment(self, url, role, project=None, domain=None,
+ user=None, group=None):
+ self.put(url)
+ action = "%s.%s" % (CREATED_OPERATION, self.ROLE_ASSIGNMENT)
+ self._assert_last_note(action, self.user_id)
+ self._assert_event(role, project, domain, user, group)
+ self.delete(url)
+ action = "%s.%s" % (DELETED_OPERATION, self.ROLE_ASSIGNMENT)
+ self._assert_last_note(action, self.user_id)
+ self._assert_event(role, project, domain, user, group)
+
+ def test_user_project_grant(self):
+ url = ('/projects/%s/users/%s/roles/%s' %
+ (self.project_id, self.user_id, self.role_id))
+ self._test_role_assignment(url, self.role_id,
+ project=self.project_id,
+ user=self.user_id)
+
+ def test_group_domain_grant(self):
+ group_ref = self.new_group_ref(domain_id=self.domain_id)
+ group = self.identity_api.create_group(group_ref)
+ url = ('/domains/%s/groups/%s/roles/%s' %
+ (self.domain_id, group['id'], self.role_id))
+ self._test_role_assignment(url, self.role_id,
+ domain=self.domain_id,
+ group=group['id'])
+
+
+class TestCallbackRegistration(testtools.TestCase):
+ def setUp(self):
+ super(TestCallbackRegistration, self).setUp()
+ self.mock_log = mock.Mock()
+ # Force the callback logging to occur
+ self.mock_log.logger.getEffectiveLevel.return_value = logging.DEBUG
+
+ def verify_log_message(self, data):
+ """Tests that use this are a little brittle because adding more
+ logging can break them.
+
+ TODO(dstanek): remove the need for this in a future refactoring
+
+ """
+ log_fn = self.mock_log.debug
+ self.assertEqual(len(data), log_fn.call_count)
+ for datum in data:
+ log_fn.assert_any_call(mock.ANY, datum)
+
+ def test_a_function_callback(self):
+ def callback(*args, **kwargs):
+ pass
+
+ resource_type = 'thing'
+ with mock.patch('keystone.notifications.LOG', self.mock_log):
+ notifications.register_event_callback(
+ CREATED_OPERATION, resource_type, callback)
+
+ callback = 'keystone.tests.unit.common.test_notifications.callback'
+ expected_log_data = {
+ 'callback': callback,
+ 'event': 'identity.%s.created' % resource_type
+ }
+ self.verify_log_message([expected_log_data])
+
+ def test_a_method_callback(self):
+ class C(object):
+ def callback(self, *args, **kwargs):
+ pass
+
+ with mock.patch('keystone.notifications.LOG', self.mock_log):
+ notifications.register_event_callback(
+ CREATED_OPERATION, 'thing', C.callback)
+
+ callback = 'keystone.tests.unit.common.test_notifications.C.callback'
+ expected_log_data = {
+ 'callback': callback,
+ 'event': 'identity.thing.created'
+ }
+ self.verify_log_message([expected_log_data])
+
+ def test_a_list_of_callbacks(self):
+ def callback(*args, **kwargs):
+ pass
+
+ class C(object):
+ def callback(self, *args, **kwargs):
+ pass
+
+ with mock.patch('keystone.notifications.LOG', self.mock_log):
+ notifications.register_event_callback(
+ CREATED_OPERATION, 'thing', [callback, C.callback])
+
+ callback_1 = 'keystone.tests.unit.common.test_notifications.callback'
+ callback_2 = 'keystone.tests.unit.common.test_notifications.C.callback'
+ expected_log_data = [
+ {
+ 'callback': callback_1,
+ 'event': 'identity.thing.created'
+ },
+ {
+ 'callback': callback_2,
+ 'event': 'identity.thing.created'
+ },
+ ]
+ self.verify_log_message(expected_log_data)
+
+ def test_an_invalid_callback(self):
+ self.assertRaises(TypeError,
+ notifications.register_event_callback,
+ (CREATED_OPERATION, 'thing', object()))
+
+ def test_an_invalid_event(self):
+ def callback(*args, **kwargs):
+ pass
+
+ self.assertRaises(ValueError,
+ notifications.register_event_callback,
+ uuid.uuid4().hex,
+ 'thing',
+ callback)
diff --git a/keystone-moon/keystone/tests/unit/common/test_pemutils.py b/keystone-moon/keystone/tests/unit/common/test_pemutils.py
new file mode 100644
index 00000000..c2f58518
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_pemutils.py
@@ -0,0 +1,337 @@
+# Copyright 2013 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import base64
+
+from six import moves
+
+from keystone.common import pemutils
+from keystone.tests import unit as tests
+
+
+# List of 2-tuples, (pem_type, pem_header)
+headers = pemutils.PEM_TYPE_TO_HEADER.items()
+
+
+def make_data(size, offset=0):
+ return ''.join([chr(x % 255) for x in moves.range(offset, size + offset)])
+
+
+def make_base64_from_data(data):
+ return base64.b64encode(data)
+
+
+def wrap_base64(base64_text):
+ wrapped_text = '\n'.join([base64_text[x:x + 64]
+ for x in moves.range(0, len(base64_text), 64)])
+ wrapped_text += '\n'
+ return wrapped_text
+
+
+def make_pem(header, data):
+ base64_text = make_base64_from_data(data)
+ wrapped_text = wrap_base64(base64_text)
+
+ result = '-----BEGIN %s-----\n' % header
+ result += wrapped_text
+ result += '-----END %s-----\n' % header
+
+ return result
+
+
+class PEM(object):
+ """PEM text and it's associated data broken out, used for testing.
+
+ """
+ def __init__(self, pem_header='CERTIFICATE', pem_type='cert',
+ data_size=70, data_offset=0):
+ self.pem_header = pem_header
+ self.pem_type = pem_type
+ self.data_size = data_size
+ self.data_offset = data_offset
+ self.data = make_data(self.data_size, self.data_offset)
+ self.base64_text = make_base64_from_data(self.data)
+ self.wrapped_base64 = wrap_base64(self.base64_text)
+ self.pem_text = make_pem(self.pem_header, self.data)
+
+
+class TestPEMParseResult(tests.BaseTestCase):
+
+ def test_pem_types(self):
+ for pem_type in pemutils.pem_types:
+ pem_header = pemutils.PEM_TYPE_TO_HEADER[pem_type]
+ r = pemutils.PEMParseResult(pem_type=pem_type)
+ self.assertEqual(pem_type, r.pem_type)
+ self.assertEqual(pem_header, r.pem_header)
+
+ pem_type = 'xxx'
+ self.assertRaises(ValueError,
+ pemutils.PEMParseResult, pem_type=pem_type)
+
+ def test_pem_headers(self):
+ for pem_header in pemutils.pem_headers:
+ pem_type = pemutils.PEM_HEADER_TO_TYPE[pem_header]
+ r = pemutils.PEMParseResult(pem_header=pem_header)
+ self.assertEqual(pem_type, r.pem_type)
+ self.assertEqual(pem_header, r.pem_header)
+
+ pem_header = 'xxx'
+ self.assertRaises(ValueError,
+ pemutils.PEMParseResult, pem_header=pem_header)
+
+
+class TestPEMParse(tests.BaseTestCase):
+ def test_parse_none(self):
+ text = ''
+ text += 'bla bla\n'
+ text += 'yada yada yada\n'
+ text += 'burfl blatz bingo\n'
+
+ parse_results = pemutils.parse_pem(text)
+ self.assertEqual(0, len(parse_results))
+
+ self.assertEqual(False, pemutils.is_pem(text))
+
+ def test_parse_invalid(self):
+ p = PEM(pem_type='xxx',
+ pem_header='XXX')
+ text = p.pem_text
+
+ self.assertRaises(ValueError,
+ pemutils.parse_pem, text)
+
+ def test_parse_one(self):
+ data_size = 70
+ count = len(headers)
+ pems = []
+
+ for i in moves.range(count):
+ pems.append(PEM(pem_type=headers[i][0],
+ pem_header=headers[i][1],
+ data_size=data_size + i,
+ data_offset=i))
+
+ for i in moves.range(count):
+ p = pems[i]
+ text = p.pem_text
+
+ parse_results = pemutils.parse_pem(text)
+ self.assertEqual(1, len(parse_results))
+
+ r = parse_results[0]
+ self.assertEqual(p.pem_type, r.pem_type)
+ self.assertEqual(p.pem_header, r.pem_header)
+ self.assertEqual(p.pem_text,
+ text[r.pem_start:r.pem_end])
+ self.assertEqual(p.wrapped_base64,
+ text[r.base64_start:r.base64_end])
+ self.assertEqual(p.data, r.binary_data)
+
+ def test_parse_one_embedded(self):
+ p = PEM(data_offset=0)
+ text = ''
+ text += 'bla bla\n'
+ text += 'yada yada yada\n'
+ text += p.pem_text
+ text += 'burfl blatz bingo\n'
+
+ parse_results = pemutils.parse_pem(text)
+ self.assertEqual(1, len(parse_results))
+
+ r = parse_results[0]
+ self.assertEqual(p.pem_type, r.pem_type)
+ self.assertEqual(p.pem_header, r.pem_header)
+ self.assertEqual(p.pem_text,
+ text[r.pem_start:r.pem_end])
+ self.assertEqual(p.wrapped_base64,
+ text[r.base64_start: r.base64_end])
+ self.assertEqual(p.data, r.binary_data)
+
+ def test_parse_multple(self):
+ data_size = 70
+ count = len(headers)
+ pems = []
+ text = ''
+
+ for i in moves.range(count):
+ pems.append(PEM(pem_type=headers[i][0],
+ pem_header=headers[i][1],
+ data_size=data_size + i,
+ data_offset=i))
+
+ for i in moves.range(count):
+ text += pems[i].pem_text
+
+ parse_results = pemutils.parse_pem(text)
+ self.assertEqual(count, len(parse_results))
+
+ for i in moves.range(count):
+ r = parse_results[i]
+ p = pems[i]
+
+ self.assertEqual(p.pem_type, r.pem_type)
+ self.assertEqual(p.pem_header, r.pem_header)
+ self.assertEqual(p.pem_text,
+ text[r.pem_start:r.pem_end])
+ self.assertEqual(p.wrapped_base64,
+ text[r.base64_start: r.base64_end])
+ self.assertEqual(p.data, r.binary_data)
+
+ def test_parse_multple_find_specific(self):
+ data_size = 70
+ count = len(headers)
+ pems = []
+ text = ''
+
+ for i in moves.range(count):
+ pems.append(PEM(pem_type=headers[i][0],
+ pem_header=headers[i][1],
+ data_size=data_size + i,
+ data_offset=i))
+
+ for i in moves.range(count):
+ text += pems[i].pem_text
+
+ for i in moves.range(count):
+ parse_results = pemutils.parse_pem(text, pem_type=headers[i][0])
+ self.assertEqual(1, len(parse_results))
+
+ r = parse_results[0]
+ p = pems[i]
+
+ self.assertEqual(p.pem_type, r.pem_type)
+ self.assertEqual(p.pem_header, r.pem_header)
+ self.assertEqual(p.pem_text,
+ text[r.pem_start:r.pem_end])
+ self.assertEqual(p.wrapped_base64,
+ text[r.base64_start:r.base64_end])
+ self.assertEqual(p.data, r.binary_data)
+
+ def test_parse_multple_embedded(self):
+ data_size = 75
+ count = len(headers)
+ pems = []
+ text = ''
+
+ for i in moves.range(count):
+ pems.append(PEM(pem_type=headers[i][0],
+ pem_header=headers[i][1],
+ data_size=data_size + i,
+ data_offset=i))
+
+ for i in moves.range(count):
+ text += 'bla bla\n'
+ text += 'yada yada yada\n'
+ text += pems[i].pem_text
+ text += 'burfl blatz bingo\n'
+
+ parse_results = pemutils.parse_pem(text)
+ self.assertEqual(count, len(parse_results))
+
+ for i in moves.range(count):
+ r = parse_results[i]
+ p = pems[i]
+
+ self.assertEqual(p.pem_type, r.pem_type)
+ self.assertEqual(p.pem_header, r.pem_header)
+ self.assertEqual(p.pem_text,
+ text[r.pem_start:r.pem_end])
+ self.assertEqual(p.wrapped_base64,
+ text[r.base64_start:r.base64_end])
+ self.assertEqual(p.data, r.binary_data)
+
+ def test_get_pem_data_none(self):
+ text = ''
+ text += 'bla bla\n'
+ text += 'yada yada yada\n'
+ text += 'burfl blatz bingo\n'
+
+ data = pemutils.get_pem_data(text)
+ self.assertIsNone(data)
+
+ def test_get_pem_data_invalid(self):
+ p = PEM(pem_type='xxx',
+ pem_header='XXX')
+ text = p.pem_text
+
+ self.assertRaises(ValueError,
+ pemutils.get_pem_data, text)
+
+ def test_get_pem_data(self):
+ data_size = 70
+ count = len(headers)
+ pems = []
+
+ for i in moves.range(count):
+ pems.append(PEM(pem_type=headers[i][0],
+ pem_header=headers[i][1],
+ data_size=data_size + i,
+ data_offset=i))
+
+ for i in moves.range(count):
+ p = pems[i]
+ text = p.pem_text
+
+ data = pemutils.get_pem_data(text, p.pem_type)
+ self.assertEqual(p.data, data)
+
+ def test_is_pem(self):
+ data_size = 70
+ count = len(headers)
+ pems = []
+
+ for i in moves.range(count):
+ pems.append(PEM(pem_type=headers[i][0],
+ pem_header=headers[i][1],
+ data_size=data_size + i,
+ data_offset=i))
+
+ for i in moves.range(count):
+ p = pems[i]
+ text = p.pem_text
+ self.assertTrue(pemutils.is_pem(text, pem_type=p.pem_type))
+ self.assertFalse(pemutils.is_pem(text,
+ pem_type=p.pem_type + 'xxx'))
+
+ def test_base64_to_pem(self):
+ data_size = 70
+ count = len(headers)
+ pems = []
+
+ for i in moves.range(count):
+ pems.append(PEM(pem_type=headers[i][0],
+ pem_header=headers[i][1],
+ data_size=data_size + i,
+ data_offset=i))
+
+ for i in moves.range(count):
+ p = pems[i]
+ pem = pemutils.base64_to_pem(p.base64_text, p.pem_type)
+ self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data)
+
+ def test_binary_to_pem(self):
+ data_size = 70
+ count = len(headers)
+ pems = []
+
+ for i in moves.range(count):
+ pems.append(PEM(pem_type=headers[i][0],
+ pem_header=headers[i][1],
+ data_size=data_size + i,
+ data_offset=i))
+
+ for i in moves.range(count):
+ p = pems[i]
+ pem = pemutils.binary_to_pem(p.data, p.pem_type)
+ self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data)
diff --git a/keystone-moon/keystone/tests/unit/common/test_sql_core.py b/keystone-moon/keystone/tests/unit/common/test_sql_core.py
new file mode 100644
index 00000000..1f33cfc3
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_sql_core.py
@@ -0,0 +1,52 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from sqlalchemy.ext import declarative
+
+from keystone.common import sql
+from keystone.tests import unit as tests
+from keystone.tests.unit import utils
+
+
+ModelBase = declarative.declarative_base()
+
+
+class TestModel(ModelBase, sql.ModelDictMixin):
+ __tablename__ = 'testmodel'
+ id = sql.Column(sql.String(64), primary_key=True)
+ text = sql.Column(sql.String(64), nullable=False)
+
+
+class TestModelDictMixin(tests.BaseTestCase):
+
+ def test_creating_a_model_instance_from_a_dict(self):
+ d = {'id': utils.new_uuid(), 'text': utils.new_uuid()}
+ m = TestModel.from_dict(d)
+ self.assertEqual(m.id, d['id'])
+ self.assertEqual(m.text, d['text'])
+
+ def test_creating_a_dict_from_a_model_instance(self):
+ m = TestModel(id=utils.new_uuid(), text=utils.new_uuid())
+ d = m.to_dict()
+ self.assertEqual(m.id, d['id'])
+ self.assertEqual(m.text, d['text'])
+
+ def test_creating_a_model_instance_from_an_invalid_dict(self):
+ d = {'id': utils.new_uuid(), 'text': utils.new_uuid(), 'extra': None}
+ self.assertRaises(TypeError, TestModel.from_dict, d)
+
+ def test_creating_a_dict_from_a_model_instance_that_has_extra_attrs(self):
+ expected = {'id': utils.new_uuid(), 'text': utils.new_uuid()}
+ m = TestModel(id=expected['id'], text=expected['text'])
+ m.extra = 'this should not be in the dictionary'
+ self.assertEqual(m.to_dict(), expected)
diff --git a/keystone-moon/keystone/tests/unit/common/test_utils.py b/keystone-moon/keystone/tests/unit/common/test_utils.py
new file mode 100644
index 00000000..184c8141
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_utils.py
@@ -0,0 +1,164 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import uuid
+
+from oslo_config import cfg
+from oslo_config import fixture as config_fixture
+from oslo_serialization import jsonutils
+
+from keystone.common import utils as common_utils
+from keystone import exception
+from keystone import service
+from keystone.tests import unit as tests
+from keystone.tests.unit import utils
+
+
+CONF = cfg.CONF
+
+TZ = utils.TZ
+
+
+class UtilsTestCase(tests.BaseTestCase):
+ OPTIONAL = object()
+
+ def setUp(self):
+ super(UtilsTestCase, self).setUp()
+ self.config_fixture = self.useFixture(config_fixture.Config(CONF))
+
+ def test_hash(self):
+ password = 'right'
+ wrong = 'wrongwrong' # Two wrongs don't make a right
+ hashed = common_utils.hash_password(password)
+ self.assertTrue(common_utils.check_password(password, hashed))
+ self.assertFalse(common_utils.check_password(wrong, hashed))
+
+ def test_verify_normal_password_strict(self):
+ self.config_fixture.config(strict_password_check=False)
+ password = uuid.uuid4().hex
+ verified = common_utils.verify_length_and_trunc_password(password)
+ self.assertEqual(password, verified)
+
+ def test_that_a_hash_can_not_be_validated_against_a_hash(self):
+ # NOTE(dstanek): Bug 1279849 reported a problem where passwords
+ # were not being hashed if they already looked like a hash. This
+ # would allow someone to hash their password ahead of time
+ # (potentially getting around password requirements, like
+ # length) and then they could auth with their original password.
+ password = uuid.uuid4().hex
+ hashed_password = common_utils.hash_password(password)
+ new_hashed_password = common_utils.hash_password(hashed_password)
+ self.assertFalse(common_utils.check_password(password,
+ new_hashed_password))
+
+ def test_verify_long_password_strict(self):
+ self.config_fixture.config(strict_password_check=False)
+ self.config_fixture.config(group='identity', max_password_length=5)
+ max_length = CONF.identity.max_password_length
+ invalid_password = 'passw0rd'
+ trunc = common_utils.verify_length_and_trunc_password(invalid_password)
+ self.assertEqual(invalid_password[:max_length], trunc)
+
+ def test_verify_long_password_strict_raises_exception(self):
+ self.config_fixture.config(strict_password_check=True)
+ self.config_fixture.config(group='identity', max_password_length=5)
+ invalid_password = 'passw0rd'
+ self.assertRaises(exception.PasswordVerificationError,
+ common_utils.verify_length_and_trunc_password,
+ invalid_password)
+
+ def test_hash_long_password_truncation(self):
+ self.config_fixture.config(strict_password_check=False)
+ invalid_length_password = '0' * 9999999
+ hashed = common_utils.hash_password(invalid_length_password)
+ self.assertTrue(common_utils.check_password(invalid_length_password,
+ hashed))
+
+ def test_hash_long_password_strict(self):
+ self.config_fixture.config(strict_password_check=True)
+ invalid_length_password = '0' * 9999999
+ self.assertRaises(exception.PasswordVerificationError,
+ common_utils.hash_password,
+ invalid_length_password)
+
+ def _create_test_user(self, password=OPTIONAL):
+ user = {"name": "hthtest"}
+ if password is not self.OPTIONAL:
+ user['password'] = password
+
+ return user
+
+ def test_hash_user_password_without_password(self):
+ user = self._create_test_user()
+ hashed = common_utils.hash_user_password(user)
+ self.assertEqual(user, hashed)
+
+ def test_hash_user_password_with_null_password(self):
+ user = self._create_test_user(password=None)
+ hashed = common_utils.hash_user_password(user)
+ self.assertEqual(user, hashed)
+
+ def test_hash_user_password_with_empty_password(self):
+ password = ''
+ user = self._create_test_user(password=password)
+ user_hashed = common_utils.hash_user_password(user)
+ password_hashed = user_hashed['password']
+ self.assertTrue(common_utils.check_password(password, password_hashed))
+
+ def test_hash_edge_cases(self):
+ hashed = common_utils.hash_password('secret')
+ self.assertFalse(common_utils.check_password('', hashed))
+ self.assertFalse(common_utils.check_password(None, hashed))
+
+ def test_hash_unicode(self):
+ password = u'Comment \xe7a va'
+ wrong = 'Comment ?a va'
+ hashed = common_utils.hash_password(password)
+ self.assertTrue(common_utils.check_password(password, hashed))
+ self.assertFalse(common_utils.check_password(wrong, hashed))
+
+ def test_auth_str_equal(self):
+ self.assertTrue(common_utils.auth_str_equal('abc123', 'abc123'))
+ self.assertFalse(common_utils.auth_str_equal('a', 'aaaaa'))
+ self.assertFalse(common_utils.auth_str_equal('aaaaa', 'a'))
+ self.assertFalse(common_utils.auth_str_equal('ABC123', 'abc123'))
+
+ def test_unixtime(self):
+ global TZ
+
+ @utils.timezone
+ def _test_unixtime():
+ epoch = common_utils.unixtime(dt)
+ self.assertEqual(epoch, epoch_ans, "TZ=%s" % TZ)
+
+ dt = datetime.datetime(1970, 1, 2, 3, 4, 56, 0)
+ epoch_ans = 56 + 4 * 60 + 3 * 3600 + 86400
+ for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']:
+ TZ = 'UTC' + d
+ _test_unixtime()
+
+ def test_pki_encoder(self):
+ data = {'field': 'value'}
+ json = jsonutils.dumps(data, cls=common_utils.PKIEncoder)
+ expected_json = b'{"field":"value"}'
+ self.assertEqual(expected_json, json)
+
+
+class ServiceHelperTests(tests.BaseTestCase):
+
+ @service.fail_gracefully
+ def _do_test(self):
+ raise Exception("Test Exc")
+
+ def test_fail_gracefully(self):
+ self.assertRaises(tests.UnexpectedExit, self._do_test)