aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_oauth1.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_oauth1.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_oauth1.py907
1 files changed, 0 insertions, 907 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_oauth1.py b/keystone-moon/keystone/tests/unit/test_v3_oauth1.py
deleted file mode 100644
index 198dffb8..00000000
--- a/keystone-moon/keystone/tests/unit/test_v3_oauth1.py
+++ /dev/null
@@ -1,907 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-import uuid
-
-import mock
-from oslo_log import versionutils
-from oslo_serialization import jsonutils
-from pycadf import cadftaxonomy
-from six.moves import http_client
-from six.moves import urllib
-
-from keystone.contrib.oauth1 import routers
-from keystone import exception
-from keystone import oauth1
-from keystone.oauth1 import controllers
-from keystone.oauth1 import core
-from keystone.tests import unit
-from keystone.tests.unit.common import test_notifications
-from keystone.tests.unit import ksfixtures
-from keystone.tests.unit.ksfixtures import temporaryfile
-from keystone.tests.unit import test_v3
-
-
-class OAuth1ContribTests(test_v3.RestfulTestCase):
-
- @mock.patch.object(versionutils, 'report_deprecated_feature')
- def test_exception_happens(self, mock_deprecator):
- routers.OAuth1Extension(mock.ANY)
- mock_deprecator.assert_called_once_with(mock.ANY, mock.ANY)
- args, _kwargs = mock_deprecator.call_args
- self.assertIn("Remove oauth1_extension from", args[1])
-
-
-class OAuth1Tests(test_v3.RestfulTestCase):
-
- CONSUMER_URL = '/OS-OAUTH1/consumers'
-
- def setUp(self):
- super(OAuth1Tests, self).setUp()
-
- # Now that the app has been served, we can query CONF values
- self.base_url = 'http://localhost/v3'
- self.controller = controllers.OAuthControllerV3()
-
- def _create_single_consumer(self):
- ref = {'description': uuid.uuid4().hex}
- resp = self.post(
- self.CONSUMER_URL,
- body={'consumer': ref})
- return resp.result['consumer']
-
- def _create_request_token(self, consumer, project_id):
- endpoint = '/OS-OAUTH1/request_token'
- client = oauth1.Client(consumer['key'],
- client_secret=consumer['secret'],
- signature_method=oauth1.SIG_HMAC,
- callback_uri="oob")
- headers = {'requested_project_id': project_id}
- url, headers, body = client.sign(self.base_url + endpoint,
- http_method='POST',
- headers=headers)
- return endpoint, headers
-
- def _create_access_token(self, consumer, token):
- endpoint = '/OS-OAUTH1/access_token'
- client = oauth1.Client(consumer['key'],
- client_secret=consumer['secret'],
- resource_owner_key=token.key,
- resource_owner_secret=token.secret,
- signature_method=oauth1.SIG_HMAC,
- verifier=token.verifier)
- url, headers, body = client.sign(self.base_url + endpoint,
- http_method='POST')
- headers.update({'Content-Type': 'application/json'})
- return endpoint, headers
-
- def _get_oauth_token(self, consumer, token):
- client = oauth1.Client(consumer['key'],
- client_secret=consumer['secret'],
- resource_owner_key=token.key,
- resource_owner_secret=token.secret,
- signature_method=oauth1.SIG_HMAC)
- endpoint = '/auth/tokens'
- url, headers, body = client.sign(self.base_url + endpoint,
- http_method='POST')
- headers.update({'Content-Type': 'application/json'})
- ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}}
- return endpoint, headers, ref
-
- def _authorize_request_token(self, request_id):
- return '/OS-OAUTH1/authorize/%s' % (request_id)
-
-
-class ConsumerCRUDTests(OAuth1Tests):
-
- def _consumer_create(self, description=None, description_flag=True,
- **kwargs):
- if description_flag:
- ref = {'description': description}
- else:
- ref = {}
- if kwargs:
- ref.update(kwargs)
- resp = self.post(
- self.CONSUMER_URL,
- body={'consumer': ref})
- consumer = resp.result['consumer']
- consumer_id = consumer['id']
- self.assertEqual(description, consumer['description'])
- self.assertIsNotNone(consumer_id)
- self.assertIsNotNone(consumer['secret'])
- return consumer
-
- def test_consumer_create(self):
- description = uuid.uuid4().hex
- self._consumer_create(description=description)
-
- def test_consumer_create_none_desc_1(self):
- self._consumer_create()
-
- def test_consumer_create_none_desc_2(self):
- self._consumer_create(description_flag=False)
-
- def test_consumer_create_normalize_field(self):
- # If create a consumer with a field with : or - in the name,
- # the name is normalized by converting those chars to _.
- field_name = 'some:weird-field'
- field_value = uuid.uuid4().hex
- extra_fields = {field_name: field_value}
- consumer = self._consumer_create(**extra_fields)
- normalized_field_name = 'some_weird_field'
- self.assertEqual(field_value, consumer[normalized_field_name])
-
- def test_consumer_delete(self):
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- resp = self.delete(self.CONSUMER_URL + '/%s' % consumer_id)
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
-
- def test_consumer_get(self):
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- resp = self.get(self.CONSUMER_URL + '/%s' % consumer_id)
- self_url = ['http://localhost/v3', self.CONSUMER_URL,
- '/', consumer_id]
- self_url = ''.join(self_url)
- self.assertEqual(self_url, resp.result['consumer']['links']['self'])
- self.assertEqual(consumer_id, resp.result['consumer']['id'])
-
- def test_consumer_list(self):
- self._consumer_create()
- resp = self.get(self.CONSUMER_URL)
- entities = resp.result['consumers']
- self.assertIsNotNone(entities)
- self_url = ['http://localhost/v3', self.CONSUMER_URL]
- self_url = ''.join(self_url)
- self.assertEqual(self_url, resp.result['links']['self'])
- self.assertValidListLinks(resp.result['links'])
-
- def test_consumer_update(self):
- consumer = self._create_single_consumer()
- original_id = consumer['id']
- original_description = consumer['description']
- update_description = original_description + '_new'
-
- update_ref = {'description': update_description}
- update_resp = self.patch(self.CONSUMER_URL + '/%s' % original_id,
- body={'consumer': update_ref})
- consumer = update_resp.result['consumer']
- self.assertEqual(update_description, consumer['description'])
- self.assertEqual(original_id, consumer['id'])
-
- def test_consumer_update_bad_secret(self):
- consumer = self._create_single_consumer()
- original_id = consumer['id']
- update_ref = copy.deepcopy(consumer)
- update_ref['description'] = uuid.uuid4().hex
- update_ref['secret'] = uuid.uuid4().hex
- self.patch(self.CONSUMER_URL + '/%s' % original_id,
- body={'consumer': update_ref},
- expected_status=http_client.BAD_REQUEST)
-
- def test_consumer_update_bad_id(self):
- consumer = self._create_single_consumer()
- original_id = consumer['id']
- original_description = consumer['description']
- update_description = original_description + "_new"
-
- update_ref = copy.deepcopy(consumer)
- update_ref['description'] = update_description
- update_ref['id'] = update_description
- self.patch(self.CONSUMER_URL + '/%s' % original_id,
- body={'consumer': update_ref},
- expected_status=http_client.BAD_REQUEST)
-
- def test_consumer_update_normalize_field(self):
- # If update a consumer with a field with : or - in the name,
- # the name is normalized by converting those chars to _.
- field1_name = 'some:weird-field'
- field1_orig_value = uuid.uuid4().hex
-
- extra_fields = {field1_name: field1_orig_value}
- consumer = self._consumer_create(**extra_fields)
- consumer_id = consumer['id']
-
- field1_new_value = uuid.uuid4().hex
-
- field2_name = 'weird:some-field'
- field2_value = uuid.uuid4().hex
-
- update_ref = {field1_name: field1_new_value,
- field2_name: field2_value}
-
- update_resp = self.patch(self.CONSUMER_URL + '/%s' % consumer_id,
- body={'consumer': update_ref})
- consumer = update_resp.result['consumer']
-
- normalized_field1_name = 'some_weird_field'
- self.assertEqual(field1_new_value, consumer[normalized_field1_name])
-
- normalized_field2_name = 'weird_some_field'
- self.assertEqual(field2_value, consumer[normalized_field2_name])
-
- def test_consumer_create_no_description(self):
- resp = self.post(self.CONSUMER_URL, body={'consumer': {}})
- consumer = resp.result['consumer']
- consumer_id = consumer['id']
- self.assertIsNone(consumer['description'])
- self.assertIsNotNone(consumer_id)
- self.assertIsNotNone(consumer['secret'])
-
- def test_consumer_get_bad_id(self):
- self.get(self.CONSUMER_URL + '/%(consumer_id)s'
- % {'consumer_id': uuid.uuid4().hex},
- expected_status=http_client.NOT_FOUND)
-
-
-class OAuthFlowTests(OAuth1Tests):
-
- def test_oauth_flow(self):
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer_secret = consumer['secret']
- self.consumer = {'key': consumer_id, 'secret': consumer_secret}
- self.assertIsNotNone(self.consumer['secret'])
-
- url, headers = self._create_request_token(self.consumer,
- self.project_id)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- request_key = credentials['oauth_token'][0]
- request_secret = credentials['oauth_token_secret'][0]
- self.request_token = oauth1.Token(request_key, request_secret)
- self.assertIsNotNone(self.request_token.key)
-
- url = self._authorize_request_token(request_key)
- body = {'roles': [{'id': self.role_id}]}
- resp = self.put(url, body=body, expected_status=http_client.OK)
- self.verifier = resp.result['token']['oauth_verifier']
- self.assertTrue(all(i in core.VERIFIER_CHARS for i in self.verifier))
- self.assertEqual(8, len(self.verifier))
-
- self.request_token.set_verifier(self.verifier)
- url, headers = self._create_access_token(self.consumer,
- self.request_token)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- access_key = credentials['oauth_token'][0]
- access_secret = credentials['oauth_token_secret'][0]
- self.access_token = oauth1.Token(access_key, access_secret)
- self.assertIsNotNone(self.access_token.key)
-
- url, headers, body = self._get_oauth_token(self.consumer,
- self.access_token)
- content = self.post(url, headers=headers, body=body)
- self.keystone_token_id = content.headers['X-Subject-Token']
- self.keystone_token = content.result['token']
- self.assertIsNotNone(self.keystone_token_id)
-
-
-class AccessTokenCRUDTests(OAuthFlowTests):
- def test_delete_access_token_dne(self):
- self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
- % {'user': self.user_id,
- 'auth': uuid.uuid4().hex},
- expected_status=http_client.NOT_FOUND)
-
- def test_list_no_access_tokens(self):
- resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
- % {'user_id': self.user_id})
- entities = resp.result['access_tokens']
- self.assertEqual([], entities)
- self.assertValidListLinks(resp.result['links'])
-
- def test_get_single_access_token(self):
- self.test_oauth_flow()
- url = '/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' % {
- 'user_id': self.user_id,
- 'key': self.access_token.key
- }
- resp = self.get(url)
- entity = resp.result['access_token']
- self.assertEqual(self.access_token.key, entity['id'])
- self.assertEqual(self.consumer['key'], entity['consumer_id'])
- self.assertEqual('http://localhost/v3' + url, entity['links']['self'])
-
- def test_get_access_token_dne(self):
- self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s'
- % {'user_id': self.user_id,
- 'key': uuid.uuid4().hex},
- expected_status=http_client.NOT_FOUND)
-
- def test_list_all_roles_in_access_token(self):
- self.test_oauth_flow()
- resp = self.get('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles'
- % {'id': self.user_id,
- 'key': self.access_token.key})
- entities = resp.result['roles']
- self.assertTrue(entities)
- self.assertValidListLinks(resp.result['links'])
-
- def test_get_role_in_access_token(self):
- self.test_oauth_flow()
- url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s'
- % {'id': self.user_id, 'key': self.access_token.key,
- 'role': self.role_id})
- resp = self.get(url)
- entity = resp.result['role']
- self.assertEqual(self.role_id, entity['id'])
-
- def test_get_role_in_access_token_dne(self):
- self.test_oauth_flow()
- url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s'
- % {'id': self.user_id, 'key': self.access_token.key,
- 'role': uuid.uuid4().hex})
- self.get(url, expected_status=http_client.NOT_FOUND)
-
- def test_list_and_delete_access_tokens(self):
- self.test_oauth_flow()
- # List access_tokens should be > 0
- resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
- % {'user_id': self.user_id})
- entities = resp.result['access_tokens']
- self.assertTrue(entities)
- self.assertValidListLinks(resp.result['links'])
-
- # Delete access_token
- resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
- % {'user': self.user_id,
- 'auth': self.access_token.key})
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
-
- # List access_token should be 0
- resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
- % {'user_id': self.user_id})
- entities = resp.result['access_tokens']
- self.assertEqual([], entities)
- self.assertValidListLinks(resp.result['links'])
-
-
-class AuthTokenTests(OAuthFlowTests):
-
- def test_keystone_token_is_valid(self):
- self.test_oauth_flow()
- headers = {'X-Subject-Token': self.keystone_token_id,
- 'X-Auth-Token': self.keystone_token_id}
- r = self.get('/auth/tokens', headers=headers)
- self.assertValidTokenResponse(r, self.user)
-
- # now verify the oauth section
- oauth_section = r.result['token']['OS-OAUTH1']
- self.assertEqual(self.access_token.key,
- oauth_section['access_token_id'])
- self.assertEqual(self.consumer['key'], oauth_section['consumer_id'])
-
- # verify the roles section
- roles_list = r.result['token']['roles']
- # we can just verify the 0th role since we are only assigning one role
- self.assertEqual(self.role_id, roles_list[0]['id'])
-
- # verify that the token can perform delegated tasks
- ref = unit.new_user_ref(domain_id=self.domain_id)
- r = self.admin_request(path='/v3/users', headers=headers,
- method='POST', body={'user': ref})
- self.assertValidUserResponse(r, ref)
-
- def test_delete_access_token_also_revokes_token(self):
- self.test_oauth_flow()
-
- # Delete access token
- resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
- % {'user': self.user_id,
- 'auth': self.access_token.key})
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
-
- # Check Keystone Token no longer exists
- headers = {'X-Subject-Token': self.keystone_token_id,
- 'X-Auth-Token': self.keystone_token_id}
- self.get('/auth/tokens', headers=headers,
- expected_status=http_client.NOT_FOUND)
-
- def test_deleting_consumer_also_deletes_tokens(self):
- self.test_oauth_flow()
-
- # Delete consumer
- consumer_id = self.consumer['key']
- resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s'
- % {'consumer_id': consumer_id})
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
-
- # List access_token should be 0
- resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
- % {'user_id': self.user_id})
- entities = resp.result['access_tokens']
- self.assertEqual([], entities)
-
- # Check Keystone Token no longer exists
- headers = {'X-Subject-Token': self.keystone_token_id,
- 'X-Auth-Token': self.keystone_token_id}
- self.head('/auth/tokens', headers=headers,
- expected_status=http_client.NOT_FOUND)
-
- def test_change_user_password_also_deletes_tokens(self):
- self.test_oauth_flow()
-
- # delegated keystone token exists
- headers = {'X-Subject-Token': self.keystone_token_id,
- 'X-Auth-Token': self.keystone_token_id}
- r = self.get('/auth/tokens', headers=headers)
- self.assertValidTokenResponse(r, self.user)
-
- user = {'password': uuid.uuid4().hex}
- r = self.patch('/users/%(user_id)s' % {
- 'user_id': self.user['id']},
- body={'user': user})
-
- headers = {'X-Subject-Token': self.keystone_token_id,
- 'X-Auth-Token': self.keystone_token_id}
- self.admin_request(path='/auth/tokens', headers=headers,
- method='GET', expected_status=http_client.NOT_FOUND)
-
- def test_deleting_project_also_invalidates_tokens(self):
- self.test_oauth_flow()
-
- # delegated keystone token exists
- headers = {'X-Subject-Token': self.keystone_token_id,
- 'X-Auth-Token': self.keystone_token_id}
- r = self.get('/auth/tokens', headers=headers)
- self.assertValidTokenResponse(r, self.user)
-
- r = self.delete('/projects/%(project_id)s' % {
- 'project_id': self.project_id})
-
- headers = {'X-Subject-Token': self.keystone_token_id,
- 'X-Auth-Token': self.keystone_token_id}
- self.admin_request(path='/auth/tokens', headers=headers,
- method='GET', expected_status=http_client.NOT_FOUND)
-
- def test_token_chaining_is_not_allowed(self):
- self.test_oauth_flow()
-
- # attempt to re-authenticate (token chain) with the given token
- path = '/v3/auth/tokens/'
- auth_data = self.build_authentication_request(
- token=self.keystone_token_id)
-
- self.admin_request(
- path=path,
- body=auth_data,
- token=self.keystone_token_id,
- method='POST',
- expected_status=http_client.FORBIDDEN)
-
- def test_delete_keystone_tokens_by_consumer_id(self):
- self.test_oauth_flow()
- self.token_provider_api._persistence.get_token(self.keystone_token_id)
- self.token_provider_api._persistence.delete_tokens(
- self.user_id,
- consumer_id=self.consumer['key'])
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api._persistence.get_token,
- self.keystone_token_id)
-
- def _create_trust_get_token(self):
- ref = unit.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.user_id,
- project_id=self.project_id,
- impersonation=True,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
- del ref['id']
-
- r = self.post('/OS-TRUST/trusts', body={'trust': ref})
- trust = self.assertValidTrustResponse(r)
-
- auth_data = self.build_authentication_request(
- user_id=self.user['id'],
- password=self.user['password'],
- trust_id=trust['id'])
-
- return self.get_requested_token(auth_data)
-
- def _approve_request_token_url(self):
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer_secret = consumer['secret']
- self.consumer = {'key': consumer_id, 'secret': consumer_secret}
- self.assertIsNotNone(self.consumer['secret'])
-
- url, headers = self._create_request_token(self.consumer,
- self.project_id)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- request_key = credentials['oauth_token'][0]
- request_secret = credentials['oauth_token_secret'][0]
- self.request_token = oauth1.Token(request_key, request_secret)
- self.assertIsNotNone(self.request_token.key)
-
- url = self._authorize_request_token(request_key)
-
- return url
-
- def test_oauth_token_cannot_create_new_trust(self):
- self.test_oauth_flow()
- ref = unit.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.user_id,
- project_id=self.project_id,
- impersonation=True,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
- del ref['id']
-
- self.post('/OS-TRUST/trusts',
- body={'trust': ref},
- token=self.keystone_token_id,
- expected_status=http_client.FORBIDDEN)
-
- def test_oauth_token_cannot_authorize_request_token(self):
- self.test_oauth_flow()
- url = self._approve_request_token_url()
- body = {'roles': [{'id': self.role_id}]}
- self.put(url, body=body, token=self.keystone_token_id,
- expected_status=http_client.FORBIDDEN)
-
- def test_oauth_token_cannot_list_request_tokens(self):
- self._set_policy({"identity:list_access_tokens": [],
- "identity:create_consumer": [],
- "identity:authorize_request_token": []})
- self.test_oauth_flow()
- url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
- self.get(url, token=self.keystone_token_id,
- expected_status=http_client.FORBIDDEN)
-
- def _set_policy(self, new_policy):
- self.tempfile = self.useFixture(temporaryfile.SecureTempFile())
- self.tmpfilename = self.tempfile.file_name
- self.config_fixture.config(group='oslo_policy',
- policy_file=self.tmpfilename)
- with open(self.tmpfilename, "w") as policyfile:
- policyfile.write(jsonutils.dumps(new_policy))
-
- def test_trust_token_cannot_authorize_request_token(self):
- trust_token = self._create_trust_get_token()
- url = self._approve_request_token_url()
- body = {'roles': [{'id': self.role_id}]}
- self.put(url, body=body, token=trust_token,
- expected_status=http_client.FORBIDDEN)
-
- def test_trust_token_cannot_list_request_tokens(self):
- self._set_policy({"identity:list_access_tokens": [],
- "identity:create_trust": []})
- trust_token = self._create_trust_get_token()
- url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
- self.get(url, token=trust_token,
- expected_status=http_client.FORBIDDEN)
-
-
-class FernetAuthTokenTests(AuthTokenTests):
-
- def config_overrides(self):
- super(FernetAuthTokenTests, self).config_overrides()
- self.config_fixture.config(group='token', provider='fernet')
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
-
- def test_delete_keystone_tokens_by_consumer_id(self):
- # NOTE(lbragstad): Fernet tokens are never persisted in the backend.
- pass
-
-
-class MaliciousOAuth1Tests(OAuth1Tests):
-
- def test_bad_consumer_secret(self):
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer = {'key': consumer_id, 'secret': uuid.uuid4().hex}
- url, headers = self._create_request_token(consumer, self.project_id)
- self.post(url, headers=headers,
- expected_status=http_client.UNAUTHORIZED)
-
- def test_bad_request_token_key(self):
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer_secret = consumer['secret']
- consumer = {'key': consumer_id, 'secret': consumer_secret}
- url, headers = self._create_request_token(consumer, self.project_id)
- self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- url = self._authorize_request_token(uuid.uuid4().hex)
- body = {'roles': [{'id': self.role_id}]}
- self.put(url, body=body, expected_status=http_client.NOT_FOUND)
-
- def test_bad_consumer_id(self):
- consumer = self._create_single_consumer()
- consumer_id = uuid.uuid4().hex
- consumer_secret = consumer['secret']
- consumer = {'key': consumer_id, 'secret': consumer_secret}
- url, headers = self._create_request_token(consumer, self.project_id)
- self.post(url, headers=headers, expected_status=http_client.NOT_FOUND)
-
- def test_bad_requested_project_id(self):
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer_secret = consumer['secret']
- consumer = {'key': consumer_id, 'secret': consumer_secret}
- project_id = uuid.uuid4().hex
- url, headers = self._create_request_token(consumer, project_id)
- self.post(url, headers=headers, expected_status=http_client.NOT_FOUND)
-
- def test_bad_verifier(self):
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer_secret = consumer['secret']
- consumer = {'key': consumer_id, 'secret': consumer_secret}
-
- url, headers = self._create_request_token(consumer, self.project_id)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- request_key = credentials['oauth_token'][0]
- request_secret = credentials['oauth_token_secret'][0]
- request_token = oauth1.Token(request_key, request_secret)
-
- url = self._authorize_request_token(request_key)
- body = {'roles': [{'id': self.role_id}]}
- resp = self.put(url, body=body, expected_status=http_client.OK)
- verifier = resp.result['token']['oauth_verifier']
- self.assertIsNotNone(verifier)
-
- request_token.set_verifier(uuid.uuid4().hex)
- url, headers = self._create_access_token(consumer, request_token)
- self.post(url, headers=headers,
- expected_status=http_client.UNAUTHORIZED)
-
- def test_bad_authorizing_roles(self):
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer_secret = consumer['secret']
- consumer = {'key': consumer_id, 'secret': consumer_secret}
-
- url, headers = self._create_request_token(consumer, self.project_id)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- request_key = credentials['oauth_token'][0]
-
- self.assignment_api.remove_role_from_user_and_project(
- self.user_id, self.project_id, self.role_id)
- url = self._authorize_request_token(request_key)
- body = {'roles': [{'id': self.role_id}]}
- self.admin_request(path=url, method='PUT',
- body=body, expected_status=http_client.NOT_FOUND)
-
- def test_expired_authorizing_request_token(self):
- self.config_fixture.config(group='oauth1', request_token_duration=-1)
-
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer_secret = consumer['secret']
- self.consumer = {'key': consumer_id, 'secret': consumer_secret}
- self.assertIsNotNone(self.consumer['key'])
-
- url, headers = self._create_request_token(self.consumer,
- self.project_id)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- request_key = credentials['oauth_token'][0]
- request_secret = credentials['oauth_token_secret'][0]
- self.request_token = oauth1.Token(request_key, request_secret)
- self.assertIsNotNone(self.request_token.key)
-
- url = self._authorize_request_token(request_key)
- body = {'roles': [{'id': self.role_id}]}
- self.put(url, body=body, expected_status=http_client.UNAUTHORIZED)
-
- def test_expired_creating_keystone_token(self):
- self.config_fixture.config(group='oauth1', access_token_duration=-1)
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer_secret = consumer['secret']
- self.consumer = {'key': consumer_id, 'secret': consumer_secret}
- self.assertIsNotNone(self.consumer['key'])
-
- url, headers = self._create_request_token(self.consumer,
- self.project_id)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- request_key = credentials['oauth_token'][0]
- request_secret = credentials['oauth_token_secret'][0]
- self.request_token = oauth1.Token(request_key, request_secret)
- self.assertIsNotNone(self.request_token.key)
-
- url = self._authorize_request_token(request_key)
- body = {'roles': [{'id': self.role_id}]}
- resp = self.put(url, body=body, expected_status=http_client.OK)
- self.verifier = resp.result['token']['oauth_verifier']
-
- self.request_token.set_verifier(self.verifier)
- url, headers = self._create_access_token(self.consumer,
- self.request_token)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- access_key = credentials['oauth_token'][0]
- access_secret = credentials['oauth_token_secret'][0]
- self.access_token = oauth1.Token(access_key, access_secret)
- self.assertIsNotNone(self.access_token.key)
-
- url, headers, body = self._get_oauth_token(self.consumer,
- self.access_token)
- self.post(url, headers=headers, body=body,
- expected_status=http_client.UNAUTHORIZED)
-
- def test_missing_oauth_headers(self):
- endpoint = '/OS-OAUTH1/request_token'
- client = oauth1.Client(uuid.uuid4().hex,
- client_secret=uuid.uuid4().hex,
- signature_method=oauth1.SIG_HMAC,
- callback_uri="oob")
- headers = {'requested_project_id': uuid.uuid4().hex}
- _url, headers, _body = client.sign(self.base_url + endpoint,
- http_method='POST',
- headers=headers)
-
- # NOTE(stevemar): To simulate this error, we remove the Authorization
- # header from the post request.
- del headers['Authorization']
- self.post(endpoint, headers=headers,
- expected_status=http_client.INTERNAL_SERVER_ERROR)
-
-
-class OAuthNotificationTests(OAuth1Tests,
- test_notifications.BaseNotificationTest):
-
- def test_create_consumer(self):
- consumer_ref = self._create_single_consumer()
- self._assert_notify_sent(consumer_ref['id'],
- test_notifications.CREATED_OPERATION,
- 'OS-OAUTH1:consumer')
- self._assert_last_audit(consumer_ref['id'],
- test_notifications.CREATED_OPERATION,
- 'OS-OAUTH1:consumer',
- cadftaxonomy.SECURITY_ACCOUNT)
-
- def test_update_consumer(self):
- consumer_ref = self._create_single_consumer()
- update_ref = {'consumer': {'description': uuid.uuid4().hex}}
- self.oauth_api.update_consumer(consumer_ref['id'], update_ref)
- self._assert_notify_sent(consumer_ref['id'],
- test_notifications.UPDATED_OPERATION,
- 'OS-OAUTH1:consumer')
- self._assert_last_audit(consumer_ref['id'],
- test_notifications.UPDATED_OPERATION,
- 'OS-OAUTH1:consumer',
- cadftaxonomy.SECURITY_ACCOUNT)
-
- def test_delete_consumer(self):
- consumer_ref = self._create_single_consumer()
- self.oauth_api.delete_consumer(consumer_ref['id'])
- self._assert_notify_sent(consumer_ref['id'],
- test_notifications.DELETED_OPERATION,
- 'OS-OAUTH1:consumer')
- self._assert_last_audit(consumer_ref['id'],
- test_notifications.DELETED_OPERATION,
- 'OS-OAUTH1:consumer',
- cadftaxonomy.SECURITY_ACCOUNT)
-
- def test_oauth_flow_notifications(self):
- """Test to ensure notifications are sent for oauth tokens
-
- This test is very similar to test_oauth_flow, however
- there are additional checks in this test for ensuring that
- notifications for request token creation, and access token
- creation/deletion are emitted.
- """
- consumer = self._create_single_consumer()
- consumer_id = consumer['id']
- consumer_secret = consumer['secret']
- self.consumer = {'key': consumer_id, 'secret': consumer_secret}
- self.assertIsNotNone(self.consumer['secret'])
-
- url, headers = self._create_request_token(self.consumer,
- self.project_id)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- request_key = credentials['oauth_token'][0]
- request_secret = credentials['oauth_token_secret'][0]
- self.request_token = oauth1.Token(request_key, request_secret)
- self.assertIsNotNone(self.request_token.key)
-
- # Test to ensure the create request token notification is sent
- self._assert_notify_sent(request_key,
- test_notifications.CREATED_OPERATION,
- 'OS-OAUTH1:request_token')
- self._assert_last_audit(request_key,
- test_notifications.CREATED_OPERATION,
- 'OS-OAUTH1:request_token',
- cadftaxonomy.SECURITY_CREDENTIAL)
-
- url = self._authorize_request_token(request_key)
- body = {'roles': [{'id': self.role_id}]}
- resp = self.put(url, body=body, expected_status=http_client.OK)
- self.verifier = resp.result['token']['oauth_verifier']
- self.assertTrue(all(i in core.VERIFIER_CHARS for i in self.verifier))
- self.assertEqual(8, len(self.verifier))
-
- self.request_token.set_verifier(self.verifier)
- url, headers = self._create_access_token(self.consumer,
- self.request_token)
- content = self.post(
- url, headers=headers,
- response_content_type='application/x-www-urlformencoded')
- credentials = urllib.parse.parse_qs(content.result)
- access_key = credentials['oauth_token'][0]
- access_secret = credentials['oauth_token_secret'][0]
- self.access_token = oauth1.Token(access_key, access_secret)
- self.assertIsNotNone(self.access_token.key)
-
- # Test to ensure the create access token notification is sent
- self._assert_notify_sent(access_key,
- test_notifications.CREATED_OPERATION,
- 'OS-OAUTH1:access_token')
- self._assert_last_audit(access_key,
- test_notifications.CREATED_OPERATION,
- 'OS-OAUTH1:access_token',
- cadftaxonomy.SECURITY_CREDENTIAL)
-
- resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
- % {'user': self.user_id,
- 'auth': self.access_token.key})
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
-
- # Test to ensure the delete access token notification is sent
- self._assert_notify_sent(access_key,
- test_notifications.DELETED_OPERATION,
- 'OS-OAUTH1:access_token')
- self._assert_last_audit(access_key,
- test_notifications.DELETED_OPERATION,
- 'OS-OAUTH1:access_token',
- cadftaxonomy.SECURITY_CREDENTIAL)
-
-
-class OAuthCADFNotificationTests(OAuthNotificationTests):
-
- def setUp(self):
- """Repeat the tests for CADF notifications."""
- super(OAuthCADFNotificationTests, self).setUp()
- self.config_fixture.config(notification_format='cadf')
-
-
-class JsonHomeTests(OAuth1Tests, test_v3.JsonHomeTestMixin):
- JSON_HOME_DATA = {
- 'http://docs.openstack.org/api/openstack-identity/3/ext/OS-OAUTH1/1.0/'
- 'rel/consumers': {
- 'href': '/OS-OAUTH1/consumers',
- },
- }