From b8c756ecdd7cced1db4300935484e8c83701c82e Mon Sep 17 00:00:00 2001 From: WuKong Date: Tue, 30 Jun 2015 18:47:29 +0200 Subject: migrate moon code from github to opnfv Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604 Signed-off-by: WuKong --- .../keystone/tests/unit/test_v3_domain_config.py | 210 +++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 keystone-moon/keystone/tests/unit/test_v3_domain_config.py (limited to 'keystone-moon/keystone/tests/unit/test_v3_domain_config.py') diff --git a/keystone-moon/keystone/tests/unit/test_v3_domain_config.py b/keystone-moon/keystone/tests/unit/test_v3_domain_config.py new file mode 100644 index 00000000..6f96f0e7 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/test_v3_domain_config.py @@ -0,0 +1,210 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +from oslo_config import cfg + +from keystone import exception +from keystone.tests.unit import test_v3 + + +CONF = cfg.CONF + + +class DomainConfigTestCase(test_v3.RestfulTestCase): + """Test domain config support.""" + + def setUp(self): + super(DomainConfigTestCase, self).setUp() + + self.domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.resource_api.create_domain(self.domain['id'], self.domain) + self.config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + + def test_create_config(self): + """Call ``PUT /domains/{domain_id}/config``.""" + url = '/domains/%(domain_id)s/config' % { + 'domain_id': self.domain['id']} + r = self.put(url, body={'config': self.config}, + expected_status=201) + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(self.config, r.result['config']) + self.assertEqual(self.config, res) + + def test_create_config_twice(self): + """Check multiple creates don't throw error""" + self.put('/domains/%(domain_id)s/config' % { + 'domain_id': self.domain['id']}, + body={'config': self.config}, + expected_status=201) + self.put('/domains/%(domain_id)s/config' % { + 'domain_id': self.domain['id']}, + body={'config': self.config}, + expected_status=200) + + def test_delete_config(self): + """Call ``DELETE /domains{domain_id}/config``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + self.delete('/domains/%(domain_id)s/config' % { + 'domain_id': self.domain['id']}) + self.get('/domains/%(domain_id)s/config' % { + 'domain_id': self.domain['id']}, + expected_status=exception.DomainConfigNotFound.code) + + def test_delete_config_by_group(self): + """Call ``DELETE /domains{domain_id}/config/{group}``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + self.delete('/domains/%(domain_id)s/config/ldap' % { + 'domain_id': self.domain['id']}) + res = self.domain_config_api.get_config(self.domain['id']) + self.assertNotIn('ldap', res) + + def test_get_head_config(self): + """Call ``GET & HEAD for /domains{domain_id}/config``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + url = '/domains/%(domain_id)s/config' % { + 'domain_id': self.domain['id']} + r = self.get(url) + self.assertEqual(self.config, r.result['config']) + self.head(url, expected_status=200) + + def test_get_config_by_group(self): + """Call ``GET & HEAD /domains{domain_id}/config/{group}``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + url = '/domains/%(domain_id)s/config/ldap' % { + 'domain_id': self.domain['id']} + r = self.get(url) + self.assertEqual({'ldap': self.config['ldap']}, r.result['config']) + self.head(url, expected_status=200) + + def test_get_config_by_option(self): + """Call ``GET & HEAD /domains{domain_id}/config/{group}/{option}``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + url = '/domains/%(domain_id)s/config/ldap/url' % { + 'domain_id': self.domain['id']} + r = self.get(url) + self.assertEqual({'url': self.config['ldap']['url']}, + r.result['config']) + self.head(url, expected_status=200) + + def test_get_non_existant_config(self): + """Call ``GET /domains{domain_id}/config when no config defined``.""" + self.get('/domains/%(domain_id)s/config' % { + 'domain_id': self.domain['id']}, expected_status=404) + + def test_get_non_existant_config_group(self): + """Call ``GET /domains{domain_id}/config/{group_not_exist}``.""" + config = {'ldap': {'url': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + self.get('/domains/%(domain_id)s/config/identity' % { + 'domain_id': self.domain['id']}, expected_status=404) + + def test_get_non_existant_config_option(self): + """Call ``GET /domains{domain_id}/config/group/{option_not_exist}``.""" + config = {'ldap': {'url': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + self.get('/domains/%(domain_id)s/config/ldap/user_tree_dn' % { + 'domain_id': self.domain['id']}, expected_status=404) + + def test_update_config(self): + """Call ``PATCH /domains/{domain_id}/config``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + new_config = {'ldap': {'url': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + r = self.patch('/domains/%(domain_id)s/config' % { + 'domain_id': self.domain['id']}, + body={'config': new_config}) + res = self.domain_config_api.get_config(self.domain['id']) + expected_config = copy.deepcopy(self.config) + expected_config['ldap']['url'] = new_config['ldap']['url'] + expected_config['identity']['driver'] = ( + new_config['identity']['driver']) + self.assertEqual(expected_config, r.result['config']) + self.assertEqual(expected_config, res) + + def test_update_config_group(self): + """Call ``PATCH /domains/{domain_id}/config/{group}``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + new_config = {'ldap': {'url': uuid.uuid4().hex, + 'user_filter': uuid.uuid4().hex}} + r = self.patch('/domains/%(domain_id)s/config/ldap' % { + 'domain_id': self.domain['id']}, + body={'config': new_config}) + res = self.domain_config_api.get_config(self.domain['id']) + expected_config = copy.deepcopy(self.config) + expected_config['ldap']['url'] = new_config['ldap']['url'] + expected_config['ldap']['user_filter'] = ( + new_config['ldap']['user_filter']) + self.assertEqual(expected_config, r.result['config']) + self.assertEqual(expected_config, res) + + def test_update_config_invalid_group(self): + """Call ``PATCH /domains/{domain_id}/config/{invalid_group}``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + + # Trying to update a group that is neither whitelisted or sensitive + # should result in Forbidden. + invalid_group = uuid.uuid4().hex + new_config = {invalid_group: {'url': uuid.uuid4().hex, + 'user_filter': uuid.uuid4().hex}} + self.patch('/domains/%(domain_id)s/config/%(invalid_group)s' % { + 'domain_id': self.domain['id'], 'invalid_group': invalid_group}, + body={'config': new_config}, + expected_status=403) + # Trying to update a valid group, but one that is not in the current + # config should result in NotFound + config = {'ldap': {'suffix': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + new_config = {'identity': {'driver': uuid.uuid4().hex}} + self.patch('/domains/%(domain_id)s/config/identity' % { + 'domain_id': self.domain['id']}, + body={'config': new_config}, + expected_status=404) + + def test_update_config_option(self): + """Call ``PATCH /domains/{domain_id}/config/{group}/{option}``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + new_config = {'url': uuid.uuid4().hex} + r = self.patch('/domains/%(domain_id)s/config/ldap/url' % { + 'domain_id': self.domain['id']}, + body={'config': new_config}) + res = self.domain_config_api.get_config(self.domain['id']) + expected_config = copy.deepcopy(self.config) + expected_config['ldap']['url'] = new_config['url'] + self.assertEqual(expected_config, r.result['config']) + self.assertEqual(expected_config, res) + + def test_update_config_invalid_option(self): + """Call ``PATCH /domains/{domain_id}/config/{group}/{invalid}``.""" + self.domain_config_api.create_config(self.domain['id'], self.config) + invalid_option = uuid.uuid4().hex + new_config = {'ldap': {invalid_option: uuid.uuid4().hex}} + # Trying to update an option that is neither whitelisted or sensitive + # should result in Forbidden. + self.patch( + '/domains/%(domain_id)s/config/ldap/%(invalid_option)s' % { + 'domain_id': self.domain['id'], + 'invalid_option': invalid_option}, + body={'config': new_config}, + expected_status=403) + # Trying to update a valid option, but one that is not in the current + # config should result in NotFound + new_config = {'suffix': uuid.uuid4().hex} + self.patch( + '/domains/%(domain_id)s/config/ldap/suffix' % { + 'domain_id': self.domain['id']}, + body={'config': new_config}, + expected_status=404) -- cgit 1.2.3-korg