aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_credential.py
diff options
context:
space:
mode:
authorDUVAL Thomas <thomas.duval@orange.com>2016-06-09 09:11:50 +0200
committerDUVAL Thomas <thomas.duval@orange.com>2016-06-09 09:11:50 +0200
commit2e7b4f2027a1147ca28301e4f88adf8274b39a1f (patch)
tree8b8d94001ebe6cc34106cf813b538911a8d66d9a /keystone-moon/keystone/tests/unit/test_v3_credential.py
parenta33bdcb627102a01244630a54cb4b5066b385a6a (diff)
Update Keystone core to Mitaka.
Change-Id: Ia10d6add16f4a9d25d1f42d420661c46332e69db
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_credential.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_credential.py242
1 files changed, 127 insertions, 115 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_credential.py b/keystone-moon/keystone/tests/unit/test_v3_credential.py
index dd8cf2dd..07995f19 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_credential.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_credential.py
@@ -21,49 +21,46 @@ from oslo_config import cfg
from six.moves import http_client
from testtools import matchers
+from keystone.common import utils
+from keystone.contrib.ec2 import controllers
from keystone import exception
+from keystone.tests import unit
from keystone.tests.unit import test_v3
CONF = cfg.CONF
+CRED_TYPE_EC2 = controllers.CRED_TYPE_EC2
class CredentialBaseTestCase(test_v3.RestfulTestCase):
def _create_dict_blob_credential(self):
- blob = {"access": uuid.uuid4().hex,
- "secret": uuid.uuid4().hex}
- credential_id = hashlib.sha256(blob['access']).hexdigest()
- credential = self.new_credential_ref(
- user_id=self.user['id'],
- project_id=self.project_id)
- credential['id'] = credential_id
+ blob, credential = unit.new_ec2_credential(user_id=self.user['id'],
+ project_id=self.project_id)
# Store the blob as a dict *not* JSON ref bug #1259584
# This means we can test the dict->json workaround, added
# as part of the bugfix for backwards compatibility works.
credential['blob'] = blob
- credential['type'] = 'ec2'
+ credential_id = credential['id']
+
# Create direct via the DB API to avoid validation failure
- self.credential_api.create_credential(
- credential_id,
- credential)
- expected_blob = json.dumps(blob)
- return expected_blob, credential_id
+ self.credential_api.create_credential(credential_id, credential)
+
+ return json.dumps(blob), credential_id
class CredentialTestCase(CredentialBaseTestCase):
"""Test credential CRUD."""
+
def setUp(self):
super(CredentialTestCase, self).setUp()
- self.credential_id = uuid.uuid4().hex
- self.credential = self.new_credential_ref(
- user_id=self.user['id'],
- project_id=self.project_id)
- self.credential['id'] = self.credential_id
+ self.credential = unit.new_credential_ref(user_id=self.user['id'],
+ project_id=self.project_id)
+
self.credential_api.create_credential(
- self.credential_id,
+ self.credential['id'],
self.credential)
def test_credential_api_delete_credentials_for_project(self):
@@ -72,7 +69,7 @@ class CredentialTestCase(CredentialBaseTestCase):
# once we delete all credentials for self.project_id
self.assertRaises(exception.CredentialNotFound,
self.credential_api.get_credential,
- credential_id=self.credential_id)
+ credential_id=self.credential['id'])
def test_credential_api_delete_credentials_for_user(self):
self.credential_api.delete_credentials_for_user(self.user_id)
@@ -80,7 +77,7 @@ class CredentialTestCase(CredentialBaseTestCase):
# once we delete all credentials for self.user_id
self.assertRaises(exception.CredentialNotFound,
self.credential_api.get_credential,
- credential_id=self.credential_id)
+ credential_id=self.credential['id'])
def test_list_credentials(self):
"""Call ``GET /credentials``."""
@@ -89,10 +86,8 @@ class CredentialTestCase(CredentialBaseTestCase):
def test_list_credentials_filtered_by_user_id(self):
"""Call ``GET /credentials?user_id={user_id}``."""
- credential = self.new_credential_ref(
- user_id=uuid.uuid4().hex)
- self.credential_api.create_credential(
- credential['id'], credential)
+ credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
+ self.credential_api.create_credential(credential['id'], credential)
r = self.get('/credentials?user_id=%s' % self.user['id'])
self.assertValidCredentialListResponse(r, ref=self.credential)
@@ -103,9 +98,9 @@ class CredentialTestCase(CredentialBaseTestCase):
"""Call ``GET /credentials?type={type}``."""
# The type ec2 was chosen, instead of a random string,
# because the type must be in the list of supported types
- ec2_credential = self.new_credential_ref(user_id=uuid.uuid4().hex,
+ ec2_credential = unit.new_credential_ref(user_id=uuid.uuid4().hex,
project_id=self.project_id,
- cred_type='ec2')
+ type=CRED_TYPE_EC2)
ec2_resp = self.credential_api.create_credential(
ec2_credential['id'], ec2_credential)
@@ -123,8 +118,8 @@ class CredentialTestCase(CredentialBaseTestCase):
cred_ec2 = r_ec2.result['credentials'][0]
self.assertValidCredentialListResponse(r_ec2, ref=ec2_resp)
- self.assertEqual('ec2', cred_ec2['type'])
- self.assertEqual(cred_ec2['id'], ec2_credential['id'])
+ self.assertEqual(CRED_TYPE_EC2, cred_ec2['type'])
+ self.assertEqual(ec2_credential['id'], cred_ec2['id'])
def test_list_credentials_filtered_by_type_and_user_id(self):
"""Call ``GET /credentials?user_id={user_id}&type={type}``."""
@@ -132,12 +127,10 @@ class CredentialTestCase(CredentialBaseTestCase):
user2_id = uuid.uuid4().hex
# Creating credentials for two different users
- credential_user1_ec2 = self.new_credential_ref(
- user_id=user1_id, cred_type='ec2')
- credential_user1_cert = self.new_credential_ref(
- user_id=user1_id)
- credential_user2_cert = self.new_credential_ref(
- user_id=user2_id)
+ credential_user1_ec2 = unit.new_credential_ref(user_id=user1_id,
+ type=CRED_TYPE_EC2)
+ credential_user1_cert = unit.new_credential_ref(user_id=user1_id)
+ credential_user2_cert = unit.new_credential_ref(user_id=user2_id)
self.credential_api.create_credential(
credential_user1_ec2['id'], credential_user1_ec2)
@@ -150,12 +143,12 @@ class CredentialTestCase(CredentialBaseTestCase):
self.assertValidCredentialListResponse(r, ref=credential_user1_ec2)
self.assertThat(r.result['credentials'], matchers.HasLength(1))
cred = r.result['credentials'][0]
- self.assertEqual('ec2', cred['type'])
+ self.assertEqual(CRED_TYPE_EC2, cred['type'])
self.assertEqual(user1_id, cred['user_id'])
def test_create_credential(self):
"""Call ``POST /credentials``."""
- ref = self.new_credential_ref(user_id=self.user['id'])
+ ref = unit.new_credential_ref(user_id=self.user['id'])
r = self.post(
'/credentials',
body={'credential': ref})
@@ -165,18 +158,17 @@ class CredentialTestCase(CredentialBaseTestCase):
"""Call ``GET /credentials/{credential_id}``."""
r = self.get(
'/credentials/%(credential_id)s' % {
- 'credential_id': self.credential_id})
+ 'credential_id': self.credential['id']})
self.assertValidCredentialResponse(r, self.credential)
def test_update_credential(self):
"""Call ``PATCH /credentials/{credential_id}``."""
- ref = self.new_credential_ref(
- user_id=self.user['id'],
- project_id=self.project_id)
+ ref = unit.new_credential_ref(user_id=self.user['id'],
+ project_id=self.project_id)
del ref['id']
r = self.patch(
'/credentials/%(credential_id)s' % {
- 'credential_id': self.credential_id},
+ 'credential_id': self.credential['id']},
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
@@ -184,29 +176,24 @@ class CredentialTestCase(CredentialBaseTestCase):
"""Call ``DELETE /credentials/{credential_id}``."""
self.delete(
'/credentials/%(credential_id)s' % {
- 'credential_id': self.credential_id})
+ 'credential_id': self.credential['id']})
def test_create_ec2_credential(self):
"""Call ``POST /credentials`` for creating ec2 credential."""
- ref = self.new_credential_ref(user_id=self.user['id'],
- project_id=self.project_id)
- blob = {"access": uuid.uuid4().hex,
- "secret": uuid.uuid4().hex}
- ref['blob'] = json.dumps(blob)
- ref['type'] = 'ec2'
- r = self.post(
- '/credentials',
- body={'credential': ref})
+ blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
+ project_id=self.project_id)
+ r = self.post('/credentials', body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
# Assert credential id is same as hash of access key id for
# ec2 credentials
- self.assertEqual(r.result['credential']['id'],
- hashlib.sha256(blob['access']).hexdigest())
+ access = blob['access'].encode('utf-8')
+ self.assertEqual(hashlib.sha256(access).hexdigest(),
+ r.result['credential']['id'])
# Create second ec2 credential with the same access key id and check
# for conflict.
self.post(
'/credentials',
- body={'credential': ref}, expected_status=409)
+ body={'credential': ref}, expected_status=http_client.CONFLICT)
def test_get_ec2_dict_blob(self):
"""Ensure non-JSON blob data is correctly converted."""
@@ -215,7 +202,11 @@ class CredentialTestCase(CredentialBaseTestCase):
r = self.get(
'/credentials/%(credential_id)s' % {
'credential_id': credential_id})
- self.assertEqual(expected_blob, r.result['credential']['blob'])
+
+ # use json.loads to transform the blobs back into Python dictionaries
+ # to avoid problems with the keys being in different orders.
+ self.assertEqual(json.loads(expected_blob),
+ json.loads(r.result['credential']['blob']))
def test_list_ec2_dict_blob(self):
"""Ensure non-JSON blob data is correctly converted."""
@@ -225,47 +216,49 @@ class CredentialTestCase(CredentialBaseTestCase):
list_creds = list_r.result['credentials']
list_ids = [r['id'] for r in list_creds]
self.assertIn(credential_id, list_ids)
+ # use json.loads to transform the blobs back into Python dictionaries
+ # to avoid problems with the keys being in different orders.
for r in list_creds:
if r['id'] == credential_id:
- self.assertEqual(expected_blob, r['blob'])
+ self.assertEqual(json.loads(expected_blob),
+ json.loads(r['blob']))
def test_create_non_ec2_credential(self):
- """Call ``POST /credentials`` for creating non-ec2 credential."""
- ref = self.new_credential_ref(user_id=self.user['id'])
- blob = {"access": uuid.uuid4().hex,
- "secret": uuid.uuid4().hex}
- ref['blob'] = json.dumps(blob)
- r = self.post(
- '/credentials',
- body={'credential': ref})
+ """Test creating non-ec2 credential.
+
+ Call ``POST /credentials``.
+ """
+ blob, ref = unit.new_cert_credential(user_id=self.user['id'])
+
+ r = self.post('/credentials', body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
# Assert credential id is not same as hash of access key id for
# non-ec2 credentials
- self.assertNotEqual(r.result['credential']['id'],
- hashlib.sha256(blob['access']).hexdigest())
+ access = blob['access'].encode('utf-8')
+ self.assertNotEqual(hashlib.sha256(access).hexdigest(),
+ r.result['credential']['id'])
def test_create_ec2_credential_with_missing_project_id(self):
- """Call ``POST /credentials`` for creating ec2
- credential with missing project_id.
+ """Test Creating ec2 credential with missing project_id.
+
+ Call ``POST /credentials``.
"""
- ref = self.new_credential_ref(user_id=self.user['id'])
- blob = {"access": uuid.uuid4().hex,
- "secret": uuid.uuid4().hex}
- ref['blob'] = json.dumps(blob)
- ref['type'] = 'ec2'
+ _, ref = unit.new_ec2_credential(user_id=self.user['id'],
+ project_id=None)
# Assert bad request status when missing project_id
self.post(
'/credentials',
body={'credential': ref}, expected_status=http_client.BAD_REQUEST)
def test_create_ec2_credential_with_invalid_blob(self):
- """Call ``POST /credentials`` for creating ec2
- credential with invalid blob.
+ """Test creating ec2 credential with invalid blob.
+
+ Call ``POST /credentials``.
"""
- ref = self.new_credential_ref(user_id=self.user['id'],
- project_id=self.project_id)
- ref['blob'] = '{"abc":"def"d}'
- ref['type'] = 'ec2'
+ ref = unit.new_credential_ref(user_id=self.user['id'],
+ project_id=self.project_id,
+ blob='{"abc":"def"d}',
+ type=CRED_TYPE_EC2)
# Assert bad request status when request contains invalid blob
response = self.post(
'/credentials',
@@ -274,20 +267,21 @@ class CredentialTestCase(CredentialBaseTestCase):
def test_create_credential_with_admin_token(self):
# Make sure we can create credential with the static admin token
- ref = self.new_credential_ref(user_id=self.user['id'])
+ ref = unit.new_credential_ref(user_id=self.user['id'])
r = self.post(
'/credentials',
body={'credential': ref},
- token=CONF.admin_token)
+ token=self.get_admin_token())
self.assertValidCredentialResponse(r, ref)
class TestCredentialTrustScoped(test_v3.RestfulTestCase):
"""Test credential with trust scoped token."""
+
def setUp(self):
super(TestCredentialTrustScoped, self).setUp()
- self.trustee_user = self.new_user_ref(domain_id=self.domain_id)
+ self.trustee_user = unit.new_user_ref(domain_id=self.domain_id)
password = self.trustee_user['password']
self.trustee_user = self.identity_api.create_user(self.trustee_user)
self.trustee_user['password'] = password
@@ -298,9 +292,12 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase):
self.config_fixture.config(group='trust', enabled=True)
def test_trust_scoped_ec2_credential(self):
- """Call ``POST /credentials`` for creating ec2 credential."""
+ """Test creating trust scoped ec2 credential.
+
+ Call ``POST /credentials``.
+ """
# Create the trust
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
@@ -316,22 +313,15 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
- self.assertValidProjectTrustScopedTokenResponse(r, self.user)
+ r = self.v3_create_token(auth_data)
+ self.assertValidProjectScopedTokenResponse(r, self.user)
trust_id = r.result['token']['OS-TRUST:trust']['id']
token_id = r.headers.get('X-Subject-Token')
# Create the credential with the trust scoped token
- ref = self.new_credential_ref(user_id=self.user['id'],
- project_id=self.project_id)
- blob = {"access": uuid.uuid4().hex,
- "secret": uuid.uuid4().hex}
- ref['blob'] = json.dumps(blob)
- ref['type'] = 'ec2'
- r = self.post(
- '/credentials',
- body={'credential': ref},
- token=token_id)
+ blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
+ project_id=self.project_id)
+ r = self.post('/credentials', body={'credential': ref}, token=token_id)
# We expect the response blob to contain the trust_id
ret_ref = ref.copy()
@@ -342,8 +332,9 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase):
# Assert credential id is same as hash of access key id for
# ec2 credentials
- self.assertEqual(r.result['credential']['id'],
- hashlib.sha256(blob['access']).hexdigest())
+ access = blob['access'].encode('utf-8')
+ self.assertEqual(hashlib.sha256(access).hexdigest(),
+ r.result['credential']['id'])
# Create second ec2 credential with the same access key id and check
# for conflict.
@@ -351,11 +342,12 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase):
'/credentials',
body={'credential': ref},
token=token_id,
- expected_status=409)
+ expected_status=http_client.CONFLICT)
class TestCredentialEc2(CredentialBaseTestCase):
"""Test v3 credential compatibility with ec2tokens."""
+
def setUp(self):
super(TestCredentialEc2, self).setUp()
@@ -382,25 +374,19 @@ class TestCredentialEc2(CredentialBaseTestCase):
r = self.post(
'/ec2tokens',
body={'ec2Credentials': sig_ref},
- expected_status=200)
+ expected_status=http_client.OK)
self.assertValidTokenResponse(r)
def test_ec2_credential_signature_validate(self):
"""Test signature validation with a v3 ec2 credential."""
- ref = self.new_credential_ref(
- user_id=self.user['id'],
- project_id=self.project_id)
- blob = {"access": uuid.uuid4().hex,
- "secret": uuid.uuid4().hex}
- ref['blob'] = json.dumps(blob)
- ref['type'] = 'ec2'
- r = self.post(
- '/credentials',
- body={'credential': ref})
+ blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
+ project_id=self.project_id)
+ r = self.post('/credentials', body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
# Assert credential id is same as hash of access key id
- self.assertEqual(r.result['credential']['id'],
- hashlib.sha256(blob['access']).hexdigest())
+ access = blob['access'].encode('utf-8')
+ self.assertEqual(hashlib.sha256(access).hexdigest(),
+ r.result['credential']['id'])
cred_blob = json.loads(r.result['credential']['blob'])
self.assertEqual(blob, cred_blob)
@@ -409,7 +395,7 @@ class TestCredentialEc2(CredentialBaseTestCase):
def test_ec2_credential_signature_validate_legacy(self):
"""Test signature validation with a legacy v3 ec2 credential."""
- cred_json, credential_id = self._create_dict_blob_credential()
+ cred_json, _ = self._create_dict_blob_credential()
cred_blob = json.loads(cred_json)
self._validate_signature(access=cred_blob['access'],
secret=cred_blob['secret'])
@@ -442,6 +428,19 @@ class TestCredentialEc2(CredentialBaseTestCase):
self.assertThat(ec2_cred['links']['self'],
matchers.EndsWith(uri))
+ def test_ec2_cannot_get_non_ec2_credential(self):
+ access_key = uuid.uuid4().hex
+ cred_id = utils.hash_access_key(access_key)
+ non_ec2_cred = unit.new_credential_ref(
+ user_id=self.user_id,
+ project_id=self.project_id)
+ non_ec2_cred['id'] = cred_id
+ self.credential_api.create_credential(cred_id, non_ec2_cred)
+ uri = '/'.join([self._get_ec2_cred_uri(), access_key])
+ # if access_key is not found, ec2 controller raises Unauthorized
+ # exception
+ self.get(uri, expected_status=http_client.UNAUTHORIZED)
+
def test_ec2_list_credentials(self):
"""Test ec2 credential listing."""
self._get_ec2_cred()
@@ -452,13 +451,26 @@ class TestCredentialEc2(CredentialBaseTestCase):
self.assertThat(r.result['links']['self'],
matchers.EndsWith(uri))
+ # non-EC2 credentials won't be fetched
+ non_ec2_cred = unit.new_credential_ref(
+ user_id=self.user_id,
+ project_id=self.project_id)
+ non_ec2_cred['type'] = uuid.uuid4().hex
+ self.credential_api.create_credential(non_ec2_cred['id'],
+ non_ec2_cred)
+ r = self.get(uri)
+ cred_list_2 = r.result['credentials']
+ # still one element because non-EC2 credentials are not returned.
+ self.assertEqual(1, len(cred_list_2))
+ self.assertEqual(cred_list[0], cred_list_2[0])
+
def test_ec2_delete_credential(self):
"""Test ec2 credential deletion."""
ec2_cred = self._get_ec2_cred()
uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
cred_from_credential_api = (
self.credential_api
- .list_credentials_for_user(self.user_id))
+ .list_credentials_for_user(self.user_id, type=CRED_TYPE_EC2))
self.assertEqual(1, len(cred_from_credential_api))
self.delete(uri)
self.assertRaises(exception.CredentialNotFound,