diff options
author | WuKong <rebirthmonkey@gmail.com> | 2015-06-30 18:47:29 +0200 |
---|---|---|
committer | WuKong <rebirthmonkey@gmail.com> | 2015-06-30 18:47:29 +0200 |
commit | b8c756ecdd7cced1db4300935484e8c83701c82e (patch) | |
tree | 87e51107d82b217ede145de9d9d59e2100725bd7 /keystone-moon/keystone/tests/unit/common | |
parent | c304c773bae68fb854ed9eab8fb35c4ef17cf136 (diff) |
migrate moon code from github to opnfv
Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604
Signed-off-by: WuKong <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/common')
10 files changed, 2740 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/__init__.py b/keystone-moon/keystone/tests/unit/common/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/__init__.py diff --git a/keystone-moon/keystone/tests/unit/common/test_base64utils.py b/keystone-moon/keystone/tests/unit/common/test_base64utils.py new file mode 100644 index 00000000..b0b75578 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_base64utils.py @@ -0,0 +1,208 @@ +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from keystone.common import base64utils +from keystone.tests import unit as tests + +base64_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789' + '+/=') # includes pad char + +base64url_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789' + '-_=') # includes pad char + + +class TestValid(tests.BaseTestCase): + def test_valid_base64(self): + self.assertTrue(base64utils.is_valid_base64('+/==')) + self.assertTrue(base64utils.is_valid_base64('+/+=')) + self.assertTrue(base64utils.is_valid_base64('+/+/')) + + self.assertFalse(base64utils.is_valid_base64('-_==')) + self.assertFalse(base64utils.is_valid_base64('-_-=')) + self.assertFalse(base64utils.is_valid_base64('-_-_')) + + self.assertTrue(base64utils.is_valid_base64('abcd')) + self.assertFalse(base64utils.is_valid_base64('abcde')) + self.assertFalse(base64utils.is_valid_base64('abcde==')) + self.assertFalse(base64utils.is_valid_base64('abcdef')) + self.assertTrue(base64utils.is_valid_base64('abcdef==')) + self.assertFalse(base64utils.is_valid_base64('abcdefg')) + self.assertTrue(base64utils.is_valid_base64('abcdefg=')) + self.assertTrue(base64utils.is_valid_base64('abcdefgh')) + + self.assertFalse(base64utils.is_valid_base64('-_==')) + + def test_valid_base64url(self): + self.assertFalse(base64utils.is_valid_base64url('+/==')) + self.assertFalse(base64utils.is_valid_base64url('+/+=')) + self.assertFalse(base64utils.is_valid_base64url('+/+/')) + + self.assertTrue(base64utils.is_valid_base64url('-_==')) + self.assertTrue(base64utils.is_valid_base64url('-_-=')) + self.assertTrue(base64utils.is_valid_base64url('-_-_')) + + self.assertTrue(base64utils.is_valid_base64url('abcd')) + self.assertFalse(base64utils.is_valid_base64url('abcde')) + self.assertFalse(base64utils.is_valid_base64url('abcde==')) + self.assertFalse(base64utils.is_valid_base64url('abcdef')) + self.assertTrue(base64utils.is_valid_base64url('abcdef==')) + self.assertFalse(base64utils.is_valid_base64url('abcdefg')) + self.assertTrue(base64utils.is_valid_base64url('abcdefg=')) + self.assertTrue(base64utils.is_valid_base64url('abcdefgh')) + + self.assertTrue(base64utils.is_valid_base64url('-_==')) + + +class TestBase64Padding(tests.BaseTestCase): + + def test_filter(self): + self.assertEqual('', base64utils.filter_formatting('')) + self.assertEqual('', base64utils.filter_formatting(' ')) + self.assertEqual('a', base64utils.filter_formatting('a')) + self.assertEqual('a', base64utils.filter_formatting(' a')) + self.assertEqual('a', base64utils.filter_formatting('a ')) + self.assertEqual('ab', base64utils.filter_formatting('ab')) + self.assertEqual('ab', base64utils.filter_formatting(' ab')) + self.assertEqual('ab', base64utils.filter_formatting('ab ')) + self.assertEqual('ab', base64utils.filter_formatting('a b')) + self.assertEqual('ab', base64utils.filter_formatting(' a b')) + self.assertEqual('ab', base64utils.filter_formatting('a b ')) + self.assertEqual('ab', base64utils.filter_formatting('a\nb\n ')) + + text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789' + '+/=') + self.assertEqual(base64_alphabet, + base64utils.filter_formatting(text)) + + text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n' + ' abcdefghijklmnopqrstuvwxyz\n' + '\t\f\r' + ' 0123456789\n' + ' +/=') + self.assertEqual(base64_alphabet, + base64utils.filter_formatting(text)) + self.assertEqual(base64url_alphabet, + base64utils.base64_to_base64url(base64_alphabet)) + + text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789' + '-_=') + self.assertEqual(base64url_alphabet, + base64utils.filter_formatting(text)) + + text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n' + ' abcdefghijklmnopqrstuvwxyz\n' + '\t\f\r' + ' 0123456789\n' + '-_=') + self.assertEqual(base64url_alphabet, + base64utils.filter_formatting(text)) + + def test_alphabet_conversion(self): + self.assertEqual(base64url_alphabet, + base64utils.base64_to_base64url(base64_alphabet)) + + self.assertEqual(base64_alphabet, + base64utils.base64url_to_base64(base64url_alphabet)) + + def test_is_padded(self): + self.assertTrue(base64utils.base64_is_padded('ABCD')) + self.assertTrue(base64utils.base64_is_padded('ABC=')) + self.assertTrue(base64utils.base64_is_padded('AB==')) + + self.assertTrue(base64utils.base64_is_padded('1234ABCD')) + self.assertTrue(base64utils.base64_is_padded('1234ABC=')) + self.assertTrue(base64utils.base64_is_padded('1234AB==')) + + self.assertFalse(base64utils.base64_is_padded('ABC')) + self.assertFalse(base64utils.base64_is_padded('AB')) + self.assertFalse(base64utils.base64_is_padded('A')) + self.assertFalse(base64utils.base64_is_padded('')) + + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64_is_padded, '=') + + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64_is_padded, 'AB=C') + + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64_is_padded, 'AB=') + + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64_is_padded, 'ABCD=') + + self.assertRaises(ValueError, base64utils.base64_is_padded, + 'ABC', pad='==') + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64_is_padded, 'A=BC') + + def test_strip_padding(self): + self.assertEqual('ABCD', base64utils.base64_strip_padding('ABCD')) + self.assertEqual('ABC', base64utils.base64_strip_padding('ABC=')) + self.assertEqual('AB', base64utils.base64_strip_padding('AB==')) + self.assertRaises(ValueError, base64utils.base64_strip_padding, + 'ABC=', pad='==') + self.assertEqual('ABC', base64utils.base64_strip_padding('ABC')) + + def test_assure_padding(self): + self.assertEqual('ABCD', base64utils.base64_assure_padding('ABCD')) + self.assertEqual('ABC=', base64utils.base64_assure_padding('ABC')) + self.assertEqual('ABC=', base64utils.base64_assure_padding('ABC=')) + self.assertEqual('AB==', base64utils.base64_assure_padding('AB')) + self.assertEqual('AB==', base64utils.base64_assure_padding('AB==')) + self.assertRaises(ValueError, base64utils.base64_assure_padding, + 'ABC', pad='==') + + def test_base64_percent_encoding(self): + self.assertEqual('ABCD', base64utils.base64url_percent_encode('ABCD')) + self.assertEqual('ABC%3D', + base64utils.base64url_percent_encode('ABC=')) + self.assertEqual('AB%3D%3D', + base64utils.base64url_percent_encode('AB==')) + + self.assertEqual('ABCD', base64utils.base64url_percent_decode('ABCD')) + self.assertEqual('ABC=', + base64utils.base64url_percent_decode('ABC%3D')) + self.assertEqual('AB==', + base64utils.base64url_percent_decode('AB%3D%3D')) + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64url_percent_encode, 'chars') + self.assertRaises(base64utils.InvalidBase64Error, + base64utils.base64url_percent_decode, 'AB%3D%3') + + +class TestTextWrap(tests.BaseTestCase): + + def test_wrapping(self): + raw_text = 'abcdefgh' + wrapped_text = 'abc\ndef\ngh\n' + + self.assertEqual(wrapped_text, + base64utils.base64_wrap(raw_text, width=3)) + + t = '\n'.join(base64utils.base64_wrap_iter(raw_text, width=3)) + '\n' + self.assertEqual(wrapped_text, t) + + raw_text = 'abcdefgh' + wrapped_text = 'abcd\nefgh\n' + + self.assertEqual(wrapped_text, + base64utils.base64_wrap(raw_text, width=4)) diff --git a/keystone-moon/keystone/tests/unit/common/test_connection_pool.py b/keystone-moon/keystone/tests/unit/common/test_connection_pool.py new file mode 100644 index 00000000..74d0420c --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_connection_pool.py @@ -0,0 +1,119 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import mock +from six.moves import queue +import testtools +from testtools import matchers + +from keystone.common.cache import _memcache_pool +from keystone import exception +from keystone.tests.unit import core + + +class _TestConnectionPool(_memcache_pool.ConnectionPool): + destroyed_value = 'destroyed' + + def _create_connection(self): + return mock.MagicMock() + + def _destroy_connection(self, conn): + conn(self.destroyed_value) + + +class TestConnectionPool(core.TestCase): + def setUp(self): + super(TestConnectionPool, self).setUp() + self.unused_timeout = 10 + self.maxsize = 2 + self.connection_pool = _TestConnectionPool( + maxsize=self.maxsize, + unused_timeout=self.unused_timeout) + self.addCleanup(self.cleanup_instance('connection_pool')) + + def test_get_context_manager(self): + self.assertThat(self.connection_pool.queue, matchers.HasLength(0)) + with self.connection_pool.acquire() as conn: + self.assertEqual(1, self.connection_pool._acquired) + self.assertEqual(0, self.connection_pool._acquired) + self.assertThat(self.connection_pool.queue, matchers.HasLength(1)) + self.assertEqual(conn, self.connection_pool.queue[0].connection) + + def test_cleanup_pool(self): + self.test_get_context_manager() + newtime = time.time() + self.unused_timeout * 2 + non_expired_connection = _memcache_pool._PoolItem( + ttl=(newtime * 2), + connection=mock.MagicMock()) + self.connection_pool.queue.append(non_expired_connection) + self.assertThat(self.connection_pool.queue, matchers.HasLength(2)) + with mock.patch.object(time, 'time', return_value=newtime): + conn = self.connection_pool.queue[0].connection + with self.connection_pool.acquire(): + pass + conn.assert_has_calls( + [mock.call(self.connection_pool.destroyed_value)]) + self.assertThat(self.connection_pool.queue, matchers.HasLength(1)) + self.assertEqual(0, non_expired_connection.connection.call_count) + + def test_acquire_conn_exception_returns_acquired_count(self): + class TestException(Exception): + pass + + with mock.patch.object(_TestConnectionPool, '_create_connection', + side_effect=TestException): + with testtools.ExpectedException(TestException): + with self.connection_pool.acquire(): + pass + self.assertThat(self.connection_pool.queue, + matchers.HasLength(0)) + self.assertEqual(0, self.connection_pool._acquired) + + def test_connection_pool_limits_maximum_connections(self): + # NOTE(morganfainberg): To ensure we don't lockup tests until the + # job limit, explicitly call .get_nowait() and .put_nowait() in this + # case. + conn1 = self.connection_pool.get_nowait() + conn2 = self.connection_pool.get_nowait() + + # Use a nowait version to raise an Empty exception indicating we would + # not get another connection until one is placed back into the queue. + self.assertRaises(queue.Empty, self.connection_pool.get_nowait) + + # Place the connections back into the pool. + self.connection_pool.put_nowait(conn1) + self.connection_pool.put_nowait(conn2) + + # Make sure we can get a connection out of the pool again. + self.connection_pool.get_nowait() + + def test_connection_pool_maximum_connection_get_timeout(self): + connection_pool = _TestConnectionPool( + maxsize=1, + unused_timeout=self.unused_timeout, + conn_get_timeout=0) + + def _acquire_connection(): + with connection_pool.acquire(): + pass + + # Make sure we've consumed the only available connection from the pool + conn = connection_pool.get_nowait() + + self.assertRaises(exception.UnexpectedError, _acquire_connection) + + # Put the connection back and ensure we can acquire the connection + # after it is available. + connection_pool.put_nowait(conn) + _acquire_connection() diff --git a/keystone-moon/keystone/tests/unit/common/test_injection.py b/keystone-moon/keystone/tests/unit/common/test_injection.py new file mode 100644 index 00000000..86bb3c24 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_injection.py @@ -0,0 +1,293 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystone.common import dependency +from keystone.tests import unit as tests + + +class TestDependencyInjection(tests.BaseTestCase): + def setUp(self): + super(TestDependencyInjection, self).setUp() + self.addCleanup(dependency.reset) + + def test_dependency_injection(self): + class Interface(object): + def do_work(self): + assert False + + @dependency.provider('first_api') + class FirstImplementation(Interface): + def do_work(self): + return True + + @dependency.provider('second_api') + class SecondImplementation(Interface): + def do_work(self): + return True + + @dependency.requires('first_api', 'second_api') + class Consumer(object): + def do_work_with_dependencies(self): + assert self.first_api.do_work() + assert self.second_api.do_work() + + # initialize dependency providers + first_api = FirstImplementation() + second_api = SecondImplementation() + + # ... sometime later, initialize a dependency consumer + consumer = Consumer() + + # the expected dependencies should be available to the consumer + self.assertIs(consumer.first_api, first_api) + self.assertIs(consumer.second_api, second_api) + self.assertIsInstance(consumer.first_api, Interface) + self.assertIsInstance(consumer.second_api, Interface) + consumer.do_work_with_dependencies() + + def test_dependency_provider_configuration(self): + @dependency.provider('api') + class Configurable(object): + def __init__(self, value=None): + self.value = value + + def get_value(self): + return self.value + + @dependency.requires('api') + class Consumer(object): + def get_value(self): + return self.api.get_value() + + # initialize dependency providers + api = Configurable(value=True) + + # ... sometime later, initialize a dependency consumer + consumer = Consumer() + + # the expected dependencies should be available to the consumer + self.assertIs(consumer.api, api) + self.assertIsInstance(consumer.api, Configurable) + self.assertTrue(consumer.get_value()) + + def test_dependency_consumer_configuration(self): + @dependency.provider('api') + class Provider(object): + def get_value(self): + return True + + @dependency.requires('api') + class Configurable(object): + def __init__(self, value=None): + self.value = value + + def get_value(self): + if self.value: + return self.api.get_value() + + # initialize dependency providers + api = Provider() + + # ... sometime later, initialize a dependency consumer + consumer = Configurable(value=True) + + # the expected dependencies should be available to the consumer + self.assertIs(consumer.api, api) + self.assertIsInstance(consumer.api, Provider) + self.assertTrue(consumer.get_value()) + + def test_inherited_dependency(self): + class Interface(object): + def do_work(self): + assert False + + @dependency.provider('first_api') + class FirstImplementation(Interface): + def do_work(self): + return True + + @dependency.provider('second_api') + class SecondImplementation(Interface): + def do_work(self): + return True + + @dependency.requires('first_api') + class ParentConsumer(object): + def do_work_with_dependencies(self): + assert self.first_api.do_work() + + @dependency.requires('second_api') + class ChildConsumer(ParentConsumer): + def do_work_with_dependencies(self): + assert self.second_api.do_work() + super(ChildConsumer, self).do_work_with_dependencies() + + # initialize dependency providers + first_api = FirstImplementation() + second_api = SecondImplementation() + + # ... sometime later, initialize a dependency consumer + consumer = ChildConsumer() + + # dependencies should be naturally inherited + self.assertEqual( + set(['first_api']), + ParentConsumer._dependencies) + self.assertEqual( + set(['first_api', 'second_api']), + ChildConsumer._dependencies) + self.assertEqual( + set(['first_api', 'second_api']), + consumer._dependencies) + + # the expected dependencies should be available to the consumer + self.assertIs(consumer.first_api, first_api) + self.assertIs(consumer.second_api, second_api) + self.assertIsInstance(consumer.first_api, Interface) + self.assertIsInstance(consumer.second_api, Interface) + consumer.do_work_with_dependencies() + + def test_unresolvable_dependency(self): + @dependency.requires(uuid.uuid4().hex) + class Consumer(object): + pass + + def for_test(): + Consumer() + dependency.resolve_future_dependencies() + + self.assertRaises(dependency.UnresolvableDependencyException, for_test) + + def test_circular_dependency(self): + p1_name = uuid.uuid4().hex + p2_name = uuid.uuid4().hex + + @dependency.provider(p1_name) + @dependency.requires(p2_name) + class P1(object): + pass + + @dependency.provider(p2_name) + @dependency.requires(p1_name) + class P2(object): + pass + + p1 = P1() + p2 = P2() + + dependency.resolve_future_dependencies() + + self.assertIs(getattr(p1, p2_name), p2) + self.assertIs(getattr(p2, p1_name), p1) + + def test_reset(self): + # Can reset the registry of providers. + + p_id = uuid.uuid4().hex + + @dependency.provider(p_id) + class P(object): + pass + + p_inst = P() + + self.assertIs(dependency.get_provider(p_id), p_inst) + + dependency.reset() + + self.assertFalse(dependency._REGISTRY) + + def test_optional_dependency_not_provided(self): + requirement_name = uuid.uuid4().hex + + @dependency.optional(requirement_name) + class C1(object): + pass + + c1_inst = C1() + + dependency.resolve_future_dependencies() + + self.assertIsNone(getattr(c1_inst, requirement_name)) + + def test_optional_dependency_provided(self): + requirement_name = uuid.uuid4().hex + + @dependency.optional(requirement_name) + class C1(object): + pass + + @dependency.provider(requirement_name) + class P1(object): + pass + + c1_inst = C1() + p1_inst = P1() + + dependency.resolve_future_dependencies() + + self.assertIs(getattr(c1_inst, requirement_name), p1_inst) + + def test_optional_and_required(self): + p1_name = uuid.uuid4().hex + p2_name = uuid.uuid4().hex + optional_name = uuid.uuid4().hex + + @dependency.provider(p1_name) + @dependency.requires(p2_name) + @dependency.optional(optional_name) + class P1(object): + pass + + @dependency.provider(p2_name) + @dependency.requires(p1_name) + class P2(object): + pass + + p1 = P1() + p2 = P2() + + dependency.resolve_future_dependencies() + + self.assertIs(getattr(p1, p2_name), p2) + self.assertIs(getattr(p2, p1_name), p1) + self.assertIsNone(getattr(p1, optional_name)) + + def test_get_provider(self): + # Can get the instance of a provider using get_provider + + provider_name = uuid.uuid4().hex + + @dependency.provider(provider_name) + class P(object): + pass + + provider_instance = P() + retrieved_provider_instance = dependency.get_provider(provider_name) + self.assertIs(provider_instance, retrieved_provider_instance) + + def test_get_provider_not_provided_error(self): + # If no provider and provider is required then fails. + + provider_name = uuid.uuid4().hex + self.assertRaises(KeyError, dependency.get_provider, provider_name) + + def test_get_provider_not_provided_optional(self): + # If no provider and provider is optional then returns None. + + provider_name = uuid.uuid4().hex + self.assertIsNone(dependency.get_provider(provider_name, + dependency.GET_OPTIONAL)) diff --git a/keystone-moon/keystone/tests/unit/common/test_json_home.py b/keystone-moon/keystone/tests/unit/common/test_json_home.py new file mode 100644 index 00000000..fb7f8448 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_json_home.py @@ -0,0 +1,91 @@ +# Copyright 2014 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import copy + +from testtools import matchers + +from keystone.common import json_home +from keystone.tests import unit as tests + + +class JsonHomeTest(tests.BaseTestCase): + def test_build_v3_resource_relation(self): + resource_name = self.getUniqueString() + relation = json_home.build_v3_resource_relation(resource_name) + exp_relation = ( + 'http://docs.openstack.org/api/openstack-identity/3/rel/%s' % + resource_name) + self.assertThat(relation, matchers.Equals(exp_relation)) + + def test_build_v3_extension_resource_relation(self): + extension_name = self.getUniqueString() + extension_version = self.getUniqueString() + resource_name = self.getUniqueString() + relation = json_home.build_v3_extension_resource_relation( + extension_name, extension_version, resource_name) + exp_relation = ( + 'http://docs.openstack.org/api/openstack-identity/3/ext/%s/%s/rel/' + '%s' % (extension_name, extension_version, resource_name)) + self.assertThat(relation, matchers.Equals(exp_relation)) + + def test_build_v3_parameter_relation(self): + parameter_name = self.getUniqueString() + relation = json_home.build_v3_parameter_relation(parameter_name) + exp_relation = ( + 'http://docs.openstack.org/api/openstack-identity/3/param/%s' % + parameter_name) + self.assertThat(relation, matchers.Equals(exp_relation)) + + def test_build_v3_extension_parameter_relation(self): + extension_name = self.getUniqueString() + extension_version = self.getUniqueString() + parameter_name = self.getUniqueString() + relation = json_home.build_v3_extension_parameter_relation( + extension_name, extension_version, parameter_name) + exp_relation = ( + 'http://docs.openstack.org/api/openstack-identity/3/ext/%s/%s/' + 'param/%s' % (extension_name, extension_version, parameter_name)) + self.assertThat(relation, matchers.Equals(exp_relation)) + + def test_translate_urls(self): + href_rel = self.getUniqueString() + href = self.getUniqueString() + href_template_rel = self.getUniqueString() + href_template = self.getUniqueString() + href_vars = {self.getUniqueString(): self.getUniqueString()} + original_json_home = { + 'resources': { + href_rel: {'href': href}, + href_template_rel: { + 'href-template': href_template, + 'href-vars': href_vars} + } + } + + new_json_home = copy.deepcopy(original_json_home) + new_prefix = self.getUniqueString() + json_home.translate_urls(new_json_home, new_prefix) + + exp_json_home = { + 'resources': { + href_rel: {'href': new_prefix + href}, + href_template_rel: { + 'href-template': new_prefix + href_template, + 'href-vars': href_vars} + } + } + + self.assertThat(new_json_home, matchers.Equals(exp_json_home)) diff --git a/keystone-moon/keystone/tests/unit/common/test_ldap.py b/keystone-moon/keystone/tests/unit/common/test_ldap.py new file mode 100644 index 00000000..41568890 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_ldap.py @@ -0,0 +1,502 @@ +# -*- coding: utf-8 -*- +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +import ldap.dn +import mock +from oslo_config import cfg +from testtools import matchers + +import os +import shutil +import tempfile + +from keystone.common import ldap as ks_ldap +from keystone.common.ldap import core as common_ldap_core +from keystone.tests import unit as tests +from keystone.tests.unit import default_fixtures +from keystone.tests.unit import fakeldap + +CONF = cfg.CONF + + +class DnCompareTest(tests.BaseTestCase): + """Tests for the DN comparison functions in keystone.common.ldap.core.""" + + def test_prep(self): + # prep_case_insensitive returns the string with spaces at the front and + # end if it's already lowercase and no insignificant characters. + value = 'lowercase value' + self.assertEqual(value, ks_ldap.prep_case_insensitive(value)) + + def test_prep_lowercase(self): + # prep_case_insensitive returns the string with spaces at the front and + # end and lowercases the value. + value = 'UPPERCASE VALUE' + exp_value = value.lower() + self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value)) + + def test_prep_insignificant(self): + # prep_case_insensitive remove insignificant spaces. + value = 'before after' + exp_value = 'before after' + self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value)) + + def test_prep_insignificant_pre_post(self): + # prep_case_insensitive remove insignificant spaces. + value = ' value ' + exp_value = 'value' + self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value)) + + def test_ava_equal_same(self): + # is_ava_value_equal returns True if the two values are the same. + value = 'val1' + self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value)) + + def test_ava_equal_complex(self): + # is_ava_value_equal returns True if the two values are the same using + # a value that's got different capitalization and insignificant chars. + val1 = 'before after' + val2 = ' BEFORE afTer ' + self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2)) + + def test_ava_different(self): + # is_ava_value_equal returns False if the values aren't the same. + self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2')) + + def test_rdn_same(self): + # is_rdn_equal returns True if the two values are the same. + rdn = ldap.dn.str2dn('cn=val1')[0] + self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn)) + + def test_rdn_diff_length(self): + # is_rdn_equal returns False if the RDNs have a different number of + # AVAs. + rdn1 = ldap.dn.str2dn('cn=cn1')[0] + rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] + self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_multi_ava_same_order(self): + # is_rdn_equal returns True if the RDNs have the same number of AVAs + # and the values are the same. + rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] + rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0] + self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_multi_ava_diff_order(self): + # is_rdn_equal returns True if the RDNs have the same number of AVAs + # and the values are the same, even if in a different order + rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] + rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0] + self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_multi_ava_diff_type(self): + # is_rdn_equal returns False if the RDNs have the same number of AVAs + # and the attribute types are different. + rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0] + rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0] + self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_attr_type_case_diff(self): + # is_rdn_equal returns True for same RDNs even when attr type case is + # different. + rdn1 = ldap.dn.str2dn('cn=cn1')[0] + rdn2 = ldap.dn.str2dn('CN=cn1')[0] + self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_rdn_attr_type_alias(self): + # is_rdn_equal returns False for same RDNs even when attr type alias is + # used. Note that this is a limitation since an LDAP server should + # consider them equal. + rdn1 = ldap.dn.str2dn('cn=cn1')[0] + rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0] + self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2)) + + def test_dn_same(self): + # is_dn_equal returns True if the DNs are the same. + dn = 'cn=Babs Jansen,ou=OpenStack' + self.assertTrue(ks_ldap.is_dn_equal(dn, dn)) + + def test_dn_equal_unicode(self): + # is_dn_equal can accept unicode + dn = u'cn=fäké,ou=OpenStack' + self.assertTrue(ks_ldap.is_dn_equal(dn, dn)) + + def test_dn_diff_length(self): + # is_dn_equal returns False if the DNs don't have the same number of + # RDNs + dn1 = 'cn=Babs Jansen,ou=OpenStack' + dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com' + self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2)) + + def test_dn_equal_rdns(self): + # is_dn_equal returns True if the DNs have the same number of RDNs + # and each RDN is the same. + dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource' + dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack' + self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2)) + + def test_dn_parsed_dns(self): + # is_dn_equal can also accept parsed DNs. + dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource') + dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack') + self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2)) + + def test_startswith_under_child(self): + # dn_startswith returns True if descendant_dn is a child of dn. + child = 'cn=Babs Jansen,ou=OpenStack' + parent = 'ou=OpenStack' + self.assertTrue(ks_ldap.dn_startswith(child, parent)) + + def test_startswith_parent(self): + # dn_startswith returns False if descendant_dn is a parent of dn. + child = 'cn=Babs Jansen,ou=OpenStack' + parent = 'ou=OpenStack' + self.assertFalse(ks_ldap.dn_startswith(parent, child)) + + def test_startswith_same(self): + # dn_startswith returns False if DNs are the same. + dn = 'cn=Babs Jansen,ou=OpenStack' + self.assertFalse(ks_ldap.dn_startswith(dn, dn)) + + def test_startswith_not_parent(self): + # dn_startswith returns False if descendant_dn is not under the dn + child = 'cn=Babs Jansen,ou=OpenStack' + parent = 'dc=example.com' + self.assertFalse(ks_ldap.dn_startswith(child, parent)) + + def test_startswith_descendant(self): + # dn_startswith returns True if descendant_dn is a descendant of dn. + descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com' + dn = 'ou=OpenStack,dc=example.com' + self.assertTrue(ks_ldap.dn_startswith(descendant, dn)) + + descendant = 'uid=12345,ou=Users,dc=example,dc=com' + dn = 'ou=Users,dc=example,dc=com' + self.assertTrue(ks_ldap.dn_startswith(descendant, dn)) + + def test_startswith_parsed_dns(self): + # dn_startswith also accepts parsed DNs. + descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack') + dn = ldap.dn.str2dn('ou=OpenStack') + self.assertTrue(ks_ldap.dn_startswith(descendant, dn)) + + def test_startswith_unicode(self): + # dn_startswith accepts unicode. + child = u'cn=cn=fäké,ou=OpenStäck' + parent = 'ou=OpenStäck' + self.assertTrue(ks_ldap.dn_startswith(child, parent)) + + +class LDAPDeleteTreeTest(tests.TestCase): + + def setUp(self): + super(LDAPDeleteTreeTest, self).setUp() + + ks_ldap.register_handler('fake://', + fakeldap.FakeLdapNoSubtreeDelete) + self.load_backends() + self.load_fixtures(default_fixtures) + + self.addCleanup(self.clear_database) + self.addCleanup(common_ldap_core._HANDLERS.clear) + + def clear_database(self): + for shelf in fakeldap.FakeShelves: + fakeldap.FakeShelves[shelf].clear() + + def config_overrides(self): + super(LDAPDeleteTreeTest, self).config_overrides() + self.config_fixture.config( + group='identity', + driver='keystone.identity.backends.ldap.Identity') + + def config_files(self): + config_files = super(LDAPDeleteTreeTest, self).config_files() + config_files.append(tests.dirs.tests_conf('backend_ldap.conf')) + return config_files + + def test_deleteTree(self): + """Test manually deleting a tree. + + Few LDAP servers support CONTROL_DELETETREE. This test + exercises the alternate code paths in BaseLdap.deleteTree. + + """ + conn = self.identity_api.user.get_connection() + id_attr = self.identity_api.user.id_attr + objclass = self.identity_api.user.object_class.lower() + tree_dn = self.identity_api.user.tree_dn + + def create_entry(name, parent_dn=None): + if not parent_dn: + parent_dn = tree_dn + dn = '%s=%s,%s' % (id_attr, name, parent_dn) + attrs = [('objectclass', [objclass, 'ldapsubentry']), + (id_attr, [name])] + conn.add_s(dn, attrs) + return dn + + # create 3 entries like this: + # cn=base + # cn=child,cn=base + # cn=grandchild,cn=child,cn=base + # then attempt to deleteTree(cn=base) + base_id = 'base' + base_dn = create_entry(base_id) + child_dn = create_entry('child', base_dn) + grandchild_dn = create_entry('grandchild', child_dn) + + # verify that the three entries were created + scope = ldap.SCOPE_SUBTREE + filt = '(|(objectclass=*)(objectclass=ldapsubentry))' + entries = conn.search_s(base_dn, scope, filt, + attrlist=common_ldap_core.DN_ONLY) + self.assertThat(entries, matchers.HasLength(3)) + sort_ents = sorted([e[0] for e in entries], key=len, reverse=True) + self.assertEqual([grandchild_dn, child_dn, base_dn], sort_ents) + + # verify that a non-leaf node can't be deleted directly by the + # LDAP server + self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF, + conn.delete_s, base_dn) + self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF, + conn.delete_s, child_dn) + + # call our deleteTree implementation + self.identity_api.user.deleteTree(base_id) + self.assertRaises(ldap.NO_SUCH_OBJECT, + conn.search_s, base_dn, ldap.SCOPE_BASE) + self.assertRaises(ldap.NO_SUCH_OBJECT, + conn.search_s, child_dn, ldap.SCOPE_BASE) + self.assertRaises(ldap.NO_SUCH_OBJECT, + conn.search_s, grandchild_dn, ldap.SCOPE_BASE) + + +class SslTlsTest(tests.TestCase): + """Tests for the SSL/TLS functionality in keystone.common.ldap.core.""" + + @mock.patch.object(ks_ldap.core.KeystoneLDAPHandler, 'simple_bind_s') + @mock.patch.object(ldap.ldapobject.LDAPObject, 'start_tls_s') + def _init_ldap_connection(self, config, mock_ldap_one, mock_ldap_two): + # Attempt to connect to initialize python-ldap. + base_ldap = ks_ldap.BaseLdap(config) + base_ldap.get_connection() + + def test_certfile_trust_tls(self): + # We need this to actually exist, so we create a tempfile. + (handle, certfile) = tempfile.mkstemp() + self.addCleanup(os.unlink, certfile) + self.addCleanup(os.close, handle) + self.config_fixture.config(group='ldap', + url='ldap://localhost', + use_tls=True, + tls_cacertfile=certfile) + + self._init_ldap_connection(CONF) + + # Ensure the cert trust option is set. + self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)) + + def test_certdir_trust_tls(self): + # We need this to actually exist, so we create a tempdir. + certdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, certdir) + self.config_fixture.config(group='ldap', + url='ldap://localhost', + use_tls=True, + tls_cacertdir=certdir) + + self._init_ldap_connection(CONF) + + # Ensure the cert trust option is set. + self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR)) + + def test_certfile_trust_ldaps(self): + # We need this to actually exist, so we create a tempfile. + (handle, certfile) = tempfile.mkstemp() + self.addCleanup(os.unlink, certfile) + self.addCleanup(os.close, handle) + self.config_fixture.config(group='ldap', + url='ldaps://localhost', + use_tls=False, + tls_cacertfile=certfile) + + self._init_ldap_connection(CONF) + + # Ensure the cert trust option is set. + self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)) + + def test_certdir_trust_ldaps(self): + # We need this to actually exist, so we create a tempdir. + certdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, certdir) + self.config_fixture.config(group='ldap', + url='ldaps://localhost', + use_tls=False, + tls_cacertdir=certdir) + + self._init_ldap_connection(CONF) + + # Ensure the cert trust option is set. + self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR)) + + +class LDAPPagedResultsTest(tests.TestCase): + """Tests the paged results functionality in keystone.common.ldap.core.""" + + def setUp(self): + super(LDAPPagedResultsTest, self).setUp() + self.clear_database() + + ks_ldap.register_handler('fake://', fakeldap.FakeLdap) + self.addCleanup(common_ldap_core._HANDLERS.clear) + + self.load_backends() + self.load_fixtures(default_fixtures) + + def clear_database(self): + for shelf in fakeldap.FakeShelves: + fakeldap.FakeShelves[shelf].clear() + + def config_overrides(self): + super(LDAPPagedResultsTest, self).config_overrides() + self.config_fixture.config( + group='identity', + driver='keystone.identity.backends.ldap.Identity') + + def config_files(self): + config_files = super(LDAPPagedResultsTest, self).config_files() + config_files.append(tests.dirs.tests_conf('backend_ldap.conf')) + return config_files + + @mock.patch.object(fakeldap.FakeLdap, 'search_ext') + @mock.patch.object(fakeldap.FakeLdap, 'result3') + def test_paged_results_control_api(self, mock_result3, mock_search_ext): + mock_result3.return_value = ('', [], 1, []) + + self.config_fixture.config(group='ldap', + page_size=1) + + conn = self.identity_api.user.get_connection() + conn._paged_search_s('dc=example,dc=test', + ldap.SCOPE_SUBTREE, + 'objectclass=*') + + +class CommonLdapTestCase(tests.BaseTestCase): + """These test cases call functions in keystone.common.ldap.""" + + def test_binary_attribute_values(self): + result = [( + 'cn=junk,dc=example,dc=com', + { + 'cn': ['junk'], + 'sn': [uuid.uuid4().hex], + 'mail': [uuid.uuid4().hex], + 'binary_attr': ['\x00\xFF\x00\xFF'] + } + ), ] + py_result = ks_ldap.convert_ldap_result(result) + # The attribute containing the binary value should + # not be present in the converted result. + self.assertNotIn('binary_attr', py_result[0][1]) + + def test_utf8_conversion(self): + value_unicode = u'fäké1' + value_utf8 = value_unicode.encode('utf-8') + + result_utf8 = ks_ldap.utf8_encode(value_unicode) + self.assertEqual(value_utf8, result_utf8) + + result_utf8 = ks_ldap.utf8_encode(value_utf8) + self.assertEqual(value_utf8, result_utf8) + + result_unicode = ks_ldap.utf8_decode(value_utf8) + self.assertEqual(value_unicode, result_unicode) + + result_unicode = ks_ldap.utf8_decode(value_unicode) + self.assertEqual(value_unicode, result_unicode) + + self.assertRaises(TypeError, + ks_ldap.utf8_encode, + 100) + + result_unicode = ks_ldap.utf8_decode(100) + self.assertEqual(u'100', result_unicode) + + def test_user_id_begins_with_0(self): + user_id = '0123456' + result = [( + 'cn=dummy,dc=example,dc=com', + { + 'user_id': [user_id], + 'enabled': ['TRUE'] + } + ), ] + py_result = ks_ldap.convert_ldap_result(result) + # The user id should be 0123456, and the enabled + # flag should be True + self.assertIs(py_result[0][1]['enabled'][0], True) + self.assertEqual(user_id, py_result[0][1]['user_id'][0]) + + def test_user_id_begins_with_0_and_enabled_bit_mask(self): + user_id = '0123456' + bitmask = '225' + expected_bitmask = 225 + result = [( + 'cn=dummy,dc=example,dc=com', + { + 'user_id': [user_id], + 'enabled': [bitmask] + } + ), ] + py_result = ks_ldap.convert_ldap_result(result) + # The user id should be 0123456, and the enabled + # flag should be 225 + self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0]) + self.assertEqual(user_id, py_result[0][1]['user_id'][0]) + + def test_user_id_and_bitmask_begins_with_0(self): + user_id = '0123456' + bitmask = '0225' + expected_bitmask = 225 + result = [( + 'cn=dummy,dc=example,dc=com', + { + 'user_id': [user_id], + 'enabled': [bitmask] + } + ), ] + py_result = ks_ldap.convert_ldap_result(result) + # The user id should be 0123456, and the enabled + # flag should be 225, the 0 is dropped. + self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0]) + self.assertEqual(user_id, py_result[0][1]['user_id'][0]) + + def test_user_id_and_user_name_with_boolean_string(self): + boolean_strings = ['TRUE', 'FALSE', 'true', 'false', 'True', 'False', + 'TrUe' 'FaLse'] + for user_name in boolean_strings: + user_id = uuid.uuid4().hex + result = [( + 'cn=dummy,dc=example,dc=com', + { + 'user_id': [user_id], + 'user_name': [user_name] + } + ), ] + py_result = ks_ldap.convert_ldap_result(result) + # The user name should still be a string value. + self.assertEqual(user_name, py_result[0][1]['user_name'][0]) diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py new file mode 100644 index 00000000..55dd556d --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_notifications.py @@ -0,0 +1,974 @@ +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import uuid + +import mock +from oslo_config import cfg +from oslo_config import fixture as config_fixture +from oslotest import mockpatch +from pycadf import cadftaxonomy +from pycadf import cadftype +from pycadf import eventfactory +from pycadf import resource as cadfresource +import testtools + +from keystone.common import dependency +from keystone import notifications +from keystone.tests.unit import test_v3 + + +CONF = cfg.CONF + +EXP_RESOURCE_TYPE = uuid.uuid4().hex +CREATED_OPERATION = notifications.ACTIONS.created +UPDATED_OPERATION = notifications.ACTIONS.updated +DELETED_OPERATION = notifications.ACTIONS.deleted +DISABLED_OPERATION = notifications.ACTIONS.disabled + + +class ArbitraryException(Exception): + pass + + +def register_callback(operation, resource_type=EXP_RESOURCE_TYPE): + """Helper for creating and registering a mock callback. + + """ + callback = mock.Mock(__name__='callback', + im_class=mock.Mock(__name__='class')) + notifications.register_event_callback(operation, resource_type, callback) + return callback + + +class AuditNotificationsTestCase(testtools.TestCase): + def setUp(self): + super(AuditNotificationsTestCase, self).setUp() + self.config_fixture = self.useFixture(config_fixture.Config(CONF)) + self.addCleanup(notifications.clear_subscribers) + + def _test_notification_operation(self, notify_function, operation): + exp_resource_id = uuid.uuid4().hex + callback = register_callback(operation) + notify_function(EXP_RESOURCE_TYPE, exp_resource_id) + callback.assert_called_once_with('identity', EXP_RESOURCE_TYPE, + operation, + {'resource_info': exp_resource_id}) + self.config_fixture.config(notification_format='cadf') + with mock.patch( + 'keystone.notifications._create_cadf_payload') as cadf_notify: + notify_function(EXP_RESOURCE_TYPE, exp_resource_id) + initiator = None + cadf_notify.assert_called_once_with( + operation, EXP_RESOURCE_TYPE, exp_resource_id, + notifications.taxonomy.OUTCOME_SUCCESS, initiator) + notify_function(EXP_RESOURCE_TYPE, exp_resource_id, public=False) + cadf_notify.assert_called_once_with( + operation, EXP_RESOURCE_TYPE, exp_resource_id, + notifications.taxonomy.OUTCOME_SUCCESS, initiator) + + def test_resource_created_notification(self): + self._test_notification_operation(notifications.Audit.created, + CREATED_OPERATION) + + def test_resource_updated_notification(self): + self._test_notification_operation(notifications.Audit.updated, + UPDATED_OPERATION) + + def test_resource_deleted_notification(self): + self._test_notification_operation(notifications.Audit.deleted, + DELETED_OPERATION) + + def test_resource_disabled_notification(self): + self._test_notification_operation(notifications.Audit.disabled, + DISABLED_OPERATION) + + +class NotificationsWrapperTestCase(testtools.TestCase): + def create_fake_ref(self): + resource_id = uuid.uuid4().hex + return resource_id, { + 'id': resource_id, + 'key': uuid.uuid4().hex + } + + @notifications.created(EXP_RESOURCE_TYPE) + def create_resource(self, resource_id, data): + return data + + def test_resource_created_notification(self): + exp_resource_id, data = self.create_fake_ref() + callback = register_callback(CREATED_OPERATION) + + self.create_resource(exp_resource_id, data) + callback.assert_called_with('identity', EXP_RESOURCE_TYPE, + CREATED_OPERATION, + {'resource_info': exp_resource_id}) + + @notifications.updated(EXP_RESOURCE_TYPE) + def update_resource(self, resource_id, data): + return data + + def test_resource_updated_notification(self): + exp_resource_id, data = self.create_fake_ref() + callback = register_callback(UPDATED_OPERATION) + + self.update_resource(exp_resource_id, data) + callback.assert_called_with('identity', EXP_RESOURCE_TYPE, + UPDATED_OPERATION, + {'resource_info': exp_resource_id}) + + @notifications.deleted(EXP_RESOURCE_TYPE) + def delete_resource(self, resource_id): + pass + + def test_resource_deleted_notification(self): + exp_resource_id = uuid.uuid4().hex + callback = register_callback(DELETED_OPERATION) + + self.delete_resource(exp_resource_id) + callback.assert_called_with('identity', EXP_RESOURCE_TYPE, + DELETED_OPERATION, + {'resource_info': exp_resource_id}) + + @notifications.created(EXP_RESOURCE_TYPE) + def create_exception(self, resource_id): + raise ArbitraryException() + + def test_create_exception_without_notification(self): + callback = register_callback(CREATED_OPERATION) + self.assertRaises( + ArbitraryException, self.create_exception, uuid.uuid4().hex) + self.assertFalse(callback.called) + + @notifications.created(EXP_RESOURCE_TYPE) + def update_exception(self, resource_id): + raise ArbitraryException() + + def test_update_exception_without_notification(self): + callback = register_callback(UPDATED_OPERATION) + self.assertRaises( + ArbitraryException, self.update_exception, uuid.uuid4().hex) + self.assertFalse(callback.called) + + @notifications.deleted(EXP_RESOURCE_TYPE) + def delete_exception(self, resource_id): + raise ArbitraryException() + + def test_delete_exception_without_notification(self): + callback = register_callback(DELETED_OPERATION) + self.assertRaises( + ArbitraryException, self.delete_exception, uuid.uuid4().hex) + self.assertFalse(callback.called) + + +class NotificationsTestCase(testtools.TestCase): + def setUp(self): + super(NotificationsTestCase, self).setUp() + + # these should use self.config_fixture.config(), but they haven't + # been registered yet + CONF.rpc_backend = 'fake' + CONF.notification_driver = ['fake'] + + def test_send_notification(self): + """Test the private method _send_notification to ensure event_type, + payload, and context are built and passed properly. + """ + resource = uuid.uuid4().hex + resource_type = EXP_RESOURCE_TYPE + operation = CREATED_OPERATION + + # NOTE(ldbragst): Even though notifications._send_notification doesn't + # contain logic that creates cases, this is supposed to test that + # context is always empty and that we ensure the resource ID of the + # resource in the notification is contained in the payload. It was + # agreed that context should be empty in Keystone's case, which is + # also noted in the /keystone/notifications.py module. This test + # ensures and maintains these conditions. + expected_args = [ + {}, # empty context + 'identity.%s.created' % resource_type, # event_type + {'resource_info': resource}, # payload + 'INFO', # priority is always INFO... + ] + + with mock.patch.object(notifications._get_notifier(), + '_notify') as mocked: + notifications._send_notification(operation, resource_type, + resource) + mocked.assert_called_once_with(*expected_args) + + +class BaseNotificationTest(test_v3.RestfulTestCase): + + def setUp(self): + super(BaseNotificationTest, self).setUp() + + self._notifications = [] + self._audits = [] + + def fake_notify(operation, resource_type, resource_id, + public=True): + note = { + 'resource_id': resource_id, + 'operation': operation, + 'resource_type': resource_type, + 'send_notification_called': True, + 'public': public} + self._notifications.append(note) + + self.useFixture(mockpatch.PatchObject( + notifications, '_send_notification', fake_notify)) + + def fake_audit(action, initiator, outcome, target, + event_type, **kwargs): + service_security = cadftaxonomy.SERVICE_SECURITY + + event = eventfactory.EventFactory().new_event( + eventType=cadftype.EVENTTYPE_ACTIVITY, + outcome=outcome, + action=action, + initiator=initiator, + target=target, + observer=cadfresource.Resource(typeURI=service_security)) + + for key, value in kwargs.items(): + setattr(event, key, value) + + audit = { + 'payload': event.as_dict(), + 'event_type': event_type, + 'send_notification_called': True} + self._audits.append(audit) + + self.useFixture(mockpatch.PatchObject( + notifications, '_send_audit_notification', fake_audit)) + + def _assert_last_note(self, resource_id, operation, resource_type): + # NOTE(stevemar): If 'basic' format is not used, then simply + # return since this assertion is not valid. + if CONF.notification_format != 'basic': + return + self.assertTrue(len(self._notifications) > 0) + note = self._notifications[-1] + self.assertEqual(note['operation'], operation) + self.assertEqual(note['resource_id'], resource_id) + self.assertEqual(note['resource_type'], resource_type) + self.assertTrue(note['send_notification_called']) + + def _assert_last_audit(self, resource_id, operation, resource_type, + target_uri): + # NOTE(stevemar): If 'cadf' format is not used, then simply + # return since this assertion is not valid. + if CONF.notification_format != 'cadf': + return + self.assertTrue(len(self._audits) > 0) + audit = self._audits[-1] + payload = audit['payload'] + self.assertEqual(resource_id, payload['resource_info']) + action = '%s.%s' % (operation, resource_type) + self.assertEqual(action, payload['action']) + self.assertEqual(target_uri, payload['target']['typeURI']) + self.assertEqual(resource_id, payload['target']['id']) + event_type = '%s.%s.%s' % ('identity', resource_type, operation) + self.assertEqual(event_type, audit['event_type']) + self.assertTrue(audit['send_notification_called']) + + def _assert_notify_not_sent(self, resource_id, operation, resource_type, + public=True): + unexpected = { + 'resource_id': resource_id, + 'operation': operation, + 'resource_type': resource_type, + 'send_notification_called': True, + 'public': public} + for note in self._notifications: + self.assertNotEqual(unexpected, note) + + def _assert_notify_sent(self, resource_id, operation, resource_type, + public=True): + expected = { + 'resource_id': resource_id, + 'operation': operation, + 'resource_type': resource_type, + 'send_notification_called': True, + 'public': public} + for note in self._notifications: + if expected == note: + break + else: + self.fail("Notification not sent.") + + +class NotificationsForEntities(BaseNotificationTest): + + def test_create_group(self): + group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = self.identity_api.create_group(group_ref) + self._assert_last_note(group_ref['id'], CREATED_OPERATION, 'group') + self._assert_last_audit(group_ref['id'], CREATED_OPERATION, 'group', + cadftaxonomy.SECURITY_GROUP) + + def test_create_project(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self._assert_last_note( + project_ref['id'], CREATED_OPERATION, 'project') + self._assert_last_audit(project_ref['id'], CREATED_OPERATION, + 'project', cadftaxonomy.SECURITY_PROJECT) + + def test_create_role(self): + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role') + self._assert_last_audit(role_ref['id'], CREATED_OPERATION, 'role', + cadftaxonomy.SECURITY_ROLE) + + def test_create_user(self): + user_ref = self.new_user_ref(domain_id=self.domain_id) + user_ref = self.identity_api.create_user(user_ref) + self._assert_last_note(user_ref['id'], CREATED_OPERATION, 'user') + self._assert_last_audit(user_ref['id'], CREATED_OPERATION, 'user', + cadftaxonomy.SECURITY_ACCOUNT_USER) + + def test_create_trust(self): + trustor = self.new_user_ref(domain_id=self.domain_id) + trustor = self.identity_api.create_user(trustor) + trustee = self.new_user_ref(domain_id=self.domain_id) + trustee = self.identity_api.create_user(trustee) + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + trust_ref = self.new_trust_ref(trustor['id'], + trustee['id']) + self.trust_api.create_trust(trust_ref['id'], + trust_ref, + [role_ref]) + self._assert_last_note( + trust_ref['id'], CREATED_OPERATION, 'OS-TRUST:trust') + self._assert_last_audit(trust_ref['id'], CREATED_OPERATION, + 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) + + def test_delete_group(self): + group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = self.identity_api.create_group(group_ref) + self.identity_api.delete_group(group_ref['id']) + self._assert_last_note(group_ref['id'], DELETED_OPERATION, 'group') + self._assert_last_audit(group_ref['id'], DELETED_OPERATION, 'group', + cadftaxonomy.SECURITY_GROUP) + + def test_delete_project(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self.assignment_api.delete_project(project_ref['id']) + self._assert_last_note( + project_ref['id'], DELETED_OPERATION, 'project') + self._assert_last_audit(project_ref['id'], DELETED_OPERATION, + 'project', cadftaxonomy.SECURITY_PROJECT) + + def test_delete_role(self): + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + self.role_api.delete_role(role_ref['id']) + self._assert_last_note(role_ref['id'], DELETED_OPERATION, 'role') + self._assert_last_audit(role_ref['id'], DELETED_OPERATION, 'role', + cadftaxonomy.SECURITY_ROLE) + + def test_delete_user(self): + user_ref = self.new_user_ref(domain_id=self.domain_id) + user_ref = self.identity_api.create_user(user_ref) + self.identity_api.delete_user(user_ref['id']) + self._assert_last_note(user_ref['id'], DELETED_OPERATION, 'user') + self._assert_last_audit(user_ref['id'], DELETED_OPERATION, 'user', + cadftaxonomy.SECURITY_ACCOUNT_USER) + + def test_create_domain(self): + domain_ref = self.new_domain_ref() + self.resource_api.create_domain(domain_ref['id'], domain_ref) + self._assert_last_note(domain_ref['id'], CREATED_OPERATION, 'domain') + self._assert_last_audit(domain_ref['id'], CREATED_OPERATION, 'domain', + cadftaxonomy.SECURITY_DOMAIN) + + def test_update_domain(self): + domain_ref = self.new_domain_ref() + self.assignment_api.create_domain(domain_ref['id'], domain_ref) + domain_ref['description'] = uuid.uuid4().hex + self.assignment_api.update_domain(domain_ref['id'], domain_ref) + self._assert_last_note(domain_ref['id'], UPDATED_OPERATION, 'domain') + self._assert_last_audit(domain_ref['id'], UPDATED_OPERATION, 'domain', + cadftaxonomy.SECURITY_DOMAIN) + + def test_delete_domain(self): + domain_ref = self.new_domain_ref() + self.assignment_api.create_domain(domain_ref['id'], domain_ref) + domain_ref['enabled'] = False + self.assignment_api.update_domain(domain_ref['id'], domain_ref) + self.assignment_api.delete_domain(domain_ref['id']) + self._assert_last_note(domain_ref['id'], DELETED_OPERATION, 'domain') + self._assert_last_audit(domain_ref['id'], DELETED_OPERATION, 'domain', + cadftaxonomy.SECURITY_DOMAIN) + + def test_delete_trust(self): + trustor = self.new_user_ref(domain_id=self.domain_id) + trustor = self.identity_api.create_user(trustor) + trustee = self.new_user_ref(domain_id=self.domain_id) + trustee = self.identity_api.create_user(trustee) + role_ref = self.new_role_ref() + trust_ref = self.new_trust_ref(trustor['id'], trustee['id']) + self.trust_api.create_trust(trust_ref['id'], + trust_ref, + [role_ref]) + self.trust_api.delete_trust(trust_ref['id']) + self._assert_last_note( + trust_ref['id'], DELETED_OPERATION, 'OS-TRUST:trust') + self._assert_last_audit(trust_ref['id'], DELETED_OPERATION, + 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) + + def test_create_endpoint(self): + endpoint_ref = self.new_endpoint_ref(service_id=self.service_id) + self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) + self._assert_notify_sent(endpoint_ref['id'], CREATED_OPERATION, + 'endpoint') + self._assert_last_audit(endpoint_ref['id'], CREATED_OPERATION, + 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) + + def test_update_endpoint(self): + endpoint_ref = self.new_endpoint_ref(service_id=self.service_id) + self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) + self.catalog_api.update_endpoint(endpoint_ref['id'], endpoint_ref) + self._assert_notify_sent(endpoint_ref['id'], UPDATED_OPERATION, + 'endpoint') + self._assert_last_audit(endpoint_ref['id'], UPDATED_OPERATION, + 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) + + def test_delete_endpoint(self): + endpoint_ref = self.new_endpoint_ref(service_id=self.service_id) + self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) + self.catalog_api.delete_endpoint(endpoint_ref['id']) + self._assert_notify_sent(endpoint_ref['id'], DELETED_OPERATION, + 'endpoint') + self._assert_last_audit(endpoint_ref['id'], DELETED_OPERATION, + 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) + + def test_create_service(self): + service_ref = self.new_service_ref() + self.catalog_api.create_service(service_ref['id'], service_ref) + self._assert_notify_sent(service_ref['id'], CREATED_OPERATION, + 'service') + self._assert_last_audit(service_ref['id'], CREATED_OPERATION, + 'service', cadftaxonomy.SECURITY_SERVICE) + + def test_update_service(self): + service_ref = self.new_service_ref() + self.catalog_api.create_service(service_ref['id'], service_ref) + self.catalog_api.update_service(service_ref['id'], service_ref) + self._assert_notify_sent(service_ref['id'], UPDATED_OPERATION, + 'service') + self._assert_last_audit(service_ref['id'], UPDATED_OPERATION, + 'service', cadftaxonomy.SECURITY_SERVICE) + + def test_delete_service(self): + service_ref = self.new_service_ref() + self.catalog_api.create_service(service_ref['id'], service_ref) + self.catalog_api.delete_service(service_ref['id']) + self._assert_notify_sent(service_ref['id'], DELETED_OPERATION, + 'service') + self._assert_last_audit(service_ref['id'], DELETED_OPERATION, + 'service', cadftaxonomy.SECURITY_SERVICE) + + def test_create_region(self): + region_ref = self.new_region_ref() + self.catalog_api.create_region(region_ref) + self._assert_notify_sent(region_ref['id'], CREATED_OPERATION, + 'region') + self._assert_last_audit(region_ref['id'], CREATED_OPERATION, + 'region', cadftaxonomy.SECURITY_REGION) + + def test_update_region(self): + region_ref = self.new_region_ref() + self.catalog_api.create_region(region_ref) + self.catalog_api.update_region(region_ref['id'], region_ref) + self._assert_notify_sent(region_ref['id'], UPDATED_OPERATION, + 'region') + self._assert_last_audit(region_ref['id'], UPDATED_OPERATION, + 'region', cadftaxonomy.SECURITY_REGION) + + def test_delete_region(self): + region_ref = self.new_region_ref() + self.catalog_api.create_region(region_ref) + self.catalog_api.delete_region(region_ref['id']) + self._assert_notify_sent(region_ref['id'], DELETED_OPERATION, + 'region') + self._assert_last_audit(region_ref['id'], DELETED_OPERATION, + 'region', cadftaxonomy.SECURITY_REGION) + + def test_create_policy(self): + policy_ref = self.new_policy_ref() + self.policy_api.create_policy(policy_ref['id'], policy_ref) + self._assert_notify_sent(policy_ref['id'], CREATED_OPERATION, + 'policy') + self._assert_last_audit(policy_ref['id'], CREATED_OPERATION, + 'policy', cadftaxonomy.SECURITY_POLICY) + + def test_update_policy(self): + policy_ref = self.new_policy_ref() + self.policy_api.create_policy(policy_ref['id'], policy_ref) + self.policy_api.update_policy(policy_ref['id'], policy_ref) + self._assert_notify_sent(policy_ref['id'], UPDATED_OPERATION, + 'policy') + self._assert_last_audit(policy_ref['id'], UPDATED_OPERATION, + 'policy', cadftaxonomy.SECURITY_POLICY) + + def test_delete_policy(self): + policy_ref = self.new_policy_ref() + self.policy_api.create_policy(policy_ref['id'], policy_ref) + self.policy_api.delete_policy(policy_ref['id']) + self._assert_notify_sent(policy_ref['id'], DELETED_OPERATION, + 'policy') + self._assert_last_audit(policy_ref['id'], DELETED_OPERATION, + 'policy', cadftaxonomy.SECURITY_POLICY) + + def test_disable_domain(self): + domain_ref = self.new_domain_ref() + self.assignment_api.create_domain(domain_ref['id'], domain_ref) + domain_ref['enabled'] = False + self.assignment_api.update_domain(domain_ref['id'], domain_ref) + self._assert_notify_sent(domain_ref['id'], 'disabled', 'domain', + public=False) + + def test_disable_of_disabled_domain_does_not_notify(self): + domain_ref = self.new_domain_ref() + domain_ref['enabled'] = False + self.assignment_api.create_domain(domain_ref['id'], domain_ref) + # The domain_ref above is not changed during the create process. We + # can use the same ref to perform the update. + self.assignment_api.update_domain(domain_ref['id'], domain_ref) + self._assert_notify_not_sent(domain_ref['id'], 'disabled', 'domain', + public=False) + + def test_update_group(self): + group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = self.identity_api.create_group(group_ref) + self.identity_api.update_group(group_ref['id'], group_ref) + self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group') + self._assert_last_audit(group_ref['id'], UPDATED_OPERATION, 'group', + cadftaxonomy.SECURITY_GROUP) + + def test_update_project(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self.assignment_api.update_project(project_ref['id'], project_ref) + self._assert_notify_sent( + project_ref['id'], UPDATED_OPERATION, 'project', public=True) + self._assert_last_audit(project_ref['id'], UPDATED_OPERATION, + 'project', cadftaxonomy.SECURITY_PROJECT) + + def test_disable_project(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + project_ref['enabled'] = False + self.assignment_api.update_project(project_ref['id'], project_ref) + self._assert_notify_sent(project_ref['id'], 'disabled', 'project', + public=False) + + def test_disable_of_disabled_project_does_not_notify(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref['enabled'] = False + self.assignment_api.create_project(project_ref['id'], project_ref) + # The project_ref above is not changed during the create process. We + # can use the same ref to perform the update. + self.assignment_api.update_project(project_ref['id'], project_ref) + self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project', + public=False) + + def test_update_project_does_not_send_disable(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + project_ref['enabled'] = True + self.assignment_api.update_project(project_ref['id'], project_ref) + self._assert_last_note( + project_ref['id'], UPDATED_OPERATION, 'project') + self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project') + + def test_update_role(self): + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + self.role_api.update_role(role_ref['id'], role_ref) + self._assert_last_note(role_ref['id'], UPDATED_OPERATION, 'role') + self._assert_last_audit(role_ref['id'], UPDATED_OPERATION, 'role', + cadftaxonomy.SECURITY_ROLE) + + def test_update_user(self): + user_ref = self.new_user_ref(domain_id=self.domain_id) + user_ref = self.identity_api.create_user(user_ref) + self.identity_api.update_user(user_ref['id'], user_ref) + self._assert_last_note(user_ref['id'], UPDATED_OPERATION, 'user') + self._assert_last_audit(user_ref['id'], UPDATED_OPERATION, 'user', + cadftaxonomy.SECURITY_ACCOUNT_USER) + + def test_config_option_no_events(self): + self.config_fixture.config(notification_format='basic') + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + # The regular notifications will still be emitted, since they are + # used for callback handling. + self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role') + # No audit event should have occurred + self.assertEqual(0, len(self._audits)) + + +class CADFNotificationsForEntities(NotificationsForEntities): + + def setUp(self): + super(CADFNotificationsForEntities, self).setUp() + self.config_fixture.config(notification_format='cadf') + + def test_initiator_data_is_set(self): + ref = self.new_domain_ref() + resp = self.post('/domains', body={'domain': ref}) + resource_id = resp.result.get('domain').get('id') + self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain', + cadftaxonomy.SECURITY_DOMAIN) + self.assertTrue(len(self._audits) > 0) + audit = self._audits[-1] + payload = audit['payload'] + self.assertEqual(self.user_id, payload['initiator']['id']) + self.assertEqual(self.project_id, payload['initiator']['project_id']) + + +class TestEventCallbacks(test_v3.RestfulTestCase): + + def setUp(self): + super(TestEventCallbacks, self).setUp() + self.has_been_called = False + + def _project_deleted_callback(self, service, resource_type, operation, + payload): + self.has_been_called = True + + def _project_created_callback(self, service, resource_type, operation, + payload): + self.has_been_called = True + + def test_notification_received(self): + callback = register_callback(CREATED_OPERATION, 'project') + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self.assertTrue(callback.called) + + def test_notification_method_not_callable(self): + fake_method = None + self.assertRaises(TypeError, + notifications.register_event_callback, + UPDATED_OPERATION, + 'project', + [fake_method]) + + def test_notification_event_not_valid(self): + self.assertRaises(ValueError, + notifications.register_event_callback, + uuid.uuid4().hex, + 'project', + self._project_deleted_callback) + + def test_event_registration_for_unknown_resource_type(self): + # Registration for unknown resource types should succeed. If no event + # is issued for that resource type, the callback wont be triggered. + notifications.register_event_callback(DELETED_OPERATION, + uuid.uuid4().hex, + self._project_deleted_callback) + resource_type = uuid.uuid4().hex + notifications.register_event_callback(DELETED_OPERATION, + resource_type, + self._project_deleted_callback) + + def test_provider_event_callbacks_subscription(self): + callback_called = [] + + @dependency.provider('foo_api') + class Foo(object): + def __init__(self): + self.event_callbacks = { + CREATED_OPERATION: {'project': [self.foo_callback]}} + + def foo_callback(self, service, resource_type, operation, + payload): + # uses callback_called from the closure + callback_called.append(True) + + Foo() + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self.assertEqual([True], callback_called) + + def test_invalid_event_callbacks(self): + @dependency.provider('foo_api') + class Foo(object): + def __init__(self): + self.event_callbacks = 'bogus' + + self.assertRaises(ValueError, Foo) + + def test_invalid_event_callbacks_event(self): + @dependency.provider('foo_api') + class Foo(object): + def __init__(self): + self.event_callbacks = {CREATED_OPERATION: 'bogus'} + + self.assertRaises(ValueError, Foo) + + +class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): + + LOCAL_HOST = 'localhost' + ACTION = 'authenticate' + ROLE_ASSIGNMENT = 'role_assignment' + + def setUp(self): + super(CadfNotificationsWrapperTestCase, self).setUp() + self._notifications = [] + + def fake_notify(action, initiator, outcome, target, + event_type, **kwargs): + service_security = cadftaxonomy.SERVICE_SECURITY + + event = eventfactory.EventFactory().new_event( + eventType=cadftype.EVENTTYPE_ACTIVITY, + outcome=outcome, + action=action, + initiator=initiator, + target=target, + observer=cadfresource.Resource(typeURI=service_security)) + + for key, value in kwargs.items(): + setattr(event, key, value) + + note = { + 'action': action, + 'initiator': initiator, + 'event': event, + 'send_notification_called': True} + self._notifications.append(note) + + self.useFixture(mockpatch.PatchObject( + notifications, '_send_audit_notification', fake_notify)) + + def _assert_last_note(self, action, user_id): + self.assertTrue(self._notifications) + note = self._notifications[-1] + self.assertEqual(note['action'], action) + initiator = note['initiator'] + self.assertEqual(initiator.id, user_id) + self.assertEqual(initiator.host.address, self.LOCAL_HOST) + self.assertTrue(note['send_notification_called']) + + def _assert_event(self, role_id, project=None, domain=None, + user=None, group=None, inherit=False): + """Assert that the CADF event is valid. + + In the case of role assignments, the event will have extra data, + specifically, the role, target, actor, and if the role is inherited. + + An example event, as a dictionary is seen below: + { + 'typeURI': 'http://schemas.dmtf.org/cloud/audit/1.0/event', + 'initiator': { + 'typeURI': 'service/security/account/user', + 'host': {'address': 'localhost'}, + 'id': 'openstack:0a90d95d-582c-4efb-9cbc-e2ca7ca9c341', + 'name': u'bccc2d9bfc2a46fd9e33bcf82f0b5c21' + }, + 'target': { + 'typeURI': 'service/security/account/user', + 'id': 'openstack:d48ea485-ef70-4f65-8d2b-01aa9d7ec12d' + }, + 'observer': { + 'typeURI': 'service/security', + 'id': 'openstack:d51dd870-d929-4aba-8d75-dcd7555a0c95' + }, + 'eventType': 'activity', + 'eventTime': '2014-08-21T21:04:56.204536+0000', + 'role': u'0e6b990380154a2599ce6b6e91548a68', + 'domain': u'24bdcff1aab8474895dbaac509793de1', + 'inherited_to_projects': False, + 'group': u'c1e22dc67cbd469ea0e33bf428fe597a', + 'action': 'created.role_assignment', + 'outcome': 'success', + 'id': 'openstack:782689dd-f428-4f13-99c7-5c70f94a5ac1' + } + """ + + note = self._notifications[-1] + event = note['event'] + if project: + self.assertEqual(project, event.project) + if domain: + self.assertEqual(domain, event.domain) + if user: + self.assertEqual(user, event.user) + if group: + self.assertEqual(group, event.group) + self.assertEqual(role_id, event.role) + self.assertEqual(inherit, event.inherited_to_projects) + + def test_v3_authenticate_user_name_and_domain_id(self): + user_id = self.user_id + user_name = self.user['name'] + password = self.user['password'] + domain_id = self.domain_id + data = self.build_authentication_request(username=user_name, + user_domain_id=domain_id, + password=password) + self.post('/auth/tokens', body=data) + self._assert_last_note(self.ACTION, user_id) + + def test_v3_authenticate_user_id(self): + user_id = self.user_id + password = self.user['password'] + data = self.build_authentication_request(user_id=user_id, + password=password) + self.post('/auth/tokens', body=data) + self._assert_last_note(self.ACTION, user_id) + + def test_v3_authenticate_user_name_and_domain_name(self): + user_id = self.user_id + user_name = self.user['name'] + password = self.user['password'] + domain_name = self.domain['name'] + data = self.build_authentication_request(username=user_name, + user_domain_name=domain_name, + password=password) + self.post('/auth/tokens', body=data) + self._assert_last_note(self.ACTION, user_id) + + def _test_role_assignment(self, url, role, project=None, domain=None, + user=None, group=None): + self.put(url) + action = "%s.%s" % (CREATED_OPERATION, self.ROLE_ASSIGNMENT) + self._assert_last_note(action, self.user_id) + self._assert_event(role, project, domain, user, group) + self.delete(url) + action = "%s.%s" % (DELETED_OPERATION, self.ROLE_ASSIGNMENT) + self._assert_last_note(action, self.user_id) + self._assert_event(role, project, domain, user, group) + + def test_user_project_grant(self): + url = ('/projects/%s/users/%s/roles/%s' % + (self.project_id, self.user_id, self.role_id)) + self._test_role_assignment(url, self.role_id, + project=self.project_id, + user=self.user_id) + + def test_group_domain_grant(self): + group_ref = self.new_group_ref(domain_id=self.domain_id) + group = self.identity_api.create_group(group_ref) + url = ('/domains/%s/groups/%s/roles/%s' % + (self.domain_id, group['id'], self.role_id)) + self._test_role_assignment(url, self.role_id, + domain=self.domain_id, + group=group['id']) + + +class TestCallbackRegistration(testtools.TestCase): + def setUp(self): + super(TestCallbackRegistration, self).setUp() + self.mock_log = mock.Mock() + # Force the callback logging to occur + self.mock_log.logger.getEffectiveLevel.return_value = logging.DEBUG + + def verify_log_message(self, data): + """Tests that use this are a little brittle because adding more + logging can break them. + + TODO(dstanek): remove the need for this in a future refactoring + + """ + log_fn = self.mock_log.debug + self.assertEqual(len(data), log_fn.call_count) + for datum in data: + log_fn.assert_any_call(mock.ANY, datum) + + def test_a_function_callback(self): + def callback(*args, **kwargs): + pass + + resource_type = 'thing' + with mock.patch('keystone.notifications.LOG', self.mock_log): + notifications.register_event_callback( + CREATED_OPERATION, resource_type, callback) + + callback = 'keystone.tests.unit.common.test_notifications.callback' + expected_log_data = { + 'callback': callback, + 'event': 'identity.%s.created' % resource_type + } + self.verify_log_message([expected_log_data]) + + def test_a_method_callback(self): + class C(object): + def callback(self, *args, **kwargs): + pass + + with mock.patch('keystone.notifications.LOG', self.mock_log): + notifications.register_event_callback( + CREATED_OPERATION, 'thing', C.callback) + + callback = 'keystone.tests.unit.common.test_notifications.C.callback' + expected_log_data = { + 'callback': callback, + 'event': 'identity.thing.created' + } + self.verify_log_message([expected_log_data]) + + def test_a_list_of_callbacks(self): + def callback(*args, **kwargs): + pass + + class C(object): + def callback(self, *args, **kwargs): + pass + + with mock.patch('keystone.notifications.LOG', self.mock_log): + notifications.register_event_callback( + CREATED_OPERATION, 'thing', [callback, C.callback]) + + callback_1 = 'keystone.tests.unit.common.test_notifications.callback' + callback_2 = 'keystone.tests.unit.common.test_notifications.C.callback' + expected_log_data = [ + { + 'callback': callback_1, + 'event': 'identity.thing.created' + }, + { + 'callback': callback_2, + 'event': 'identity.thing.created' + }, + ] + self.verify_log_message(expected_log_data) + + def test_an_invalid_callback(self): + self.assertRaises(TypeError, + notifications.register_event_callback, + (CREATED_OPERATION, 'thing', object())) + + def test_an_invalid_event(self): + def callback(*args, **kwargs): + pass + + self.assertRaises(ValueError, + notifications.register_event_callback, + uuid.uuid4().hex, + 'thing', + callback) diff --git a/keystone-moon/keystone/tests/unit/common/test_pemutils.py b/keystone-moon/keystone/tests/unit/common/test_pemutils.py new file mode 100644 index 00000000..c2f58518 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_pemutils.py @@ -0,0 +1,337 @@ +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 + +from six import moves + +from keystone.common import pemutils +from keystone.tests import unit as tests + + +# List of 2-tuples, (pem_type, pem_header) +headers = pemutils.PEM_TYPE_TO_HEADER.items() + + +def make_data(size, offset=0): + return ''.join([chr(x % 255) for x in moves.range(offset, size + offset)]) + + +def make_base64_from_data(data): + return base64.b64encode(data) + + +def wrap_base64(base64_text): + wrapped_text = '\n'.join([base64_text[x:x + 64] + for x in moves.range(0, len(base64_text), 64)]) + wrapped_text += '\n' + return wrapped_text + + +def make_pem(header, data): + base64_text = make_base64_from_data(data) + wrapped_text = wrap_base64(base64_text) + + result = '-----BEGIN %s-----\n' % header + result += wrapped_text + result += '-----END %s-----\n' % header + + return result + + +class PEM(object): + """PEM text and it's associated data broken out, used for testing. + + """ + def __init__(self, pem_header='CERTIFICATE', pem_type='cert', + data_size=70, data_offset=0): + self.pem_header = pem_header + self.pem_type = pem_type + self.data_size = data_size + self.data_offset = data_offset + self.data = make_data(self.data_size, self.data_offset) + self.base64_text = make_base64_from_data(self.data) + self.wrapped_base64 = wrap_base64(self.base64_text) + self.pem_text = make_pem(self.pem_header, self.data) + + +class TestPEMParseResult(tests.BaseTestCase): + + def test_pem_types(self): + for pem_type in pemutils.pem_types: + pem_header = pemutils.PEM_TYPE_TO_HEADER[pem_type] + r = pemutils.PEMParseResult(pem_type=pem_type) + self.assertEqual(pem_type, r.pem_type) + self.assertEqual(pem_header, r.pem_header) + + pem_type = 'xxx' + self.assertRaises(ValueError, + pemutils.PEMParseResult, pem_type=pem_type) + + def test_pem_headers(self): + for pem_header in pemutils.pem_headers: + pem_type = pemutils.PEM_HEADER_TO_TYPE[pem_header] + r = pemutils.PEMParseResult(pem_header=pem_header) + self.assertEqual(pem_type, r.pem_type) + self.assertEqual(pem_header, r.pem_header) + + pem_header = 'xxx' + self.assertRaises(ValueError, + pemutils.PEMParseResult, pem_header=pem_header) + + +class TestPEMParse(tests.BaseTestCase): + def test_parse_none(self): + text = '' + text += 'bla bla\n' + text += 'yada yada yada\n' + text += 'burfl blatz bingo\n' + + parse_results = pemutils.parse_pem(text) + self.assertEqual(0, len(parse_results)) + + self.assertEqual(False, pemutils.is_pem(text)) + + def test_parse_invalid(self): + p = PEM(pem_type='xxx', + pem_header='XXX') + text = p.pem_text + + self.assertRaises(ValueError, + pemutils.parse_pem, text) + + def test_parse_one(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in moves.range(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in moves.range(count): + p = pems[i] + text = p.pem_text + + parse_results = pemutils.parse_pem(text) + self.assertEqual(1, len(parse_results)) + + r = parse_results[0] + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start:r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_parse_one_embedded(self): + p = PEM(data_offset=0) + text = '' + text += 'bla bla\n' + text += 'yada yada yada\n' + text += p.pem_text + text += 'burfl blatz bingo\n' + + parse_results = pemutils.parse_pem(text) + self.assertEqual(1, len(parse_results)) + + r = parse_results[0] + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start: r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_parse_multple(self): + data_size = 70 + count = len(headers) + pems = [] + text = '' + + for i in moves.range(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in moves.range(count): + text += pems[i].pem_text + + parse_results = pemutils.parse_pem(text) + self.assertEqual(count, len(parse_results)) + + for i in moves.range(count): + r = parse_results[i] + p = pems[i] + + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start: r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_parse_multple_find_specific(self): + data_size = 70 + count = len(headers) + pems = [] + text = '' + + for i in moves.range(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in moves.range(count): + text += pems[i].pem_text + + for i in moves.range(count): + parse_results = pemutils.parse_pem(text, pem_type=headers[i][0]) + self.assertEqual(1, len(parse_results)) + + r = parse_results[0] + p = pems[i] + + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start:r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_parse_multple_embedded(self): + data_size = 75 + count = len(headers) + pems = [] + text = '' + + for i in moves.range(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in moves.range(count): + text += 'bla bla\n' + text += 'yada yada yada\n' + text += pems[i].pem_text + text += 'burfl blatz bingo\n' + + parse_results = pemutils.parse_pem(text) + self.assertEqual(count, len(parse_results)) + + for i in moves.range(count): + r = parse_results[i] + p = pems[i] + + self.assertEqual(p.pem_type, r.pem_type) + self.assertEqual(p.pem_header, r.pem_header) + self.assertEqual(p.pem_text, + text[r.pem_start:r.pem_end]) + self.assertEqual(p.wrapped_base64, + text[r.base64_start:r.base64_end]) + self.assertEqual(p.data, r.binary_data) + + def test_get_pem_data_none(self): + text = '' + text += 'bla bla\n' + text += 'yada yada yada\n' + text += 'burfl blatz bingo\n' + + data = pemutils.get_pem_data(text) + self.assertIsNone(data) + + def test_get_pem_data_invalid(self): + p = PEM(pem_type='xxx', + pem_header='XXX') + text = p.pem_text + + self.assertRaises(ValueError, + pemutils.get_pem_data, text) + + def test_get_pem_data(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in moves.range(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in moves.range(count): + p = pems[i] + text = p.pem_text + + data = pemutils.get_pem_data(text, p.pem_type) + self.assertEqual(p.data, data) + + def test_is_pem(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in moves.range(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in moves.range(count): + p = pems[i] + text = p.pem_text + self.assertTrue(pemutils.is_pem(text, pem_type=p.pem_type)) + self.assertFalse(pemutils.is_pem(text, + pem_type=p.pem_type + 'xxx')) + + def test_base64_to_pem(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in moves.range(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in moves.range(count): + p = pems[i] + pem = pemutils.base64_to_pem(p.base64_text, p.pem_type) + self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data) + + def test_binary_to_pem(self): + data_size = 70 + count = len(headers) + pems = [] + + for i in moves.range(count): + pems.append(PEM(pem_type=headers[i][0], + pem_header=headers[i][1], + data_size=data_size + i, + data_offset=i)) + + for i in moves.range(count): + p = pems[i] + pem = pemutils.binary_to_pem(p.data, p.pem_type) + self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data) diff --git a/keystone-moon/keystone/tests/unit/common/test_sql_core.py b/keystone-moon/keystone/tests/unit/common/test_sql_core.py new file mode 100644 index 00000000..1f33cfc3 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_sql_core.py @@ -0,0 +1,52 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from sqlalchemy.ext import declarative + +from keystone.common import sql +from keystone.tests import unit as tests +from keystone.tests.unit import utils + + +ModelBase = declarative.declarative_base() + + +class TestModel(ModelBase, sql.ModelDictMixin): + __tablename__ = 'testmodel' + id = sql.Column(sql.String(64), primary_key=True) + text = sql.Column(sql.String(64), nullable=False) + + +class TestModelDictMixin(tests.BaseTestCase): + + def test_creating_a_model_instance_from_a_dict(self): + d = {'id': utils.new_uuid(), 'text': utils.new_uuid()} + m = TestModel.from_dict(d) + self.assertEqual(m.id, d['id']) + self.assertEqual(m.text, d['text']) + + def test_creating_a_dict_from_a_model_instance(self): + m = TestModel(id=utils.new_uuid(), text=utils.new_uuid()) + d = m.to_dict() + self.assertEqual(m.id, d['id']) + self.assertEqual(m.text, d['text']) + + def test_creating_a_model_instance_from_an_invalid_dict(self): + d = {'id': utils.new_uuid(), 'text': utils.new_uuid(), 'extra': None} + self.assertRaises(TypeError, TestModel.from_dict, d) + + def test_creating_a_dict_from_a_model_instance_that_has_extra_attrs(self): + expected = {'id': utils.new_uuid(), 'text': utils.new_uuid()} + m = TestModel(id=expected['id'], text=expected['text']) + m.extra = 'this should not be in the dictionary' + self.assertEqual(m.to_dict(), expected) diff --git a/keystone-moon/keystone/tests/unit/common/test_utils.py b/keystone-moon/keystone/tests/unit/common/test_utils.py new file mode 100644 index 00000000..184c8141 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_utils.py @@ -0,0 +1,164 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import uuid + +from oslo_config import cfg +from oslo_config import fixture as config_fixture +from oslo_serialization import jsonutils + +from keystone.common import utils as common_utils +from keystone import exception +from keystone import service +from keystone.tests import unit as tests +from keystone.tests.unit import utils + + +CONF = cfg.CONF + +TZ = utils.TZ + + +class UtilsTestCase(tests.BaseTestCase): + OPTIONAL = object() + + def setUp(self): + super(UtilsTestCase, self).setUp() + self.config_fixture = self.useFixture(config_fixture.Config(CONF)) + + def test_hash(self): + password = 'right' + wrong = 'wrongwrong' # Two wrongs don't make a right + hashed = common_utils.hash_password(password) + self.assertTrue(common_utils.check_password(password, hashed)) + self.assertFalse(common_utils.check_password(wrong, hashed)) + + def test_verify_normal_password_strict(self): + self.config_fixture.config(strict_password_check=False) + password = uuid.uuid4().hex + verified = common_utils.verify_length_and_trunc_password(password) + self.assertEqual(password, verified) + + def test_that_a_hash_can_not_be_validated_against_a_hash(self): + # NOTE(dstanek): Bug 1279849 reported a problem where passwords + # were not being hashed if they already looked like a hash. This + # would allow someone to hash their password ahead of time + # (potentially getting around password requirements, like + # length) and then they could auth with their original password. + password = uuid.uuid4().hex + hashed_password = common_utils.hash_password(password) + new_hashed_password = common_utils.hash_password(hashed_password) + self.assertFalse(common_utils.check_password(password, + new_hashed_password)) + + def test_verify_long_password_strict(self): + self.config_fixture.config(strict_password_check=False) + self.config_fixture.config(group='identity', max_password_length=5) + max_length = CONF.identity.max_password_length + invalid_password = 'passw0rd' + trunc = common_utils.verify_length_and_trunc_password(invalid_password) + self.assertEqual(invalid_password[:max_length], trunc) + + def test_verify_long_password_strict_raises_exception(self): + self.config_fixture.config(strict_password_check=True) + self.config_fixture.config(group='identity', max_password_length=5) + invalid_password = 'passw0rd' + self.assertRaises(exception.PasswordVerificationError, + common_utils.verify_length_and_trunc_password, + invalid_password) + + def test_hash_long_password_truncation(self): + self.config_fixture.config(strict_password_check=False) + invalid_length_password = '0' * 9999999 + hashed = common_utils.hash_password(invalid_length_password) + self.assertTrue(common_utils.check_password(invalid_length_password, + hashed)) + + def test_hash_long_password_strict(self): + self.config_fixture.config(strict_password_check=True) + invalid_length_password = '0' * 9999999 + self.assertRaises(exception.PasswordVerificationError, + common_utils.hash_password, + invalid_length_password) + + def _create_test_user(self, password=OPTIONAL): + user = {"name": "hthtest"} + if password is not self.OPTIONAL: + user['password'] = password + + return user + + def test_hash_user_password_without_password(self): + user = self._create_test_user() + hashed = common_utils.hash_user_password(user) + self.assertEqual(user, hashed) + + def test_hash_user_password_with_null_password(self): + user = self._create_test_user(password=None) + hashed = common_utils.hash_user_password(user) + self.assertEqual(user, hashed) + + def test_hash_user_password_with_empty_password(self): + password = '' + user = self._create_test_user(password=password) + user_hashed = common_utils.hash_user_password(user) + password_hashed = user_hashed['password'] + self.assertTrue(common_utils.check_password(password, password_hashed)) + + def test_hash_edge_cases(self): + hashed = common_utils.hash_password('secret') + self.assertFalse(common_utils.check_password('', hashed)) + self.assertFalse(common_utils.check_password(None, hashed)) + + def test_hash_unicode(self): + password = u'Comment \xe7a va' + wrong = 'Comment ?a va' + hashed = common_utils.hash_password(password) + self.assertTrue(common_utils.check_password(password, hashed)) + self.assertFalse(common_utils.check_password(wrong, hashed)) + + def test_auth_str_equal(self): + self.assertTrue(common_utils.auth_str_equal('abc123', 'abc123')) + self.assertFalse(common_utils.auth_str_equal('a', 'aaaaa')) + self.assertFalse(common_utils.auth_str_equal('aaaaa', 'a')) + self.assertFalse(common_utils.auth_str_equal('ABC123', 'abc123')) + + def test_unixtime(self): + global TZ + + @utils.timezone + def _test_unixtime(): + epoch = common_utils.unixtime(dt) + self.assertEqual(epoch, epoch_ans, "TZ=%s" % TZ) + + dt = datetime.datetime(1970, 1, 2, 3, 4, 56, 0) + epoch_ans = 56 + 4 * 60 + 3 * 3600 + 86400 + for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']: + TZ = 'UTC' + d + _test_unixtime() + + def test_pki_encoder(self): + data = {'field': 'value'} + json = jsonutils.dumps(data, cls=common_utils.PKIEncoder) + expected_json = b'{"field":"value"}' + self.assertEqual(expected_json, json) + + +class ServiceHelperTests(tests.BaseTestCase): + + @service.fail_gracefully + def _do_test(self): + raise Exception("Test Exc") + + def test_fail_gracefully(self): + self.assertRaises(tests.UnexpectedExit, self._do_test) |