aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_credential.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_credential.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_credential.py265
1 files changed, 265 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_credential.py b/keystone-moon/keystone/tests/unit/test_credential.py
new file mode 100644
index 00000000..e917ef71
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/test_credential.py
@@ -0,0 +1,265 @@
+# Copyright 2015 UnitedStack, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from keystoneclient.contrib.ec2 import utils as ec2_utils
+from six.moves import http_client
+
+from keystone.common import utils
+from keystone.contrib.ec2 import controllers
+from keystone import exception
+from keystone.tests import unit
+from keystone.tests.unit import default_fixtures
+from keystone.tests.unit.ksfixtures import database
+from keystone.tests.unit import rest
+
+CRED_TYPE_EC2 = controllers.CRED_TYPE_EC2
+
+
+class V2CredentialEc2TestCase(rest.RestfulTestCase):
+ def setUp(self):
+ super(V2CredentialEc2TestCase, self).setUp()
+ self.user_id = self.user_foo['id']
+ self.project_id = self.tenant_bar['id']
+
+ def _get_token_id(self, r):
+ return r.result['access']['token']['id']
+
+ def _get_ec2_cred(self):
+ uri = self._get_ec2_cred_uri()
+ r = self.public_request(method='POST', token=self.get_scoped_token(),
+ path=uri, body={'tenant_id': self.project_id})
+ return r.result['credential']
+
+ def _get_ec2_cred_uri(self):
+ return '/v2.0/users/%s/credentials/OS-EC2' % self.user_id
+
+ def test_ec2_cannot_get_non_ec2_credential(self):
+ access_key = uuid.uuid4().hex
+ cred_id = utils.hash_access_key(access_key)
+ non_ec2_cred = unit.new_credential_ref(
+ user_id=self.user_id,
+ project_id=self.project_id)
+ non_ec2_cred['id'] = cred_id
+ self.credential_api.create_credential(cred_id, non_ec2_cred)
+
+ # if access_key is not found, ec2 controller raises Unauthorized
+ # exception
+ path = '/'.join([self._get_ec2_cred_uri(), access_key])
+ self.public_request(method='GET', token=self.get_scoped_token(),
+ path=path,
+ expected_status=http_client.UNAUTHORIZED)
+
+ def assertValidErrorResponse(self, r):
+ # FIXME(wwwjfy): it's copied from test_v3.py. The logic of this method
+ # in test_v2.py and test_v3.py (both are inherited from rest.py) has no
+ # difference, so they should be refactored into one place. Also, the
+ # function signatures in both files don't match the one in the parent
+ # class in rest.py.
+ resp = r.result
+ self.assertIsNotNone(resp.get('error'))
+ self.assertIsNotNone(resp['error'].get('code'))
+ self.assertIsNotNone(resp['error'].get('title'))
+ self.assertIsNotNone(resp['error'].get('message'))
+ self.assertEqual(int(resp['error']['code']), r.status_code)
+
+ def test_ec2_list_credentials(self):
+ self._get_ec2_cred()
+ uri = self._get_ec2_cred_uri()
+ r = self.public_request(method='GET', token=self.get_scoped_token(),
+ path=uri)
+ cred_list = r.result['credentials']
+ self.assertEqual(1, len(cred_list))
+
+ # non-EC2 credentials won't be fetched
+ non_ec2_cred = unit.new_credential_ref(
+ user_id=self.user_id,
+ project_id=self.project_id)
+ non_ec2_cred['type'] = uuid.uuid4().hex
+ self.credential_api.create_credential(non_ec2_cred['id'],
+ non_ec2_cred)
+ r = self.public_request(method='GET', token=self.get_scoped_token(),
+ path=uri)
+ cred_list_2 = r.result['credentials']
+ # still one element because non-EC2 credentials are not returned.
+ self.assertEqual(1, len(cred_list_2))
+ self.assertEqual(cred_list[0], cred_list_2[0])
+
+
+class V2CredentialEc2Controller(unit.TestCase):
+ def setUp(self):
+ super(V2CredentialEc2Controller, self).setUp()
+ self.useFixture(database.Database())
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
+ self.user_id = self.user_foo['id']
+ self.project_id = self.tenant_bar['id']
+ self.controller = controllers.Ec2Controller()
+ self.blob, tmp_ref = unit.new_ec2_credential(
+ user_id=self.user_id,
+ project_id=self.project_id)
+
+ self.creds_ref = (controllers.Ec2Controller
+ ._convert_v3_to_ec2_credential(tmp_ref))
+
+ def test_signature_validate_no_host_port(self):
+ """Test signature validation with the access/secret provided."""
+ access = self.blob['access']
+ secret = self.blob['secret']
+ signer = ec2_utils.Ec2Signer(secret)
+ params = {'SignatureMethod': 'HmacSHA256',
+ 'SignatureVersion': '2',
+ 'AWSAccessKeyId': access}
+ request = {'host': 'foo',
+ 'verb': 'GET',
+ 'path': '/bar',
+ 'params': params}
+ signature = signer.generate(request)
+
+ sig_ref = {'access': access,
+ 'signature': signature,
+ 'host': 'foo',
+ 'verb': 'GET',
+ 'path': '/bar',
+ 'params': params}
+
+ # Now validate the signature based on the dummy request
+ self.assertTrue(self.controller.check_signature(self.creds_ref,
+ sig_ref))
+
+ def test_signature_validate_with_host_port(self):
+ """Test signature validation when host is bound with port.
+
+ Host is bound with a port, generally, the port here is not the
+ standard port for the protocol, like '80' for HTTP and port 443
+ for HTTPS, the port is not omitted by the client library.
+ """
+ access = self.blob['access']
+ secret = self.blob['secret']
+ signer = ec2_utils.Ec2Signer(secret)
+ params = {'SignatureMethod': 'HmacSHA256',
+ 'SignatureVersion': '2',
+ 'AWSAccessKeyId': access}
+ request = {'host': 'foo:8181',
+ 'verb': 'GET',
+ 'path': '/bar',
+ 'params': params}
+ signature = signer.generate(request)
+
+ sig_ref = {'access': access,
+ 'signature': signature,
+ 'host': 'foo:8181',
+ 'verb': 'GET',
+ 'path': '/bar',
+ 'params': params}
+
+ # Now validate the signature based on the dummy request
+ self.assertTrue(self.controller.check_signature(self.creds_ref,
+ sig_ref))
+
+ def test_signature_validate_with_missed_host_port(self):
+ """Test signature validation when host is bound with well-known port.
+
+ Host is bound with a port, but the port is well-know port like '80'
+ for HTTP and port 443 for HTTPS, sometimes, client library omit
+ the port but then make the request with the port.
+ see (How to create the string to sign): 'http://docs.aws.amazon.com/
+ general/latest/gr/signature-version-2.html'.
+
+ Since "credentials['host']" is not set by client library but is
+ taken from "req.host", so caused the differences.
+ """
+ access = self.blob['access']
+ secret = self.blob['secret']
+ signer = ec2_utils.Ec2Signer(secret)
+ params = {'SignatureMethod': 'HmacSHA256',
+ 'SignatureVersion': '2',
+ 'AWSAccessKeyId': access}
+ # Omit the port to generate the signature.
+ cnt_req = {'host': 'foo',
+ 'verb': 'GET',
+ 'path': '/bar',
+ 'params': params}
+ signature = signer.generate(cnt_req)
+
+ sig_ref = {'access': access,
+ 'signature': signature,
+ 'host': 'foo:8080',
+ 'verb': 'GET',
+ 'path': '/bar',
+ 'params': params}
+
+ # Now validate the signature based on the dummy request
+ # Check the signature again after omitting the port.
+ self.assertTrue(self.controller.check_signature(self.creds_ref,
+ sig_ref))
+
+ def test_signature_validate_no_signature(self):
+ """Signature is not presented in signature reference data."""
+ access = self.blob['access']
+ params = {'SignatureMethod': 'HmacSHA256',
+ 'SignatureVersion': '2',
+ 'AWSAccessKeyId': access}
+
+ sig_ref = {'access': access,
+ 'signature': None,
+ 'host': 'foo:8080',
+ 'verb': 'GET',
+ 'path': '/bar',
+ 'params': params}
+
+ # Now validate the signature based on the dummy request
+ self.assertRaises(exception.Unauthorized,
+ self.controller.check_signature,
+ self.creds_ref, sig_ref)
+
+ def test_signature_validate_invalid_signature(self):
+ """Signature is not signed on the correct data."""
+ access = self.blob['access']
+ secret = self.blob['secret']
+ signer = ec2_utils.Ec2Signer(secret)
+ params = {'SignatureMethod': 'HmacSHA256',
+ 'SignatureVersion': '2',
+ 'AWSAccessKeyId': access}
+ request = {'host': 'bar',
+ 'verb': 'GET',
+ 'path': '/bar',
+ 'params': params}
+ signature = signer.generate(request)
+
+ sig_ref = {'access': access,
+ 'signature': signature,
+ 'host': 'foo:8080',
+ 'verb': 'GET',
+ 'path': '/bar',
+ 'params': params}
+
+ # Now validate the signature based on the dummy request
+ self.assertRaises(exception.Unauthorized,
+ self.controller.check_signature,
+ self.creds_ref, sig_ref)
+
+ def test_check_non_admin_user(self):
+ """Checking if user is admin causes uncaught error.
+
+ When checking if a user is an admin, keystone.exception.Unauthorized
+ is raised but not caught if the user is not an admin.
+ """
+ # make a non-admin user
+ context = {'is_admin': False, 'token_id': uuid.uuid4().hex}
+
+ # check if user is admin
+ # no exceptions should be raised
+ self.controller._is_admin(context)