aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_cert_setup.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_cert_setup.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_cert_setup.py243
1 files changed, 0 insertions, 243 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_cert_setup.py b/keystone-moon/keystone/tests/unit/test_cert_setup.py
deleted file mode 100644
index debf87f5..00000000
--- a/keystone-moon/keystone/tests/unit/test_cert_setup.py
+++ /dev/null
@@ -1,243 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import shutil
-
-import mock
-from six.moves import http_client
-from testtools import matchers
-
-from keystone.common import environment
-from keystone.common import openssl
-from keystone import exception
-from keystone.tests import unit
-from keystone.tests.unit import rest
-from keystone import token
-
-
-SSLDIR = unit.dirs.tmp('ssl')
-CONF = unit.CONF
-
-
-CERTDIR = os.path.join(SSLDIR, 'certs')
-KEYDIR = os.path.join(SSLDIR, 'private')
-
-
-class CertSetupTestCase(rest.RestfulTestCase):
-
- def setUp(self):
- super(CertSetupTestCase, self).setUp()
-
- def cleanup_ssldir():
- try:
- shutil.rmtree(SSLDIR)
- except OSError:
- pass
-
- self.addCleanup(cleanup_ssldir)
-
- def config_overrides(self):
- super(CertSetupTestCase, self).config_overrides()
- ca_certs = os.path.join(CERTDIR, 'ca.pem')
- ca_key = os.path.join(CERTDIR, 'cakey.pem')
-
- self.config_fixture.config(
- group='signing',
- certfile=os.path.join(CERTDIR, 'signing_cert.pem'),
- ca_certs=ca_certs,
- ca_key=ca_key,
- keyfile=os.path.join(KEYDIR, 'signing_key.pem'))
- self.config_fixture.config(
- group='ssl',
- ca_key=ca_key)
- self.config_fixture.config(
- group='eventlet_server_ssl',
- ca_certs=ca_certs,
- certfile=os.path.join(CERTDIR, 'keystone.pem'),
- keyfile=os.path.join(KEYDIR, 'keystonekey.pem'))
- self.config_fixture.config(group='token', provider='pkiz')
-
- def test_can_handle_missing_certs(self):
- controller = token.controllers.Auth()
-
- self.config_fixture.config(group='signing', certfile='invalid')
- user = unit.create_user(self.identity_api,
- domain_id=CONF.identity.default_domain_id)
- body_dict = {
- 'passwordCredentials': {
- 'userId': user['id'],
- 'password': user['password'],
- },
- }
- self.assertRaises(exception.UnexpectedError,
- controller.authenticate,
- {}, body_dict)
-
- def test_create_pki_certs(self, rebuild=False):
- pki = openssl.ConfigurePKI(None, None, rebuild=rebuild)
- pki.run()
- self.assertTrue(os.path.exists(CONF.signing.certfile))
- self.assertTrue(os.path.exists(CONF.signing.ca_certs))
- self.assertTrue(os.path.exists(CONF.signing.keyfile))
-
- def test_create_ssl_certs(self, rebuild=False):
- ssl = openssl.ConfigureSSL(None, None, rebuild=rebuild)
- ssl.run()
- self.assertTrue(os.path.exists(CONF.eventlet_server_ssl.ca_certs))
- self.assertTrue(os.path.exists(CONF.eventlet_server_ssl.certfile))
- self.assertTrue(os.path.exists(CONF.eventlet_server_ssl.keyfile))
-
- def test_fetch_signing_cert(self, rebuild=False):
- pki = openssl.ConfigurePKI(None, None, rebuild=rebuild)
- pki.run()
-
- # NOTE(jamielennox): Use request directly because certificate
- # requests don't have some of the normal information
- signing_resp = self.request(self.public_app,
- '/v2.0/certificates/signing',
- method='GET',
- expected_status=http_client.OK)
-
- cacert_resp = self.request(self.public_app,
- '/v2.0/certificates/ca',
- method='GET',
- expected_status=http_client.OK)
-
- with open(CONF.signing.certfile) as f:
- self.assertEqual(f.read(), signing_resp.text)
-
- with open(CONF.signing.ca_certs) as f:
- self.assertEqual(f.read(), cacert_resp.text)
-
- # NOTE(jamielennox): This is weird behaviour that we need to enforce.
- # It doesn't matter what you ask for it's always going to give text
- # with a text/html content_type.
-
- for path in ['/v2.0/certificates/signing', '/v2.0/certificates/ca']:
- for accept in [None, 'text/html', 'application/json', 'text/xml']:
- headers = {'Accept': accept} if accept else {}
- resp = self.request(self.public_app, path, method='GET',
- expected_status=http_client.OK,
- headers=headers)
-
- self.assertEqual('text/html', resp.content_type)
-
- def test_fetch_signing_cert_when_rebuild(self):
- pki = openssl.ConfigurePKI(None, None)
- pki.run()
- self.test_fetch_signing_cert(rebuild=True)
-
- def test_failure(self):
- for path in ['/v2.0/certificates/signing', '/v2.0/certificates/ca']:
- self.request(self.public_app, path, method='GET',
- expected_status=http_client.INTERNAL_SERVER_ERROR)
-
- def test_pki_certs_rebuild(self):
- self.test_create_pki_certs()
- with open(CONF.signing.certfile) as f:
- cert_file1 = f.read()
-
- self.test_create_pki_certs(rebuild=True)
- with open(CONF.signing.certfile) as f:
- cert_file2 = f.read()
-
- self.assertNotEqual(cert_file1, cert_file2)
-
- def test_ssl_certs_rebuild(self):
- self.test_create_ssl_certs()
- with open(CONF.eventlet_server_ssl.certfile) as f:
- cert_file1 = f.read()
-
- self.test_create_ssl_certs(rebuild=True)
- with open(CONF.eventlet_server_ssl.certfile) as f:
- cert_file2 = f.read()
-
- self.assertNotEqual(cert_file1, cert_file2)
-
- @mock.patch.object(os, 'remove')
- def test_rebuild_pki_certs_remove_error(self, mock_remove):
- self.test_create_pki_certs()
- with open(CONF.signing.certfile) as f:
- cert_file1 = f.read()
-
- mock_remove.side_effect = OSError()
- self.test_create_pki_certs(rebuild=True)
- with open(CONF.signing.certfile) as f:
- cert_file2 = f.read()
-
- self.assertEqual(cert_file1, cert_file2)
-
- @mock.patch.object(os, 'remove')
- def test_rebuild_ssl_certs_remove_error(self, mock_remove):
- self.test_create_ssl_certs()
- with open(CONF.eventlet_server_ssl.certfile) as f:
- cert_file1 = f.read()
-
- mock_remove.side_effect = OSError()
- self.test_create_ssl_certs(rebuild=True)
- with open(CONF.eventlet_server_ssl.certfile) as f:
- cert_file2 = f.read()
-
- self.assertEqual(cert_file1, cert_file2)
-
- def test_create_pki_certs_twice_without_rebuild(self):
- self.test_create_pki_certs()
- with open(CONF.signing.certfile) as f:
- cert_file1 = f.read()
-
- self.test_create_pki_certs()
- with open(CONF.signing.certfile) as f:
- cert_file2 = f.read()
-
- self.assertEqual(cert_file1, cert_file2)
-
- def test_create_ssl_certs_twice_without_rebuild(self):
- self.test_create_ssl_certs()
- with open(CONF.eventlet_server_ssl.certfile) as f:
- cert_file1 = f.read()
-
- self.test_create_ssl_certs()
- with open(CONF.eventlet_server_ssl.certfile) as f:
- cert_file2 = f.read()
-
- self.assertEqual(cert_file1, cert_file2)
-
-
-class TestExecCommand(unit.TestCase):
-
- @mock.patch.object(environment.subprocess.Popen, 'poll')
- def test_running_a_successful_command(self, mock_poll):
- mock_poll.return_value = 0
-
- ssl = openssl.ConfigureSSL('keystone_user', 'keystone_group')
- ssl.exec_command(['ls'])
-
- @mock.patch.object(environment.subprocess, 'check_output')
- def test_running_an_invalid_command(self, mock_check_output):
- cmd = ['ls']
-
- output = 'this is the output string'
-
- error = environment.subprocess.CalledProcessError(returncode=1,
- cmd=cmd,
- output=output)
- mock_check_output.side_effect = error
-
- ssl = openssl.ConfigureSSL('keystone_user', 'keystone_group')
- e = self.assertRaises(environment.subprocess.CalledProcessError,
- ssl.exec_command,
- cmd)
- self.assertThat(e.output, matchers.Equals(output))