diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_credential.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_credential.py | 478 |
1 files changed, 0 insertions, 478 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_credential.py b/keystone-moon/keystone/tests/unit/test_v3_credential.py deleted file mode 100644 index 07995f19..00000000 --- a/keystone-moon/keystone/tests/unit/test_v3_credential.py +++ /dev/null @@ -1,478 +0,0 @@ -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import hashlib -import json -import uuid - -from keystoneclient.contrib.ec2 import utils as ec2_utils -from oslo_config import cfg -from six.moves import http_client -from testtools import matchers - -from keystone.common import utils -from keystone.contrib.ec2 import controllers -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import test_v3 - - -CONF = cfg.CONF -CRED_TYPE_EC2 = controllers.CRED_TYPE_EC2 - - -class CredentialBaseTestCase(test_v3.RestfulTestCase): - def _create_dict_blob_credential(self): - blob, credential = unit.new_ec2_credential(user_id=self.user['id'], - project_id=self.project_id) - - # Store the blob as a dict *not* JSON ref bug #1259584 - # This means we can test the dict->json workaround, added - # as part of the bugfix for backwards compatibility works. - credential['blob'] = blob - credential_id = credential['id'] - - # Create direct via the DB API to avoid validation failure - self.credential_api.create_credential(credential_id, credential) - - return json.dumps(blob), credential_id - - -class CredentialTestCase(CredentialBaseTestCase): - """Test credential CRUD.""" - - def setUp(self): - - super(CredentialTestCase, self).setUp() - - self.credential = unit.new_credential_ref(user_id=self.user['id'], - project_id=self.project_id) - - self.credential_api.create_credential( - self.credential['id'], - self.credential) - - def test_credential_api_delete_credentials_for_project(self): - self.credential_api.delete_credentials_for_project(self.project_id) - # Test that the credential that we created in .setUp no longer exists - # once we delete all credentials for self.project_id - self.assertRaises(exception.CredentialNotFound, - self.credential_api.get_credential, - credential_id=self.credential['id']) - - def test_credential_api_delete_credentials_for_user(self): - self.credential_api.delete_credentials_for_user(self.user_id) - # Test that the credential that we created in .setUp no longer exists - # once we delete all credentials for self.user_id - self.assertRaises(exception.CredentialNotFound, - self.credential_api.get_credential, - credential_id=self.credential['id']) - - def test_list_credentials(self): - """Call ``GET /credentials``.""" - r = self.get('/credentials') - self.assertValidCredentialListResponse(r, ref=self.credential) - - def test_list_credentials_filtered_by_user_id(self): - """Call ``GET /credentials?user_id={user_id}``.""" - credential = unit.new_credential_ref(user_id=uuid.uuid4().hex) - self.credential_api.create_credential(credential['id'], credential) - - r = self.get('/credentials?user_id=%s' % self.user['id']) - self.assertValidCredentialListResponse(r, ref=self.credential) - for cred in r.result['credentials']: - self.assertEqual(self.user['id'], cred['user_id']) - - def test_list_credentials_filtered_by_type(self): - """Call ``GET /credentials?type={type}``.""" - # The type ec2 was chosen, instead of a random string, - # because the type must be in the list of supported types - ec2_credential = unit.new_credential_ref(user_id=uuid.uuid4().hex, - project_id=self.project_id, - type=CRED_TYPE_EC2) - - ec2_resp = self.credential_api.create_credential( - ec2_credential['id'], ec2_credential) - - # The type cert was chosen for the same reason as ec2 - r = self.get('/credentials?type=cert') - - # Testing the filter for two different types - self.assertValidCredentialListResponse(r, ref=self.credential) - for cred in r.result['credentials']: - self.assertEqual('cert', cred['type']) - - r_ec2 = self.get('/credentials?type=ec2') - self.assertThat(r_ec2.result['credentials'], matchers.HasLength(1)) - cred_ec2 = r_ec2.result['credentials'][0] - - self.assertValidCredentialListResponse(r_ec2, ref=ec2_resp) - self.assertEqual(CRED_TYPE_EC2, cred_ec2['type']) - self.assertEqual(ec2_credential['id'], cred_ec2['id']) - - def test_list_credentials_filtered_by_type_and_user_id(self): - """Call ``GET /credentials?user_id={user_id}&type={type}``.""" - user1_id = uuid.uuid4().hex - user2_id = uuid.uuid4().hex - - # Creating credentials for two different users - credential_user1_ec2 = unit.new_credential_ref(user_id=user1_id, - type=CRED_TYPE_EC2) - credential_user1_cert = unit.new_credential_ref(user_id=user1_id) - credential_user2_cert = unit.new_credential_ref(user_id=user2_id) - - self.credential_api.create_credential( - credential_user1_ec2['id'], credential_user1_ec2) - self.credential_api.create_credential( - credential_user1_cert['id'], credential_user1_cert) - self.credential_api.create_credential( - credential_user2_cert['id'], credential_user2_cert) - - r = self.get('/credentials?user_id=%s&type=ec2' % user1_id) - self.assertValidCredentialListResponse(r, ref=credential_user1_ec2) - self.assertThat(r.result['credentials'], matchers.HasLength(1)) - cred = r.result['credentials'][0] - self.assertEqual(CRED_TYPE_EC2, cred['type']) - self.assertEqual(user1_id, cred['user_id']) - - def test_create_credential(self): - """Call ``POST /credentials``.""" - ref = unit.new_credential_ref(user_id=self.user['id']) - r = self.post( - '/credentials', - body={'credential': ref}) - self.assertValidCredentialResponse(r, ref) - - def test_get_credential(self): - """Call ``GET /credentials/{credential_id}``.""" - r = self.get( - '/credentials/%(credential_id)s' % { - 'credential_id': self.credential['id']}) - self.assertValidCredentialResponse(r, self.credential) - - def test_update_credential(self): - """Call ``PATCH /credentials/{credential_id}``.""" - ref = unit.new_credential_ref(user_id=self.user['id'], - project_id=self.project_id) - del ref['id'] - r = self.patch( - '/credentials/%(credential_id)s' % { - 'credential_id': self.credential['id']}, - body={'credential': ref}) - self.assertValidCredentialResponse(r, ref) - - def test_delete_credential(self): - """Call ``DELETE /credentials/{credential_id}``.""" - self.delete( - '/credentials/%(credential_id)s' % { - 'credential_id': self.credential['id']}) - - def test_create_ec2_credential(self): - """Call ``POST /credentials`` for creating ec2 credential.""" - blob, ref = unit.new_ec2_credential(user_id=self.user['id'], - project_id=self.project_id) - r = self.post('/credentials', body={'credential': ref}) - self.assertValidCredentialResponse(r, ref) - # Assert credential id is same as hash of access key id for - # ec2 credentials - access = blob['access'].encode('utf-8') - self.assertEqual(hashlib.sha256(access).hexdigest(), - r.result['credential']['id']) - # Create second ec2 credential with the same access key id and check - # for conflict. - self.post( - '/credentials', - body={'credential': ref}, expected_status=http_client.CONFLICT) - - def test_get_ec2_dict_blob(self): - """Ensure non-JSON blob data is correctly converted.""" - expected_blob, credential_id = self._create_dict_blob_credential() - - r = self.get( - '/credentials/%(credential_id)s' % { - 'credential_id': credential_id}) - - # use json.loads to transform the blobs back into Python dictionaries - # to avoid problems with the keys being in different orders. - self.assertEqual(json.loads(expected_blob), - json.loads(r.result['credential']['blob'])) - - def test_list_ec2_dict_blob(self): - """Ensure non-JSON blob data is correctly converted.""" - expected_blob, credential_id = self._create_dict_blob_credential() - - list_r = self.get('/credentials') - list_creds = list_r.result['credentials'] - list_ids = [r['id'] for r in list_creds] - self.assertIn(credential_id, list_ids) - # use json.loads to transform the blobs back into Python dictionaries - # to avoid problems with the keys being in different orders. - for r in list_creds: - if r['id'] == credential_id: - self.assertEqual(json.loads(expected_blob), - json.loads(r['blob'])) - - def test_create_non_ec2_credential(self): - """Test creating non-ec2 credential. - - Call ``POST /credentials``. - """ - blob, ref = unit.new_cert_credential(user_id=self.user['id']) - - r = self.post('/credentials', body={'credential': ref}) - self.assertValidCredentialResponse(r, ref) - # Assert credential id is not same as hash of access key id for - # non-ec2 credentials - access = blob['access'].encode('utf-8') - self.assertNotEqual(hashlib.sha256(access).hexdigest(), - r.result['credential']['id']) - - def test_create_ec2_credential_with_missing_project_id(self): - """Test Creating ec2 credential with missing project_id. - - Call ``POST /credentials``. - """ - _, ref = unit.new_ec2_credential(user_id=self.user['id'], - project_id=None) - # Assert bad request status when missing project_id - self.post( - '/credentials', - body={'credential': ref}, expected_status=http_client.BAD_REQUEST) - - def test_create_ec2_credential_with_invalid_blob(self): - """Test creating ec2 credential with invalid blob. - - Call ``POST /credentials``. - """ - ref = unit.new_credential_ref(user_id=self.user['id'], - project_id=self.project_id, - blob='{"abc":"def"d}', - type=CRED_TYPE_EC2) - # Assert bad request status when request contains invalid blob - response = self.post( - '/credentials', - body={'credential': ref}, expected_status=http_client.BAD_REQUEST) - self.assertValidErrorResponse(response) - - def test_create_credential_with_admin_token(self): - # Make sure we can create credential with the static admin token - ref = unit.new_credential_ref(user_id=self.user['id']) - r = self.post( - '/credentials', - body={'credential': ref}, - token=self.get_admin_token()) - self.assertValidCredentialResponse(r, ref) - - -class TestCredentialTrustScoped(test_v3.RestfulTestCase): - """Test credential with trust scoped token.""" - - def setUp(self): - super(TestCredentialTrustScoped, self).setUp() - - self.trustee_user = unit.new_user_ref(domain_id=self.domain_id) - password = self.trustee_user['password'] - self.trustee_user = self.identity_api.create_user(self.trustee_user) - self.trustee_user['password'] = password - self.trustee_user_id = self.trustee_user['id'] - - def config_overrides(self): - super(TestCredentialTrustScoped, self).config_overrides() - self.config_fixture.config(group='trust', enabled=True) - - def test_trust_scoped_ec2_credential(self): - """Test creating trust scoped ec2 credential. - - Call ``POST /credentials``. - """ - # Create the trust - ref = unit.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - impersonation=True, - expires=dict(minutes=1), - role_ids=[self.role_id]) - del ref['id'] - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - trust = self.assertValidTrustResponse(r) - - # Get a trust scoped token - auth_data = self.build_authentication_request( - user_id=self.trustee_user['id'], - password=self.trustee_user['password'], - trust_id=trust['id']) - r = self.v3_create_token(auth_data) - self.assertValidProjectScopedTokenResponse(r, self.user) - trust_id = r.result['token']['OS-TRUST:trust']['id'] - token_id = r.headers.get('X-Subject-Token') - - # Create the credential with the trust scoped token - blob, ref = unit.new_ec2_credential(user_id=self.user['id'], - project_id=self.project_id) - r = self.post('/credentials', body={'credential': ref}, token=token_id) - - # We expect the response blob to contain the trust_id - ret_ref = ref.copy() - ret_blob = blob.copy() - ret_blob['trust_id'] = trust_id - ret_ref['blob'] = json.dumps(ret_blob) - self.assertValidCredentialResponse(r, ref=ret_ref) - - # Assert credential id is same as hash of access key id for - # ec2 credentials - access = blob['access'].encode('utf-8') - self.assertEqual(hashlib.sha256(access).hexdigest(), - r.result['credential']['id']) - - # Create second ec2 credential with the same access key id and check - # for conflict. - self.post( - '/credentials', - body={'credential': ref}, - token=token_id, - expected_status=http_client.CONFLICT) - - -class TestCredentialEc2(CredentialBaseTestCase): - """Test v3 credential compatibility with ec2tokens.""" - - def setUp(self): - super(TestCredentialEc2, self).setUp() - - def _validate_signature(self, access, secret): - """Test signature validation with the access/secret provided.""" - signer = ec2_utils.Ec2Signer(secret) - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - request = {'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - signature = signer.generate(request) - - # Now make a request to validate the signed dummy request via the - # ec2tokens API. This proves the v3 ec2 credentials actually work. - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - r = self.post( - '/ec2tokens', - body={'ec2Credentials': sig_ref}, - expected_status=http_client.OK) - self.assertValidTokenResponse(r) - - def test_ec2_credential_signature_validate(self): - """Test signature validation with a v3 ec2 credential.""" - blob, ref = unit.new_ec2_credential(user_id=self.user['id'], - project_id=self.project_id) - r = self.post('/credentials', body={'credential': ref}) - self.assertValidCredentialResponse(r, ref) - # Assert credential id is same as hash of access key id - access = blob['access'].encode('utf-8') - self.assertEqual(hashlib.sha256(access).hexdigest(), - r.result['credential']['id']) - - cred_blob = json.loads(r.result['credential']['blob']) - self.assertEqual(blob, cred_blob) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) - - def test_ec2_credential_signature_validate_legacy(self): - """Test signature validation with a legacy v3 ec2 credential.""" - cred_json, _ = self._create_dict_blob_credential() - cred_blob = json.loads(cred_json) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) - - def _get_ec2_cred_uri(self): - return '/users/%s/credentials/OS-EC2' % self.user_id - - def _get_ec2_cred(self): - uri = self._get_ec2_cred_uri() - r = self.post(uri, body={'tenant_id': self.project_id}) - return r.result['credential'] - - def test_ec2_create_credential(self): - """Test ec2 credential creation.""" - ec2_cred = self._get_ec2_cred() - self.assertEqual(self.user_id, ec2_cred['user_id']) - self.assertEqual(self.project_id, ec2_cred['tenant_id']) - self.assertIsNone(ec2_cred['trust_id']) - self._validate_signature(access=ec2_cred['access'], - secret=ec2_cred['secret']) - uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) - self.assertThat(ec2_cred['links']['self'], - matchers.EndsWith(uri)) - - def test_ec2_get_credential(self): - ec2_cred = self._get_ec2_cred() - uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) - r = self.get(uri) - self.assertDictEqual(ec2_cred, r.result['credential']) - self.assertThat(ec2_cred['links']['self'], - matchers.EndsWith(uri)) - - def test_ec2_cannot_get_non_ec2_credential(self): - access_key = uuid.uuid4().hex - cred_id = utils.hash_access_key(access_key) - non_ec2_cred = unit.new_credential_ref( - user_id=self.user_id, - project_id=self.project_id) - non_ec2_cred['id'] = cred_id - self.credential_api.create_credential(cred_id, non_ec2_cred) - uri = '/'.join([self._get_ec2_cred_uri(), access_key]) - # if access_key is not found, ec2 controller raises Unauthorized - # exception - self.get(uri, expected_status=http_client.UNAUTHORIZED) - - def test_ec2_list_credentials(self): - """Test ec2 credential listing.""" - self._get_ec2_cred() - uri = self._get_ec2_cred_uri() - r = self.get(uri) - cred_list = r.result['credentials'] - self.assertEqual(1, len(cred_list)) - self.assertThat(r.result['links']['self'], - matchers.EndsWith(uri)) - - # non-EC2 credentials won't be fetched - non_ec2_cred = unit.new_credential_ref( - user_id=self.user_id, - project_id=self.project_id) - non_ec2_cred['type'] = uuid.uuid4().hex - self.credential_api.create_credential(non_ec2_cred['id'], - non_ec2_cred) - r = self.get(uri) - cred_list_2 = r.result['credentials'] - # still one element because non-EC2 credentials are not returned. - self.assertEqual(1, len(cred_list_2)) - self.assertEqual(cred_list[0], cred_list_2[0]) - - def test_ec2_delete_credential(self): - """Test ec2 credential deletion.""" - ec2_cred = self._get_ec2_cred() - uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) - cred_from_credential_api = ( - self.credential_api - .list_credentials_for_user(self.user_id, type=CRED_TYPE_EC2)) - self.assertEqual(1, len(cred_from_credential_api)) - self.delete(uri) - self.assertRaises(exception.CredentialNotFound, - self.credential_api.get_credential, - cred_from_credential_api[0]['id']) |