From 920a49cfa055733d575282973e23558c33087a4a Mon Sep 17 00:00:00 2001 From: RHE Date: Fri, 24 Nov 2017 13:54:26 +0100 Subject: remove keystone-moon Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd Signed-off-by: RHE --- .../keystone/tests/unit/test_v3_oauth1.py | 907 --------------------- 1 file changed, 907 deletions(-) delete mode 100644 keystone-moon/keystone/tests/unit/test_v3_oauth1.py (limited to 'keystone-moon/keystone/tests/unit/test_v3_oauth1.py') diff --git a/keystone-moon/keystone/tests/unit/test_v3_oauth1.py b/keystone-moon/keystone/tests/unit/test_v3_oauth1.py deleted file mode 100644 index 198dffb8..00000000 --- a/keystone-moon/keystone/tests/unit/test_v3_oauth1.py +++ /dev/null @@ -1,907 +0,0 @@ -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import uuid - -import mock -from oslo_log import versionutils -from oslo_serialization import jsonutils -from pycadf import cadftaxonomy -from six.moves import http_client -from six.moves import urllib - -from keystone.contrib.oauth1 import routers -from keystone import exception -from keystone import oauth1 -from keystone.oauth1 import controllers -from keystone.oauth1 import core -from keystone.tests import unit -from keystone.tests.unit.common import test_notifications -from keystone.tests.unit import ksfixtures -from keystone.tests.unit.ksfixtures import temporaryfile -from keystone.tests.unit import test_v3 - - -class OAuth1ContribTests(test_v3.RestfulTestCase): - - @mock.patch.object(versionutils, 'report_deprecated_feature') - def test_exception_happens(self, mock_deprecator): - routers.OAuth1Extension(mock.ANY) - mock_deprecator.assert_called_once_with(mock.ANY, mock.ANY) - args, _kwargs = mock_deprecator.call_args - self.assertIn("Remove oauth1_extension from", args[1]) - - -class OAuth1Tests(test_v3.RestfulTestCase): - - CONSUMER_URL = '/OS-OAUTH1/consumers' - - def setUp(self): - super(OAuth1Tests, self).setUp() - - # Now that the app has been served, we can query CONF values - self.base_url = 'http://localhost/v3' - self.controller = controllers.OAuthControllerV3() - - def _create_single_consumer(self): - ref = {'description': uuid.uuid4().hex} - resp = self.post( - self.CONSUMER_URL, - body={'consumer': ref}) - return resp.result['consumer'] - - def _create_request_token(self, consumer, project_id): - endpoint = '/OS-OAUTH1/request_token' - client = oauth1.Client(consumer['key'], - client_secret=consumer['secret'], - signature_method=oauth1.SIG_HMAC, - callback_uri="oob") - headers = {'requested_project_id': project_id} - url, headers, body = client.sign(self.base_url + endpoint, - http_method='POST', - headers=headers) - return endpoint, headers - - def _create_access_token(self, consumer, token): - endpoint = '/OS-OAUTH1/access_token' - client = oauth1.Client(consumer['key'], - client_secret=consumer['secret'], - resource_owner_key=token.key, - resource_owner_secret=token.secret, - signature_method=oauth1.SIG_HMAC, - verifier=token.verifier) - url, headers, body = client.sign(self.base_url + endpoint, - http_method='POST') - headers.update({'Content-Type': 'application/json'}) - return endpoint, headers - - def _get_oauth_token(self, consumer, token): - client = oauth1.Client(consumer['key'], - client_secret=consumer['secret'], - resource_owner_key=token.key, - resource_owner_secret=token.secret, - signature_method=oauth1.SIG_HMAC) - endpoint = '/auth/tokens' - url, headers, body = client.sign(self.base_url + endpoint, - http_method='POST') - headers.update({'Content-Type': 'application/json'}) - ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}} - return endpoint, headers, ref - - def _authorize_request_token(self, request_id): - return '/OS-OAUTH1/authorize/%s' % (request_id) - - -class ConsumerCRUDTests(OAuth1Tests): - - def _consumer_create(self, description=None, description_flag=True, - **kwargs): - if description_flag: - ref = {'description': description} - else: - ref = {} - if kwargs: - ref.update(kwargs) - resp = self.post( - self.CONSUMER_URL, - body={'consumer': ref}) - consumer = resp.result['consumer'] - consumer_id = consumer['id'] - self.assertEqual(description, consumer['description']) - self.assertIsNotNone(consumer_id) - self.assertIsNotNone(consumer['secret']) - return consumer - - def test_consumer_create(self): - description = uuid.uuid4().hex - self._consumer_create(description=description) - - def test_consumer_create_none_desc_1(self): - self._consumer_create() - - def test_consumer_create_none_desc_2(self): - self._consumer_create(description_flag=False) - - def test_consumer_create_normalize_field(self): - # If create a consumer with a field with : or - in the name, - # the name is normalized by converting those chars to _. - field_name = 'some:weird-field' - field_value = uuid.uuid4().hex - extra_fields = {field_name: field_value} - consumer = self._consumer_create(**extra_fields) - normalized_field_name = 'some_weird_field' - self.assertEqual(field_value, consumer[normalized_field_name]) - - def test_consumer_delete(self): - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - resp = self.delete(self.CONSUMER_URL + '/%s' % consumer_id) - self.assertResponseStatus(resp, http_client.NO_CONTENT) - - def test_consumer_get(self): - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - resp = self.get(self.CONSUMER_URL + '/%s' % consumer_id) - self_url = ['http://localhost/v3', self.CONSUMER_URL, - '/', consumer_id] - self_url = ''.join(self_url) - self.assertEqual(self_url, resp.result['consumer']['links']['self']) - self.assertEqual(consumer_id, resp.result['consumer']['id']) - - def test_consumer_list(self): - self._consumer_create() - resp = self.get(self.CONSUMER_URL) - entities = resp.result['consumers'] - self.assertIsNotNone(entities) - self_url = ['http://localhost/v3', self.CONSUMER_URL] - self_url = ''.join(self_url) - self.assertEqual(self_url, resp.result['links']['self']) - self.assertValidListLinks(resp.result['links']) - - def test_consumer_update(self): - consumer = self._create_single_consumer() - original_id = consumer['id'] - original_description = consumer['description'] - update_description = original_description + '_new' - - update_ref = {'description': update_description} - update_resp = self.patch(self.CONSUMER_URL + '/%s' % original_id, - body={'consumer': update_ref}) - consumer = update_resp.result['consumer'] - self.assertEqual(update_description, consumer['description']) - self.assertEqual(original_id, consumer['id']) - - def test_consumer_update_bad_secret(self): - consumer = self._create_single_consumer() - original_id = consumer['id'] - update_ref = copy.deepcopy(consumer) - update_ref['description'] = uuid.uuid4().hex - update_ref['secret'] = uuid.uuid4().hex - self.patch(self.CONSUMER_URL + '/%s' % original_id, - body={'consumer': update_ref}, - expected_status=http_client.BAD_REQUEST) - - def test_consumer_update_bad_id(self): - consumer = self._create_single_consumer() - original_id = consumer['id'] - original_description = consumer['description'] - update_description = original_description + "_new" - - update_ref = copy.deepcopy(consumer) - update_ref['description'] = update_description - update_ref['id'] = update_description - self.patch(self.CONSUMER_URL + '/%s' % original_id, - body={'consumer': update_ref}, - expected_status=http_client.BAD_REQUEST) - - def test_consumer_update_normalize_field(self): - # If update a consumer with a field with : or - in the name, - # the name is normalized by converting those chars to _. - field1_name = 'some:weird-field' - field1_orig_value = uuid.uuid4().hex - - extra_fields = {field1_name: field1_orig_value} - consumer = self._consumer_create(**extra_fields) - consumer_id = consumer['id'] - - field1_new_value = uuid.uuid4().hex - - field2_name = 'weird:some-field' - field2_value = uuid.uuid4().hex - - update_ref = {field1_name: field1_new_value, - field2_name: field2_value} - - update_resp = self.patch(self.CONSUMER_URL + '/%s' % consumer_id, - body={'consumer': update_ref}) - consumer = update_resp.result['consumer'] - - normalized_field1_name = 'some_weird_field' - self.assertEqual(field1_new_value, consumer[normalized_field1_name]) - - normalized_field2_name = 'weird_some_field' - self.assertEqual(field2_value, consumer[normalized_field2_name]) - - def test_consumer_create_no_description(self): - resp = self.post(self.CONSUMER_URL, body={'consumer': {}}) - consumer = resp.result['consumer'] - consumer_id = consumer['id'] - self.assertIsNone(consumer['description']) - self.assertIsNotNone(consumer_id) - self.assertIsNotNone(consumer['secret']) - - def test_consumer_get_bad_id(self): - self.get(self.CONSUMER_URL + '/%(consumer_id)s' - % {'consumer_id': uuid.uuid4().hex}, - expected_status=http_client.NOT_FOUND) - - -class OAuthFlowTests(OAuth1Tests): - - def test_oauth_flow(self): - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer_secret = consumer['secret'] - self.consumer = {'key': consumer_id, 'secret': consumer_secret} - self.assertIsNotNone(self.consumer['secret']) - - url, headers = self._create_request_token(self.consumer, - self.project_id) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - request_key = credentials['oauth_token'][0] - request_secret = credentials['oauth_token_secret'][0] - self.request_token = oauth1.Token(request_key, request_secret) - self.assertIsNotNone(self.request_token.key) - - url = self._authorize_request_token(request_key) - body = {'roles': [{'id': self.role_id}]} - resp = self.put(url, body=body, expected_status=http_client.OK) - self.verifier = resp.result['token']['oauth_verifier'] - self.assertTrue(all(i in core.VERIFIER_CHARS for i in self.verifier)) - self.assertEqual(8, len(self.verifier)) - - self.request_token.set_verifier(self.verifier) - url, headers = self._create_access_token(self.consumer, - self.request_token) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - access_key = credentials['oauth_token'][0] - access_secret = credentials['oauth_token_secret'][0] - self.access_token = oauth1.Token(access_key, access_secret) - self.assertIsNotNone(self.access_token.key) - - url, headers, body = self._get_oauth_token(self.consumer, - self.access_token) - content = self.post(url, headers=headers, body=body) - self.keystone_token_id = content.headers['X-Subject-Token'] - self.keystone_token = content.result['token'] - self.assertIsNotNone(self.keystone_token_id) - - -class AccessTokenCRUDTests(OAuthFlowTests): - def test_delete_access_token_dne(self): - self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' - % {'user': self.user_id, - 'auth': uuid.uuid4().hex}, - expected_status=http_client.NOT_FOUND) - - def test_list_no_access_tokens(self): - resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' - % {'user_id': self.user_id}) - entities = resp.result['access_tokens'] - self.assertEqual([], entities) - self.assertValidListLinks(resp.result['links']) - - def test_get_single_access_token(self): - self.test_oauth_flow() - url = '/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' % { - 'user_id': self.user_id, - 'key': self.access_token.key - } - resp = self.get(url) - entity = resp.result['access_token'] - self.assertEqual(self.access_token.key, entity['id']) - self.assertEqual(self.consumer['key'], entity['consumer_id']) - self.assertEqual('http://localhost/v3' + url, entity['links']['self']) - - def test_get_access_token_dne(self): - self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' - % {'user_id': self.user_id, - 'key': uuid.uuid4().hex}, - expected_status=http_client.NOT_FOUND) - - def test_list_all_roles_in_access_token(self): - self.test_oauth_flow() - resp = self.get('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles' - % {'id': self.user_id, - 'key': self.access_token.key}) - entities = resp.result['roles'] - self.assertTrue(entities) - self.assertValidListLinks(resp.result['links']) - - def test_get_role_in_access_token(self): - self.test_oauth_flow() - url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s' - % {'id': self.user_id, 'key': self.access_token.key, - 'role': self.role_id}) - resp = self.get(url) - entity = resp.result['role'] - self.assertEqual(self.role_id, entity['id']) - - def test_get_role_in_access_token_dne(self): - self.test_oauth_flow() - url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s' - % {'id': self.user_id, 'key': self.access_token.key, - 'role': uuid.uuid4().hex}) - self.get(url, expected_status=http_client.NOT_FOUND) - - def test_list_and_delete_access_tokens(self): - self.test_oauth_flow() - # List access_tokens should be > 0 - resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' - % {'user_id': self.user_id}) - entities = resp.result['access_tokens'] - self.assertTrue(entities) - self.assertValidListLinks(resp.result['links']) - - # Delete access_token - resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' - % {'user': self.user_id, - 'auth': self.access_token.key}) - self.assertResponseStatus(resp, http_client.NO_CONTENT) - - # List access_token should be 0 - resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' - % {'user_id': self.user_id}) - entities = resp.result['access_tokens'] - self.assertEqual([], entities) - self.assertValidListLinks(resp.result['links']) - - -class AuthTokenTests(OAuthFlowTests): - - def test_keystone_token_is_valid(self): - self.test_oauth_flow() - headers = {'X-Subject-Token': self.keystone_token_id, - 'X-Auth-Token': self.keystone_token_id} - r = self.get('/auth/tokens', headers=headers) - self.assertValidTokenResponse(r, self.user) - - # now verify the oauth section - oauth_section = r.result['token']['OS-OAUTH1'] - self.assertEqual(self.access_token.key, - oauth_section['access_token_id']) - self.assertEqual(self.consumer['key'], oauth_section['consumer_id']) - - # verify the roles section - roles_list = r.result['token']['roles'] - # we can just verify the 0th role since we are only assigning one role - self.assertEqual(self.role_id, roles_list[0]['id']) - - # verify that the token can perform delegated tasks - ref = unit.new_user_ref(domain_id=self.domain_id) - r = self.admin_request(path='/v3/users', headers=headers, - method='POST', body={'user': ref}) - self.assertValidUserResponse(r, ref) - - def test_delete_access_token_also_revokes_token(self): - self.test_oauth_flow() - - # Delete access token - resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' - % {'user': self.user_id, - 'auth': self.access_token.key}) - self.assertResponseStatus(resp, http_client.NO_CONTENT) - - # Check Keystone Token no longer exists - headers = {'X-Subject-Token': self.keystone_token_id, - 'X-Auth-Token': self.keystone_token_id} - self.get('/auth/tokens', headers=headers, - expected_status=http_client.NOT_FOUND) - - def test_deleting_consumer_also_deletes_tokens(self): - self.test_oauth_flow() - - # Delete consumer - consumer_id = self.consumer['key'] - resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s' - % {'consumer_id': consumer_id}) - self.assertResponseStatus(resp, http_client.NO_CONTENT) - - # List access_token should be 0 - resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' - % {'user_id': self.user_id}) - entities = resp.result['access_tokens'] - self.assertEqual([], entities) - - # Check Keystone Token no longer exists - headers = {'X-Subject-Token': self.keystone_token_id, - 'X-Auth-Token': self.keystone_token_id} - self.head('/auth/tokens', headers=headers, - expected_status=http_client.NOT_FOUND) - - def test_change_user_password_also_deletes_tokens(self): - self.test_oauth_flow() - - # delegated keystone token exists - headers = {'X-Subject-Token': self.keystone_token_id, - 'X-Auth-Token': self.keystone_token_id} - r = self.get('/auth/tokens', headers=headers) - self.assertValidTokenResponse(r, self.user) - - user = {'password': uuid.uuid4().hex} - r = self.patch('/users/%(user_id)s' % { - 'user_id': self.user['id']}, - body={'user': user}) - - headers = {'X-Subject-Token': self.keystone_token_id, - 'X-Auth-Token': self.keystone_token_id} - self.admin_request(path='/auth/tokens', headers=headers, - method='GET', expected_status=http_client.NOT_FOUND) - - def test_deleting_project_also_invalidates_tokens(self): - self.test_oauth_flow() - - # delegated keystone token exists - headers = {'X-Subject-Token': self.keystone_token_id, - 'X-Auth-Token': self.keystone_token_id} - r = self.get('/auth/tokens', headers=headers) - self.assertValidTokenResponse(r, self.user) - - r = self.delete('/projects/%(project_id)s' % { - 'project_id': self.project_id}) - - headers = {'X-Subject-Token': self.keystone_token_id, - 'X-Auth-Token': self.keystone_token_id} - self.admin_request(path='/auth/tokens', headers=headers, - method='GET', expected_status=http_client.NOT_FOUND) - - def test_token_chaining_is_not_allowed(self): - self.test_oauth_flow() - - # attempt to re-authenticate (token chain) with the given token - path = '/v3/auth/tokens/' - auth_data = self.build_authentication_request( - token=self.keystone_token_id) - - self.admin_request( - path=path, - body=auth_data, - token=self.keystone_token_id, - method='POST', - expected_status=http_client.FORBIDDEN) - - def test_delete_keystone_tokens_by_consumer_id(self): - self.test_oauth_flow() - self.token_provider_api._persistence.get_token(self.keystone_token_id) - self.token_provider_api._persistence.delete_tokens( - self.user_id, - consumer_id=self.consumer['key']) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - self.keystone_token_id) - - def _create_trust_get_token(self): - ref = unit.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.user_id, - project_id=self.project_id, - impersonation=True, - expires=dict(minutes=1), - role_ids=[self.role_id]) - del ref['id'] - - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - trust = self.assertValidTrustResponse(r) - - auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - trust_id=trust['id']) - - return self.get_requested_token(auth_data) - - def _approve_request_token_url(self): - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer_secret = consumer['secret'] - self.consumer = {'key': consumer_id, 'secret': consumer_secret} - self.assertIsNotNone(self.consumer['secret']) - - url, headers = self._create_request_token(self.consumer, - self.project_id) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - request_key = credentials['oauth_token'][0] - request_secret = credentials['oauth_token_secret'][0] - self.request_token = oauth1.Token(request_key, request_secret) - self.assertIsNotNone(self.request_token.key) - - url = self._authorize_request_token(request_key) - - return url - - def test_oauth_token_cannot_create_new_trust(self): - self.test_oauth_flow() - ref = unit.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.user_id, - project_id=self.project_id, - impersonation=True, - expires=dict(minutes=1), - role_ids=[self.role_id]) - del ref['id'] - - self.post('/OS-TRUST/trusts', - body={'trust': ref}, - token=self.keystone_token_id, - expected_status=http_client.FORBIDDEN) - - def test_oauth_token_cannot_authorize_request_token(self): - self.test_oauth_flow() - url = self._approve_request_token_url() - body = {'roles': [{'id': self.role_id}]} - self.put(url, body=body, token=self.keystone_token_id, - expected_status=http_client.FORBIDDEN) - - def test_oauth_token_cannot_list_request_tokens(self): - self._set_policy({"identity:list_access_tokens": [], - "identity:create_consumer": [], - "identity:authorize_request_token": []}) - self.test_oauth_flow() - url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id - self.get(url, token=self.keystone_token_id, - expected_status=http_client.FORBIDDEN) - - def _set_policy(self, new_policy): - self.tempfile = self.useFixture(temporaryfile.SecureTempFile()) - self.tmpfilename = self.tempfile.file_name - self.config_fixture.config(group='oslo_policy', - policy_file=self.tmpfilename) - with open(self.tmpfilename, "w") as policyfile: - policyfile.write(jsonutils.dumps(new_policy)) - - def test_trust_token_cannot_authorize_request_token(self): - trust_token = self._create_trust_get_token() - url = self._approve_request_token_url() - body = {'roles': [{'id': self.role_id}]} - self.put(url, body=body, token=trust_token, - expected_status=http_client.FORBIDDEN) - - def test_trust_token_cannot_list_request_tokens(self): - self._set_policy({"identity:list_access_tokens": [], - "identity:create_trust": []}) - trust_token = self._create_trust_get_token() - url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id - self.get(url, token=trust_token, - expected_status=http_client.FORBIDDEN) - - -class FernetAuthTokenTests(AuthTokenTests): - - def config_overrides(self): - super(FernetAuthTokenTests, self).config_overrides() - self.config_fixture.config(group='token', provider='fernet') - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - - def test_delete_keystone_tokens_by_consumer_id(self): - # NOTE(lbragstad): Fernet tokens are never persisted in the backend. - pass - - -class MaliciousOAuth1Tests(OAuth1Tests): - - def test_bad_consumer_secret(self): - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer = {'key': consumer_id, 'secret': uuid.uuid4().hex} - url, headers = self._create_request_token(consumer, self.project_id) - self.post(url, headers=headers, - expected_status=http_client.UNAUTHORIZED) - - def test_bad_request_token_key(self): - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer_secret = consumer['secret'] - consumer = {'key': consumer_id, 'secret': consumer_secret} - url, headers = self._create_request_token(consumer, self.project_id) - self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - url = self._authorize_request_token(uuid.uuid4().hex) - body = {'roles': [{'id': self.role_id}]} - self.put(url, body=body, expected_status=http_client.NOT_FOUND) - - def test_bad_consumer_id(self): - consumer = self._create_single_consumer() - consumer_id = uuid.uuid4().hex - consumer_secret = consumer['secret'] - consumer = {'key': consumer_id, 'secret': consumer_secret} - url, headers = self._create_request_token(consumer, self.project_id) - self.post(url, headers=headers, expected_status=http_client.NOT_FOUND) - - def test_bad_requested_project_id(self): - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer_secret = consumer['secret'] - consumer = {'key': consumer_id, 'secret': consumer_secret} - project_id = uuid.uuid4().hex - url, headers = self._create_request_token(consumer, project_id) - self.post(url, headers=headers, expected_status=http_client.NOT_FOUND) - - def test_bad_verifier(self): - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer_secret = consumer['secret'] - consumer = {'key': consumer_id, 'secret': consumer_secret} - - url, headers = self._create_request_token(consumer, self.project_id) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - request_key = credentials['oauth_token'][0] - request_secret = credentials['oauth_token_secret'][0] - request_token = oauth1.Token(request_key, request_secret) - - url = self._authorize_request_token(request_key) - body = {'roles': [{'id': self.role_id}]} - resp = self.put(url, body=body, expected_status=http_client.OK) - verifier = resp.result['token']['oauth_verifier'] - self.assertIsNotNone(verifier) - - request_token.set_verifier(uuid.uuid4().hex) - url, headers = self._create_access_token(consumer, request_token) - self.post(url, headers=headers, - expected_status=http_client.UNAUTHORIZED) - - def test_bad_authorizing_roles(self): - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer_secret = consumer['secret'] - consumer = {'key': consumer_id, 'secret': consumer_secret} - - url, headers = self._create_request_token(consumer, self.project_id) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - request_key = credentials['oauth_token'][0] - - self.assignment_api.remove_role_from_user_and_project( - self.user_id, self.project_id, self.role_id) - url = self._authorize_request_token(request_key) - body = {'roles': [{'id': self.role_id}]} - self.admin_request(path=url, method='PUT', - body=body, expected_status=http_client.NOT_FOUND) - - def test_expired_authorizing_request_token(self): - self.config_fixture.config(group='oauth1', request_token_duration=-1) - - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer_secret = consumer['secret'] - self.consumer = {'key': consumer_id, 'secret': consumer_secret} - self.assertIsNotNone(self.consumer['key']) - - url, headers = self._create_request_token(self.consumer, - self.project_id) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - request_key = credentials['oauth_token'][0] - request_secret = credentials['oauth_token_secret'][0] - self.request_token = oauth1.Token(request_key, request_secret) - self.assertIsNotNone(self.request_token.key) - - url = self._authorize_request_token(request_key) - body = {'roles': [{'id': self.role_id}]} - self.put(url, body=body, expected_status=http_client.UNAUTHORIZED) - - def test_expired_creating_keystone_token(self): - self.config_fixture.config(group='oauth1', access_token_duration=-1) - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer_secret = consumer['secret'] - self.consumer = {'key': consumer_id, 'secret': consumer_secret} - self.assertIsNotNone(self.consumer['key']) - - url, headers = self._create_request_token(self.consumer, - self.project_id) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - request_key = credentials['oauth_token'][0] - request_secret = credentials['oauth_token_secret'][0] - self.request_token = oauth1.Token(request_key, request_secret) - self.assertIsNotNone(self.request_token.key) - - url = self._authorize_request_token(request_key) - body = {'roles': [{'id': self.role_id}]} - resp = self.put(url, body=body, expected_status=http_client.OK) - self.verifier = resp.result['token']['oauth_verifier'] - - self.request_token.set_verifier(self.verifier) - url, headers = self._create_access_token(self.consumer, - self.request_token) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - access_key = credentials['oauth_token'][0] - access_secret = credentials['oauth_token_secret'][0] - self.access_token = oauth1.Token(access_key, access_secret) - self.assertIsNotNone(self.access_token.key) - - url, headers, body = self._get_oauth_token(self.consumer, - self.access_token) - self.post(url, headers=headers, body=body, - expected_status=http_client.UNAUTHORIZED) - - def test_missing_oauth_headers(self): - endpoint = '/OS-OAUTH1/request_token' - client = oauth1.Client(uuid.uuid4().hex, - client_secret=uuid.uuid4().hex, - signature_method=oauth1.SIG_HMAC, - callback_uri="oob") - headers = {'requested_project_id': uuid.uuid4().hex} - _url, headers, _body = client.sign(self.base_url + endpoint, - http_method='POST', - headers=headers) - - # NOTE(stevemar): To simulate this error, we remove the Authorization - # header from the post request. - del headers['Authorization'] - self.post(endpoint, headers=headers, - expected_status=http_client.INTERNAL_SERVER_ERROR) - - -class OAuthNotificationTests(OAuth1Tests, - test_notifications.BaseNotificationTest): - - def test_create_consumer(self): - consumer_ref = self._create_single_consumer() - self._assert_notify_sent(consumer_ref['id'], - test_notifications.CREATED_OPERATION, - 'OS-OAUTH1:consumer') - self._assert_last_audit(consumer_ref['id'], - test_notifications.CREATED_OPERATION, - 'OS-OAUTH1:consumer', - cadftaxonomy.SECURITY_ACCOUNT) - - def test_update_consumer(self): - consumer_ref = self._create_single_consumer() - update_ref = {'consumer': {'description': uuid.uuid4().hex}} - self.oauth_api.update_consumer(consumer_ref['id'], update_ref) - self._assert_notify_sent(consumer_ref['id'], - test_notifications.UPDATED_OPERATION, - 'OS-OAUTH1:consumer') - self._assert_last_audit(consumer_ref['id'], - test_notifications.UPDATED_OPERATION, - 'OS-OAUTH1:consumer', - cadftaxonomy.SECURITY_ACCOUNT) - - def test_delete_consumer(self): - consumer_ref = self._create_single_consumer() - self.oauth_api.delete_consumer(consumer_ref['id']) - self._assert_notify_sent(consumer_ref['id'], - test_notifications.DELETED_OPERATION, - 'OS-OAUTH1:consumer') - self._assert_last_audit(consumer_ref['id'], - test_notifications.DELETED_OPERATION, - 'OS-OAUTH1:consumer', - cadftaxonomy.SECURITY_ACCOUNT) - - def test_oauth_flow_notifications(self): - """Test to ensure notifications are sent for oauth tokens - - This test is very similar to test_oauth_flow, however - there are additional checks in this test for ensuring that - notifications for request token creation, and access token - creation/deletion are emitted. - """ - consumer = self._create_single_consumer() - consumer_id = consumer['id'] - consumer_secret = consumer['secret'] - self.consumer = {'key': consumer_id, 'secret': consumer_secret} - self.assertIsNotNone(self.consumer['secret']) - - url, headers = self._create_request_token(self.consumer, - self.project_id) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - request_key = credentials['oauth_token'][0] - request_secret = credentials['oauth_token_secret'][0] - self.request_token = oauth1.Token(request_key, request_secret) - self.assertIsNotNone(self.request_token.key) - - # Test to ensure the create request token notification is sent - self._assert_notify_sent(request_key, - test_notifications.CREATED_OPERATION, - 'OS-OAUTH1:request_token') - self._assert_last_audit(request_key, - test_notifications.CREATED_OPERATION, - 'OS-OAUTH1:request_token', - cadftaxonomy.SECURITY_CREDENTIAL) - - url = self._authorize_request_token(request_key) - body = {'roles': [{'id': self.role_id}]} - resp = self.put(url, body=body, expected_status=http_client.OK) - self.verifier = resp.result['token']['oauth_verifier'] - self.assertTrue(all(i in core.VERIFIER_CHARS for i in self.verifier)) - self.assertEqual(8, len(self.verifier)) - - self.request_token.set_verifier(self.verifier) - url, headers = self._create_access_token(self.consumer, - self.request_token) - content = self.post( - url, headers=headers, - response_content_type='application/x-www-urlformencoded') - credentials = urllib.parse.parse_qs(content.result) - access_key = credentials['oauth_token'][0] - access_secret = credentials['oauth_token_secret'][0] - self.access_token = oauth1.Token(access_key, access_secret) - self.assertIsNotNone(self.access_token.key) - - # Test to ensure the create access token notification is sent - self._assert_notify_sent(access_key, - test_notifications.CREATED_OPERATION, - 'OS-OAUTH1:access_token') - self._assert_last_audit(access_key, - test_notifications.CREATED_OPERATION, - 'OS-OAUTH1:access_token', - cadftaxonomy.SECURITY_CREDENTIAL) - - resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' - % {'user': self.user_id, - 'auth': self.access_token.key}) - self.assertResponseStatus(resp, http_client.NO_CONTENT) - - # Test to ensure the delete access token notification is sent - self._assert_notify_sent(access_key, - test_notifications.DELETED_OPERATION, - 'OS-OAUTH1:access_token') - self._assert_last_audit(access_key, - test_notifications.DELETED_OPERATION, - 'OS-OAUTH1:access_token', - cadftaxonomy.SECURITY_CREDENTIAL) - - -class OAuthCADFNotificationTests(OAuthNotificationTests): - - def setUp(self): - """Repeat the tests for CADF notifications.""" - super(OAuthCADFNotificationTests, self).setUp() - self.config_fixture.config(notification_format='cadf') - - -class JsonHomeTests(OAuth1Tests, test_v3.JsonHomeTestMixin): - JSON_HOME_DATA = { - 'http://docs.openstack.org/api/openstack-identity/3/ext/OS-OAUTH1/1.0/' - 'rel/consumers': { - 'href': '/OS-OAUTH1/consumers', - }, - } -- cgit 1.2.3-korg