aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_credential.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_credential.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_credential.py478
1 files changed, 0 insertions, 478 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_credential.py b/keystone-moon/keystone/tests/unit/test_v3_credential.py
deleted file mode 100644
index 07995f19..00000000
--- a/keystone-moon/keystone/tests/unit/test_v3_credential.py
+++ /dev/null
@@ -1,478 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import hashlib
-import json
-import uuid
-
-from keystoneclient.contrib.ec2 import utils as ec2_utils
-from oslo_config import cfg
-from six.moves import http_client
-from testtools import matchers
-
-from keystone.common import utils
-from keystone.contrib.ec2 import controllers
-from keystone import exception
-from keystone.tests import unit
-from keystone.tests.unit import test_v3
-
-
-CONF = cfg.CONF
-CRED_TYPE_EC2 = controllers.CRED_TYPE_EC2
-
-
-class CredentialBaseTestCase(test_v3.RestfulTestCase):
- def _create_dict_blob_credential(self):
- blob, credential = unit.new_ec2_credential(user_id=self.user['id'],
- project_id=self.project_id)
-
- # Store the blob as a dict *not* JSON ref bug #1259584
- # This means we can test the dict->json workaround, added
- # as part of the bugfix for backwards compatibility works.
- credential['blob'] = blob
- credential_id = credential['id']
-
- # Create direct via the DB API to avoid validation failure
- self.credential_api.create_credential(credential_id, credential)
-
- return json.dumps(blob), credential_id
-
-
-class CredentialTestCase(CredentialBaseTestCase):
- """Test credential CRUD."""
-
- def setUp(self):
-
- super(CredentialTestCase, self).setUp()
-
- self.credential = unit.new_credential_ref(user_id=self.user['id'],
- project_id=self.project_id)
-
- self.credential_api.create_credential(
- self.credential['id'],
- self.credential)
-
- def test_credential_api_delete_credentials_for_project(self):
- self.credential_api.delete_credentials_for_project(self.project_id)
- # Test that the credential that we created in .setUp no longer exists
- # once we delete all credentials for self.project_id
- self.assertRaises(exception.CredentialNotFound,
- self.credential_api.get_credential,
- credential_id=self.credential['id'])
-
- def test_credential_api_delete_credentials_for_user(self):
- self.credential_api.delete_credentials_for_user(self.user_id)
- # Test that the credential that we created in .setUp no longer exists
- # once we delete all credentials for self.user_id
- self.assertRaises(exception.CredentialNotFound,
- self.credential_api.get_credential,
- credential_id=self.credential['id'])
-
- def test_list_credentials(self):
- """Call ``GET /credentials``."""
- r = self.get('/credentials')
- self.assertValidCredentialListResponse(r, ref=self.credential)
-
- def test_list_credentials_filtered_by_user_id(self):
- """Call ``GET /credentials?user_id={user_id}``."""
- credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
- self.credential_api.create_credential(credential['id'], credential)
-
- r = self.get('/credentials?user_id=%s' % self.user['id'])
- self.assertValidCredentialListResponse(r, ref=self.credential)
- for cred in r.result['credentials']:
- self.assertEqual(self.user['id'], cred['user_id'])
-
- def test_list_credentials_filtered_by_type(self):
- """Call ``GET /credentials?type={type}``."""
- # The type ec2 was chosen, instead of a random string,
- # because the type must be in the list of supported types
- ec2_credential = unit.new_credential_ref(user_id=uuid.uuid4().hex,
- project_id=self.project_id,
- type=CRED_TYPE_EC2)
-
- ec2_resp = self.credential_api.create_credential(
- ec2_credential['id'], ec2_credential)
-
- # The type cert was chosen for the same reason as ec2
- r = self.get('/credentials?type=cert')
-
- # Testing the filter for two different types
- self.assertValidCredentialListResponse(r, ref=self.credential)
- for cred in r.result['credentials']:
- self.assertEqual('cert', cred['type'])
-
- r_ec2 = self.get('/credentials?type=ec2')
- self.assertThat(r_ec2.result['credentials'], matchers.HasLength(1))
- cred_ec2 = r_ec2.result['credentials'][0]
-
- self.assertValidCredentialListResponse(r_ec2, ref=ec2_resp)
- self.assertEqual(CRED_TYPE_EC2, cred_ec2['type'])
- self.assertEqual(ec2_credential['id'], cred_ec2['id'])
-
- def test_list_credentials_filtered_by_type_and_user_id(self):
- """Call ``GET /credentials?user_id={user_id}&type={type}``."""
- user1_id = uuid.uuid4().hex
- user2_id = uuid.uuid4().hex
-
- # Creating credentials for two different users
- credential_user1_ec2 = unit.new_credential_ref(user_id=user1_id,
- type=CRED_TYPE_EC2)
- credential_user1_cert = unit.new_credential_ref(user_id=user1_id)
- credential_user2_cert = unit.new_credential_ref(user_id=user2_id)
-
- self.credential_api.create_credential(
- credential_user1_ec2['id'], credential_user1_ec2)
- self.credential_api.create_credential(
- credential_user1_cert['id'], credential_user1_cert)
- self.credential_api.create_credential(
- credential_user2_cert['id'], credential_user2_cert)
-
- r = self.get('/credentials?user_id=%s&type=ec2' % user1_id)
- self.assertValidCredentialListResponse(r, ref=credential_user1_ec2)
- self.assertThat(r.result['credentials'], matchers.HasLength(1))
- cred = r.result['credentials'][0]
- self.assertEqual(CRED_TYPE_EC2, cred['type'])
- self.assertEqual(user1_id, cred['user_id'])
-
- def test_create_credential(self):
- """Call ``POST /credentials``."""
- ref = unit.new_credential_ref(user_id=self.user['id'])
- r = self.post(
- '/credentials',
- body={'credential': ref})
- self.assertValidCredentialResponse(r, ref)
-
- def test_get_credential(self):
- """Call ``GET /credentials/{credential_id}``."""
- r = self.get(
- '/credentials/%(credential_id)s' % {
- 'credential_id': self.credential['id']})
- self.assertValidCredentialResponse(r, self.credential)
-
- def test_update_credential(self):
- """Call ``PATCH /credentials/{credential_id}``."""
- ref = unit.new_credential_ref(user_id=self.user['id'],
- project_id=self.project_id)
- del ref['id']
- r = self.patch(
- '/credentials/%(credential_id)s' % {
- 'credential_id': self.credential['id']},
- body={'credential': ref})
- self.assertValidCredentialResponse(r, ref)
-
- def test_delete_credential(self):
- """Call ``DELETE /credentials/{credential_id}``."""
- self.delete(
- '/credentials/%(credential_id)s' % {
- 'credential_id': self.credential['id']})
-
- def test_create_ec2_credential(self):
- """Call ``POST /credentials`` for creating ec2 credential."""
- blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
- project_id=self.project_id)
- r = self.post('/credentials', body={'credential': ref})
- self.assertValidCredentialResponse(r, ref)
- # Assert credential id is same as hash of access key id for
- # ec2 credentials
- access = blob['access'].encode('utf-8')
- self.assertEqual(hashlib.sha256(access).hexdigest(),
- r.result['credential']['id'])
- # Create second ec2 credential with the same access key id and check
- # for conflict.
- self.post(
- '/credentials',
- body={'credential': ref}, expected_status=http_client.CONFLICT)
-
- def test_get_ec2_dict_blob(self):
- """Ensure non-JSON blob data is correctly converted."""
- expected_blob, credential_id = self._create_dict_blob_credential()
-
- r = self.get(
- '/credentials/%(credential_id)s' % {
- 'credential_id': credential_id})
-
- # use json.loads to transform the blobs back into Python dictionaries
- # to avoid problems with the keys being in different orders.
- self.assertEqual(json.loads(expected_blob),
- json.loads(r.result['credential']['blob']))
-
- def test_list_ec2_dict_blob(self):
- """Ensure non-JSON blob data is correctly converted."""
- expected_blob, credential_id = self._create_dict_blob_credential()
-
- list_r = self.get('/credentials')
- list_creds = list_r.result['credentials']
- list_ids = [r['id'] for r in list_creds]
- self.assertIn(credential_id, list_ids)
- # use json.loads to transform the blobs back into Python dictionaries
- # to avoid problems with the keys being in different orders.
- for r in list_creds:
- if r['id'] == credential_id:
- self.assertEqual(json.loads(expected_blob),
- json.loads(r['blob']))
-
- def test_create_non_ec2_credential(self):
- """Test creating non-ec2 credential.
-
- Call ``POST /credentials``.
- """
- blob, ref = unit.new_cert_credential(user_id=self.user['id'])
-
- r = self.post('/credentials', body={'credential': ref})
- self.assertValidCredentialResponse(r, ref)
- # Assert credential id is not same as hash of access key id for
- # non-ec2 credentials
- access = blob['access'].encode('utf-8')
- self.assertNotEqual(hashlib.sha256(access).hexdigest(),
- r.result['credential']['id'])
-
- def test_create_ec2_credential_with_missing_project_id(self):
- """Test Creating ec2 credential with missing project_id.
-
- Call ``POST /credentials``.
- """
- _, ref = unit.new_ec2_credential(user_id=self.user['id'],
- project_id=None)
- # Assert bad request status when missing project_id
- self.post(
- '/credentials',
- body={'credential': ref}, expected_status=http_client.BAD_REQUEST)
-
- def test_create_ec2_credential_with_invalid_blob(self):
- """Test creating ec2 credential with invalid blob.
-
- Call ``POST /credentials``.
- """
- ref = unit.new_credential_ref(user_id=self.user['id'],
- project_id=self.project_id,
- blob='{"abc":"def"d}',
- type=CRED_TYPE_EC2)
- # Assert bad request status when request contains invalid blob
- response = self.post(
- '/credentials',
- body={'credential': ref}, expected_status=http_client.BAD_REQUEST)
- self.assertValidErrorResponse(response)
-
- def test_create_credential_with_admin_token(self):
- # Make sure we can create credential with the static admin token
- ref = unit.new_credential_ref(user_id=self.user['id'])
- r = self.post(
- '/credentials',
- body={'credential': ref},
- token=self.get_admin_token())
- self.assertValidCredentialResponse(r, ref)
-
-
-class TestCredentialTrustScoped(test_v3.RestfulTestCase):
- """Test credential with trust scoped token."""
-
- def setUp(self):
- super(TestCredentialTrustScoped, self).setUp()
-
- self.trustee_user = unit.new_user_ref(domain_id=self.domain_id)
- password = self.trustee_user['password']
- self.trustee_user = self.identity_api.create_user(self.trustee_user)
- self.trustee_user['password'] = password
- self.trustee_user_id = self.trustee_user['id']
-
- def config_overrides(self):
- super(TestCredentialTrustScoped, self).config_overrides()
- self.config_fixture.config(group='trust', enabled=True)
-
- def test_trust_scoped_ec2_credential(self):
- """Test creating trust scoped ec2 credential.
-
- Call ``POST /credentials``.
- """
- # Create the trust
- ref = unit.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- impersonation=True,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
- del ref['id']
- r = self.post('/OS-TRUST/trusts', body={'trust': ref})
- trust = self.assertValidTrustResponse(r)
-
- # Get a trust scoped token
- auth_data = self.build_authentication_request(
- user_id=self.trustee_user['id'],
- password=self.trustee_user['password'],
- trust_id=trust['id'])
- r = self.v3_create_token(auth_data)
- self.assertValidProjectScopedTokenResponse(r, self.user)
- trust_id = r.result['token']['OS-TRUST:trust']['id']
- token_id = r.headers.get('X-Subject-Token')
-
- # Create the credential with the trust scoped token
- blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
- project_id=self.project_id)
- r = self.post('/credentials', body={'credential': ref}, token=token_id)
-
- # We expect the response blob to contain the trust_id
- ret_ref = ref.copy()
- ret_blob = blob.copy()
- ret_blob['trust_id'] = trust_id
- ret_ref['blob'] = json.dumps(ret_blob)
- self.assertValidCredentialResponse(r, ref=ret_ref)
-
- # Assert credential id is same as hash of access key id for
- # ec2 credentials
- access = blob['access'].encode('utf-8')
- self.assertEqual(hashlib.sha256(access).hexdigest(),
- r.result['credential']['id'])
-
- # Create second ec2 credential with the same access key id and check
- # for conflict.
- self.post(
- '/credentials',
- body={'credential': ref},
- token=token_id,
- expected_status=http_client.CONFLICT)
-
-
-class TestCredentialEc2(CredentialBaseTestCase):
- """Test v3 credential compatibility with ec2tokens."""
-
- def setUp(self):
- super(TestCredentialEc2, self).setUp()
-
- def _validate_signature(self, access, secret):
- """Test signature validation with the access/secret provided."""
- signer = ec2_utils.Ec2Signer(secret)
- params = {'SignatureMethod': 'HmacSHA256',
- 'SignatureVersion': '2',
- 'AWSAccessKeyId': access}
- request = {'host': 'foo',
- 'verb': 'GET',
- 'path': '/bar',
- 'params': params}
- signature = signer.generate(request)
-
- # Now make a request to validate the signed dummy request via the
- # ec2tokens API. This proves the v3 ec2 credentials actually work.
- sig_ref = {'access': access,
- 'signature': signature,
- 'host': 'foo',
- 'verb': 'GET',
- 'path': '/bar',
- 'params': params}
- r = self.post(
- '/ec2tokens',
- body={'ec2Credentials': sig_ref},
- expected_status=http_client.OK)
- self.assertValidTokenResponse(r)
-
- def test_ec2_credential_signature_validate(self):
- """Test signature validation with a v3 ec2 credential."""
- blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
- project_id=self.project_id)
- r = self.post('/credentials', body={'credential': ref})
- self.assertValidCredentialResponse(r, ref)
- # Assert credential id is same as hash of access key id
- access = blob['access'].encode('utf-8')
- self.assertEqual(hashlib.sha256(access).hexdigest(),
- r.result['credential']['id'])
-
- cred_blob = json.loads(r.result['credential']['blob'])
- self.assertEqual(blob, cred_blob)
- self._validate_signature(access=cred_blob['access'],
- secret=cred_blob['secret'])
-
- def test_ec2_credential_signature_validate_legacy(self):
- """Test signature validation with a legacy v3 ec2 credential."""
- cred_json, _ = self._create_dict_blob_credential()
- cred_blob = json.loads(cred_json)
- self._validate_signature(access=cred_blob['access'],
- secret=cred_blob['secret'])
-
- def _get_ec2_cred_uri(self):
- return '/users/%s/credentials/OS-EC2' % self.user_id
-
- def _get_ec2_cred(self):
- uri = self._get_ec2_cred_uri()
- r = self.post(uri, body={'tenant_id': self.project_id})
- return r.result['credential']
-
- def test_ec2_create_credential(self):
- """Test ec2 credential creation."""
- ec2_cred = self._get_ec2_cred()
- self.assertEqual(self.user_id, ec2_cred['user_id'])
- self.assertEqual(self.project_id, ec2_cred['tenant_id'])
- self.assertIsNone(ec2_cred['trust_id'])
- self._validate_signature(access=ec2_cred['access'],
- secret=ec2_cred['secret'])
- uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
- self.assertThat(ec2_cred['links']['self'],
- matchers.EndsWith(uri))
-
- def test_ec2_get_credential(self):
- ec2_cred = self._get_ec2_cred()
- uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
- r = self.get(uri)
- self.assertDictEqual(ec2_cred, r.result['credential'])
- self.assertThat(ec2_cred['links']['self'],
- matchers.EndsWith(uri))
-
- def test_ec2_cannot_get_non_ec2_credential(self):
- access_key = uuid.uuid4().hex
- cred_id = utils.hash_access_key(access_key)
- non_ec2_cred = unit.new_credential_ref(
- user_id=self.user_id,
- project_id=self.project_id)
- non_ec2_cred['id'] = cred_id
- self.credential_api.create_credential(cred_id, non_ec2_cred)
- uri = '/'.join([self._get_ec2_cred_uri(), access_key])
- # if access_key is not found, ec2 controller raises Unauthorized
- # exception
- self.get(uri, expected_status=http_client.UNAUTHORIZED)
-
- def test_ec2_list_credentials(self):
- """Test ec2 credential listing."""
- self._get_ec2_cred()
- uri = self._get_ec2_cred_uri()
- r = self.get(uri)
- cred_list = r.result['credentials']
- self.assertEqual(1, len(cred_list))
- self.assertThat(r.result['links']['self'],
- matchers.EndsWith(uri))
-
- # non-EC2 credentials won't be fetched
- non_ec2_cred = unit.new_credential_ref(
- user_id=self.user_id,
- project_id=self.project_id)
- non_ec2_cred['type'] = uuid.uuid4().hex
- self.credential_api.create_credential(non_ec2_cred['id'],
- non_ec2_cred)
- r = self.get(uri)
- cred_list_2 = r.result['credentials']
- # still one element because non-EC2 credentials are not returned.
- self.assertEqual(1, len(cred_list_2))
- self.assertEqual(cred_list[0], cred_list_2[0])
-
- def test_ec2_delete_credential(self):
- """Test ec2 credential deletion."""
- ec2_cred = self._get_ec2_cred()
- uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
- cred_from_credential_api = (
- self.credential_api
- .list_credentials_for_user(self.user_id, type=CRED_TYPE_EC2))
- self.assertEqual(1, len(cred_from_credential_api))
- self.delete(uri)
- self.assertRaises(exception.CredentialNotFound,
- self.credential_api.get_credential,
- cred_from_credential_api[0]['id'])