summaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_federation.py
diff options
context:
space:
mode:
authorasteroide <thomas.duval@orange.com>2015-09-24 16:27:16 +0200
committerasteroide <thomas.duval@orange.com>2015-09-24 16:27:16 +0200
commit92d11d139e9f76d4fd76859aea78643fc32ef36b (patch)
treebd5a2e7b50853498074ab55bdaee4452c460010b /keystone-moon/keystone/tests/unit/test_v3_federation.py
parent49325d99acfadaadfad99c596c4ada6b5ec849de (diff)
Update Keystone code from repository.
Change-Id: Ib3d0a06b10902fcc6d520f58e85aa617bc326d00
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_federation.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_federation.py293
1 files changed, 170 insertions, 123 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py
index e646bc0a..5717e67b 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_federation.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py
@@ -12,7 +12,6 @@
import os
import random
-import subprocess
from testtools import matchers
import uuid
@@ -26,12 +25,14 @@ from oslotest import mockpatch
import saml2
from saml2 import saml
from saml2 import sigver
+from six.moves import http_client
from six.moves import range, urllib, zip
xmldsig = importutils.try_import("saml2.xmldsig")
if not xmldsig:
xmldsig = importutils.try_import("xmldsig")
from keystone.auth import controllers as auth_controllers
+from keystone.common import environment
from keystone.contrib.federation import controllers as federation_controllers
from keystone.contrib.federation import idp as keystone_idp
from keystone import exception
@@ -44,6 +45,8 @@ from keystone.tests.unit import test_v3
from keystone.token.providers import common as token_common
+subprocess = environment.subprocess
+
CONF = cfg.CONF
LOG = log.getLogger(__name__)
ROOTDIR = os.path.dirname(os.path.abspath(__file__))
@@ -109,25 +112,43 @@ class FederatedSetupMixin(object):
self.assertEqual(token_projects, projects_ref)
def _check_scoped_token_attributes(self, token):
- def xor_project_domain(token_keys):
- return sum(('project' in token_keys, 'domain' in token_keys)) % 2
for obj in ('user', 'catalog', 'expires_at', 'issued_at',
'methods', 'roles'):
self.assertIn(obj, token)
- # Check for either project or domain
- if not xor_project_domain(list(token.keys())):
- raise AssertionError("You must specify either"
- "project or domain.")
- self.assertIn('OS-FEDERATION', token['user'])
os_federation = token['user']['OS-FEDERATION']
+
+ self.assertIn('groups', os_federation)
+ self.assertIn('identity_provider', os_federation)
+ self.assertIn('protocol', os_federation)
+ self.assertThat(os_federation, matchers.HasLength(3))
+
self.assertEqual(self.IDP, os_federation['identity_provider']['id'])
self.assertEqual(self.PROTOCOL, os_federation['protocol']['id'])
- self.assertListEqual(sorted(['groups',
- 'identity_provider',
- 'protocol']),
- sorted(os_federation.keys()))
+
+ def _check_project_scoped_token_attributes(self, token, project_id):
+ self.assertEqual(project_id, token['project']['id'])
+ self._check_scoped_token_attributes(token)
+
+ def _check_domain_scoped_token_attributes(self, token, domain_id):
+ self.assertEqual(domain_id, token['domain']['id'])
+ self._check_scoped_token_attributes(token)
+
+ def assertValidMappedUser(self, token):
+ """Check if user object meets all the criteria."""
+
+ user = token['user']
+ self.assertIn('id', user)
+ self.assertIn('name', user)
+ self.assertIn('domain', user)
+
+ self.assertIn('groups', user['OS-FEDERATION'])
+ self.assertIn('identity_provider', user['OS-FEDERATION'])
+ self.assertIn('protocol', user['OS-FEDERATION'])
+
+ # Make sure user_id is url safe
+ self.assertEqual(urllib.parse.quote(user['name']), user['id'])
def _issue_unscoped_token(self,
idp=None,
@@ -794,7 +815,7 @@ class FederatedIdentityProviderTests(FederationTests):
if body is None:
body = self._http_idp_input()
resp = self.put(url, body={'identity_provider': body},
- expected_status=201)
+ expected_status=http_client.CREATED)
return resp
def _http_idp_input(self, **kwargs):
@@ -881,7 +902,7 @@ class FederatedIdentityProviderTests(FederationTests):
body['remote_ids'] = [uuid.uuid4().hex,
repeated_remote_id]
self.put(url, body={'identity_provider': body},
- expected_status=409)
+ expected_status=http_client.CONFLICT)
def test_create_idp_remote_empty(self):
"""Creates an IdP with empty remote_ids."""
@@ -1006,9 +1027,9 @@ class FederatedIdentityProviderTests(FederationTests):
url = self.base_url(suffix=uuid.uuid4().hex)
body = self._http_idp_input()
self.put(url, body={'identity_provider': body},
- expected_status=201)
+ expected_status=http_client.CREATED)
self.put(url, body={'identity_provider': body},
- expected_status=409)
+ expected_status=http_client.CONFLICT)
def test_get_idp(self):
"""Create and later fetch IdP."""
@@ -1033,7 +1054,7 @@ class FederatedIdentityProviderTests(FederationTests):
self.assertIsNotNone(idp_id)
url = self.base_url(suffix=idp_id)
- self.get(url, expected_status=404)
+ self.get(url, expected_status=http_client.NOT_FOUND)
def test_delete_existing_idp(self):
"""Create and later delete IdP.
@@ -1047,7 +1068,7 @@ class FederatedIdentityProviderTests(FederationTests):
self.assertIsNotNone(idp_id)
url = self.base_url(suffix=idp_id)
self.delete(url)
- self.get(url, expected_status=404)
+ self.get(url, expected_status=http_client.NOT_FOUND)
def test_delete_idp_also_deletes_assigned_protocols(self):
"""Deleting an IdP will delete its assigned protocol."""
@@ -1063,7 +1084,7 @@ class FederatedIdentityProviderTests(FederationTests):
idp_url = self.base_url(suffix=idp_id)
# assign protocol to IdP
- kwargs = {'expected_status': 201}
+ kwargs = {'expected_status': http_client.CREATED}
resp, idp_id, proto = self._assign_protocol_to_idp(
url=url,
idp_id=idp_id,
@@ -1073,7 +1094,7 @@ class FederatedIdentityProviderTests(FederationTests):
# removing IdP will remove the assigned protocol as well
self.assertEqual(1, len(self.federation_api.list_protocols(idp_id)))
self.delete(idp_url)
- self.get(idp_url, expected_status=404)
+ self.get(idp_url, expected_status=http_client.NOT_FOUND)
self.assertEqual(0, len(self.federation_api.list_protocols(idp_id)))
def test_delete_nonexisting_idp(self):
@@ -1083,7 +1104,7 @@ class FederatedIdentityProviderTests(FederationTests):
"""
idp_id = uuid.uuid4().hex
url = self.base_url(suffix=idp_id)
- self.delete(url, expected_status=404)
+ self.delete(url, expected_status=http_client.NOT_FOUND)
def test_update_idp_mutable_attributes(self):
"""Update IdP's mutable parameters."""
@@ -1124,7 +1145,7 @@ class FederatedIdentityProviderTests(FederationTests):
def test_update_idp_immutable_attributes(self):
"""Update IdP's immutable parameters.
- Expect HTTP 403 code.
+ Expect HTTP FORBIDDEN.
"""
default_resp = self._create_default_idp()
@@ -1138,7 +1159,8 @@ class FederatedIdentityProviderTests(FederationTests):
body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex]
url = self.base_url(suffix=idp_id)
- self.patch(url, body={'identity_provider': body}, expected_status=403)
+ self.patch(url, body={'identity_provider': body},
+ expected_status=http_client.FORBIDDEN)
def test_update_nonexistent_idp(self):
"""Update nonexistent IdP
@@ -1152,12 +1174,12 @@ class FederatedIdentityProviderTests(FederationTests):
body['enabled'] = False
body = {'identity_provider': body}
- self.patch(url, body=body, expected_status=404)
+ self.patch(url, body=body, expected_status=http_client.NOT_FOUND)
def test_assign_protocol_to_idp(self):
"""Assign a protocol to existing IdP."""
- self._assign_protocol_to_idp(expected_status=201)
+ self._assign_protocol_to_idp(expected_status=http_client.CREATED)
def test_protocol_composite_pk(self):
"""Test whether Keystone let's add two entities with identical
@@ -1171,7 +1193,7 @@ class FederatedIdentityProviderTests(FederationTests):
"""
url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
- kwargs = {'expected_status': 201}
+ kwargs = {'expected_status': http_client.CREATED}
self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs)
@@ -1187,10 +1209,10 @@ class FederatedIdentityProviderTests(FederationTests):
"""
url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
- kwargs = {'expected_status': 201}
+ kwargs = {'expected_status': http_client.CREATED}
resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs)
- kwargs = {'expected_status': 409}
+ kwargs = {'expected_status': http_client.CONFLICT}
resp, idp_id, proto = self._assign_protocol_to_idp(idp_id=idp_id,
proto='saml2',
validate=False,
@@ -1204,7 +1226,7 @@ class FederatedIdentityProviderTests(FederationTests):
"""
idp_id = uuid.uuid4().hex
- kwargs = {'expected_status': 404}
+ kwargs = {'expected_status': http_client.NOT_FOUND}
self._assign_protocol_to_idp(proto='saml2',
idp_id=idp_id,
validate=False,
@@ -1213,7 +1235,8 @@ class FederatedIdentityProviderTests(FederationTests):
def test_get_protocol(self):
"""Create and later fetch protocol tied to IdP."""
- resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ resp, idp_id, proto = self._assign_protocol_to_idp(
+ expected_status=http_client.CREATED)
proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id']
url = "%s/protocols/%s" % (idp_id, proto_id)
url = self.base_url(suffix=url)
@@ -1232,12 +1255,14 @@ class FederatedIdentityProviderTests(FederationTests):
Compare input and output id sets.
"""
- resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ resp, idp_id, proto = self._assign_protocol_to_idp(
+ expected_status=http_client.CREATED)
iterations = random.randint(0, 16)
protocol_ids = []
for _ in range(iterations):
- resp, _, proto = self._assign_protocol_to_idp(idp_id=idp_id,
- expected_status=201)
+ resp, _, proto = self._assign_protocol_to_idp(
+ idp_id=idp_id,
+ expected_status=http_client.CREATED)
proto_id = self._fetch_attribute_from_response(resp, 'protocol')
proto_id = proto_id['id']
protocol_ids.append(proto_id)
@@ -1256,7 +1281,8 @@ class FederatedIdentityProviderTests(FederationTests):
def test_update_protocols_attribute(self):
"""Update protocol's attribute."""
- resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ resp, idp_id, proto = self._assign_protocol_to_idp(
+ expected_status=http_client.CREATED)
new_mapping_id = uuid.uuid4().hex
url = "%s/protocols/%s" % (idp_id, proto)
@@ -1277,11 +1303,12 @@ class FederatedIdentityProviderTests(FederationTests):
"""
url = self.base_url(suffix='/%(idp_id)s/'
'protocols/%(protocol_id)s')
- resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ resp, idp_id, proto = self._assign_protocol_to_idp(
+ expected_status=http_client.CREATED)
url = url % {'idp_id': idp_id,
'protocol_id': proto}
self.delete(url)
- self.get(url, expected_status=404)
+ self.get(url, expected_status=http_client.NOT_FOUND)
class MappingCRUDTests(FederationTests):
@@ -1318,7 +1345,7 @@ class MappingCRUDTests(FederationTests):
url = self.MAPPING_URL + uuid.uuid4().hex
resp = self.put(url,
body={'mapping': mapping_fixtures.MAPPING_LARGE},
- expected_status=201)
+ expected_status=http_client.CREATED)
return resp
def _get_id_from_response(self, resp):
@@ -1335,7 +1362,7 @@ class MappingCRUDTests(FederationTests):
resp = self.get(url)
entities = resp.result.get('mappings')
self.assertIsNotNone(entities)
- self.assertResponseStatus(resp, 200)
+ self.assertResponseStatus(resp, http_client.OK)
self.assertValidListLinks(resp.result.get('links'))
self.assertEqual(1, len(entities))
@@ -1345,8 +1372,8 @@ class MappingCRUDTests(FederationTests):
mapping_id = self._get_id_from_response(resp)
url = url % {'mapping_id': str(mapping_id)}
resp = self.delete(url)
- self.assertResponseStatus(resp, 204)
- self.get(url, expected_status=404)
+ self.assertResponseStatus(resp, http_client.NO_CONTENT)
+ self.get(url, expected_status=http_client.NOT_FOUND)
def test_mapping_get(self):
url = self.MAPPING_URL + '%(mapping_id)s'
@@ -1369,70 +1396,73 @@ class MappingCRUDTests(FederationTests):
def test_delete_mapping_dne(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.delete(url, expected_status=404)
+ self.delete(url, expected_status=http_client.NOT_FOUND)
def test_get_mapping_dne(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.get(url, expected_status=404)
+ self.get(url, expected_status=http_client.NOT_FOUND)
def test_create_mapping_bad_requirements(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.put(url, expected_status=400,
+ self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_BAD_REQ})
def test_create_mapping_no_rules(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.put(url, expected_status=400,
+ self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_NO_RULES})
def test_create_mapping_no_remote_objects(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.put(url, expected_status=400,
+ self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE})
def test_create_mapping_bad_value(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.put(url, expected_status=400,
+ self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE})
def test_create_mapping_missing_local(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.put(url, expected_status=400,
+ self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL})
def test_create_mapping_missing_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.put(url, expected_status=400,
+ self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE})
def test_create_mapping_wrong_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.put(url, expected_status=400,
+ self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE})
def test_create_mapping_extra_remote_properties_not_any_of(self):
url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF
- self.put(url, expected_status=400, body={'mapping': mapping})
+ self.put(url, expected_status=http_client.BAD_REQUEST,
+ body={'mapping': mapping})
def test_create_mapping_extra_remote_properties_any_one_of(self):
url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF
- self.put(url, expected_status=400, body={'mapping': mapping})
+ self.put(url, expected_status=http_client.BAD_REQUEST,
+ body={'mapping': mapping})
def test_create_mapping_extra_remote_properties_just_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE
- self.put(url, expected_status=400, body={'mapping': mapping})
+ self.put(url, expected_status=http_client.BAD_REQUEST,
+ body={'mapping': mapping})
def test_create_mapping_empty_map(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.put(url, expected_status=400,
+ self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': {}})
def test_create_mapping_extra_rules_properties(self):
url = self.MAPPING_URL + uuid.uuid4().hex
- self.put(url, expected_status=400,
+ self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS})
def test_create_mapping_with_blacklist_and_whitelist(self):
@@ -1444,7 +1474,8 @@ class MappingCRUDTests(FederationTests):
"""
url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST
- self.put(url, expected_status=400, body={'mapping': mapping})
+ self.put(url, expected_status=http_client.BAD_REQUEST,
+ body={'mapping': mapping})
class FederatedTokenTests(FederationTests, FederatedSetupMixin):
@@ -1494,6 +1525,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
def test_issue_unscoped_token(self):
r = self._issue_unscoped_token()
self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+ self.assertValidMappedUser(r.json['token'])
def test_issue_unscoped_token_disabled_idp(self):
"""Checks if authentication works with disabled identity providers.
@@ -1632,11 +1664,12 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
token_resp = r.result['token']
project_id = token_resp['project']['id']
- self.assertEqual(project_id, self.proj_employees['id'])
- self._check_scoped_token_attributes(token_resp)
+ self._check_project_scoped_token_attributes(token_resp, project_id)
roles_ref = [self.role_employee]
+
projects_ref = self.proj_employees
self._check_projects_and_roles(token_resp, roles_ref, projects_ref)
+ self.assertValidMappedUser(token_resp)
def test_scope_token_with_idp_disabled(self):
"""Scope token issued by disabled IdP.
@@ -1659,14 +1692,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self.federation_api.update_idp(self.IDP, enabled_false)
self.v3_authenticate_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
- expected_status=403)
+ expected_status=http_client.FORBIDDEN)
def test_scope_to_bad_project(self):
"""Scope unscoped token with a project we don't have access to."""
self.v3_authenticate_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
- expected_status=401)
+ expected_status=http_client.UNAUTHORIZED)
def test_scope_to_project_multiple_times(self):
"""Try to scope the unscoped token multiple times.
@@ -1685,9 +1718,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
for body, project_id_ref in zip(bodies, project_ids):
r = self.v3_authenticate_token(body)
token_resp = r.result['token']
- project_id = token_resp['project']['id']
- self.assertEqual(project_id, project_id_ref)
- self._check_scoped_token_attributes(token_resp)
+ self._check_project_scoped_token_attributes(token_resp,
+ project_id_ref)
def test_scope_to_project_with_only_inherited_roles(self):
"""Try to scope token whose only roles are inherited."""
@@ -1695,18 +1727,18 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
r = self.v3_authenticate_token(
self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER)
token_resp = r.result['token']
- project_id = token_resp['project']['id']
- self.assertEqual(project_id, self.project_inherited['id'])
- self._check_scoped_token_attributes(token_resp)
+ self._check_project_scoped_token_attributes(
+ token_resp, self.project_inherited['id'])
roles_ref = [self.role_customer]
projects_ref = self.project_inherited
self._check_projects_and_roles(token_resp, roles_ref, projects_ref)
+ self.assertValidMappedUser(token_resp)
def test_scope_token_from_nonexistent_unscoped_token(self):
"""Try to scope token from non-existent unscoped token."""
self.v3_authenticate_token(
self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN,
- expected_status=404)
+ expected_status=http_client.NOT_FOUND)
def test_issue_token_from_rules_without_user(self):
api = auth_controllers.Auth()
@@ -1730,9 +1762,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
def test_scope_to_domain_once(self):
r = self.v3_authenticate_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER)
token_resp = r.result['token']
- domain_id = token_resp['domain']['id']
- self.assertEqual(self.domainA['id'], domain_id)
- self._check_scoped_token_attributes(token_resp)
+ self._check_domain_scoped_token_attributes(token_resp,
+ self.domainA['id'])
def test_scope_to_domain_multiple_tokens(self):
"""Issue multiple tokens scoping to different domains.
@@ -1754,15 +1785,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
for body, domain_id_ref in zip(bodies, domain_ids):
r = self.v3_authenticate_token(body)
token_resp = r.result['token']
- domain_id = token_resp['domain']['id']
- self.assertEqual(domain_id_ref, domain_id)
- self._check_scoped_token_attributes(token_resp)
+ self._check_domain_scoped_token_attributes(token_resp,
+ domain_id_ref)
def test_scope_to_domain_with_only_inherited_roles_fails(self):
"""Try to scope to a domain that has no direct roles."""
self.v3_authenticate_token(
self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER,
- expected_status=401)
+ expected_status=http_client.UNAUTHORIZED)
def test_list_projects(self):
urls = ('/OS-FEDERATION/projects', '/auth/projects')
@@ -1863,9 +1893,10 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
"""
r = self._issue_unscoped_token()
+ token_resp = r.json_body['token']
+ self.assertValidMappedUser(token_resp)
employee_unscoped_token_id = r.headers.get('X-Subject-Token')
- r = self.get('/OS-FEDERATION/projects',
- token=employee_unscoped_token_id)
+ r = self.get('/auth/projects', token=employee_unscoped_token_id)
projects = r.result['projects']
random_project = random.randint(0, len(projects)) - 1
project = projects[random_project]
@@ -1875,9 +1906,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
r = self.v3_authenticate_token(v3_scope_request)
token_resp = r.result['token']
- project_id = token_resp['project']['id']
- self.assertEqual(project['id'], project_id)
- self._check_scoped_token_attributes(token_resp)
+ self._check_project_scoped_token_attributes(token_resp, project['id'])
def test_workflow_with_groups_deletion(self):
"""Test full workflow with groups deletion before token scoping.
@@ -1947,7 +1976,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
token_id, 'project',
self.project_all['id'])
- self.v3_authenticate_token(scoped_token, expected_status=500)
+ self.v3_authenticate_token(
+ scoped_token, expected_status=http_client.INTERNAL_SERVER_ERROR)
def test_lists_with_missing_group_in_backend(self):
"""Test a mapping that points to a group that does not exist
@@ -2379,11 +2409,13 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin):
def test_federated_unscoped_token(self):
resp = self._issue_unscoped_token()
self.assertEqual(204, len(resp.headers['X-Subject-Token']))
+ self.assertValidMappedUser(resp.json_body['token'])
def test_federated_unscoped_token_with_multiple_groups(self):
assertion = 'ANOTHER_CUSTOMER_ASSERTION'
resp = self._issue_unscoped_token(assertion=assertion)
- self.assertEqual(232, len(resp.headers['X-Subject-Token']))
+ self.assertEqual(226, len(resp.headers['X-Subject-Token']))
+ self.assertValidMappedUser(resp.json_body['token'])
def test_validate_federated_unscoped_token(self):
resp = self._issue_unscoped_token()
@@ -2400,9 +2432,9 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin):
"""
resp = self._issue_unscoped_token()
+ self.assertValidMappedUser(resp.json_body['token'])
unscoped_token = resp.headers.get('X-Subject-Token')
- resp = self.get('/OS-FEDERATION/projects',
- token=unscoped_token)
+ resp = self.get('/auth/projects', token=unscoped_token)
projects = resp.result['projects']
random_project = random.randint(0, len(projects)) - 1
project = projects[random_project]
@@ -2412,9 +2444,7 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin):
resp = self.v3_authenticate_token(v3_scope_request)
token_resp = resp.result['token']
- project_id = token_resp['project']['id']
- self.assertEqual(project['id'], project_id)
- self._check_scoped_token_attributes(token_resp)
+ self._check_project_scoped_token_attributes(token_resp, project['id'])
class FederatedTokenTestsMethodToken(FederatedTokenTests):
@@ -2499,7 +2529,7 @@ class SAMLGenerationTests(FederationTests):
self.sp = self.sp_ref()
url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID
self.put(url, body={'service_provider': self.sp},
- expected_status=201)
+ expected_status=http_client.CREATED)
def test_samlize_token_values(self):
"""Test the SAML generator produces a SAML object.
@@ -2615,8 +2645,8 @@ class SAMLGenerationTests(FederationTests):
# the assertion as is without signing it
return assertion_content
- with mock.patch('subprocess.check_output',
- side_effect=mocked_subprocess_check_output):
+ with mock.patch.object(subprocess, 'check_output',
+ side_effect=mocked_subprocess_check_output):
generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
self.SUBJECT,
@@ -2711,7 +2741,7 @@ class SAMLGenerationTests(FederationTests):
with mock.patch.object(keystone_idp, '_sign_assertion',
return_value=self.signed_assertion):
self.post(self.SAML_GENERATION_ROUTE, body=body,
- expected_status=403)
+ expected_status=http_client.FORBIDDEN)
def test_generate_saml_route(self):
"""Test that the SAML generation endpoint produces XML.
@@ -2733,7 +2763,7 @@ class SAMLGenerationTests(FederationTests):
return_value=self.signed_assertion):
http_response = self.post(self.SAML_GENERATION_ROUTE, body=body,
response_content_type='text/xml',
- expected_status=200)
+ expected_status=http_client.OK)
response = etree.fromstring(http_response.result)
issuer = response[0]
@@ -2774,7 +2804,8 @@ class SAMLGenerationTests(FederationTests):
self.SERVICE_PROVDIER_ID)
del body['auth']['scope']
- self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400)
+ self.post(self.SAML_GENERATION_ROUTE, body=body,
+ expected_status=http_client.BAD_REQUEST)
def test_invalid_token_body(self):
"""Test that missing the token in request body raises an exception.
@@ -2788,7 +2819,8 @@ class SAMLGenerationTests(FederationTests):
self.SERVICE_PROVDIER_ID)
del body['auth']['identity']['token']
- self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400)
+ self.post(self.SAML_GENERATION_ROUTE, body=body,
+ expected_status=http_client.BAD_REQUEST)
def test_sp_not_found(self):
"""Test SAML generation with an invalid service provider ID.
@@ -2799,7 +2831,8 @@ class SAMLGenerationTests(FederationTests):
sp_id = uuid.uuid4().hex
token_id = self._fetch_valid_token()
body = self._create_generate_saml_request(token_id, sp_id)
- self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404)
+ self.post(self.SAML_GENERATION_ROUTE, body=body,
+ expected_status=http_client.NOT_FOUND)
def test_sp_disabled(self):
"""Try generating assertion for disabled Service Provider."""
@@ -2811,7 +2844,8 @@ class SAMLGenerationTests(FederationTests):
token_id = self._fetch_valid_token()
body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID)
- self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=403)
+ self.post(self.SAML_GENERATION_ROUTE, body=body,
+ expected_status=http_client.FORBIDDEN)
def test_token_not_found(self):
"""Test that an invalid token in the request body raises an exception.
@@ -2823,7 +2857,8 @@ class SAMLGenerationTests(FederationTests):
token_id = uuid.uuid4().hex
body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID)
- self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404)
+ self.post(self.SAML_GENERATION_ROUTE, body=body,
+ expected_status=http_client.NOT_FOUND)
def test_generate_ecp_route(self):
"""Test that the ECP generation endpoint produces XML.
@@ -2844,7 +2879,7 @@ class SAMLGenerationTests(FederationTests):
return_value=self.signed_assertion):
http_response = self.post(self.ECP_GENERATION_ROUTE, body=body,
response_content_type='text/xml',
- expected_status=200)
+ expected_status=http_client.OK)
env_response = etree.fromstring(http_response.result)
header = env_response[0]
@@ -2879,7 +2914,7 @@ class SAMLGenerationTests(FederationTests):
@mock.patch('saml2.create_class_from_xml_string')
@mock.patch('oslo_utils.fileutils.write_to_tempfile')
- @mock.patch('subprocess.check_output')
+ @mock.patch.object(subprocess, 'check_output')
def test__sign_assertion(self, check_output_mock,
write_to_tempfile_mock, create_class_mock):
write_to_tempfile_mock.return_value = 'tmp_path'
@@ -2890,7 +2925,7 @@ class SAMLGenerationTests(FederationTests):
create_class_mock.assert_called_with(saml.Assertion, 'fakeoutput')
@mock.patch('oslo_utils.fileutils.write_to_tempfile')
- @mock.patch('subprocess.check_output')
+ @mock.patch.object(subprocess, 'check_output')
def test__sign_assertion_exc(self, check_output_mock,
write_to_tempfile_mock):
# If the command fails the command output is logged.
@@ -2903,12 +2938,15 @@ class SAMLGenerationTests(FederationTests):
returncode=sample_returncode, cmd=CONF.saml.xmlsec1_binary,
output=sample_output)
- # FIXME(blk-u): This should raise exception.SAMLSigningError instead,
- # but fails with TypeError due to concatenating string to Message, see
- # bug 1484735.
- self.assertRaises(TypeError,
+ logger_fixture = self.useFixture(fixtures.LoggerFixture())
+ self.assertRaises(exception.SAMLSigningError,
keystone_idp._sign_assertion,
self.signed_assertion)
+ expected_log = (
+ "Error when signing assertion, reason: Command '%s' returned "
+ "non-zero exit status %s %s\n" %
+ (CONF.saml.xmlsec1_binary, sample_returncode, sample_output))
+ self.assertEqual(expected_log, logger_fixture.output)
@mock.patch('oslo_utils.fileutils.write_to_tempfile')
def test__sign_assertion_fileutils_exc(self, write_to_tempfile_mock):
@@ -3041,13 +3079,13 @@ class IdPMetadataGenerationTests(FederationTests):
self.generator.generate_metadata)
def test_get_metadata_with_no_metadata_file_configured(self):
- self.get(self.METADATA_URL, expected_status=500)
+ self.get(self.METADATA_URL,
+ expected_status=http_client.INTERNAL_SERVER_ERROR)
def test_get_metadata(self):
self.config_fixture.config(
group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml')
- r = self.get(self.METADATA_URL, response_content_type='text/xml',
- expected_status=200)
+ r = self.get(self.METADATA_URL, response_content_type='text/xml')
self.assertEqual('text/xml', r.headers.get('Content-Type'))
reference_file = _load_xml('idp_saml2_metadata.xml')
@@ -3070,7 +3108,7 @@ class ServiceProviderTests(FederationTests):
self.SP_REF = self.sp_ref()
self.SERVICE_PROVIDER = self.put(
url, body={'service_provider': self.SP_REF},
- expected_status=201).result
+ expected_status=http_client.CREATED).result
def sp_ref(self):
ref = {
@@ -3089,19 +3127,19 @@ class ServiceProviderTests(FederationTests):
def test_get_service_provider(self):
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- resp = self.get(url, expected_status=200)
+ resp = self.get(url)
self.assertValidEntity(resp.result['service_provider'],
keys_to_check=self.SP_KEYS)
def test_get_service_provider_fail(self):
url = self.base_url(suffix=uuid.uuid4().hex)
- self.get(url, expected_status=404)
+ self.get(url, expected_status=http_client.NOT_FOUND)
def test_create_service_provider(self):
url = self.base_url(suffix=uuid.uuid4().hex)
sp = self.sp_ref()
resp = self.put(url, body={'service_provider': sp},
- expected_status=201)
+ expected_status=http_client.CREATED)
self.assertValidEntity(resp.result['service_provider'],
keys_to_check=self.SP_KEYS)
@@ -3111,7 +3149,7 @@ class ServiceProviderTests(FederationTests):
sp = self.sp_ref()
del sp['relay_state_prefix']
resp = self.put(url, body={'service_provider': sp},
- expected_status=201)
+ expected_status=http_client.CREATED)
sp_result = resp.result['service_provider']
self.assertEqual(CONF.saml.relay_state_prefix,
sp_result['relay_state_prefix'])
@@ -3123,7 +3161,7 @@ class ServiceProviderTests(FederationTests):
non_default_prefix = uuid.uuid4().hex
sp['relay_state_prefix'] = non_default_prefix
resp = self.put(url, body={'service_provider': sp},
- expected_status=201)
+ expected_status=http_client.CREATED)
sp_result = resp.result['service_provider']
self.assertEqual(non_default_prefix,
sp_result['relay_state_prefix'])
@@ -3134,7 +3172,7 @@ class ServiceProviderTests(FederationTests):
sp = self.sp_ref()
sp[uuid.uuid4().hex] = uuid.uuid4().hex
self.put(url, body={'service_provider': sp},
- expected_status=400)
+ expected_status=http_client.BAD_REQUEST)
def test_list_service_providers(self):
"""Test listing of service provider objects.
@@ -3150,7 +3188,8 @@ class ServiceProviderTests(FederationTests):
}
for id, sp in ref_service_providers.items():
url = self.base_url(suffix=id)
- self.put(url, body={'service_provider': sp}, expected_status=201)
+ self.put(url, body={'service_provider': sp},
+ expected_status=http_client.CREATED)
# Insert ids into service provider object, we will compare it with
# responses from server and those include 'id' attribute.
@@ -3177,15 +3216,14 @@ class ServiceProviderTests(FederationTests):
"""
new_sp_ref = self.sp_ref()
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- resp = self.patch(url, body={'service_provider': new_sp_ref},
- expected_status=200)
+ resp = self.patch(url, body={'service_provider': new_sp_ref})
patch_result = resp.result
new_sp_ref['id'] = self.SERVICE_PROVIDER_ID
self.assertValidEntity(patch_result['service_provider'],
ref=new_sp_ref,
keys_to_check=self.SP_KEYS)
- resp = self.get(url, expected_status=200)
+ resp = self.get(url)
get_result = resp.result
self.assertDictEqual(patch_result['service_provider'],
@@ -3201,21 +3239,21 @@ class ServiceProviderTests(FederationTests):
new_sp_ref = {'id': uuid.uuid4().hex}
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.patch(url, body={'service_provider': new_sp_ref},
- expected_status=400)
+ expected_status=http_client.BAD_REQUEST)
def test_update_service_provider_unknown_parameter(self):
new_sp_ref = self.sp_ref()
new_sp_ref[uuid.uuid4().hex] = uuid.uuid4().hex
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.patch(url, body={'service_provider': new_sp_ref},
- expected_status=400)
+ expected_status=http_client.BAD_REQUEST)
def test_update_service_provider_404(self):
new_sp_ref = self.sp_ref()
new_sp_ref['description'] = uuid.uuid4().hex
url = self.base_url(suffix=uuid.uuid4().hex)
self.patch(url, body={'service_provider': new_sp_ref},
- expected_status=404)
+ expected_status=http_client.NOT_FOUND)
def test_update_sp_relay_state(self):
"""Update an SP with custome relay state."""
@@ -3223,19 +3261,18 @@ class ServiceProviderTests(FederationTests):
non_default_prefix = uuid.uuid4().hex
new_sp_ref['relay_state_prefix'] = non_default_prefix
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- resp = self.patch(url, body={'service_provider': new_sp_ref},
- expected_status=200)
+ resp = self.patch(url, body={'service_provider': new_sp_ref})
sp_result = resp.result['service_provider']
self.assertEqual(non_default_prefix,
sp_result['relay_state_prefix'])
def test_delete_service_provider(self):
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- self.delete(url, expected_status=204)
+ self.delete(url)
def test_delete_service_provider_404(self):
url = self.base_url(suffix=uuid.uuid4().hex)
- self.delete(url, expected_status=404)
+ self.delete(url, expected_status=http_client.NOT_FOUND)
class WebSSOTests(FederatedTokenTests):
@@ -3337,6 +3374,16 @@ class WebSSOTests(FederatedTokenTests):
self.api.federated_sso_auth,
context, self.PROTOCOL)
+ def test_identity_provider_specific_federated_authentication(self):
+ environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]}
+ context = {'environment': environment}
+ query_string = {'origin': self.ORIGIN}
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string)
+ resp = self.api.federated_idp_specific_sso_auth(context,
+ self.idp['id'],
+ self.PROTOCOL)
+ self.assertIn(self.TRUSTED_DASHBOARD, resp.body)
+
class K2KServiceCatalogTests(FederationTests):
SP1 = 'SP1'