aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/common
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/common')
-rw-r--r--keystone-moon/keystone/tests/unit/common/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_authorization.py161
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_base64utils.py208
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_connection_pool.py135
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_injection.py238
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_json_home.py91
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_ldap.py584
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_manager.py40
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_notifications.py1248
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_pemutils.py337
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_sql_core.py52
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_utils.py210
12 files changed, 0 insertions, 3304 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/__init__.py b/keystone-moon/keystone/tests/unit/common/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/keystone-moon/keystone/tests/unit/common/__init__.py
+++ /dev/null
diff --git a/keystone-moon/keystone/tests/unit/common/test_authorization.py b/keystone-moon/keystone/tests/unit/common/test_authorization.py
deleted file mode 100644
index 73ddbc61..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_authorization.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# Copyright 2015 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-import copy
-import uuid
-
-from keystone.common import authorization
-from keystone import exception
-from keystone.federation import constants as federation_constants
-from keystone.models import token_model
-from keystone.tests import unit
-from keystone.tests.unit import test_token_provider
-
-
-class TestTokenToAuthContext(unit.BaseTestCase):
- def test_token_is_project_scoped_with_trust(self):
- # Check auth_context result when the token is project-scoped and has
- # trust info.
-
- # SAMPLE_V3_TOKEN has OS-TRUST:trust in it.
- token_data = test_token_provider.SAMPLE_V3_TOKEN
- token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=token_data)
-
- auth_context = authorization.token_to_auth_context(token)
-
- self.assertEqual(token, auth_context['token'])
- self.assertTrue(auth_context['is_delegated_auth'])
- self.assertEqual(token_data['token']['user']['id'],
- auth_context['user_id'])
- self.assertEqual(token_data['token']['user']['domain']['id'],
- auth_context['user_domain_id'])
- self.assertEqual(token_data['token']['project']['id'],
- auth_context['project_id'])
- self.assertEqual(token_data['token']['project']['domain']['id'],
- auth_context['project_domain_id'])
- self.assertNotIn('domain_id', auth_context)
- self.assertNotIn('domain_name', auth_context)
- self.assertEqual(token_data['token']['OS-TRUST:trust']['id'],
- auth_context['trust_id'])
- self.assertEqual(
- token_data['token']['OS-TRUST:trust']['trustor_user_id'],
- auth_context['trustor_id'])
- self.assertEqual(
- token_data['token']['OS-TRUST:trust']['trustee_user_id'],
- auth_context['trustee_id'])
- self.assertItemsEqual(
- [r['name'] for r in token_data['token']['roles']],
- auth_context['roles'])
- self.assertIsNone(auth_context['consumer_id'])
- self.assertIsNone(auth_context['access_token_id'])
- self.assertNotIn('group_ids', auth_context)
-
- def test_token_is_domain_scoped(self):
- # Check contents of auth_context when token is domain-scoped.
- token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
- del token_data['token']['project']
-
- domain_id = uuid.uuid4().hex
- domain_name = uuid.uuid4().hex
- token_data['token']['domain'] = {'id': domain_id, 'name': domain_name}
-
- token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=token_data)
-
- auth_context = authorization.token_to_auth_context(token)
-
- self.assertNotIn('project_id', auth_context)
- self.assertNotIn('project_domain_id', auth_context)
-
- self.assertEqual(domain_id, auth_context['domain_id'])
- self.assertEqual(domain_name, auth_context['domain_name'])
-
- def test_token_is_unscoped(self):
- # Check contents of auth_context when the token is unscoped.
- token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
- del token_data['token']['project']
-
- token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=token_data)
-
- auth_context = authorization.token_to_auth_context(token)
-
- self.assertNotIn('project_id', auth_context)
- self.assertNotIn('project_domain_id', auth_context)
- self.assertNotIn('domain_id', auth_context)
- self.assertNotIn('domain_name', auth_context)
-
- def test_token_is_for_federated_user(self):
- # When the token is for a federated user then group_ids is in
- # auth_context.
- token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
-
- group_ids = [uuid.uuid4().hex for x in range(1, 5)]
-
- federation_data = {'identity_provider': {'id': uuid.uuid4().hex},
- 'protocol': {'id': 'saml2'},
- 'groups': [{'id': gid} for gid in group_ids]}
- token_data['token']['user'][federation_constants.FEDERATION] = (
- federation_data)
-
- token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=token_data)
-
- auth_context = authorization.token_to_auth_context(token)
-
- self.assertItemsEqual(group_ids, auth_context['group_ids'])
-
- def test_oauth_variables_set_for_oauth_token(self):
- token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
- access_token_id = uuid.uuid4().hex
- consumer_id = uuid.uuid4().hex
- token_data['token']['OS-OAUTH1'] = {'access_token_id': access_token_id,
- 'consumer_id': consumer_id}
- token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=token_data)
-
- auth_context = authorization.token_to_auth_context(token)
-
- self.assertEqual(access_token_id, auth_context['access_token_id'])
- self.assertEqual(consumer_id, auth_context['consumer_id'])
-
- def test_oauth_variables_not_set(self):
- token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
- token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=token_data)
-
- auth_context = authorization.token_to_auth_context(token)
-
- self.assertIsNone(auth_context['access_token_id'])
- self.assertIsNone(auth_context['consumer_id'])
-
- def test_token_is_not_KeystoneToken_raises_exception(self):
- # If the token isn't a KeystoneToken then an UnexpectedError exception
- # is raised.
- self.assertRaises(exception.UnexpectedError,
- authorization.token_to_auth_context, {})
-
- def test_user_id_missing_in_token_raises_exception(self):
- # If there's no user ID in the token then an Unauthorized
- # exception is raised.
- token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
- del token_data['token']['user']['id']
-
- token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=token_data)
-
- self.assertRaises(exception.Unauthorized,
- authorization.token_to_auth_context, token)
diff --git a/keystone-moon/keystone/tests/unit/common/test_base64utils.py b/keystone-moon/keystone/tests/unit/common/test_base64utils.py
deleted file mode 100644
index 355a2e03..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_base64utils.py
+++ /dev/null
@@ -1,208 +0,0 @@
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from keystone.common import base64utils
-from keystone.tests import unit
-
-base64_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
- 'abcdefghijklmnopqrstuvwxyz'
- '0123456789'
- '+/=') # includes pad char
-
-base64url_alphabet = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
- 'abcdefghijklmnopqrstuvwxyz'
- '0123456789'
- '-_=') # includes pad char
-
-
-class TestValid(unit.BaseTestCase):
- def test_valid_base64(self):
- self.assertTrue(base64utils.is_valid_base64('+/=='))
- self.assertTrue(base64utils.is_valid_base64('+/+='))
- self.assertTrue(base64utils.is_valid_base64('+/+/'))
-
- self.assertFalse(base64utils.is_valid_base64('-_=='))
- self.assertFalse(base64utils.is_valid_base64('-_-='))
- self.assertFalse(base64utils.is_valid_base64('-_-_'))
-
- self.assertTrue(base64utils.is_valid_base64('abcd'))
- self.assertFalse(base64utils.is_valid_base64('abcde'))
- self.assertFalse(base64utils.is_valid_base64('abcde=='))
- self.assertFalse(base64utils.is_valid_base64('abcdef'))
- self.assertTrue(base64utils.is_valid_base64('abcdef=='))
- self.assertFalse(base64utils.is_valid_base64('abcdefg'))
- self.assertTrue(base64utils.is_valid_base64('abcdefg='))
- self.assertTrue(base64utils.is_valid_base64('abcdefgh'))
-
- self.assertFalse(base64utils.is_valid_base64('-_=='))
-
- def test_valid_base64url(self):
- self.assertFalse(base64utils.is_valid_base64url('+/=='))
- self.assertFalse(base64utils.is_valid_base64url('+/+='))
- self.assertFalse(base64utils.is_valid_base64url('+/+/'))
-
- self.assertTrue(base64utils.is_valid_base64url('-_=='))
- self.assertTrue(base64utils.is_valid_base64url('-_-='))
- self.assertTrue(base64utils.is_valid_base64url('-_-_'))
-
- self.assertTrue(base64utils.is_valid_base64url('abcd'))
- self.assertFalse(base64utils.is_valid_base64url('abcde'))
- self.assertFalse(base64utils.is_valid_base64url('abcde=='))
- self.assertFalse(base64utils.is_valid_base64url('abcdef'))
- self.assertTrue(base64utils.is_valid_base64url('abcdef=='))
- self.assertFalse(base64utils.is_valid_base64url('abcdefg'))
- self.assertTrue(base64utils.is_valid_base64url('abcdefg='))
- self.assertTrue(base64utils.is_valid_base64url('abcdefgh'))
-
- self.assertTrue(base64utils.is_valid_base64url('-_=='))
-
-
-class TestBase64Padding(unit.BaseTestCase):
-
- def test_filter(self):
- self.assertEqual('', base64utils.filter_formatting(''))
- self.assertEqual('', base64utils.filter_formatting(' '))
- self.assertEqual('a', base64utils.filter_formatting('a'))
- self.assertEqual('a', base64utils.filter_formatting(' a'))
- self.assertEqual('a', base64utils.filter_formatting('a '))
- self.assertEqual('ab', base64utils.filter_formatting('ab'))
- self.assertEqual('ab', base64utils.filter_formatting(' ab'))
- self.assertEqual('ab', base64utils.filter_formatting('ab '))
- self.assertEqual('ab', base64utils.filter_formatting('a b'))
- self.assertEqual('ab', base64utils.filter_formatting(' a b'))
- self.assertEqual('ab', base64utils.filter_formatting('a b '))
- self.assertEqual('ab', base64utils.filter_formatting('a\nb\n '))
-
- text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
- 'abcdefghijklmnopqrstuvwxyz'
- '0123456789'
- '+/=')
- self.assertEqual(base64_alphabet,
- base64utils.filter_formatting(text))
-
- text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n'
- ' abcdefghijklmnopqrstuvwxyz\n'
- '\t\f\r'
- ' 0123456789\n'
- ' +/=')
- self.assertEqual(base64_alphabet,
- base64utils.filter_formatting(text))
- self.assertEqual(base64url_alphabet,
- base64utils.base64_to_base64url(base64_alphabet))
-
- text = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
- 'abcdefghijklmnopqrstuvwxyz'
- '0123456789'
- '-_=')
- self.assertEqual(base64url_alphabet,
- base64utils.filter_formatting(text))
-
- text = (' ABCDEFGHIJKLMNOPQRSTUVWXYZ\n'
- ' abcdefghijklmnopqrstuvwxyz\n'
- '\t\f\r'
- ' 0123456789\n'
- '-_=')
- self.assertEqual(base64url_alphabet,
- base64utils.filter_formatting(text))
-
- def test_alphabet_conversion(self):
- self.assertEqual(base64url_alphabet,
- base64utils.base64_to_base64url(base64_alphabet))
-
- self.assertEqual(base64_alphabet,
- base64utils.base64url_to_base64(base64url_alphabet))
-
- def test_is_padded(self):
- self.assertTrue(base64utils.base64_is_padded('ABCD'))
- self.assertTrue(base64utils.base64_is_padded('ABC='))
- self.assertTrue(base64utils.base64_is_padded('AB=='))
-
- self.assertTrue(base64utils.base64_is_padded('1234ABCD'))
- self.assertTrue(base64utils.base64_is_padded('1234ABC='))
- self.assertTrue(base64utils.base64_is_padded('1234AB=='))
-
- self.assertFalse(base64utils.base64_is_padded('ABC'))
- self.assertFalse(base64utils.base64_is_padded('AB'))
- self.assertFalse(base64utils.base64_is_padded('A'))
- self.assertFalse(base64utils.base64_is_padded(''))
-
- self.assertRaises(base64utils.InvalidBase64Error,
- base64utils.base64_is_padded, '=')
-
- self.assertRaises(base64utils.InvalidBase64Error,
- base64utils.base64_is_padded, 'AB=C')
-
- self.assertRaises(base64utils.InvalidBase64Error,
- base64utils.base64_is_padded, 'AB=')
-
- self.assertRaises(base64utils.InvalidBase64Error,
- base64utils.base64_is_padded, 'ABCD=')
-
- self.assertRaises(ValueError, base64utils.base64_is_padded,
- 'ABC', pad='==')
- self.assertRaises(base64utils.InvalidBase64Error,
- base64utils.base64_is_padded, 'A=BC')
-
- def test_strip_padding(self):
- self.assertEqual('ABCD', base64utils.base64_strip_padding('ABCD'))
- self.assertEqual('ABC', base64utils.base64_strip_padding('ABC='))
- self.assertEqual('AB', base64utils.base64_strip_padding('AB=='))
- self.assertRaises(ValueError, base64utils.base64_strip_padding,
- 'ABC=', pad='==')
- self.assertEqual('ABC', base64utils.base64_strip_padding('ABC'))
-
- def test_assure_padding(self):
- self.assertEqual('ABCD', base64utils.base64_assure_padding('ABCD'))
- self.assertEqual('ABC=', base64utils.base64_assure_padding('ABC'))
- self.assertEqual('ABC=', base64utils.base64_assure_padding('ABC='))
- self.assertEqual('AB==', base64utils.base64_assure_padding('AB'))
- self.assertEqual('AB==', base64utils.base64_assure_padding('AB=='))
- self.assertRaises(ValueError, base64utils.base64_assure_padding,
- 'ABC', pad='==')
-
- def test_base64_percent_encoding(self):
- self.assertEqual('ABCD', base64utils.base64url_percent_encode('ABCD'))
- self.assertEqual('ABC%3D',
- base64utils.base64url_percent_encode('ABC='))
- self.assertEqual('AB%3D%3D',
- base64utils.base64url_percent_encode('AB=='))
-
- self.assertEqual('ABCD', base64utils.base64url_percent_decode('ABCD'))
- self.assertEqual('ABC=',
- base64utils.base64url_percent_decode('ABC%3D'))
- self.assertEqual('AB==',
- base64utils.base64url_percent_decode('AB%3D%3D'))
- self.assertRaises(base64utils.InvalidBase64Error,
- base64utils.base64url_percent_encode, 'chars')
- self.assertRaises(base64utils.InvalidBase64Error,
- base64utils.base64url_percent_decode, 'AB%3D%3')
-
-
-class TestTextWrap(unit.BaseTestCase):
-
- def test_wrapping(self):
- raw_text = 'abcdefgh'
- wrapped_text = 'abc\ndef\ngh\n'
-
- self.assertEqual(wrapped_text,
- base64utils.base64_wrap(raw_text, width=3))
-
- t = '\n'.join(base64utils.base64_wrap_iter(raw_text, width=3)) + '\n'
- self.assertEqual(wrapped_text, t)
-
- raw_text = 'abcdefgh'
- wrapped_text = 'abcd\nefgh\n'
-
- self.assertEqual(wrapped_text,
- base64utils.base64_wrap(raw_text, width=4))
diff --git a/keystone-moon/keystone/tests/unit/common/test_connection_pool.py b/keystone-moon/keystone/tests/unit/common/test_connection_pool.py
deleted file mode 100644
index 3813e033..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_connection_pool.py
+++ /dev/null
@@ -1,135 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import threading
-import time
-
-import mock
-import six
-from six.moves import queue
-import testtools
-from testtools import matchers
-
-from keystone.common.cache import _memcache_pool
-from keystone import exception
-from keystone.tests.unit import core
-
-
-class _TestConnectionPool(_memcache_pool.ConnectionPool):
- destroyed_value = 'destroyed'
-
- def _create_connection(self):
- return mock.MagicMock()
-
- def _destroy_connection(self, conn):
- conn(self.destroyed_value)
-
-
-class TestConnectionPool(core.TestCase):
- def setUp(self):
- super(TestConnectionPool, self).setUp()
- self.unused_timeout = 10
- self.maxsize = 2
- self.connection_pool = _TestConnectionPool(
- maxsize=self.maxsize,
- unused_timeout=self.unused_timeout)
- self.addCleanup(self.cleanup_instance('connection_pool'))
-
- def test_get_context_manager(self):
- self.assertThat(self.connection_pool.queue, matchers.HasLength(0))
- with self.connection_pool.acquire() as conn:
- self.assertEqual(1, self.connection_pool._acquired)
- self.assertEqual(0, self.connection_pool._acquired)
- self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
- self.assertEqual(conn, self.connection_pool.queue[0].connection)
-
- def test_cleanup_pool(self):
- self.test_get_context_manager()
- newtime = time.time() + self.unused_timeout * 2
- non_expired_connection = _memcache_pool._PoolItem(
- ttl=(newtime * 2),
- connection=mock.MagicMock())
- self.connection_pool.queue.append(non_expired_connection)
- self.assertThat(self.connection_pool.queue, matchers.HasLength(2))
- with mock.patch.object(time, 'time', return_value=newtime):
- conn = self.connection_pool.queue[0].connection
- with self.connection_pool.acquire():
- pass
- conn.assert_has_calls(
- [mock.call(self.connection_pool.destroyed_value)])
- self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
- self.assertEqual(0, non_expired_connection.connection.call_count)
-
- def test_acquire_conn_exception_returns_acquired_count(self):
- class TestException(Exception):
- pass
-
- with mock.patch.object(_TestConnectionPool, '_create_connection',
- side_effect=TestException):
- with testtools.ExpectedException(TestException):
- with self.connection_pool.acquire():
- pass
- self.assertThat(self.connection_pool.queue,
- matchers.HasLength(0))
- self.assertEqual(0, self.connection_pool._acquired)
-
- def test_connection_pool_limits_maximum_connections(self):
- # NOTE(morganfainberg): To ensure we don't lockup tests until the
- # job limit, explicitly call .get_nowait() and .put_nowait() in this
- # case.
- conn1 = self.connection_pool.get_nowait()
- conn2 = self.connection_pool.get_nowait()
-
- # Use a nowait version to raise an Empty exception indicating we would
- # not get another connection until one is placed back into the queue.
- self.assertRaises(queue.Empty, self.connection_pool.get_nowait)
-
- # Place the connections back into the pool.
- self.connection_pool.put_nowait(conn1)
- self.connection_pool.put_nowait(conn2)
-
- # Make sure we can get a connection out of the pool again.
- self.connection_pool.get_nowait()
-
- def test_connection_pool_maximum_connection_get_timeout(self):
- connection_pool = _TestConnectionPool(
- maxsize=1,
- unused_timeout=self.unused_timeout,
- conn_get_timeout=0)
-
- def _acquire_connection():
- with connection_pool.acquire():
- pass
-
- # Make sure we've consumed the only available connection from the pool
- conn = connection_pool.get_nowait()
-
- self.assertRaises(exception.UnexpectedError, _acquire_connection)
-
- # Put the connection back and ensure we can acquire the connection
- # after it is available.
- connection_pool.put_nowait(conn)
- _acquire_connection()
-
-
-class TestMemcacheClientOverrides(core.BaseTestCase):
-
- def test_client_stripped_of_threading_local(self):
- """threading.local overrides are restored for _MemcacheClient"""
- client_class = _memcache_pool._MemcacheClient
- # get the genuine thread._local from MRO
- thread_local = client_class.__mro__[2]
- self.assertTrue(thread_local is threading.local)
- for field in six.iterkeys(thread_local.__dict__):
- if field not in ('__dict__', '__weakref__'):
- self.assertNotEqual(id(getattr(thread_local, field, None)),
- id(getattr(client_class, field, None)))
diff --git a/keystone-moon/keystone/tests/unit/common/test_injection.py b/keystone-moon/keystone/tests/unit/common/test_injection.py
deleted file mode 100644
index 9a5d1e7d..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_injection.py
+++ /dev/null
@@ -1,238 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-from keystone.common import dependency
-from keystone.tests import unit
-
-
-class TestDependencyInjection(unit.BaseTestCase):
- def setUp(self):
- super(TestDependencyInjection, self).setUp()
- dependency.reset()
- self.addCleanup(dependency.reset)
-
- def test_dependency_injection(self):
- class Interface(object):
- def do_work(self):
- assert False
-
- @dependency.provider('first_api')
- class FirstImplementation(Interface):
- def do_work(self):
- return True
-
- @dependency.provider('second_api')
- class SecondImplementation(Interface):
- def do_work(self):
- return True
-
- @dependency.requires('first_api', 'second_api')
- class Consumer(object):
- def do_work_with_dependencies(self):
- assert self.first_api.do_work()
- assert self.second_api.do_work()
-
- # initialize dependency providers
- first_api = FirstImplementation()
- second_api = SecondImplementation()
-
- # ... sometime later, initialize a dependency consumer
- consumer = Consumer()
-
- # the expected dependencies should be available to the consumer
- self.assertIs(consumer.first_api, first_api)
- self.assertIs(consumer.second_api, second_api)
- self.assertIsInstance(consumer.first_api, Interface)
- self.assertIsInstance(consumer.second_api, Interface)
- consumer.do_work_with_dependencies()
-
- def test_dependency_provider_configuration(self):
- @dependency.provider('api')
- class Configurable(object):
- def __init__(self, value=None):
- self.value = value
-
- def get_value(self):
- return self.value
-
- @dependency.requires('api')
- class Consumer(object):
- def get_value(self):
- return self.api.get_value()
-
- # initialize dependency providers
- api = Configurable(value=True)
-
- # ... sometime later, initialize a dependency consumer
- consumer = Consumer()
-
- # the expected dependencies should be available to the consumer
- self.assertIs(consumer.api, api)
- self.assertIsInstance(consumer.api, Configurable)
- self.assertTrue(consumer.get_value())
-
- def test_dependency_consumer_configuration(self):
- @dependency.provider('api')
- class Provider(object):
- def get_value(self):
- return True
-
- @dependency.requires('api')
- class Configurable(object):
- def __init__(self, value=None):
- self.value = value
-
- def get_value(self):
- if self.value:
- return self.api.get_value()
-
- # initialize dependency providers
- api = Provider()
-
- # ... sometime later, initialize a dependency consumer
- consumer = Configurable(value=True)
-
- # the expected dependencies should be available to the consumer
- self.assertIs(consumer.api, api)
- self.assertIsInstance(consumer.api, Provider)
- self.assertTrue(consumer.get_value())
-
- def test_inherited_dependency(self):
- class Interface(object):
- def do_work(self):
- assert False
-
- @dependency.provider('first_api')
- class FirstImplementation(Interface):
- def do_work(self):
- return True
-
- @dependency.provider('second_api')
- class SecondImplementation(Interface):
- def do_work(self):
- return True
-
- @dependency.requires('first_api')
- class ParentConsumer(object):
- def do_work_with_dependencies(self):
- assert self.first_api.do_work()
-
- @dependency.requires('second_api')
- class ChildConsumer(ParentConsumer):
- def do_work_with_dependencies(self):
- assert self.second_api.do_work()
- super(ChildConsumer, self).do_work_with_dependencies()
-
- # initialize dependency providers
- first_api = FirstImplementation()
- second_api = SecondImplementation()
-
- # ... sometime later, initialize a dependency consumer
- consumer = ChildConsumer()
-
- # dependencies should be naturally inherited
- self.assertEqual(
- set(['first_api']),
- ParentConsumer._dependencies)
- self.assertEqual(
- set(['first_api', 'second_api']),
- ChildConsumer._dependencies)
- self.assertEqual(
- set(['first_api', 'second_api']),
- consumer._dependencies)
-
- # the expected dependencies should be available to the consumer
- self.assertIs(consumer.first_api, first_api)
- self.assertIs(consumer.second_api, second_api)
- self.assertIsInstance(consumer.first_api, Interface)
- self.assertIsInstance(consumer.second_api, Interface)
- consumer.do_work_with_dependencies()
-
- def test_unresolvable_dependency(self):
- @dependency.requires(uuid.uuid4().hex)
- class Consumer(object):
- pass
-
- def for_test():
- Consumer()
- dependency.resolve_future_dependencies()
-
- self.assertRaises(dependency.UnresolvableDependencyException, for_test)
-
- def test_circular_dependency(self):
- p1_name = uuid.uuid4().hex
- p2_name = uuid.uuid4().hex
-
- @dependency.provider(p1_name)
- @dependency.requires(p2_name)
- class P1(object):
- pass
-
- @dependency.provider(p2_name)
- @dependency.requires(p1_name)
- class P2(object):
- pass
-
- p1 = P1()
- p2 = P2()
-
- dependency.resolve_future_dependencies()
-
- self.assertIs(getattr(p1, p2_name), p2)
- self.assertIs(getattr(p2, p1_name), p1)
-
- def test_reset(self):
- # Can reset the registry of providers.
-
- p_id = uuid.uuid4().hex
-
- @dependency.provider(p_id)
- class P(object):
- pass
-
- p_inst = P()
-
- self.assertIs(dependency.get_provider(p_id), p_inst)
-
- dependency.reset()
-
- self.assertFalse(dependency._REGISTRY)
-
- def test_get_provider(self):
- # Can get the instance of a provider using get_provider
-
- provider_name = uuid.uuid4().hex
-
- @dependency.provider(provider_name)
- class P(object):
- pass
-
- provider_instance = P()
- retrieved_provider_instance = dependency.get_provider(provider_name)
- self.assertIs(provider_instance, retrieved_provider_instance)
-
- def test_get_provider_not_provided_error(self):
- # If no provider and provider is required then fails.
-
- provider_name = uuid.uuid4().hex
- self.assertRaises(KeyError, dependency.get_provider, provider_name)
-
- def test_get_provider_not_provided_optional(self):
- # If no provider and provider is optional then returns None.
-
- provider_name = uuid.uuid4().hex
- self.assertIsNone(dependency.get_provider(provider_name,
- dependency.GET_OPTIONAL))
diff --git a/keystone-moon/keystone/tests/unit/common/test_json_home.py b/keystone-moon/keystone/tests/unit/common/test_json_home.py
deleted file mode 100644
index 94e2d138..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_json_home.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Copyright 2014 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-import copy
-
-from testtools import matchers
-
-from keystone.common import json_home
-from keystone.tests import unit
-
-
-class JsonHomeTest(unit.BaseTestCase):
- def test_build_v3_resource_relation(self):
- resource_name = self.getUniqueString()
- relation = json_home.build_v3_resource_relation(resource_name)
- exp_relation = (
- 'http://docs.openstack.org/api/openstack-identity/3/rel/%s' %
- resource_name)
- self.assertThat(relation, matchers.Equals(exp_relation))
-
- def test_build_v3_extension_resource_relation(self):
- extension_name = self.getUniqueString()
- extension_version = self.getUniqueString()
- resource_name = self.getUniqueString()
- relation = json_home.build_v3_extension_resource_relation(
- extension_name, extension_version, resource_name)
- exp_relation = (
- 'http://docs.openstack.org/api/openstack-identity/3/ext/%s/%s/rel/'
- '%s' % (extension_name, extension_version, resource_name))
- self.assertThat(relation, matchers.Equals(exp_relation))
-
- def test_build_v3_parameter_relation(self):
- parameter_name = self.getUniqueString()
- relation = json_home.build_v3_parameter_relation(parameter_name)
- exp_relation = (
- 'http://docs.openstack.org/api/openstack-identity/3/param/%s' %
- parameter_name)
- self.assertThat(relation, matchers.Equals(exp_relation))
-
- def test_build_v3_extension_parameter_relation(self):
- extension_name = self.getUniqueString()
- extension_version = self.getUniqueString()
- parameter_name = self.getUniqueString()
- relation = json_home.build_v3_extension_parameter_relation(
- extension_name, extension_version, parameter_name)
- exp_relation = (
- 'http://docs.openstack.org/api/openstack-identity/3/ext/%s/%s/'
- 'param/%s' % (extension_name, extension_version, parameter_name))
- self.assertThat(relation, matchers.Equals(exp_relation))
-
- def test_translate_urls(self):
- href_rel = self.getUniqueString()
- href = self.getUniqueString()
- href_template_rel = self.getUniqueString()
- href_template = self.getUniqueString()
- href_vars = {self.getUniqueString(): self.getUniqueString()}
- original_json_home = {
- 'resources': {
- href_rel: {'href': href},
- href_template_rel: {
- 'href-template': href_template,
- 'href-vars': href_vars}
- }
- }
-
- new_json_home = copy.deepcopy(original_json_home)
- new_prefix = self.getUniqueString()
- json_home.translate_urls(new_json_home, new_prefix)
-
- exp_json_home = {
- 'resources': {
- href_rel: {'href': new_prefix + href},
- href_template_rel: {
- 'href-template': new_prefix + href_template,
- 'href-vars': href_vars}
- }
- }
-
- self.assertThat(new_json_home, matchers.Equals(exp_json_home))
diff --git a/keystone-moon/keystone/tests/unit/common/test_ldap.py b/keystone-moon/keystone/tests/unit/common/test_ldap.py
deleted file mode 100644
index eed77286..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_ldap.py
+++ /dev/null
@@ -1,584 +0,0 @@
-# -*- coding: utf-8 -*-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-import tempfile
-import uuid
-
-import fixtures
-import ldap.dn
-import mock
-from oslo_config import cfg
-from testtools import matchers
-
-from keystone.common import driver_hints
-from keystone.common import ldap as ks_ldap
-from keystone.common.ldap import core as common_ldap_core
-from keystone.tests import unit
-from keystone.tests.unit import default_fixtures
-from keystone.tests.unit import fakeldap
-from keystone.tests.unit.ksfixtures import database
-
-
-CONF = cfg.CONF
-
-
-class DnCompareTest(unit.BaseTestCase):
- """Tests for the DN comparison functions in keystone.common.ldap.core."""
-
- def test_prep(self):
- # prep_case_insensitive returns the string with spaces at the front and
- # end if it's already lowercase and no insignificant characters.
- value = 'lowercase value'
- self.assertEqual(value, ks_ldap.prep_case_insensitive(value))
-
- def test_prep_lowercase(self):
- # prep_case_insensitive returns the string with spaces at the front and
- # end and lowercases the value.
- value = 'UPPERCASE VALUE'
- exp_value = value.lower()
- self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
-
- def test_prep_insignificant(self):
- # prep_case_insensitive remove insignificant spaces.
- value = 'before after'
- exp_value = 'before after'
- self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
-
- def test_prep_insignificant_pre_post(self):
- # prep_case_insensitive remove insignificant spaces.
- value = ' value '
- exp_value = 'value'
- self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
-
- def test_ava_equal_same(self):
- # is_ava_value_equal returns True if the two values are the same.
- value = 'val1'
- self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value))
-
- def test_ava_equal_complex(self):
- # is_ava_value_equal returns True if the two values are the same using
- # a value that's got different capitalization and insignificant chars.
- val1 = 'before after'
- val2 = ' BEFORE afTer '
- self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2))
-
- def test_ava_different(self):
- # is_ava_value_equal returns False if the values aren't the same.
- self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
-
- def test_rdn_same(self):
- # is_rdn_equal returns True if the two values are the same.
- rdn = ldap.dn.str2dn('cn=val1')[0]
- self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn))
-
- def test_rdn_diff_length(self):
- # is_rdn_equal returns False if the RDNs have a different number of
- # AVAs.
- rdn1 = ldap.dn.str2dn('cn=cn1')[0]
- rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
- self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
-
- def test_rdn_multi_ava_same_order(self):
- # is_rdn_equal returns True if the RDNs have the same number of AVAs
- # and the values are the same.
- rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
- rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0]
- self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
-
- def test_rdn_multi_ava_diff_order(self):
- # is_rdn_equal returns True if the RDNs have the same number of AVAs
- # and the values are the same, even if in a different order
- rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
- rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0]
- self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
-
- def test_rdn_multi_ava_diff_type(self):
- # is_rdn_equal returns False if the RDNs have the same number of AVAs
- # and the attribute types are different.
- rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
- rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0]
- self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
-
- def test_rdn_attr_type_case_diff(self):
- # is_rdn_equal returns True for same RDNs even when attr type case is
- # different.
- rdn1 = ldap.dn.str2dn('cn=cn1')[0]
- rdn2 = ldap.dn.str2dn('CN=cn1')[0]
- self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
-
- def test_rdn_attr_type_alias(self):
- # is_rdn_equal returns False for same RDNs even when attr type alias is
- # used. Note that this is a limitation since an LDAP server should
- # consider them equal.
- rdn1 = ldap.dn.str2dn('cn=cn1')[0]
- rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0]
- self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
-
- def test_dn_same(self):
- # is_dn_equal returns True if the DNs are the same.
- dn = 'cn=Babs Jansen,ou=OpenStack'
- self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
-
- def test_dn_equal_unicode(self):
- # is_dn_equal can accept unicode
- dn = u'cn=fäké,ou=OpenStack'
- self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
-
- def test_dn_diff_length(self):
- # is_dn_equal returns False if the DNs don't have the same number of
- # RDNs
- dn1 = 'cn=Babs Jansen,ou=OpenStack'
- dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com'
- self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2))
-
- def test_dn_equal_rdns(self):
- # is_dn_equal returns True if the DNs have the same number of RDNs
- # and each RDN is the same.
- dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource'
- dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack'
- self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2))
-
- def test_dn_parsed_dns(self):
- # is_dn_equal can also accept parsed DNs.
- dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource')
- dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack')
- self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2))
-
- def test_startswith_under_child(self):
- # dn_startswith returns True if descendant_dn is a child of dn.
- child = 'cn=Babs Jansen,ou=OpenStack'
- parent = 'ou=OpenStack'
- self.assertTrue(ks_ldap.dn_startswith(child, parent))
-
- def test_startswith_parent(self):
- # dn_startswith returns False if descendant_dn is a parent of dn.
- child = 'cn=Babs Jansen,ou=OpenStack'
- parent = 'ou=OpenStack'
- self.assertFalse(ks_ldap.dn_startswith(parent, child))
-
- def test_startswith_same(self):
- # dn_startswith returns False if DNs are the same.
- dn = 'cn=Babs Jansen,ou=OpenStack'
- self.assertFalse(ks_ldap.dn_startswith(dn, dn))
-
- def test_startswith_not_parent(self):
- # dn_startswith returns False if descendant_dn is not under the dn
- child = 'cn=Babs Jansen,ou=OpenStack'
- parent = 'dc=example.com'
- self.assertFalse(ks_ldap.dn_startswith(child, parent))
-
- def test_startswith_descendant(self):
- # dn_startswith returns True if descendant_dn is a descendant of dn.
- descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com'
- dn = 'ou=OpenStack,dc=example.com'
- self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
-
- descendant = 'uid=12345,ou=Users,dc=example,dc=com'
- dn = 'ou=Users,dc=example,dc=com'
- self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
-
- def test_startswith_parsed_dns(self):
- # dn_startswith also accepts parsed DNs.
- descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack')
- dn = ldap.dn.str2dn('ou=OpenStack')
- self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
-
- def test_startswith_unicode(self):
- # dn_startswith accepts unicode.
- child = u'cn=fäké,ou=OpenStäck'
- parent = u'ou=OpenStäck'
- self.assertTrue(ks_ldap.dn_startswith(child, parent))
-
-
-class LDAPDeleteTreeTest(unit.TestCase):
-
- def setUp(self):
- super(LDAPDeleteTreeTest, self).setUp()
-
- ks_ldap.register_handler('fake://',
- fakeldap.FakeLdapNoSubtreeDelete)
- self.useFixture(database.Database(self.sql_driver_version_overrides))
-
- self.load_backends()
- self.load_fixtures(default_fixtures)
-
- self.addCleanup(self.clear_database)
- self.addCleanup(common_ldap_core._HANDLERS.clear)
-
- def clear_database(self):
- for shelf in fakeldap.FakeShelves:
- fakeldap.FakeShelves[shelf].clear()
-
- def config_overrides(self):
- super(LDAPDeleteTreeTest, self).config_overrides()
- self.config_fixture.config(group='identity', driver='ldap')
-
- def config_files(self):
- config_files = super(LDAPDeleteTreeTest, self).config_files()
- config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
- return config_files
-
- def test_delete_tree(self):
- """Test manually deleting a tree.
-
- Few LDAP servers support CONTROL_DELETETREE. This test
- exercises the alternate code paths in BaseLdap.delete_tree.
-
- """
- conn = self.identity_api.user.get_connection()
- id_attr = self.identity_api.user.id_attr
- objclass = self.identity_api.user.object_class.lower()
- tree_dn = self.identity_api.user.tree_dn
-
- def create_entry(name, parent_dn=None):
- if not parent_dn:
- parent_dn = tree_dn
- dn = '%s=%s,%s' % (id_attr, name, parent_dn)
- attrs = [('objectclass', [objclass, 'ldapsubentry']),
- (id_attr, [name])]
- conn.add_s(dn, attrs)
- return dn
-
- # create 3 entries like this:
- # cn=base
- # cn=child,cn=base
- # cn=grandchild,cn=child,cn=base
- # then attempt to delete_tree(cn=base)
- base_id = 'base'
- base_dn = create_entry(base_id)
- child_dn = create_entry('child', base_dn)
- grandchild_dn = create_entry('grandchild', child_dn)
-
- # verify that the three entries were created
- scope = ldap.SCOPE_SUBTREE
- filt = '(|(objectclass=*)(objectclass=ldapsubentry))'
- entries = conn.search_s(base_dn, scope, filt,
- attrlist=common_ldap_core.DN_ONLY)
- self.assertThat(entries, matchers.HasLength(3))
- sort_ents = sorted([e[0] for e in entries], key=len, reverse=True)
- self.assertEqual([grandchild_dn, child_dn, base_dn], sort_ents)
-
- # verify that a non-leaf node can't be deleted directly by the
- # LDAP server
- self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF,
- conn.delete_s, base_dn)
- self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF,
- conn.delete_s, child_dn)
-
- # call our delete_tree implementation
- self.identity_api.user.delete_tree(base_id)
- self.assertRaises(ldap.NO_SUCH_OBJECT,
- conn.search_s, base_dn, ldap.SCOPE_BASE)
- self.assertRaises(ldap.NO_SUCH_OBJECT,
- conn.search_s, child_dn, ldap.SCOPE_BASE)
- self.assertRaises(ldap.NO_SUCH_OBJECT,
- conn.search_s, grandchild_dn, ldap.SCOPE_BASE)
-
-
-class MultiURLTests(unit.TestCase):
- """Tests for setting multiple LDAP URLs."""
-
- def test_multiple_urls_with_comma_no_conn_pool(self):
- urls = 'ldap://localhost,ldap://backup.localhost'
- self.config_fixture.config(group='ldap', url=urls, use_pool=False)
- base_ldap = ks_ldap.BaseLdap(CONF)
- ldap_connection = base_ldap.get_connection()
- self.assertEqual(urls, ldap_connection.conn.conn._uri)
-
- def test_multiple_urls_with_comma_with_conn_pool(self):
- urls = 'ldap://localhost,ldap://backup.localhost'
- self.config_fixture.config(group='ldap', url=urls, use_pool=True)
- base_ldap = ks_ldap.BaseLdap(CONF)
- ldap_connection = base_ldap.get_connection()
- self.assertEqual(urls, ldap_connection.conn.conn_pool.uri)
-
-
-class SslTlsTest(unit.TestCase):
- """Tests for the SSL/TLS functionality in keystone.common.ldap.core."""
-
- @mock.patch.object(ks_ldap.core.KeystoneLDAPHandler, 'simple_bind_s')
- @mock.patch.object(ldap.ldapobject.LDAPObject, 'start_tls_s')
- def _init_ldap_connection(self, config, mock_ldap_one, mock_ldap_two):
- # Attempt to connect to initialize python-ldap.
- base_ldap = ks_ldap.BaseLdap(config)
- base_ldap.get_connection()
-
- def test_certfile_trust_tls(self):
- # We need this to actually exist, so we create a tempfile.
- (handle, certfile) = tempfile.mkstemp()
- self.addCleanup(os.unlink, certfile)
- self.addCleanup(os.close, handle)
- self.config_fixture.config(group='ldap',
- url='ldap://localhost',
- use_tls=True,
- tls_cacertfile=certfile)
-
- self._init_ldap_connection(CONF)
-
- # Ensure the cert trust option is set.
- self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE))
-
- def test_certdir_trust_tls(self):
- # We need this to actually exist, so we create a tempdir.
- certdir = self.useFixture(fixtures.TempDir()).path
- self.config_fixture.config(group='ldap',
- url='ldap://localhost',
- use_tls=True,
- tls_cacertdir=certdir)
-
- self._init_ldap_connection(CONF)
-
- # Ensure the cert trust option is set.
- self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR))
-
- def test_certfile_trust_ldaps(self):
- # We need this to actually exist, so we create a tempfile.
- (handle, certfile) = tempfile.mkstemp()
- self.addCleanup(os.unlink, certfile)
- self.addCleanup(os.close, handle)
- self.config_fixture.config(group='ldap',
- url='ldaps://localhost',
- use_tls=False,
- tls_cacertfile=certfile)
-
- self._init_ldap_connection(CONF)
-
- # Ensure the cert trust option is set.
- self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE))
-
- def test_certdir_trust_ldaps(self):
- # We need this to actually exist, so we create a tempdir.
- certdir = self.useFixture(fixtures.TempDir()).path
- self.config_fixture.config(group='ldap',
- url='ldaps://localhost',
- use_tls=False,
- tls_cacertdir=certdir)
-
- self._init_ldap_connection(CONF)
-
- # Ensure the cert trust option is set.
- self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR))
-
-
-class LDAPPagedResultsTest(unit.TestCase):
- """Tests the paged results functionality in keystone.common.ldap.core."""
-
- def setUp(self):
- super(LDAPPagedResultsTest, self).setUp()
- self.clear_database()
-
- ks_ldap.register_handler('fake://', fakeldap.FakeLdap)
- self.addCleanup(common_ldap_core._HANDLERS.clear)
- self.useFixture(database.Database(self.sql_driver_version_overrides))
-
- self.load_backends()
- self.load_fixtures(default_fixtures)
-
- def clear_database(self):
- for shelf in fakeldap.FakeShelves:
- fakeldap.FakeShelves[shelf].clear()
-
- def config_overrides(self):
- super(LDAPPagedResultsTest, self).config_overrides()
- self.config_fixture.config(group='identity', driver='ldap')
-
- def config_files(self):
- config_files = super(LDAPPagedResultsTest, self).config_files()
- config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
- return config_files
-
- @mock.patch.object(fakeldap.FakeLdap, 'search_ext')
- @mock.patch.object(fakeldap.FakeLdap, 'result3')
- def test_paged_results_control_api(self, mock_result3, mock_search_ext):
- mock_result3.return_value = ('', [], 1, [])
-
- self.config_fixture.config(group='ldap',
- page_size=1)
-
- conn = self.identity_api.user.get_connection()
- conn._paged_search_s('dc=example,dc=test',
- ldap.SCOPE_SUBTREE,
- 'objectclass=*')
-
-
-class CommonLdapTestCase(unit.BaseTestCase):
- """These test cases call functions in keystone.common.ldap."""
-
- def test_binary_attribute_values(self):
- result = [(
- 'cn=junk,dc=example,dc=com',
- {
- 'cn': ['junk'],
- 'sn': [uuid.uuid4().hex],
- 'mail': [uuid.uuid4().hex],
- 'binary_attr': ['\x00\xFF\x00\xFF']
- }
- ), ]
- py_result = ks_ldap.convert_ldap_result(result)
- # The attribute containing the binary value should
- # not be present in the converted result.
- self.assertNotIn('binary_attr', py_result[0][1])
-
- def test_utf8_conversion(self):
- value_unicode = u'fäké1'
- value_utf8 = value_unicode.encode('utf-8')
-
- result_utf8 = ks_ldap.utf8_encode(value_unicode)
- self.assertEqual(value_utf8, result_utf8)
-
- result_utf8 = ks_ldap.utf8_encode(value_utf8)
- self.assertEqual(value_utf8, result_utf8)
-
- result_unicode = ks_ldap.utf8_decode(value_utf8)
- self.assertEqual(value_unicode, result_unicode)
-
- result_unicode = ks_ldap.utf8_decode(value_unicode)
- self.assertEqual(value_unicode, result_unicode)
-
- self.assertRaises(TypeError,
- ks_ldap.utf8_encode,
- 100)
-
- result_unicode = ks_ldap.utf8_decode(100)
- self.assertEqual(u'100', result_unicode)
-
- def test_user_id_begins_with_0(self):
- user_id = '0123456'
- result = [(
- 'cn=dummy,dc=example,dc=com',
- {
- 'user_id': [user_id],
- 'enabled': ['TRUE']
- }
- ), ]
- py_result = ks_ldap.convert_ldap_result(result)
- # The user id should be 0123456, and the enabled
- # flag should be True
- self.assertIs(py_result[0][1]['enabled'][0], True)
- self.assertEqual(user_id, py_result[0][1]['user_id'][0])
-
- def test_user_id_begins_with_0_and_enabled_bit_mask(self):
- user_id = '0123456'
- bitmask = '225'
- expected_bitmask = 225
- result = [(
- 'cn=dummy,dc=example,dc=com',
- {
- 'user_id': [user_id],
- 'enabled': [bitmask]
- }
- ), ]
- py_result = ks_ldap.convert_ldap_result(result)
- # The user id should be 0123456, and the enabled
- # flag should be 225
- self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
- self.assertEqual(user_id, py_result[0][1]['user_id'][0])
-
- def test_user_id_and_bitmask_begins_with_0(self):
- user_id = '0123456'
- bitmask = '0225'
- expected_bitmask = 225
- result = [(
- 'cn=dummy,dc=example,dc=com',
- {
- 'user_id': [user_id],
- 'enabled': [bitmask]
- }
- ), ]
- py_result = ks_ldap.convert_ldap_result(result)
- # The user id should be 0123456, and the enabled
- # flag should be 225, the 0 is dropped.
- self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
- self.assertEqual(user_id, py_result[0][1]['user_id'][0])
-
- def test_user_id_and_user_name_with_boolean_string(self):
- boolean_strings = ['TRUE', 'FALSE', 'true', 'false', 'True', 'False',
- 'TrUe' 'FaLse']
- for user_name in boolean_strings:
- user_id = uuid.uuid4().hex
- result = [(
- 'cn=dummy,dc=example,dc=com',
- {
- 'user_id': [user_id],
- 'user_name': [user_name]
- }
- ), ]
- py_result = ks_ldap.convert_ldap_result(result)
- # The user name should still be a string value.
- self.assertEqual(user_name, py_result[0][1]['user_name'][0])
-
-
-class LDAPFilterQueryCompositionTest(unit.TestCase):
- """These test cases test LDAP filter generation."""
-
- def setUp(self):
- super(LDAPFilterQueryCompositionTest, self).setUp()
-
- self.base_ldap = ks_ldap.BaseLdap(self.config_fixture.conf)
-
- # The tests need an attribute mapping to use.
- self.attribute_name = uuid.uuid4().hex
- self.filter_attribute_name = uuid.uuid4().hex
- self.base_ldap.attribute_mapping = {
- self.attribute_name: self.filter_attribute_name
- }
-
- def test_return_query_with_no_hints(self):
- hints = driver_hints.Hints()
- # NOTE: doesn't have to be a real query, we just need to make sure the
- # same string is returned if there are no hints.
- query = uuid.uuid4().hex
- self.assertEqual(query,
- self.base_ldap.filter_query(hints=hints, query=query))
-
- # make sure the default query is an empty string
- self.assertEqual('', self.base_ldap.filter_query(hints=hints))
-
- def test_filter_with_empty_query_and_hints_set(self):
- hints = driver_hints.Hints()
- username = uuid.uuid4().hex
- hints.add_filter(name=self.attribute_name,
- value=username,
- comparator='equals',
- case_sensitive=False)
- expected_ldap_filter = '(&(%s=%s))' % (
- self.filter_attribute_name, username)
- self.assertEqual(expected_ldap_filter,
- self.base_ldap.filter_query(hints=hints))
-
- def test_filter_with_both_query_and_hints_set(self):
- hints = driver_hints.Hints()
- # NOTE: doesn't have to be a real query, we just need to make sure the
- # filter string is concatenated correctly
- query = uuid.uuid4().hex
- username = uuid.uuid4().hex
- expected_result = '(&%(query)s(%(user_name_attr)s=%(username)s))' % (
- {'query': query,
- 'user_name_attr': self.filter_attribute_name,
- 'username': username})
- hints.add_filter(self.attribute_name, username)
- self.assertEqual(expected_result,
- self.base_ldap.filter_query(hints=hints, query=query))
-
- def test_filter_with_hints_and_query_is_none(self):
- hints = driver_hints.Hints()
- username = uuid.uuid4().hex
- hints.add_filter(name=self.attribute_name,
- value=username,
- comparator='equals',
- case_sensitive=False)
- expected_ldap_filter = '(&(%s=%s))' % (
- self.filter_attribute_name, username)
- self.assertEqual(expected_ldap_filter,
- self.base_ldap.filter_query(hints=hints, query=None))
diff --git a/keystone-moon/keystone/tests/unit/common/test_manager.py b/keystone-moon/keystone/tests/unit/common/test_manager.py
deleted file mode 100644
index 7ef91e15..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_manager.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-
-from keystone import catalog
-from keystone.common import manager
-from keystone.tests import unit
-
-
-class TestCreateLegacyDriver(unit.BaseTestCase):
-
- @mock.patch('oslo_log.versionutils.report_deprecated_feature')
- def test_class_is_properly_deprecated(self, mock_reporter):
- Driver = manager.create_legacy_driver(catalog.CatalogDriverV8)
-
- # NOTE(dstanek): I want to subvert the requirement for this
- # class to implement all of the abstract methods.
- Driver.__abstractmethods__ = set()
- impl = Driver()
-
- details = {
- 'as_of': 'Liberty',
- 'what': 'keystone.catalog.core.Driver',
- 'in_favor_of': 'keystone.catalog.core.CatalogDriverV8',
- 'remove_in': mock.ANY,
- }
- mock_reporter.assert_called_with(mock.ANY, mock.ANY, details)
- self.assertEqual('N', mock_reporter.call_args[0][2]['remove_in'][0])
-
- self.assertIsInstance(impl, catalog.CatalogDriverV8)
diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py
deleted file mode 100644
index aa2e6f72..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_notifications.py
+++ /dev/null
@@ -1,1248 +0,0 @@
-# Copyright 2013 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import uuid
-
-import mock
-from oslo_config import cfg
-from oslo_config import fixture as config_fixture
-from oslotest import mockpatch
-from pycadf import cadftaxonomy
-from pycadf import cadftype
-from pycadf import eventfactory
-from pycadf import resource as cadfresource
-
-from keystone import notifications
-from keystone.tests import unit
-from keystone.tests.unit import test_v3
-
-
-CONF = cfg.CONF
-
-EXP_RESOURCE_TYPE = uuid.uuid4().hex
-CREATED_OPERATION = notifications.ACTIONS.created
-UPDATED_OPERATION = notifications.ACTIONS.updated
-DELETED_OPERATION = notifications.ACTIONS.deleted
-DISABLED_OPERATION = notifications.ACTIONS.disabled
-
-
-class ArbitraryException(Exception):
- pass
-
-
-def register_callback(operation, resource_type=EXP_RESOURCE_TYPE):
- """Helper for creating and registering a mock callback."""
- callback = mock.Mock(__name__='callback',
- im_class=mock.Mock(__name__='class'))
- notifications.register_event_callback(operation, resource_type, callback)
- return callback
-
-
-class AuditNotificationsTestCase(unit.BaseTestCase):
- def setUp(self):
- super(AuditNotificationsTestCase, self).setUp()
- self.config_fixture = self.useFixture(config_fixture.Config(CONF))
- self.addCleanup(notifications.clear_subscribers)
-
- def _test_notification_operation(self, notify_function, operation):
- exp_resource_id = uuid.uuid4().hex
- callback = register_callback(operation)
- notify_function(EXP_RESOURCE_TYPE, exp_resource_id)
- callback.assert_called_once_with('identity', EXP_RESOURCE_TYPE,
- operation,
- {'resource_info': exp_resource_id})
- self.config_fixture.config(notification_format='cadf')
- with mock.patch(
- 'keystone.notifications._create_cadf_payload') as cadf_notify:
- notify_function(EXP_RESOURCE_TYPE, exp_resource_id)
- initiator = None
- cadf_notify.assert_called_once_with(
- operation, EXP_RESOURCE_TYPE, exp_resource_id,
- notifications.taxonomy.OUTCOME_SUCCESS, initiator)
- notify_function(EXP_RESOURCE_TYPE, exp_resource_id, public=False)
- cadf_notify.assert_called_once_with(
- operation, EXP_RESOURCE_TYPE, exp_resource_id,
- notifications.taxonomy.OUTCOME_SUCCESS, initiator)
-
- def test_resource_created_notification(self):
- self._test_notification_operation(notifications.Audit.created,
- CREATED_OPERATION)
-
- def test_resource_updated_notification(self):
- self._test_notification_operation(notifications.Audit.updated,
- UPDATED_OPERATION)
-
- def test_resource_deleted_notification(self):
- self._test_notification_operation(notifications.Audit.deleted,
- DELETED_OPERATION)
-
- def test_resource_disabled_notification(self):
- self._test_notification_operation(notifications.Audit.disabled,
- DISABLED_OPERATION)
-
-
-class NotificationsTestCase(unit.BaseTestCase):
-
- def test_send_notification(self):
- """Test _send_notification.
-
- Test the private method _send_notification to ensure event_type,
- payload, and context are built and passed properly.
-
- """
- resource = uuid.uuid4().hex
- resource_type = EXP_RESOURCE_TYPE
- operation = CREATED_OPERATION
-
- # NOTE(ldbragst): Even though notifications._send_notification doesn't
- # contain logic that creates cases, this is supposed to test that
- # context is always empty and that we ensure the resource ID of the
- # resource in the notification is contained in the payload. It was
- # agreed that context should be empty in Keystone's case, which is
- # also noted in the /keystone/notifications.py module. This test
- # ensures and maintains these conditions.
- expected_args = [
- {}, # empty context
- 'identity.%s.created' % resource_type, # event_type
- {'resource_info': resource}, # payload
- 'INFO', # priority is always INFO...
- ]
-
- with mock.patch.object(notifications._get_notifier(),
- '_notify') as mocked:
- notifications._send_notification(operation, resource_type,
- resource)
- mocked.assert_called_once_with(*expected_args)
-
- def test_send_notification_with_opt_out(self):
- """Test the private method _send_notification with opt-out.
-
- Test that _send_notification does not notify when a valid
- notification_opt_out configuration is provided.
- """
- resource = uuid.uuid4().hex
- resource_type = EXP_RESOURCE_TYPE
- operation = CREATED_OPERATION
- event_type = 'identity.%s.created' % resource_type
-
- # NOTE(diazjf): Here we add notification_opt_out to the
- # configuration so that we should return before _get_notifer is
- # called. This is because we are opting out notifications for the
- # passed resource_type and operation.
- conf = self.useFixture(config_fixture.Config(CONF))
- conf.config(notification_opt_out=event_type)
-
- with mock.patch.object(notifications._get_notifier(),
- '_notify') as mocked:
-
- notifications._send_notification(operation, resource_type,
- resource)
- mocked.assert_not_called()
-
- def test_send_audit_notification_with_opt_out(self):
- """Test the private method _send_audit_notification with opt-out.
-
- Test that _send_audit_notification does not notify when a valid
- notification_opt_out configuration is provided.
- """
- resource_type = EXP_RESOURCE_TYPE
-
- action = CREATED_OPERATION + '.' + resource_type
- initiator = mock
- target = mock
- outcome = 'success'
- event_type = 'identity.%s.created' % resource_type
-
- conf = self.useFixture(config_fixture.Config(CONF))
- conf.config(notification_opt_out=event_type)
-
- with mock.patch.object(notifications._get_notifier(),
- '_notify') as mocked:
-
- notifications._send_audit_notification(action,
- initiator,
- outcome,
- target,
- event_type)
- mocked.assert_not_called()
-
- def test_opt_out_authenticate_event(self):
- """Test that authenticate events are successfully opted out."""
- resource_type = EXP_RESOURCE_TYPE
-
- action = CREATED_OPERATION + '.' + resource_type
- initiator = mock
- target = mock
- outcome = 'success'
- event_type = 'identity.authenticate'
- meter_name = '%s.%s' % (event_type, outcome)
-
- conf = self.useFixture(config_fixture.Config(CONF))
- conf.config(notification_opt_out=meter_name)
-
- with mock.patch.object(notifications._get_notifier(),
- '_notify') as mocked:
-
- notifications._send_audit_notification(action,
- initiator,
- outcome,
- target,
- event_type)
- mocked.assert_not_called()
-
-
-class BaseNotificationTest(test_v3.RestfulTestCase):
-
- def setUp(self):
- super(BaseNotificationTest, self).setUp()
-
- self._notifications = []
- self._audits = []
-
- def fake_notify(operation, resource_type, resource_id,
- actor_dict=None, public=True):
- note = {
- 'resource_id': resource_id,
- 'operation': operation,
- 'resource_type': resource_type,
- 'send_notification_called': True,
- 'public': public}
- if actor_dict:
- note['actor_id'] = actor_dict.get('id')
- note['actor_type'] = actor_dict.get('type')
- note['actor_operation'] = actor_dict.get('actor_operation')
- self._notifications.append(note)
-
- self.useFixture(mockpatch.PatchObject(
- notifications, '_send_notification', fake_notify))
-
- def fake_audit(action, initiator, outcome, target,
- event_type, **kwargs):
- service_security = cadftaxonomy.SERVICE_SECURITY
-
- event = eventfactory.EventFactory().new_event(
- eventType=cadftype.EVENTTYPE_ACTIVITY,
- outcome=outcome,
- action=action,
- initiator=initiator,
- target=target,
- observer=cadfresource.Resource(typeURI=service_security))
-
- for key, value in kwargs.items():
- setattr(event, key, value)
-
- audit = {
- 'payload': event.as_dict(),
- 'event_type': event_type,
- 'send_notification_called': True}
- self._audits.append(audit)
-
- self.useFixture(mockpatch.PatchObject(
- notifications, '_send_audit_notification', fake_audit))
-
- def _assert_last_note(self, resource_id, operation, resource_type,
- actor_id=None, actor_type=None,
- actor_operation=None):
- # NOTE(stevemar): If 'basic' format is not used, then simply
- # return since this assertion is not valid.
- if CONF.notification_format != 'basic':
- return
- self.assertTrue(len(self._notifications) > 0)
- note = self._notifications[-1]
- self.assertEqual(operation, note['operation'])
- self.assertEqual(resource_id, note['resource_id'])
- self.assertEqual(resource_type, note['resource_type'])
- self.assertTrue(note['send_notification_called'])
- if actor_id:
- self.assertEqual(actor_id, note['actor_id'])
- self.assertEqual(actor_type, note['actor_type'])
- self.assertEqual(actor_operation, note['actor_operation'])
-
- def _assert_last_audit(self, resource_id, operation, resource_type,
- target_uri):
- # NOTE(stevemar): If 'cadf' format is not used, then simply
- # return since this assertion is not valid.
- if CONF.notification_format != 'cadf':
- return
- self.assertTrue(len(self._audits) > 0)
- audit = self._audits[-1]
- payload = audit['payload']
- self.assertEqual(resource_id, payload['resource_info'])
- action = '%s.%s' % (operation, resource_type)
- self.assertEqual(action, payload['action'])
- self.assertEqual(target_uri, payload['target']['typeURI'])
- self.assertEqual(resource_id, payload['target']['id'])
- event_type = '%s.%s.%s' % ('identity', resource_type, operation)
- self.assertEqual(event_type, audit['event_type'])
- self.assertTrue(audit['send_notification_called'])
-
- def _assert_initiator_data_is_set(self, operation, resource_type, typeURI):
- self.assertTrue(len(self._audits) > 0)
- audit = self._audits[-1]
- payload = audit['payload']
- self.assertEqual(self.user_id, payload['initiator']['id'])
- self.assertEqual(self.project_id, payload['initiator']['project_id'])
- self.assertEqual(typeURI, payload['target']['typeURI'])
- action = '%s.%s' % (operation, resource_type)
- self.assertEqual(action, payload['action'])
-
- def _assert_notify_not_sent(self, resource_id, operation, resource_type,
- public=True):
- unexpected = {
- 'resource_id': resource_id,
- 'operation': operation,
- 'resource_type': resource_type,
- 'send_notification_called': True,
- 'public': public}
- for note in self._notifications:
- self.assertNotEqual(unexpected, note)
-
- def _assert_notify_sent(self, resource_id, operation, resource_type,
- public=True):
- expected = {
- 'resource_id': resource_id,
- 'operation': operation,
- 'resource_type': resource_type,
- 'send_notification_called': True,
- 'public': public}
- for note in self._notifications:
- if expected == note:
- break
- else:
- self.fail("Notification not sent.")
-
-
-class NotificationsForEntities(BaseNotificationTest):
-
- def test_create_group(self):
- group_ref = unit.new_group_ref(domain_id=self.domain_id)
- group_ref = self.identity_api.create_group(group_ref)
- self._assert_last_note(group_ref['id'], CREATED_OPERATION, 'group')
- self._assert_last_audit(group_ref['id'], CREATED_OPERATION, 'group',
- cadftaxonomy.SECURITY_GROUP)
-
- def test_create_project(self):
- project_ref = unit.new_project_ref(domain_id=self.domain_id)
- self.resource_api.create_project(project_ref['id'], project_ref)
- self._assert_last_note(
- project_ref['id'], CREATED_OPERATION, 'project')
- self._assert_last_audit(project_ref['id'], CREATED_OPERATION,
- 'project', cadftaxonomy.SECURITY_PROJECT)
-
- def test_create_role(self):
- role_ref = unit.new_role_ref()
- self.role_api.create_role(role_ref['id'], role_ref)
- self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role')
- self._assert_last_audit(role_ref['id'], CREATED_OPERATION, 'role',
- cadftaxonomy.SECURITY_ROLE)
-
- def test_create_user(self):
- user_ref = unit.new_user_ref(domain_id=self.domain_id)
- user_ref = self.identity_api.create_user(user_ref)
- self._assert_last_note(user_ref['id'], CREATED_OPERATION, 'user')
- self._assert_last_audit(user_ref['id'], CREATED_OPERATION, 'user',
- cadftaxonomy.SECURITY_ACCOUNT_USER)
-
- def test_create_trust(self):
- trustor = unit.new_user_ref(domain_id=self.domain_id)
- trustor = self.identity_api.create_user(trustor)
- trustee = unit.new_user_ref(domain_id=self.domain_id)
- trustee = self.identity_api.create_user(trustee)
- role_ref = unit.new_role_ref()
- self.role_api.create_role(role_ref['id'], role_ref)
- trust_ref = unit.new_trust_ref(trustor['id'],
- trustee['id'])
- self.trust_api.create_trust(trust_ref['id'],
- trust_ref,
- [role_ref])
- self._assert_last_note(
- trust_ref['id'], CREATED_OPERATION, 'OS-TRUST:trust')
- self._assert_last_audit(trust_ref['id'], CREATED_OPERATION,
- 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST)
-
- def test_delete_group(self):
- group_ref = unit.new_group_ref(domain_id=self.domain_id)
- group_ref = self.identity_api.create_group(group_ref)
- self.identity_api.delete_group(group_ref['id'])
- self._assert_last_note(group_ref['id'], DELETED_OPERATION, 'group')
- self._assert_last_audit(group_ref['id'], DELETED_OPERATION, 'group',
- cadftaxonomy.SECURITY_GROUP)
-
- def test_delete_project(self):
- project_ref = unit.new_project_ref(domain_id=self.domain_id)
- self.resource_api.create_project(project_ref['id'], project_ref)
- self.resource_api.delete_project(project_ref['id'])
- self._assert_last_note(
- project_ref['id'], DELETED_OPERATION, 'project')
- self._assert_last_audit(project_ref['id'], DELETED_OPERATION,
- 'project', cadftaxonomy.SECURITY_PROJECT)
-
- def test_delete_role(self):
- role_ref = unit.new_role_ref()
- self.role_api.create_role(role_ref['id'], role_ref)
- self.role_api.delete_role(role_ref['id'])
- self._assert_last_note(role_ref['id'], DELETED_OPERATION, 'role')
- self._assert_last_audit(role_ref['id'], DELETED_OPERATION, 'role',
- cadftaxonomy.SECURITY_ROLE)
-
- def test_delete_user(self):
- user_ref = unit.new_user_ref(domain_id=self.domain_id)
- user_ref = self.identity_api.create_user(user_ref)
- self.identity_api.delete_user(user_ref['id'])
- self._assert_last_note(user_ref['id'], DELETED_OPERATION, 'user')
- self._assert_last_audit(user_ref['id'], DELETED_OPERATION, 'user',
- cadftaxonomy.SECURITY_ACCOUNT_USER)
-
- def test_create_domain(self):
- domain_ref = unit.new_domain_ref()
- self.resource_api.create_domain(domain_ref['id'], domain_ref)
- self._assert_last_note(domain_ref['id'], CREATED_OPERATION, 'domain')
- self._assert_last_audit(domain_ref['id'], CREATED_OPERATION, 'domain',
- cadftaxonomy.SECURITY_DOMAIN)
-
- def test_update_domain(self):
- domain_ref = unit.new_domain_ref()
- self.resource_api.create_domain(domain_ref['id'], domain_ref)
- domain_ref['description'] = uuid.uuid4().hex
- self.resource_api.update_domain(domain_ref['id'], domain_ref)
- self._assert_last_note(domain_ref['id'], UPDATED_OPERATION, 'domain')
- self._assert_last_audit(domain_ref['id'], UPDATED_OPERATION, 'domain',
- cadftaxonomy.SECURITY_DOMAIN)
-
- def test_delete_domain(self):
- domain_ref = unit.new_domain_ref()
- self.resource_api.create_domain(domain_ref['id'], domain_ref)
- domain_ref['enabled'] = False
- self.resource_api.update_domain(domain_ref['id'], domain_ref)
- self.resource_api.delete_domain(domain_ref['id'])
- self._assert_last_note(domain_ref['id'], DELETED_OPERATION, 'domain')
- self._assert_last_audit(domain_ref['id'], DELETED_OPERATION, 'domain',
- cadftaxonomy.SECURITY_DOMAIN)
-
- def test_delete_trust(self):
- trustor = unit.new_user_ref(domain_id=self.domain_id)
- trustor = self.identity_api.create_user(trustor)
- trustee = unit.new_user_ref(domain_id=self.domain_id)
- trustee = self.identity_api.create_user(trustee)
- role_ref = unit.new_role_ref()
- trust_ref = unit.new_trust_ref(trustor['id'], trustee['id'])
- self.trust_api.create_trust(trust_ref['id'],
- trust_ref,
- [role_ref])
- self.trust_api.delete_trust(trust_ref['id'])
- self._assert_last_note(
- trust_ref['id'], DELETED_OPERATION, 'OS-TRUST:trust')
- self._assert_last_audit(trust_ref['id'], DELETED_OPERATION,
- 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST)
-
- def test_create_endpoint(self):
- endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id,
- interface='public',
- region_id=self.region_id)
- self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref)
- self._assert_notify_sent(endpoint_ref['id'], CREATED_OPERATION,
- 'endpoint')
- self._assert_last_audit(endpoint_ref['id'], CREATED_OPERATION,
- 'endpoint', cadftaxonomy.SECURITY_ENDPOINT)
-
- def test_update_endpoint(self):
- endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id,
- interface='public',
- region_id=self.region_id)
- self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref)
- self.catalog_api.update_endpoint(endpoint_ref['id'], endpoint_ref)
- self._assert_notify_sent(endpoint_ref['id'], UPDATED_OPERATION,
- 'endpoint')
- self._assert_last_audit(endpoint_ref['id'], UPDATED_OPERATION,
- 'endpoint', cadftaxonomy.SECURITY_ENDPOINT)
-
- def test_delete_endpoint(self):
- endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id,
- interface='public',
- region_id=self.region_id)
- self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref)
- self.catalog_api.delete_endpoint(endpoint_ref['id'])
- self._assert_notify_sent(endpoint_ref['id'], DELETED_OPERATION,
- 'endpoint')
- self._assert_last_audit(endpoint_ref['id'], DELETED_OPERATION,
- 'endpoint', cadftaxonomy.SECURITY_ENDPOINT)
-
- def test_create_service(self):
- service_ref = unit.new_service_ref()
- self.catalog_api.create_service(service_ref['id'], service_ref)
- self._assert_notify_sent(service_ref['id'], CREATED_OPERATION,
- 'service')
- self._assert_last_audit(service_ref['id'], CREATED_OPERATION,
- 'service', cadftaxonomy.SECURITY_SERVICE)
-
- def test_update_service(self):
- service_ref = unit.new_service_ref()
- self.catalog_api.create_service(service_ref['id'], service_ref)
- self.catalog_api.update_service(service_ref['id'], service_ref)
- self._assert_notify_sent(service_ref['id'], UPDATED_OPERATION,
- 'service')
- self._assert_last_audit(service_ref['id'], UPDATED_OPERATION,
- 'service', cadftaxonomy.SECURITY_SERVICE)
-
- def test_delete_service(self):
- service_ref = unit.new_service_ref()
- self.catalog_api.create_service(service_ref['id'], service_ref)
- self.catalog_api.delete_service(service_ref['id'])
- self._assert_notify_sent(service_ref['id'], DELETED_OPERATION,
- 'service')
- self._assert_last_audit(service_ref['id'], DELETED_OPERATION,
- 'service', cadftaxonomy.SECURITY_SERVICE)
-
- def test_create_region(self):
- region_ref = unit.new_region_ref()
- self.catalog_api.create_region(region_ref)
- self._assert_notify_sent(region_ref['id'], CREATED_OPERATION,
- 'region')
- self._assert_last_audit(region_ref['id'], CREATED_OPERATION,
- 'region', cadftaxonomy.SECURITY_REGION)
-
- def test_update_region(self):
- region_ref = unit.new_region_ref()
- self.catalog_api.create_region(region_ref)
- self.catalog_api.update_region(region_ref['id'], region_ref)
- self._assert_notify_sent(region_ref['id'], UPDATED_OPERATION,
- 'region')
- self._assert_last_audit(region_ref['id'], UPDATED_OPERATION,
- 'region', cadftaxonomy.SECURITY_REGION)
-
- def test_delete_region(self):
- region_ref = unit.new_region_ref()
- self.catalog_api.create_region(region_ref)
- self.catalog_api.delete_region(region_ref['id'])
- self._assert_notify_sent(region_ref['id'], DELETED_OPERATION,
- 'region')
- self._assert_last_audit(region_ref['id'], DELETED_OPERATION,
- 'region', cadftaxonomy.SECURITY_REGION)
-
- def test_create_policy(self):
- policy_ref = unit.new_policy_ref()
- self.policy_api.create_policy(policy_ref['id'], policy_ref)
- self._assert_notify_sent(policy_ref['id'], CREATED_OPERATION,
- 'policy')
- self._assert_last_audit(policy_ref['id'], CREATED_OPERATION,
- 'policy', cadftaxonomy.SECURITY_POLICY)
-
- def test_update_policy(self):
- policy_ref = unit.new_policy_ref()
- self.policy_api.create_policy(policy_ref['id'], policy_ref)
- self.policy_api.update_policy(policy_ref['id'], policy_ref)
- self._assert_notify_sent(policy_ref['id'], UPDATED_OPERATION,
- 'policy')
- self._assert_last_audit(policy_ref['id'], UPDATED_OPERATION,
- 'policy', cadftaxonomy.SECURITY_POLICY)
-
- def test_delete_policy(self):
- policy_ref = unit.new_policy_ref()
- self.policy_api.create_policy(policy_ref['id'], policy_ref)
- self.policy_api.delete_policy(policy_ref['id'])
- self._assert_notify_sent(policy_ref['id'], DELETED_OPERATION,
- 'policy')
- self._assert_last_audit(policy_ref['id'], DELETED_OPERATION,
- 'policy', cadftaxonomy.SECURITY_POLICY)
-
- def test_disable_domain(self):
- domain_ref = unit.new_domain_ref()
- self.resource_api.create_domain(domain_ref['id'], domain_ref)
- domain_ref['enabled'] = False
- self.resource_api.update_domain(domain_ref['id'], domain_ref)
- self._assert_notify_sent(domain_ref['id'], 'disabled', 'domain',
- public=False)
-
- def test_disable_of_disabled_domain_does_not_notify(self):
- domain_ref = unit.new_domain_ref(enabled=False)
- self.resource_api.create_domain(domain_ref['id'], domain_ref)
- # The domain_ref above is not changed during the create process. We
- # can use the same ref to perform the update.
- self.resource_api.update_domain(domain_ref['id'], domain_ref)
- self._assert_notify_not_sent(domain_ref['id'], 'disabled', 'domain',
- public=False)
-
- def test_update_group(self):
- group_ref = unit.new_group_ref(domain_id=self.domain_id)
- group_ref = self.identity_api.create_group(group_ref)
- self.identity_api.update_group(group_ref['id'], group_ref)
- self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group')
- self._assert_last_audit(group_ref['id'], UPDATED_OPERATION, 'group',
- cadftaxonomy.SECURITY_GROUP)
-
- def test_update_project(self):
- project_ref = unit.new_project_ref(domain_id=self.domain_id)
- self.resource_api.create_project(project_ref['id'], project_ref)
- self.resource_api.update_project(project_ref['id'], project_ref)
- self._assert_notify_sent(
- project_ref['id'], UPDATED_OPERATION, 'project', public=True)
- self._assert_last_audit(project_ref['id'], UPDATED_OPERATION,
- 'project', cadftaxonomy.SECURITY_PROJECT)
-
- def test_disable_project(self):
- project_ref = unit.new_project_ref(domain_id=self.domain_id)
- self.resource_api.create_project(project_ref['id'], project_ref)
- project_ref['enabled'] = False
- self.resource_api.update_project(project_ref['id'], project_ref)
- self._assert_notify_sent(project_ref['id'], 'disabled', 'project',
- public=False)
-
- def test_disable_of_disabled_project_does_not_notify(self):
- project_ref = unit.new_project_ref(domain_id=self.domain_id,
- enabled=False)
- self.resource_api.create_project(project_ref['id'], project_ref)
- # The project_ref above is not changed during the create process. We
- # can use the same ref to perform the update.
- self.resource_api.update_project(project_ref['id'], project_ref)
- self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project',
- public=False)
-
- def test_update_project_does_not_send_disable(self):
- project_ref = unit.new_project_ref(domain_id=self.domain_id)
- self.resource_api.create_project(project_ref['id'], project_ref)
- project_ref['enabled'] = True
- self.resource_api.update_project(project_ref['id'], project_ref)
- self._assert_last_note(
- project_ref['id'], UPDATED_OPERATION, 'project')
- self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project')
-
- def test_update_role(self):
- role_ref = unit.new_role_ref()
- self.role_api.create_role(role_ref['id'], role_ref)
- self.role_api.update_role(role_ref['id'], role_ref)
- self._assert_last_note(role_ref['id'], UPDATED_OPERATION, 'role')
- self._assert_last_audit(role_ref['id'], UPDATED_OPERATION, 'role',
- cadftaxonomy.SECURITY_ROLE)
-
- def test_update_user(self):
- user_ref = unit.new_user_ref(domain_id=self.domain_id)
- user_ref = self.identity_api.create_user(user_ref)
- self.identity_api.update_user(user_ref['id'], user_ref)
- self._assert_last_note(user_ref['id'], UPDATED_OPERATION, 'user')
- self._assert_last_audit(user_ref['id'], UPDATED_OPERATION, 'user',
- cadftaxonomy.SECURITY_ACCOUNT_USER)
-
- def test_config_option_no_events(self):
- self.config_fixture.config(notification_format='basic')
- role_ref = unit.new_role_ref()
- self.role_api.create_role(role_ref['id'], role_ref)
- # The regular notifications will still be emitted, since they are
- # used for callback handling.
- self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role')
- # No audit event should have occurred
- self.assertEqual(0, len(self._audits))
-
- def test_add_user_to_group(self):
- user_ref = unit.new_user_ref(domain_id=self.domain_id)
- user_ref = self.identity_api.create_user(user_ref)
- group_ref = unit.new_group_ref(domain_id=self.domain_id)
- group_ref = self.identity_api.create_group(group_ref)
- self.identity_api.add_user_to_group(user_ref['id'], group_ref['id'])
- self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group',
- actor_id=user_ref['id'], actor_type='user',
- actor_operation='added')
-
- def test_remove_user_from_group(self):
- user_ref = unit.new_user_ref(domain_id=self.domain_id)
- user_ref = self.identity_api.create_user(user_ref)
- group_ref = unit.new_group_ref(domain_id=self.domain_id)
- group_ref = self.identity_api.create_group(group_ref)
- self.identity_api.add_user_to_group(user_ref['id'], group_ref['id'])
- self.identity_api.remove_user_from_group(user_ref['id'],
- group_ref['id'])
- self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group',
- actor_id=user_ref['id'], actor_type='user',
- actor_operation='removed')
-
-
-class CADFNotificationsForEntities(NotificationsForEntities):
-
- def setUp(self):
- super(CADFNotificationsForEntities, self).setUp()
- self.config_fixture.config(notification_format='cadf')
-
- def test_initiator_data_is_set(self):
- ref = unit.new_domain_ref()
- resp = self.post('/domains', body={'domain': ref})
- resource_id = resp.result.get('domain').get('id')
- self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain',
- cadftaxonomy.SECURITY_DOMAIN)
- self._assert_initiator_data_is_set(CREATED_OPERATION,
- 'domain',
- cadftaxonomy.SECURITY_DOMAIN)
-
-
-class V2Notifications(BaseNotificationTest):
-
- def setUp(self):
- super(V2Notifications, self).setUp()
- self.config_fixture.config(notification_format='cadf')
-
- def test_user(self):
- token = self.get_scoped_token()
- resp = self.admin_request(
- method='POST',
- path='/v2.0/users',
- body={
- 'user': {
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- 'enabled': True,
- },
- },
- token=token,
- )
- user_id = resp.result.get('user').get('id')
- self._assert_initiator_data_is_set(CREATED_OPERATION,
- 'user',
- cadftaxonomy.SECURITY_ACCOUNT_USER)
- # test for delete user
- self.admin_request(
- method='DELETE',
- path='/v2.0/users/%s' % user_id,
- token=token,
- )
- self._assert_initiator_data_is_set(DELETED_OPERATION,
- 'user',
- cadftaxonomy.SECURITY_ACCOUNT_USER)
-
- def test_role(self):
- token = self.get_scoped_token()
- resp = self.admin_request(
- method='POST',
- path='/v2.0/OS-KSADM/roles',
- body={
- 'role': {
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- },
- },
- token=token,
- )
- role_id = resp.result.get('role').get('id')
- self._assert_initiator_data_is_set(CREATED_OPERATION,
- 'role',
- cadftaxonomy.SECURITY_ROLE)
- # test for delete role
- self.admin_request(
- method='DELETE',
- path='/v2.0/OS-KSADM/roles/%s' % role_id,
- token=token,
- )
- self._assert_initiator_data_is_set(DELETED_OPERATION,
- 'role',
- cadftaxonomy.SECURITY_ROLE)
-
- def test_service_and_endpoint(self):
- token = self.get_scoped_token()
- resp = self.admin_request(
- method='POST',
- path='/v2.0/OS-KSADM/services',
- body={
- 'OS-KSADM:service': {
- 'name': uuid.uuid4().hex,
- 'type': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- },
- },
- token=token,
- )
- service_id = resp.result.get('OS-KSADM:service').get('id')
- self._assert_initiator_data_is_set(CREATED_OPERATION,
- 'service',
- cadftaxonomy.SECURITY_SERVICE)
- resp = self.admin_request(
- method='POST',
- path='/v2.0/endpoints',
- body={
- 'endpoint': {
- 'region': uuid.uuid4().hex,
- 'service_id': service_id,
- 'publicurl': uuid.uuid4().hex,
- 'adminurl': uuid.uuid4().hex,
- 'internalurl': uuid.uuid4().hex,
- },
- },
- token=token,
- )
- endpoint_id = resp.result.get('endpoint').get('id')
- self._assert_initiator_data_is_set(CREATED_OPERATION,
- 'endpoint',
- cadftaxonomy.SECURITY_ENDPOINT)
- # test for delete endpoint
- self.admin_request(
- method='DELETE',
- path='/v2.0/endpoints/%s' % endpoint_id,
- token=token,
- )
- self._assert_initiator_data_is_set(DELETED_OPERATION,
- 'endpoint',
- cadftaxonomy.SECURITY_ENDPOINT)
- # test for delete service
- self.admin_request(
- method='DELETE',
- path='/v2.0/OS-KSADM/services/%s' % service_id,
- token=token,
- )
- self._assert_initiator_data_is_set(DELETED_OPERATION,
- 'service',
- cadftaxonomy.SECURITY_SERVICE)
-
- def test_project(self):
- token = self.get_scoped_token()
- resp = self.admin_request(
- method='POST',
- path='/v2.0/tenants',
- body={
- 'tenant': {
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- 'enabled': True
- },
- },
- token=token,
- )
- project_id = resp.result.get('tenant').get('id')
- self._assert_initiator_data_is_set(CREATED_OPERATION,
- 'project',
- cadftaxonomy.SECURITY_PROJECT)
- # test for delete project
- self.admin_request(
- method='DELETE',
- path='/v2.0/tenants/%s' % project_id,
- token=token,
- )
- self._assert_initiator_data_is_set(DELETED_OPERATION,
- 'project',
- cadftaxonomy.SECURITY_PROJECT)
-
-
-class TestEventCallbacks(test_v3.RestfulTestCase):
-
- def setUp(self):
- super(TestEventCallbacks, self).setUp()
- self.has_been_called = False
-
- def _project_deleted_callback(self, service, resource_type, operation,
- payload):
- self.has_been_called = True
-
- def _project_created_callback(self, service, resource_type, operation,
- payload):
- self.has_been_called = True
-
- def test_notification_received(self):
- callback = register_callback(CREATED_OPERATION, 'project')
- project_ref = unit.new_project_ref(domain_id=self.domain_id)
- self.resource_api.create_project(project_ref['id'], project_ref)
- self.assertTrue(callback.called)
-
- def test_notification_method_not_callable(self):
- fake_method = None
- self.assertRaises(TypeError,
- notifications.register_event_callback,
- UPDATED_OPERATION,
- 'project',
- [fake_method])
-
- def test_notification_event_not_valid(self):
- self.assertRaises(ValueError,
- notifications.register_event_callback,
- uuid.uuid4().hex,
- 'project',
- self._project_deleted_callback)
-
- def test_event_registration_for_unknown_resource_type(self):
- # Registration for unknown resource types should succeed. If no event
- # is issued for that resource type, the callback wont be triggered.
- notifications.register_event_callback(DELETED_OPERATION,
- uuid.uuid4().hex,
- self._project_deleted_callback)
- resource_type = uuid.uuid4().hex
- notifications.register_event_callback(DELETED_OPERATION,
- resource_type,
- self._project_deleted_callback)
-
- def test_provider_event_callback_subscription(self):
- callback_called = []
-
- @notifications.listener
- class Foo(object):
- def __init__(self):
- self.event_callbacks = {
- CREATED_OPERATION: {'project': self.foo_callback}}
-
- def foo_callback(self, service, resource_type, operation,
- payload):
- # uses callback_called from the closure
- callback_called.append(True)
-
- Foo()
- project_ref = unit.new_project_ref(domain_id=self.domain_id)
- self.resource_api.create_project(project_ref['id'], project_ref)
- self.assertEqual([True], callback_called)
-
- def test_provider_event_callbacks_subscription(self):
- callback_called = []
-
- @notifications.listener
- class Foo(object):
- def __init__(self):
- self.event_callbacks = {
- CREATED_OPERATION: {
- 'project': [self.callback_0, self.callback_1]}}
-
- def callback_0(self, service, resource_type, operation, payload):
- # uses callback_called from the closure
- callback_called.append('cb0')
-
- def callback_1(self, service, resource_type, operation, payload):
- # uses callback_called from the closure
- callback_called.append('cb1')
-
- Foo()
- project_ref = unit.new_project_ref(domain_id=self.domain_id)
- self.resource_api.create_project(project_ref['id'], project_ref)
- self.assertItemsEqual(['cb1', 'cb0'], callback_called)
-
- def test_invalid_event_callbacks(self):
- @notifications.listener
- class Foo(object):
- def __init__(self):
- self.event_callbacks = 'bogus'
-
- self.assertRaises(AttributeError, Foo)
-
- def test_invalid_event_callbacks_event(self):
- @notifications.listener
- class Foo(object):
- def __init__(self):
- self.event_callbacks = {CREATED_OPERATION: 'bogus'}
-
- self.assertRaises(AttributeError, Foo)
-
- def test_using_an_unbound_method_as_a_callback_fails(self):
- # NOTE(dstanek): An unbound method is when you reference a method
- # from a class object. You'll get a method that isn't bound to a
- # particular instance so there is no magic 'self'. You can call it,
- # but you have to pass in the instance manually like: C.m(C()).
- # If you reference the method from an instance then you get a method
- # that effectively curries the self argument for you
- # (think functools.partial). Obviously is we don't have an
- # instance then we can't call the method.
- @notifications.listener
- class Foo(object):
- def __init__(self):
- self.event_callbacks = {CREATED_OPERATION:
- {'project': Foo.callback}}
-
- def callback(self, *args):
- pass
-
- # TODO(dstanek): it would probably be nice to fail early using
- # something like:
- # self.assertRaises(TypeError, Foo)
- Foo()
- project_ref = unit.new_project_ref(domain_id=self.domain_id)
- self.assertRaises(TypeError, self.resource_api.create_project,
- project_ref['id'], project_ref)
-
-
-class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase):
-
- LOCAL_HOST = 'localhost'
- ACTION = 'authenticate'
- ROLE_ASSIGNMENT = 'role_assignment'
-
- def setUp(self):
- super(CadfNotificationsWrapperTestCase, self).setUp()
- self._notifications = []
-
- def fake_notify(action, initiator, outcome, target,
- event_type, **kwargs):
- service_security = cadftaxonomy.SERVICE_SECURITY
-
- event = eventfactory.EventFactory().new_event(
- eventType=cadftype.EVENTTYPE_ACTIVITY,
- outcome=outcome,
- action=action,
- initiator=initiator,
- target=target,
- observer=cadfresource.Resource(typeURI=service_security))
-
- for key, value in kwargs.items():
- setattr(event, key, value)
-
- note = {
- 'action': action,
- 'initiator': initiator,
- 'event': event,
- 'event_type': event_type,
- 'send_notification_called': True}
- self._notifications.append(note)
-
- self.useFixture(mockpatch.PatchObject(
- notifications, '_send_audit_notification', fake_notify))
-
- def _assert_last_note(self, action, user_id, event_type=None):
- self.assertTrue(self._notifications)
- note = self._notifications[-1]
- self.assertEqual(action, note['action'])
- initiator = note['initiator']
- self.assertEqual(user_id, initiator.id)
- self.assertEqual(self.LOCAL_HOST, initiator.host.address)
- self.assertTrue(note['send_notification_called'])
- if event_type:
- self.assertEqual(event_type, note['event_type'])
-
- def _assert_event(self, role_id, project=None, domain=None,
- user=None, group=None, inherit=False):
- """Assert that the CADF event is valid.
-
- In the case of role assignments, the event will have extra data,
- specifically, the role, target, actor, and if the role is inherited.
-
- An example event, as a dictionary is seen below:
- {
- 'typeURI': 'http://schemas.dmtf.org/cloud/audit/1.0/event',
- 'initiator': {
- 'typeURI': 'service/security/account/user',
- 'host': {'address': 'localhost'},
- 'id': 'openstack:0a90d95d-582c-4efb-9cbc-e2ca7ca9c341',
- 'name': u'bccc2d9bfc2a46fd9e33bcf82f0b5c21'
- },
- 'target': {
- 'typeURI': 'service/security/account/user',
- 'id': 'openstack:d48ea485-ef70-4f65-8d2b-01aa9d7ec12d'
- },
- 'observer': {
- 'typeURI': 'service/security',
- 'id': 'openstack:d51dd870-d929-4aba-8d75-dcd7555a0c95'
- },
- 'eventType': 'activity',
- 'eventTime': '2014-08-21T21:04:56.204536+0000',
- 'role': u'0e6b990380154a2599ce6b6e91548a68',
- 'domain': u'24bdcff1aab8474895dbaac509793de1',
- 'inherited_to_projects': False,
- 'group': u'c1e22dc67cbd469ea0e33bf428fe597a',
- 'action': 'created.role_assignment',
- 'outcome': 'success',
- 'id': 'openstack:782689dd-f428-4f13-99c7-5c70f94a5ac1'
- }
- """
- note = self._notifications[-1]
- event = note['event']
- if project:
- self.assertEqual(project, event.project)
- if domain:
- self.assertEqual(domain, event.domain)
- if group:
- self.assertEqual(group, event.group)
- elif user:
- self.assertEqual(user, event.user)
- self.assertEqual(role_id, event.role)
- self.assertEqual(inherit, event.inherited_to_projects)
-
- def test_v3_authenticate_user_name_and_domain_id(self):
- user_id = self.user_id
- user_name = self.user['name']
- password = self.user['password']
- domain_id = self.domain_id
- data = self.build_authentication_request(username=user_name,
- user_domain_id=domain_id,
- password=password)
- self.post('/auth/tokens', body=data)
- self._assert_last_note(self.ACTION, user_id)
-
- def test_v3_authenticate_user_id(self):
- user_id = self.user_id
- password = self.user['password']
- data = self.build_authentication_request(user_id=user_id,
- password=password)
- self.post('/auth/tokens', body=data)
- self._assert_last_note(self.ACTION, user_id)
-
- def test_v3_authenticate_user_name_and_domain_name(self):
- user_id = self.user_id
- user_name = self.user['name']
- password = self.user['password']
- domain_name = self.domain['name']
- data = self.build_authentication_request(username=user_name,
- user_domain_name=domain_name,
- password=password)
- self.post('/auth/tokens', body=data)
- self._assert_last_note(self.ACTION, user_id)
-
- def _test_role_assignment(self, url, role, project=None, domain=None,
- user=None, group=None):
- self.put(url)
- action = "%s.%s" % (CREATED_OPERATION, self.ROLE_ASSIGNMENT)
- event_type = '%s.%s.%s' % (notifications.SERVICE,
- self.ROLE_ASSIGNMENT, CREATED_OPERATION)
- self._assert_last_note(action, self.user_id, event_type)
- self._assert_event(role, project, domain, user, group)
- self.delete(url)
- action = "%s.%s" % (DELETED_OPERATION, self.ROLE_ASSIGNMENT)
- event_type = '%s.%s.%s' % (notifications.SERVICE,
- self.ROLE_ASSIGNMENT, DELETED_OPERATION)
- self._assert_last_note(action, self.user_id, event_type)
- self._assert_event(role, project, domain, user, None)
-
- def test_user_project_grant(self):
- url = ('/projects/%s/users/%s/roles/%s' %
- (self.project_id, self.user_id, self.role_id))
- self._test_role_assignment(url, self.role_id,
- project=self.project_id,
- user=self.user_id)
-
- def test_group_domain_grant(self):
- group_ref = unit.new_group_ref(domain_id=self.domain_id)
- group = self.identity_api.create_group(group_ref)
- self.identity_api.add_user_to_group(self.user_id, group['id'])
- url = ('/domains/%s/groups/%s/roles/%s' %
- (self.domain_id, group['id'], self.role_id))
- self._test_role_assignment(url, self.role_id,
- domain=self.domain_id,
- user=self.user_id,
- group=group['id'])
-
- def test_add_role_to_user_and_project(self):
- # A notification is sent when add_role_to_user_and_project is called on
- # the assignment manager.
-
- project_ref = unit.new_project_ref(self.domain_id)
- project = self.resource_api.create_project(
- project_ref['id'], project_ref)
- tenant_id = project['id']
-
- self.assignment_api.add_role_to_user_and_project(
- self.user_id, tenant_id, self.role_id)
-
- self.assertTrue(self._notifications)
- note = self._notifications[-1]
- self.assertEqual('created.role_assignment', note['action'])
- self.assertTrue(note['send_notification_called'])
-
- self._assert_event(self.role_id, project=tenant_id, user=self.user_id)
-
- def test_remove_role_from_user_and_project(self):
- # A notification is sent when remove_role_from_user_and_project is
- # called on the assignment manager.
-
- self.assignment_api.remove_role_from_user_and_project(
- self.user_id, self.project_id, self.role_id)
-
- self.assertTrue(self._notifications)
- note = self._notifications[-1]
- self.assertEqual('deleted.role_assignment', note['action'])
- self.assertTrue(note['send_notification_called'])
-
- self._assert_event(self.role_id, project=self.project_id,
- user=self.user_id)
-
-
-class TestCallbackRegistration(unit.BaseTestCase):
- def setUp(self):
- super(TestCallbackRegistration, self).setUp()
- self.mock_log = mock.Mock()
- # Force the callback logging to occur
- self.mock_log.logger.getEffectiveLevel.return_value = logging.DEBUG
-
- def verify_log_message(self, data):
- """Verify log message.
-
- Tests that use this are a little brittle because adding more
- logging can break them.
-
- TODO(dstanek): remove the need for this in a future refactoring
-
- """
- log_fn = self.mock_log.debug
- self.assertEqual(len(data), log_fn.call_count)
- for datum in data:
- log_fn.assert_any_call(mock.ANY, datum)
-
- def test_a_function_callback(self):
- def callback(*args, **kwargs):
- pass
-
- resource_type = 'thing'
- with mock.patch('keystone.notifications.LOG', self.mock_log):
- notifications.register_event_callback(
- CREATED_OPERATION, resource_type, callback)
-
- callback = 'keystone.tests.unit.common.test_notifications.callback'
- expected_log_data = {
- 'callback': callback,
- 'event': 'identity.%s.created' % resource_type
- }
- self.verify_log_message([expected_log_data])
-
- def test_a_method_callback(self):
- class C(object):
- def callback(self, *args, **kwargs):
- pass
-
- with mock.patch('keystone.notifications.LOG', self.mock_log):
- notifications.register_event_callback(
- CREATED_OPERATION, 'thing', C().callback)
-
- callback = 'keystone.tests.unit.common.test_notifications.C.callback'
- expected_log_data = {
- 'callback': callback,
- 'event': 'identity.thing.created'
- }
- self.verify_log_message([expected_log_data])
-
- def test_a_list_of_callbacks(self):
- def callback(*args, **kwargs):
- pass
-
- class C(object):
- def callback(self, *args, **kwargs):
- pass
-
- with mock.patch('keystone.notifications.LOG', self.mock_log):
- notifications.register_event_callback(
- CREATED_OPERATION, 'thing', [callback, C().callback])
-
- callback_1 = 'keystone.tests.unit.common.test_notifications.callback'
- callback_2 = 'keystone.tests.unit.common.test_notifications.C.callback'
- expected_log_data = [
- {
- 'callback': callback_1,
- 'event': 'identity.thing.created'
- },
- {
- 'callback': callback_2,
- 'event': 'identity.thing.created'
- },
- ]
- self.verify_log_message(expected_log_data)
-
- def test_an_invalid_callback(self):
- self.assertRaises(TypeError,
- notifications.register_event_callback,
- (CREATED_OPERATION, 'thing', object()))
-
- def test_an_invalid_event(self):
- def callback(*args, **kwargs):
- pass
-
- self.assertRaises(ValueError,
- notifications.register_event_callback,
- uuid.uuid4().hex,
- 'thing',
- callback)
diff --git a/keystone-moon/keystone/tests/unit/common/test_pemutils.py b/keystone-moon/keystone/tests/unit/common/test_pemutils.py
deleted file mode 100644
index c2f58518..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_pemutils.py
+++ /dev/null
@@ -1,337 +0,0 @@
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import base64
-
-from six import moves
-
-from keystone.common import pemutils
-from keystone.tests import unit as tests
-
-
-# List of 2-tuples, (pem_type, pem_header)
-headers = pemutils.PEM_TYPE_TO_HEADER.items()
-
-
-def make_data(size, offset=0):
- return ''.join([chr(x % 255) for x in moves.range(offset, size + offset)])
-
-
-def make_base64_from_data(data):
- return base64.b64encode(data)
-
-
-def wrap_base64(base64_text):
- wrapped_text = '\n'.join([base64_text[x:x + 64]
- for x in moves.range(0, len(base64_text), 64)])
- wrapped_text += '\n'
- return wrapped_text
-
-
-def make_pem(header, data):
- base64_text = make_base64_from_data(data)
- wrapped_text = wrap_base64(base64_text)
-
- result = '-----BEGIN %s-----\n' % header
- result += wrapped_text
- result += '-----END %s-----\n' % header
-
- return result
-
-
-class PEM(object):
- """PEM text and it's associated data broken out, used for testing.
-
- """
- def __init__(self, pem_header='CERTIFICATE', pem_type='cert',
- data_size=70, data_offset=0):
- self.pem_header = pem_header
- self.pem_type = pem_type
- self.data_size = data_size
- self.data_offset = data_offset
- self.data = make_data(self.data_size, self.data_offset)
- self.base64_text = make_base64_from_data(self.data)
- self.wrapped_base64 = wrap_base64(self.base64_text)
- self.pem_text = make_pem(self.pem_header, self.data)
-
-
-class TestPEMParseResult(tests.BaseTestCase):
-
- def test_pem_types(self):
- for pem_type in pemutils.pem_types:
- pem_header = pemutils.PEM_TYPE_TO_HEADER[pem_type]
- r = pemutils.PEMParseResult(pem_type=pem_type)
- self.assertEqual(pem_type, r.pem_type)
- self.assertEqual(pem_header, r.pem_header)
-
- pem_type = 'xxx'
- self.assertRaises(ValueError,
- pemutils.PEMParseResult, pem_type=pem_type)
-
- def test_pem_headers(self):
- for pem_header in pemutils.pem_headers:
- pem_type = pemutils.PEM_HEADER_TO_TYPE[pem_header]
- r = pemutils.PEMParseResult(pem_header=pem_header)
- self.assertEqual(pem_type, r.pem_type)
- self.assertEqual(pem_header, r.pem_header)
-
- pem_header = 'xxx'
- self.assertRaises(ValueError,
- pemutils.PEMParseResult, pem_header=pem_header)
-
-
-class TestPEMParse(tests.BaseTestCase):
- def test_parse_none(self):
- text = ''
- text += 'bla bla\n'
- text += 'yada yada yada\n'
- text += 'burfl blatz bingo\n'
-
- parse_results = pemutils.parse_pem(text)
- self.assertEqual(0, len(parse_results))
-
- self.assertEqual(False, pemutils.is_pem(text))
-
- def test_parse_invalid(self):
- p = PEM(pem_type='xxx',
- pem_header='XXX')
- text = p.pem_text
-
- self.assertRaises(ValueError,
- pemutils.parse_pem, text)
-
- def test_parse_one(self):
- data_size = 70
- count = len(headers)
- pems = []
-
- for i in moves.range(count):
- pems.append(PEM(pem_type=headers[i][0],
- pem_header=headers[i][1],
- data_size=data_size + i,
- data_offset=i))
-
- for i in moves.range(count):
- p = pems[i]
- text = p.pem_text
-
- parse_results = pemutils.parse_pem(text)
- self.assertEqual(1, len(parse_results))
-
- r = parse_results[0]
- self.assertEqual(p.pem_type, r.pem_type)
- self.assertEqual(p.pem_header, r.pem_header)
- self.assertEqual(p.pem_text,
- text[r.pem_start:r.pem_end])
- self.assertEqual(p.wrapped_base64,
- text[r.base64_start:r.base64_end])
- self.assertEqual(p.data, r.binary_data)
-
- def test_parse_one_embedded(self):
- p = PEM(data_offset=0)
- text = ''
- text += 'bla bla\n'
- text += 'yada yada yada\n'
- text += p.pem_text
- text += 'burfl blatz bingo\n'
-
- parse_results = pemutils.parse_pem(text)
- self.assertEqual(1, len(parse_results))
-
- r = parse_results[0]
- self.assertEqual(p.pem_type, r.pem_type)
- self.assertEqual(p.pem_header, r.pem_header)
- self.assertEqual(p.pem_text,
- text[r.pem_start:r.pem_end])
- self.assertEqual(p.wrapped_base64,
- text[r.base64_start: r.base64_end])
- self.assertEqual(p.data, r.binary_data)
-
- def test_parse_multple(self):
- data_size = 70
- count = len(headers)
- pems = []
- text = ''
-
- for i in moves.range(count):
- pems.append(PEM(pem_type=headers[i][0],
- pem_header=headers[i][1],
- data_size=data_size + i,
- data_offset=i))
-
- for i in moves.range(count):
- text += pems[i].pem_text
-
- parse_results = pemutils.parse_pem(text)
- self.assertEqual(count, len(parse_results))
-
- for i in moves.range(count):
- r = parse_results[i]
- p = pems[i]
-
- self.assertEqual(p.pem_type, r.pem_type)
- self.assertEqual(p.pem_header, r.pem_header)
- self.assertEqual(p.pem_text,
- text[r.pem_start:r.pem_end])
- self.assertEqual(p.wrapped_base64,
- text[r.base64_start: r.base64_end])
- self.assertEqual(p.data, r.binary_data)
-
- def test_parse_multple_find_specific(self):
- data_size = 70
- count = len(headers)
- pems = []
- text = ''
-
- for i in moves.range(count):
- pems.append(PEM(pem_type=headers[i][0],
- pem_header=headers[i][1],
- data_size=data_size + i,
- data_offset=i))
-
- for i in moves.range(count):
- text += pems[i].pem_text
-
- for i in moves.range(count):
- parse_results = pemutils.parse_pem(text, pem_type=headers[i][0])
- self.assertEqual(1, len(parse_results))
-
- r = parse_results[0]
- p = pems[i]
-
- self.assertEqual(p.pem_type, r.pem_type)
- self.assertEqual(p.pem_header, r.pem_header)
- self.assertEqual(p.pem_text,
- text[r.pem_start:r.pem_end])
- self.assertEqual(p.wrapped_base64,
- text[r.base64_start:r.base64_end])
- self.assertEqual(p.data, r.binary_data)
-
- def test_parse_multple_embedded(self):
- data_size = 75
- count = len(headers)
- pems = []
- text = ''
-
- for i in moves.range(count):
- pems.append(PEM(pem_type=headers[i][0],
- pem_header=headers[i][1],
- data_size=data_size + i,
- data_offset=i))
-
- for i in moves.range(count):
- text += 'bla bla\n'
- text += 'yada yada yada\n'
- text += pems[i].pem_text
- text += 'burfl blatz bingo\n'
-
- parse_results = pemutils.parse_pem(text)
- self.assertEqual(count, len(parse_results))
-
- for i in moves.range(count):
- r = parse_results[i]
- p = pems[i]
-
- self.assertEqual(p.pem_type, r.pem_type)
- self.assertEqual(p.pem_header, r.pem_header)
- self.assertEqual(p.pem_text,
- text[r.pem_start:r.pem_end])
- self.assertEqual(p.wrapped_base64,
- text[r.base64_start:r.base64_end])
- self.assertEqual(p.data, r.binary_data)
-
- def test_get_pem_data_none(self):
- text = ''
- text += 'bla bla\n'
- text += 'yada yada yada\n'
- text += 'burfl blatz bingo\n'
-
- data = pemutils.get_pem_data(text)
- self.assertIsNone(data)
-
- def test_get_pem_data_invalid(self):
- p = PEM(pem_type='xxx',
- pem_header='XXX')
- text = p.pem_text
-
- self.assertRaises(ValueError,
- pemutils.get_pem_data, text)
-
- def test_get_pem_data(self):
- data_size = 70
- count = len(headers)
- pems = []
-
- for i in moves.range(count):
- pems.append(PEM(pem_type=headers[i][0],
- pem_header=headers[i][1],
- data_size=data_size + i,
- data_offset=i))
-
- for i in moves.range(count):
- p = pems[i]
- text = p.pem_text
-
- data = pemutils.get_pem_data(text, p.pem_type)
- self.assertEqual(p.data, data)
-
- def test_is_pem(self):
- data_size = 70
- count = len(headers)
- pems = []
-
- for i in moves.range(count):
- pems.append(PEM(pem_type=headers[i][0],
- pem_header=headers[i][1],
- data_size=data_size + i,
- data_offset=i))
-
- for i in moves.range(count):
- p = pems[i]
- text = p.pem_text
- self.assertTrue(pemutils.is_pem(text, pem_type=p.pem_type))
- self.assertFalse(pemutils.is_pem(text,
- pem_type=p.pem_type + 'xxx'))
-
- def test_base64_to_pem(self):
- data_size = 70
- count = len(headers)
- pems = []
-
- for i in moves.range(count):
- pems.append(PEM(pem_type=headers[i][0],
- pem_header=headers[i][1],
- data_size=data_size + i,
- data_offset=i))
-
- for i in moves.range(count):
- p = pems[i]
- pem = pemutils.base64_to_pem(p.base64_text, p.pem_type)
- self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data)
-
- def test_binary_to_pem(self):
- data_size = 70
- count = len(headers)
- pems = []
-
- for i in moves.range(count):
- pems.append(PEM(pem_type=headers[i][0],
- pem_header=headers[i][1],
- data_size=data_size + i,
- data_offset=i))
-
- for i in moves.range(count):
- p = pems[i]
- pem = pemutils.binary_to_pem(p.data, p.pem_type)
- self.assertEqual(pemutils.get_pem_data(pem, p.pem_type), p.data)
diff --git a/keystone-moon/keystone/tests/unit/common/test_sql_core.py b/keystone-moon/keystone/tests/unit/common/test_sql_core.py
deleted file mode 100644
index 7d20eb03..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_sql_core.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-from sqlalchemy.ext import declarative
-
-from keystone.common import sql
-from keystone.tests import unit
-from keystone.tests.unit import utils
-
-
-ModelBase = declarative.declarative_base()
-
-
-class TestModel(ModelBase, sql.ModelDictMixin):
- __tablename__ = 'testmodel'
- id = sql.Column(sql.String(64), primary_key=True)
- text = sql.Column(sql.String(64), nullable=False)
-
-
-class TestModelDictMixin(unit.BaseTestCase):
-
- def test_creating_a_model_instance_from_a_dict(self):
- d = {'id': utils.new_uuid(), 'text': utils.new_uuid()}
- m = TestModel.from_dict(d)
- self.assertEqual(d['id'], m.id)
- self.assertEqual(d['text'], m.text)
-
- def test_creating_a_dict_from_a_model_instance(self):
- m = TestModel(id=utils.new_uuid(), text=utils.new_uuid())
- d = m.to_dict()
- self.assertEqual(d['id'], m.id)
- self.assertEqual(d['text'], m.text)
-
- def test_creating_a_model_instance_from_an_invalid_dict(self):
- d = {'id': utils.new_uuid(), 'text': utils.new_uuid(), 'extra': None}
- self.assertRaises(TypeError, TestModel.from_dict, d)
-
- def test_creating_a_dict_from_a_model_instance_that_has_extra_attrs(self):
- expected = {'id': utils.new_uuid(), 'text': utils.new_uuid()}
- m = TestModel(id=expected['id'], text=expected['text'])
- m.extra = 'this should not be in the dictionary'
- self.assertEqual(expected, m.to_dict())
diff --git a/keystone-moon/keystone/tests/unit/common/test_utils.py b/keystone-moon/keystone/tests/unit/common/test_utils.py
deleted file mode 100644
index 3641aacd..00000000
--- a/keystone-moon/keystone/tests/unit/common/test_utils.py
+++ /dev/null
@@ -1,210 +0,0 @@
-# encoding: utf-8
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import uuid
-
-from oslo_config import cfg
-from oslo_config import fixture as config_fixture
-from oslo_serialization import jsonutils
-import six
-
-from keystone.common import utils as common_utils
-from keystone import exception
-from keystone.tests import unit
-from keystone.tests.unit import utils
-from keystone.version import service
-
-
-CONF = cfg.CONF
-
-TZ = utils.TZ
-
-
-class UtilsTestCase(unit.BaseTestCase):
- OPTIONAL = object()
-
- def setUp(self):
- super(UtilsTestCase, self).setUp()
- self.config_fixture = self.useFixture(config_fixture.Config(CONF))
-
- def test_resource_uuid(self):
- uuid_str = '536e28c2017e405e89b25a1ed777b952'
- self.assertEqual(uuid_str, common_utils.resource_uuid(uuid_str))
-
- # Exact 64 length string.
- uuid_str = ('536e28c2017e405e89b25a1ed777b952'
- 'f13de678ac714bb1b7d1e9a007c10db5')
- resource_id_namespace = common_utils.RESOURCE_ID_NAMESPACE
- transformed_id = uuid.uuid5(resource_id_namespace, uuid_str).hex
- self.assertEqual(transformed_id, common_utils.resource_uuid(uuid_str))
-
- # Non-ASCII character test.
- non_ascii_ = 'ß' * 32
- transformed_id = uuid.uuid5(resource_id_namespace, non_ascii_).hex
- self.assertEqual(transformed_id,
- common_utils.resource_uuid(non_ascii_))
-
- # This input is invalid because it's length is more than 64.
- invalid_input = 'x' * 65
- self.assertRaises(ValueError, common_utils.resource_uuid,
- invalid_input)
-
- # 64 length unicode string, to mimic what is returned from mapping_id
- # backend.
- uuid_str = six.text_type('536e28c2017e405e89b25a1ed777b952'
- 'f13de678ac714bb1b7d1e9a007c10db5')
- resource_id_namespace = common_utils.RESOURCE_ID_NAMESPACE
- if six.PY2:
- uuid_str = uuid_str.encode('utf-8')
- transformed_id = uuid.uuid5(resource_id_namespace, uuid_str).hex
- self.assertEqual(transformed_id, common_utils.resource_uuid(uuid_str))
-
- def test_hash(self):
- password = 'right'
- wrong = 'wrongwrong' # Two wrongs don't make a right
- hashed = common_utils.hash_password(password)
- self.assertTrue(common_utils.check_password(password, hashed))
- self.assertFalse(common_utils.check_password(wrong, hashed))
-
- def test_verify_normal_password_strict(self):
- self.config_fixture.config(strict_password_check=False)
- password = uuid.uuid4().hex
- verified = common_utils.verify_length_and_trunc_password(password)
- self.assertEqual(password, verified)
-
- def test_that_a_hash_can_not_be_validated_against_a_hash(self):
- # NOTE(dstanek): Bug 1279849 reported a problem where passwords
- # were not being hashed if they already looked like a hash. This
- # would allow someone to hash their password ahead of time
- # (potentially getting around password requirements, like
- # length) and then they could auth with their original password.
- password = uuid.uuid4().hex
- hashed_password = common_utils.hash_password(password)
- new_hashed_password = common_utils.hash_password(hashed_password)
- self.assertFalse(common_utils.check_password(password,
- new_hashed_password))
-
- def test_verify_long_password_strict(self):
- self.config_fixture.config(strict_password_check=False)
- self.config_fixture.config(group='identity', max_password_length=5)
- max_length = CONF.identity.max_password_length
- invalid_password = 'passw0rd'
- trunc = common_utils.verify_length_and_trunc_password(invalid_password)
- self.assertEqual(invalid_password[:max_length], trunc)
-
- def test_verify_long_password_strict_raises_exception(self):
- self.config_fixture.config(strict_password_check=True)
- self.config_fixture.config(group='identity', max_password_length=5)
- invalid_password = 'passw0rd'
- self.assertRaises(exception.PasswordVerificationError,
- common_utils.verify_length_and_trunc_password,
- invalid_password)
-
- def test_hash_long_password_truncation(self):
- self.config_fixture.config(strict_password_check=False)
- invalid_length_password = '0' * 9999999
- hashed = common_utils.hash_password(invalid_length_password)
- self.assertTrue(common_utils.check_password(invalid_length_password,
- hashed))
-
- def test_hash_long_password_strict(self):
- self.config_fixture.config(strict_password_check=True)
- invalid_length_password = '0' * 9999999
- self.assertRaises(exception.PasswordVerificationError,
- common_utils.hash_password,
- invalid_length_password)
-
- def _create_test_user(self, password=OPTIONAL):
- user = {"name": "hthtest"}
- if password is not self.OPTIONAL:
- user['password'] = password
-
- return user
-
- def test_hash_user_password_without_password(self):
- user = self._create_test_user()
- hashed = common_utils.hash_user_password(user)
- self.assertEqual(user, hashed)
-
- def test_hash_user_password_with_null_password(self):
- user = self._create_test_user(password=None)
- hashed = common_utils.hash_user_password(user)
- self.assertEqual(user, hashed)
-
- def test_hash_user_password_with_empty_password(self):
- password = ''
- user = self._create_test_user(password=password)
- user_hashed = common_utils.hash_user_password(user)
- password_hashed = user_hashed['password']
- self.assertTrue(common_utils.check_password(password, password_hashed))
-
- def test_hash_edge_cases(self):
- hashed = common_utils.hash_password('secret')
- self.assertFalse(common_utils.check_password('', hashed))
- self.assertFalse(common_utils.check_password(None, hashed))
-
- def test_hash_unicode(self):
- password = u'Comment \xe7a va'
- wrong = 'Comment ?a va'
- hashed = common_utils.hash_password(password)
- self.assertTrue(common_utils.check_password(password, hashed))
- self.assertFalse(common_utils.check_password(wrong, hashed))
-
- def test_auth_str_equal(self):
- self.assertTrue(common_utils.auth_str_equal('abc123', 'abc123'))
- self.assertFalse(common_utils.auth_str_equal('a', 'aaaaa'))
- self.assertFalse(common_utils.auth_str_equal('aaaaa', 'a'))
- self.assertFalse(common_utils.auth_str_equal('ABC123', 'abc123'))
-
- def test_unixtime(self):
- global TZ
-
- @utils.timezone
- def _test_unixtime():
- epoch = common_utils.unixtime(dt)
- self.assertEqual(epoch, epoch_ans, "TZ=%s" % TZ)
-
- dt = datetime.datetime(1970, 1, 2, 3, 4, 56, 0)
- epoch_ans = 56 + 4 * 60 + 3 * 3600 + 86400
- for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']:
- TZ = 'UTC' + d
- _test_unixtime()
-
- def test_pki_encoder(self):
- data = {'field': 'value'}
- json = jsonutils.dumps(data, cls=common_utils.PKIEncoder)
- expected_json = '{"field":"value"}'
- self.assertEqual(expected_json, json)
-
- def test_url_safe_check(self):
- base_str = 'i am safe'
- self.assertFalse(common_utils.is_not_url_safe(base_str))
- for i in common_utils.URL_RESERVED_CHARS:
- self.assertTrue(common_utils.is_not_url_safe(base_str + i))
-
- def test_url_safe_with_unicode_check(self):
- base_str = u'i am \xe7afe'
- self.assertFalse(common_utils.is_not_url_safe(base_str))
- for i in common_utils.URL_RESERVED_CHARS:
- self.assertTrue(common_utils.is_not_url_safe(base_str + i))
-
-
-class ServiceHelperTests(unit.BaseTestCase):
-
- @service.fail_gracefully
- def _do_test(self):
- raise Exception("Test Exc")
-
- def test_fail_gracefully(self):
- self.assertRaises(unit.UnexpectedExit, self._do_test)