diff options
author | DUVAL Thomas <thomas.duval@orange.com> | 2016-06-09 09:11:50 +0200 |
---|---|---|
committer | DUVAL Thomas <thomas.duval@orange.com> | 2016-06-09 09:11:50 +0200 |
commit | 2e7b4f2027a1147ca28301e4f88adf8274b39a1f (patch) | |
tree | 8b8d94001ebe6cc34106cf813b538911a8d66d9a /keystone-moon/keystone/tests/unit/test_v3_credential.py | |
parent | a33bdcb627102a01244630a54cb4b5066b385a6a (diff) |
Update Keystone core to Mitaka.
Change-Id: Ia10d6add16f4a9d25d1f42d420661c46332e69db
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_credential.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_credential.py | 242 |
1 files changed, 127 insertions, 115 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_credential.py b/keystone-moon/keystone/tests/unit/test_v3_credential.py index dd8cf2dd..07995f19 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_credential.py +++ b/keystone-moon/keystone/tests/unit/test_v3_credential.py @@ -21,49 +21,46 @@ from oslo_config import cfg from six.moves import http_client from testtools import matchers +from keystone.common import utils +from keystone.contrib.ec2 import controllers from keystone import exception +from keystone.tests import unit from keystone.tests.unit import test_v3 CONF = cfg.CONF +CRED_TYPE_EC2 = controllers.CRED_TYPE_EC2 class CredentialBaseTestCase(test_v3.RestfulTestCase): def _create_dict_blob_credential(self): - blob = {"access": uuid.uuid4().hex, - "secret": uuid.uuid4().hex} - credential_id = hashlib.sha256(blob['access']).hexdigest() - credential = self.new_credential_ref( - user_id=self.user['id'], - project_id=self.project_id) - credential['id'] = credential_id + blob, credential = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) # Store the blob as a dict *not* JSON ref bug #1259584 # This means we can test the dict->json workaround, added # as part of the bugfix for backwards compatibility works. credential['blob'] = blob - credential['type'] = 'ec2' + credential_id = credential['id'] + # Create direct via the DB API to avoid validation failure - self.credential_api.create_credential( - credential_id, - credential) - expected_blob = json.dumps(blob) - return expected_blob, credential_id + self.credential_api.create_credential(credential_id, credential) + + return json.dumps(blob), credential_id class CredentialTestCase(CredentialBaseTestCase): """Test credential CRUD.""" + def setUp(self): super(CredentialTestCase, self).setUp() - self.credential_id = uuid.uuid4().hex - self.credential = self.new_credential_ref( - user_id=self.user['id'], - project_id=self.project_id) - self.credential['id'] = self.credential_id + self.credential = unit.new_credential_ref(user_id=self.user['id'], + project_id=self.project_id) + self.credential_api.create_credential( - self.credential_id, + self.credential['id'], self.credential) def test_credential_api_delete_credentials_for_project(self): @@ -72,7 +69,7 @@ class CredentialTestCase(CredentialBaseTestCase): # once we delete all credentials for self.project_id self.assertRaises(exception.CredentialNotFound, self.credential_api.get_credential, - credential_id=self.credential_id) + credential_id=self.credential['id']) def test_credential_api_delete_credentials_for_user(self): self.credential_api.delete_credentials_for_user(self.user_id) @@ -80,7 +77,7 @@ class CredentialTestCase(CredentialBaseTestCase): # once we delete all credentials for self.user_id self.assertRaises(exception.CredentialNotFound, self.credential_api.get_credential, - credential_id=self.credential_id) + credential_id=self.credential['id']) def test_list_credentials(self): """Call ``GET /credentials``.""" @@ -89,10 +86,8 @@ class CredentialTestCase(CredentialBaseTestCase): def test_list_credentials_filtered_by_user_id(self): """Call ``GET /credentials?user_id={user_id}``.""" - credential = self.new_credential_ref( - user_id=uuid.uuid4().hex) - self.credential_api.create_credential( - credential['id'], credential) + credential = unit.new_credential_ref(user_id=uuid.uuid4().hex) + self.credential_api.create_credential(credential['id'], credential) r = self.get('/credentials?user_id=%s' % self.user['id']) self.assertValidCredentialListResponse(r, ref=self.credential) @@ -103,9 +98,9 @@ class CredentialTestCase(CredentialBaseTestCase): """Call ``GET /credentials?type={type}``.""" # The type ec2 was chosen, instead of a random string, # because the type must be in the list of supported types - ec2_credential = self.new_credential_ref(user_id=uuid.uuid4().hex, + ec2_credential = unit.new_credential_ref(user_id=uuid.uuid4().hex, project_id=self.project_id, - cred_type='ec2') + type=CRED_TYPE_EC2) ec2_resp = self.credential_api.create_credential( ec2_credential['id'], ec2_credential) @@ -123,8 +118,8 @@ class CredentialTestCase(CredentialBaseTestCase): cred_ec2 = r_ec2.result['credentials'][0] self.assertValidCredentialListResponse(r_ec2, ref=ec2_resp) - self.assertEqual('ec2', cred_ec2['type']) - self.assertEqual(cred_ec2['id'], ec2_credential['id']) + self.assertEqual(CRED_TYPE_EC2, cred_ec2['type']) + self.assertEqual(ec2_credential['id'], cred_ec2['id']) def test_list_credentials_filtered_by_type_and_user_id(self): """Call ``GET /credentials?user_id={user_id}&type={type}``.""" @@ -132,12 +127,10 @@ class CredentialTestCase(CredentialBaseTestCase): user2_id = uuid.uuid4().hex # Creating credentials for two different users - credential_user1_ec2 = self.new_credential_ref( - user_id=user1_id, cred_type='ec2') - credential_user1_cert = self.new_credential_ref( - user_id=user1_id) - credential_user2_cert = self.new_credential_ref( - user_id=user2_id) + credential_user1_ec2 = unit.new_credential_ref(user_id=user1_id, + type=CRED_TYPE_EC2) + credential_user1_cert = unit.new_credential_ref(user_id=user1_id) + credential_user2_cert = unit.new_credential_ref(user_id=user2_id) self.credential_api.create_credential( credential_user1_ec2['id'], credential_user1_ec2) @@ -150,12 +143,12 @@ class CredentialTestCase(CredentialBaseTestCase): self.assertValidCredentialListResponse(r, ref=credential_user1_ec2) self.assertThat(r.result['credentials'], matchers.HasLength(1)) cred = r.result['credentials'][0] - self.assertEqual('ec2', cred['type']) + self.assertEqual(CRED_TYPE_EC2, cred['type']) self.assertEqual(user1_id, cred['user_id']) def test_create_credential(self): """Call ``POST /credentials``.""" - ref = self.new_credential_ref(user_id=self.user['id']) + ref = unit.new_credential_ref(user_id=self.user['id']) r = self.post( '/credentials', body={'credential': ref}) @@ -165,18 +158,17 @@ class CredentialTestCase(CredentialBaseTestCase): """Call ``GET /credentials/{credential_id}``.""" r = self.get( '/credentials/%(credential_id)s' % { - 'credential_id': self.credential_id}) + 'credential_id': self.credential['id']}) self.assertValidCredentialResponse(r, self.credential) def test_update_credential(self): """Call ``PATCH /credentials/{credential_id}``.""" - ref = self.new_credential_ref( - user_id=self.user['id'], - project_id=self.project_id) + ref = unit.new_credential_ref(user_id=self.user['id'], + project_id=self.project_id) del ref['id'] r = self.patch( '/credentials/%(credential_id)s' % { - 'credential_id': self.credential_id}, + 'credential_id': self.credential['id']}, body={'credential': ref}) self.assertValidCredentialResponse(r, ref) @@ -184,29 +176,24 @@ class CredentialTestCase(CredentialBaseTestCase): """Call ``DELETE /credentials/{credential_id}``.""" self.delete( '/credentials/%(credential_id)s' % { - 'credential_id': self.credential_id}) + 'credential_id': self.credential['id']}) def test_create_ec2_credential(self): """Call ``POST /credentials`` for creating ec2 credential.""" - ref = self.new_credential_ref(user_id=self.user['id'], - project_id=self.project_id) - blob = {"access": uuid.uuid4().hex, - "secret": uuid.uuid4().hex} - ref['blob'] = json.dumps(blob) - ref['type'] = 'ec2' - r = self.post( - '/credentials', - body={'credential': ref}) + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}) self.assertValidCredentialResponse(r, ref) # Assert credential id is same as hash of access key id for # ec2 credentials - self.assertEqual(r.result['credential']['id'], - hashlib.sha256(blob['access']).hexdigest()) + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) # Create second ec2 credential with the same access key id and check # for conflict. self.post( '/credentials', - body={'credential': ref}, expected_status=409) + body={'credential': ref}, expected_status=http_client.CONFLICT) def test_get_ec2_dict_blob(self): """Ensure non-JSON blob data is correctly converted.""" @@ -215,7 +202,11 @@ class CredentialTestCase(CredentialBaseTestCase): r = self.get( '/credentials/%(credential_id)s' % { 'credential_id': credential_id}) - self.assertEqual(expected_blob, r.result['credential']['blob']) + + # use json.loads to transform the blobs back into Python dictionaries + # to avoid problems with the keys being in different orders. + self.assertEqual(json.loads(expected_blob), + json.loads(r.result['credential']['blob'])) def test_list_ec2_dict_blob(self): """Ensure non-JSON blob data is correctly converted.""" @@ -225,47 +216,49 @@ class CredentialTestCase(CredentialBaseTestCase): list_creds = list_r.result['credentials'] list_ids = [r['id'] for r in list_creds] self.assertIn(credential_id, list_ids) + # use json.loads to transform the blobs back into Python dictionaries + # to avoid problems with the keys being in different orders. for r in list_creds: if r['id'] == credential_id: - self.assertEqual(expected_blob, r['blob']) + self.assertEqual(json.loads(expected_blob), + json.loads(r['blob'])) def test_create_non_ec2_credential(self): - """Call ``POST /credentials`` for creating non-ec2 credential.""" - ref = self.new_credential_ref(user_id=self.user['id']) - blob = {"access": uuid.uuid4().hex, - "secret": uuid.uuid4().hex} - ref['blob'] = json.dumps(blob) - r = self.post( - '/credentials', - body={'credential': ref}) + """Test creating non-ec2 credential. + + Call ``POST /credentials``. + """ + blob, ref = unit.new_cert_credential(user_id=self.user['id']) + + r = self.post('/credentials', body={'credential': ref}) self.assertValidCredentialResponse(r, ref) # Assert credential id is not same as hash of access key id for # non-ec2 credentials - self.assertNotEqual(r.result['credential']['id'], - hashlib.sha256(blob['access']).hexdigest()) + access = blob['access'].encode('utf-8') + self.assertNotEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) def test_create_ec2_credential_with_missing_project_id(self): - """Call ``POST /credentials`` for creating ec2 - credential with missing project_id. + """Test Creating ec2 credential with missing project_id. + + Call ``POST /credentials``. """ - ref = self.new_credential_ref(user_id=self.user['id']) - blob = {"access": uuid.uuid4().hex, - "secret": uuid.uuid4().hex} - ref['blob'] = json.dumps(blob) - ref['type'] = 'ec2' + _, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=None) # Assert bad request status when missing project_id self.post( '/credentials', body={'credential': ref}, expected_status=http_client.BAD_REQUEST) def test_create_ec2_credential_with_invalid_blob(self): - """Call ``POST /credentials`` for creating ec2 - credential with invalid blob. + """Test creating ec2 credential with invalid blob. + + Call ``POST /credentials``. """ - ref = self.new_credential_ref(user_id=self.user['id'], - project_id=self.project_id) - ref['blob'] = '{"abc":"def"d}' - ref['type'] = 'ec2' + ref = unit.new_credential_ref(user_id=self.user['id'], + project_id=self.project_id, + blob='{"abc":"def"d}', + type=CRED_TYPE_EC2) # Assert bad request status when request contains invalid blob response = self.post( '/credentials', @@ -274,20 +267,21 @@ class CredentialTestCase(CredentialBaseTestCase): def test_create_credential_with_admin_token(self): # Make sure we can create credential with the static admin token - ref = self.new_credential_ref(user_id=self.user['id']) + ref = unit.new_credential_ref(user_id=self.user['id']) r = self.post( '/credentials', body={'credential': ref}, - token=CONF.admin_token) + token=self.get_admin_token()) self.assertValidCredentialResponse(r, ref) class TestCredentialTrustScoped(test_v3.RestfulTestCase): """Test credential with trust scoped token.""" + def setUp(self): super(TestCredentialTrustScoped, self).setUp() - self.trustee_user = self.new_user_ref(domain_id=self.domain_id) + self.trustee_user = unit.new_user_ref(domain_id=self.domain_id) password = self.trustee_user['password'] self.trustee_user = self.identity_api.create_user(self.trustee_user) self.trustee_user['password'] = password @@ -298,9 +292,12 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase): self.config_fixture.config(group='trust', enabled=True) def test_trust_scoped_ec2_credential(self): - """Call ``POST /credentials`` for creating ec2 credential.""" + """Test creating trust scoped ec2 credential. + + Call ``POST /credentials``. + """ # Create the trust - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, @@ -316,22 +313,15 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) - self.assertValidProjectTrustScopedTokenResponse(r, self.user) + r = self.v3_create_token(auth_data) + self.assertValidProjectScopedTokenResponse(r, self.user) trust_id = r.result['token']['OS-TRUST:trust']['id'] token_id = r.headers.get('X-Subject-Token') # Create the credential with the trust scoped token - ref = self.new_credential_ref(user_id=self.user['id'], - project_id=self.project_id) - blob = {"access": uuid.uuid4().hex, - "secret": uuid.uuid4().hex} - ref['blob'] = json.dumps(blob) - ref['type'] = 'ec2' - r = self.post( - '/credentials', - body={'credential': ref}, - token=token_id) + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}, token=token_id) # We expect the response blob to contain the trust_id ret_ref = ref.copy() @@ -342,8 +332,9 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase): # Assert credential id is same as hash of access key id for # ec2 credentials - self.assertEqual(r.result['credential']['id'], - hashlib.sha256(blob['access']).hexdigest()) + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) # Create second ec2 credential with the same access key id and check # for conflict. @@ -351,11 +342,12 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase): '/credentials', body={'credential': ref}, token=token_id, - expected_status=409) + expected_status=http_client.CONFLICT) class TestCredentialEc2(CredentialBaseTestCase): """Test v3 credential compatibility with ec2tokens.""" + def setUp(self): super(TestCredentialEc2, self).setUp() @@ -382,25 +374,19 @@ class TestCredentialEc2(CredentialBaseTestCase): r = self.post( '/ec2tokens', body={'ec2Credentials': sig_ref}, - expected_status=200) + expected_status=http_client.OK) self.assertValidTokenResponse(r) def test_ec2_credential_signature_validate(self): """Test signature validation with a v3 ec2 credential.""" - ref = self.new_credential_ref( - user_id=self.user['id'], - project_id=self.project_id) - blob = {"access": uuid.uuid4().hex, - "secret": uuid.uuid4().hex} - ref['blob'] = json.dumps(blob) - ref['type'] = 'ec2' - r = self.post( - '/credentials', - body={'credential': ref}) + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}) self.assertValidCredentialResponse(r, ref) # Assert credential id is same as hash of access key id - self.assertEqual(r.result['credential']['id'], - hashlib.sha256(blob['access']).hexdigest()) + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) cred_blob = json.loads(r.result['credential']['blob']) self.assertEqual(blob, cred_blob) @@ -409,7 +395,7 @@ class TestCredentialEc2(CredentialBaseTestCase): def test_ec2_credential_signature_validate_legacy(self): """Test signature validation with a legacy v3 ec2 credential.""" - cred_json, credential_id = self._create_dict_blob_credential() + cred_json, _ = self._create_dict_blob_credential() cred_blob = json.loads(cred_json) self._validate_signature(access=cred_blob['access'], secret=cred_blob['secret']) @@ -442,6 +428,19 @@ class TestCredentialEc2(CredentialBaseTestCase): self.assertThat(ec2_cred['links']['self'], matchers.EndsWith(uri)) + def test_ec2_cannot_get_non_ec2_credential(self): + access_key = uuid.uuid4().hex + cred_id = utils.hash_access_key(access_key) + non_ec2_cred = unit.new_credential_ref( + user_id=self.user_id, + project_id=self.project_id) + non_ec2_cred['id'] = cred_id + self.credential_api.create_credential(cred_id, non_ec2_cred) + uri = '/'.join([self._get_ec2_cred_uri(), access_key]) + # if access_key is not found, ec2 controller raises Unauthorized + # exception + self.get(uri, expected_status=http_client.UNAUTHORIZED) + def test_ec2_list_credentials(self): """Test ec2 credential listing.""" self._get_ec2_cred() @@ -452,13 +451,26 @@ class TestCredentialEc2(CredentialBaseTestCase): self.assertThat(r.result['links']['self'], matchers.EndsWith(uri)) + # non-EC2 credentials won't be fetched + non_ec2_cred = unit.new_credential_ref( + user_id=self.user_id, + project_id=self.project_id) + non_ec2_cred['type'] = uuid.uuid4().hex + self.credential_api.create_credential(non_ec2_cred['id'], + non_ec2_cred) + r = self.get(uri) + cred_list_2 = r.result['credentials'] + # still one element because non-EC2 credentials are not returned. + self.assertEqual(1, len(cred_list_2)) + self.assertEqual(cred_list[0], cred_list_2[0]) + def test_ec2_delete_credential(self): """Test ec2 credential deletion.""" ec2_cred = self._get_ec2_cred() uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) cred_from_credential_api = ( self.credential_api - .list_credentials_for_user(self.user_id)) + .list_credentials_for_user(self.user_id, type=CRED_TYPE_EC2)) self.assertEqual(1, len(cred_from_credential_api)) self.delete(uri) self.assertRaises(exception.CredentialNotFound, |