aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_federation.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_federation.py562
1 files changed, 405 insertions, 157 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py
index 4d7dcaab..f4ec8e51 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_federation.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py
@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
import os
import random
from testtools import matchers
@@ -19,7 +20,8 @@ import fixtures
from lxml import etree
import mock
from oslo_config import cfg
-from oslo_log import log
+from oslo_log import versionutils
+from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslotest import mockpatch
import saml2
@@ -33,22 +35,24 @@ if not xmldsig:
from keystone.auth import controllers as auth_controllers
from keystone.common import environment
-from keystone.contrib.federation import controllers as federation_controllers
-from keystone.contrib.federation import idp as keystone_idp
+from keystone.contrib.federation import routers
from keystone import exception
+from keystone.federation import controllers as federation_controllers
+from keystone.federation import idp as keystone_idp
from keystone import notifications
+from keystone.tests import unit
from keystone.tests.unit import core
from keystone.tests.unit import federation_fixtures
from keystone.tests.unit import ksfixtures
from keystone.tests.unit import mapping_fixtures
from keystone.tests.unit import test_v3
+from keystone.tests.unit import utils
from keystone.token.providers import common as token_common
subprocess = environment.subprocess
CONF = cfg.CONF
-LOG = log.getLogger(__name__)
ROOTDIR = os.path.dirname(os.path.abspath(__file__))
XMLDIR = os.path.join(ROOTDIR, 'saml2/')
@@ -59,8 +63,12 @@ def dummy_validator(*args, **kwargs):
class FederationTests(test_v3.RestfulTestCase):
- EXTENSION_NAME = 'federation'
- EXTENSION_TO_ADD = 'federation_extension'
+ @mock.patch.object(versionutils, 'report_deprecated_feature')
+ def test_exception_happens(self, mock_deprecator):
+ routers.FederationExtension(mock.ANY)
+ mock_deprecator.assert_called_once_with(mock.ANY, mock.ANY)
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("Remove federation_extension from", args[1])
class FederatedSetupMixin(object):
@@ -137,7 +145,6 @@ class FederatedSetupMixin(object):
def assertValidMappedUser(self, token):
"""Check if user object meets all the criteria."""
-
user = token['user']
self.assertIn('id', user)
self.assertIn('name', user)
@@ -209,66 +216,62 @@ class FederatedSetupMixin(object):
def load_federation_sample_data(self):
"""Inject additional data."""
-
# Create and add domains
- self.domainA = self.new_domain_ref()
+ self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'],
self.domainA)
- self.domainB = self.new_domain_ref()
+ self.domainB = unit.new_domain_ref()
self.resource_api.create_domain(self.domainB['id'],
self.domainB)
- self.domainC = self.new_domain_ref()
+ self.domainC = unit.new_domain_ref()
self.resource_api.create_domain(self.domainC['id'],
self.domainC)
- self.domainD = self.new_domain_ref()
+ self.domainD = unit.new_domain_ref()
self.resource_api.create_domain(self.domainD['id'],
self.domainD)
# Create and add projects
- self.proj_employees = self.new_project_ref(
+ self.proj_employees = unit.new_project_ref(
domain_id=self.domainA['id'])
self.resource_api.create_project(self.proj_employees['id'],
self.proj_employees)
- self.proj_customers = self.new_project_ref(
+ self.proj_customers = unit.new_project_ref(
domain_id=self.domainA['id'])
self.resource_api.create_project(self.proj_customers['id'],
self.proj_customers)
- self.project_all = self.new_project_ref(
+ self.project_all = unit.new_project_ref(
domain_id=self.domainA['id'])
self.resource_api.create_project(self.project_all['id'],
self.project_all)
- self.project_inherited = self.new_project_ref(
+ self.project_inherited = unit.new_project_ref(
domain_id=self.domainD['id'])
self.resource_api.create_project(self.project_inherited['id'],
self.project_inherited)
# Create and add groups
- self.group_employees = self.new_group_ref(
- domain_id=self.domainA['id'])
+ self.group_employees = unit.new_group_ref(domain_id=self.domainA['id'])
self.group_employees = (
self.identity_api.create_group(self.group_employees))
- self.group_customers = self.new_group_ref(
- domain_id=self.domainA['id'])
+ self.group_customers = unit.new_group_ref(domain_id=self.domainA['id'])
self.group_customers = (
self.identity_api.create_group(self.group_customers))
- self.group_admins = self.new_group_ref(
- domain_id=self.domainA['id'])
+ self.group_admins = unit.new_group_ref(domain_id=self.domainA['id'])
self.group_admins = self.identity_api.create_group(self.group_admins)
# Create and add roles
- self.role_employee = self.new_role_ref()
+ self.role_employee = unit.new_role_ref()
self.role_api.create_role(self.role_employee['id'], self.role_employee)
- self.role_customer = self.new_role_ref()
+ self.role_customer = unit.new_role_ref()
self.role_api.create_role(self.role_customer['id'], self.role_customer)
- self.role_admin = self.new_role_ref()
+ self.role_admin = unit.new_role_ref()
self.role_api.create_role(self.role_admin['id'], self.role_admin)
# Employees can access
@@ -774,7 +777,7 @@ class FederatedSetupMixin(object):
self.domainC['id'])
-class FederatedIdentityProviderTests(FederationTests):
+class FederatedIdentityProviderTests(test_v3.RestfulTestCase):
"""A test class for Identity Providers."""
idp_keys = ['description', 'enabled']
@@ -815,7 +818,7 @@ class FederatedIdentityProviderTests(FederationTests):
if body is None:
body = self._http_idp_input()
resp = self.put(url, body={'identity_provider': body},
- expected_status=201)
+ expected_status=http_client.CREATED)
return resp
def _http_idp_input(self, **kwargs):
@@ -856,7 +859,6 @@ class FederatedIdentityProviderTests(FederationTests):
def test_create_idp(self):
"""Creates the IdentityProvider entity associated to remote_ids."""
-
keys_to_check = list(self.idp_keys)
body = self.default_body.copy()
body['description'] = uuid.uuid4().hex
@@ -867,7 +869,6 @@ class FederatedIdentityProviderTests(FederationTests):
def test_create_idp_remote(self):
"""Creates the IdentityProvider entity associated to remote_ids."""
-
keys_to_check = list(self.idp_keys)
keys_to_check.append('remote_ids')
body = self.default_body.copy()
@@ -886,10 +887,9 @@ class FederatedIdentityProviderTests(FederationTests):
A remote_id is the same for both so the second IdP is not
created because of the uniqueness of the remote_ids
- Expect HTTP 409 code for the latter call.
+ Expect HTTP 409 Conflict code for the latter call.
"""
-
body = self.default_body.copy()
repeated_remote_id = uuid.uuid4().hex
body['remote_ids'] = [uuid.uuid4().hex,
@@ -901,12 +901,15 @@ class FederatedIdentityProviderTests(FederationTests):
url = self.base_url(suffix=uuid.uuid4().hex)
body['remote_ids'] = [uuid.uuid4().hex,
repeated_remote_id]
- self.put(url, body={'identity_provider': body},
- expected_status=http_client.CONFLICT)
+ resp = self.put(url, body={'identity_provider': body},
+ expected_status=http_client.CONFLICT)
+
+ resp_data = jsonutils.loads(resp.body)
+ self.assertIn('Duplicate remote ID',
+ resp_data.get('error', {}).get('message'))
def test_create_idp_remote_empty(self):
"""Creates an IdP with empty remote_ids."""
-
keys_to_check = list(self.idp_keys)
keys_to_check.append('remote_ids')
body = self.default_body.copy()
@@ -919,7 +922,6 @@ class FederatedIdentityProviderTests(FederationTests):
def test_create_idp_remote_none(self):
"""Creates an IdP with a None remote_ids."""
-
keys_to_check = list(self.idp_keys)
keys_to_check.append('remote_ids')
body = self.default_body.copy()
@@ -986,6 +988,37 @@ class FederatedIdentityProviderTests(FederationTests):
self.assertEqual(sorted(body['remote_ids']),
sorted(returned_idp.get('remote_ids')))
+ def test_update_idp_remote_repeated(self):
+ """Update an IdentityProvider entity reusing a remote_id.
+
+ A remote_id is the same for both so the second IdP is not
+ updated because of the uniqueness of the remote_ids.
+
+ Expect HTTP 409 Conflict code for the latter call.
+
+ """
+ # Create first identity provider
+ body = self.default_body.copy()
+ repeated_remote_id = uuid.uuid4().hex
+ body['remote_ids'] = [uuid.uuid4().hex,
+ repeated_remote_id]
+ self._create_default_idp(body=body)
+
+ # Create second identity provider (without remote_ids)
+ body = self.default_body.copy()
+ default_resp = self._create_default_idp(body=body)
+ default_idp = self._fetch_attribute_from_response(default_resp,
+ 'identity_provider')
+ idp_id = default_idp.get('id')
+ url = self.base_url(suffix=idp_id)
+
+ body['remote_ids'] = [repeated_remote_id]
+ resp = self.patch(url, body={'identity_provider': body},
+ expected_status=http_client.CONFLICT)
+ resp_data = jsonutils.loads(resp.body)
+ self.assertIn('Duplicate remote ID',
+ resp_data['error']['message'])
+
def test_list_idps(self, iterations=5):
"""Lists all available IdentityProviders.
@@ -1018,18 +1051,73 @@ class FederatedIdentityProviderTests(FederationTests):
ids_intersection = entities_ids.intersection(ids)
self.assertEqual(ids_intersection, ids)
+ def test_filter_list_idp_by_id(self):
+ def get_id(resp):
+ r = self._fetch_attribute_from_response(resp,
+ 'identity_provider')
+ return r.get('id')
+
+ idp1_id = get_id(self._create_default_idp())
+ idp2_id = get_id(self._create_default_idp())
+
+ # list the IdP, should get two IdP.
+ url = self.base_url()
+ resp = self.get(url)
+ entities = self._fetch_attribute_from_response(resp,
+ 'identity_providers')
+ entities_ids = [e['id'] for e in entities]
+ self.assertItemsEqual(entities_ids, [idp1_id, idp2_id])
+
+ # filter the IdP by ID.
+ url = self.base_url() + '?id=' + idp1_id
+ resp = self.get(url)
+ filtered_service_list = resp.json['identity_providers']
+ self.assertThat(filtered_service_list, matchers.HasLength(1))
+ self.assertEqual(idp1_id, filtered_service_list[0].get('id'))
+
+ def test_filter_list_idp_by_enabled(self):
+ def get_id(resp):
+ r = self._fetch_attribute_from_response(resp,
+ 'identity_provider')
+ return r.get('id')
+
+ idp1_id = get_id(self._create_default_idp())
+
+ body = self.default_body.copy()
+ body['enabled'] = False
+ idp2_id = get_id(self._create_default_idp(body=body))
+
+ # list the IdP, should get two IdP.
+ url = self.base_url()
+ resp = self.get(url)
+ entities = self._fetch_attribute_from_response(resp,
+ 'identity_providers')
+ entities_ids = [e['id'] for e in entities]
+ self.assertItemsEqual(entities_ids, [idp1_id, idp2_id])
+
+ # filter the IdP by 'enabled'.
+ url = self.base_url() + '?enabled=True'
+ resp = self.get(url)
+ filtered_service_list = resp.json['identity_providers']
+ self.assertThat(filtered_service_list, matchers.HasLength(1))
+ self.assertEqual(idp1_id, filtered_service_list[0].get('id'))
+
def test_check_idp_uniqueness(self):
"""Add same IdP twice.
- Expect HTTP 409 code for the latter call.
+ Expect HTTP 409 Conflict code for the latter call.
"""
url = self.base_url(suffix=uuid.uuid4().hex)
body = self._http_idp_input()
self.put(url, body={'identity_provider': body},
- expected_status=201)
- self.put(url, body={'identity_provider': body},
- expected_status=http_client.CONFLICT)
+ expected_status=http_client.CREATED)
+ resp = self.put(url, body={'identity_provider': body},
+ expected_status=http_client.CONFLICT)
+
+ resp_data = jsonutils.loads(resp.body)
+ self.assertIn('Duplicate entry',
+ resp_data.get('error', {}).get('message'))
def test_get_idp(self):
"""Create and later fetch IdP."""
@@ -1047,7 +1135,7 @@ class FederatedIdentityProviderTests(FederationTests):
def test_get_nonexisting_idp(self):
"""Fetch nonexisting IdP entity.
- Expected HTTP 404 status code.
+ Expected HTTP 404 Not Found status code.
"""
idp_id = uuid.uuid4().hex
@@ -1059,7 +1147,7 @@ class FederatedIdentityProviderTests(FederationTests):
def test_delete_existing_idp(self):
"""Create and later delete IdP.
- Expect HTTP 404 for the GET IdP call.
+ Expect HTTP 404 Not Found for the GET IdP call.
"""
default_resp = self._create_default_idp()
default_idp = self._fetch_attribute_from_response(default_resp,
@@ -1072,7 +1160,6 @@ class FederatedIdentityProviderTests(FederationTests):
def test_delete_idp_also_deletes_assigned_protocols(self):
"""Deleting an IdP will delete its assigned protocol."""
-
# create default IdP
default_resp = self._create_default_idp()
default_idp = self._fetch_attribute_from_response(default_resp,
@@ -1084,7 +1171,7 @@ class FederatedIdentityProviderTests(FederationTests):
idp_url = self.base_url(suffix=idp_id)
# assign protocol to IdP
- kwargs = {'expected_status': 201}
+ kwargs = {'expected_status': http_client.CREATED}
resp, idp_id, proto = self._assign_protocol_to_idp(
url=url,
idp_id=idp_id,
@@ -1100,7 +1187,7 @@ class FederatedIdentityProviderTests(FederationTests):
def test_delete_nonexisting_idp(self):
"""Delete nonexisting IdP.
- Expect HTTP 404 for the GET IdP call.
+ Expect HTTP 404 Not Found for the GET IdP call.
"""
idp_id = uuid.uuid4().hex
url = self.base_url(suffix=idp_id)
@@ -1145,7 +1232,7 @@ class FederatedIdentityProviderTests(FederationTests):
def test_update_idp_immutable_attributes(self):
"""Update IdP's immutable parameters.
- Expect HTTP FORBIDDEN.
+ Expect HTTP BAD REQUEST.
"""
default_resp = self._create_default_idp()
@@ -1160,12 +1247,12 @@ class FederatedIdentityProviderTests(FederationTests):
url = self.base_url(suffix=idp_id)
self.patch(url, body={'identity_provider': body},
- expected_status=http_client.FORBIDDEN)
+ expected_status=http_client.BAD_REQUEST)
def test_update_nonexistent_idp(self):
"""Update nonexistent IdP
- Expect HTTP 404 code.
+ Expect HTTP 404 Not Found code.
"""
idp_id = uuid.uuid4().hex
@@ -1178,12 +1265,13 @@ class FederatedIdentityProviderTests(FederationTests):
def test_assign_protocol_to_idp(self):
"""Assign a protocol to existing IdP."""
-
- self._assign_protocol_to_idp(expected_status=201)
+ self._assign_protocol_to_idp(expected_status=http_client.CREATED)
def test_protocol_composite_pk(self):
- """Test whether Keystone let's add two entities with identical
- names, however attached to different IdPs.
+ """Test that Keystone can add two entities.
+
+ The entities have identical names, however, attached to different
+ IdPs.
1. Add IdP and assign it protocol with predefined name
2. Add another IdP and assign it a protocol with same name.
@@ -1193,7 +1281,7 @@ class FederatedIdentityProviderTests(FederationTests):
"""
url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
- kwargs = {'expected_status': 201}
+ kwargs = {'expected_status': http_client.CREATED}
self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs)
@@ -1204,12 +1292,12 @@ class FederatedIdentityProviderTests(FederationTests):
"""Test whether Keystone checks for unique idp/protocol values.
Add same protocol twice, expect Keystone to reject a latter call and
- return HTTP 409 code.
+ return HTTP 409 Conflict code.
"""
url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
- kwargs = {'expected_status': 201}
+ kwargs = {'expected_status': http_client.CREATED}
resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs)
kwargs = {'expected_status': http_client.CONFLICT}
@@ -1221,10 +1309,9 @@ class FederatedIdentityProviderTests(FederationTests):
def test_assign_protocol_to_nonexistent_idp(self):
"""Assign protocol to IdP that doesn't exist.
- Expect HTTP 404 code.
+ Expect HTTP 404 Not Found code.
"""
-
idp_id = uuid.uuid4().hex
kwargs = {'expected_status': http_client.NOT_FOUND}
self._assign_protocol_to_idp(proto='saml2',
@@ -1234,8 +1321,8 @@ class FederatedIdentityProviderTests(FederationTests):
def test_get_protocol(self):
"""Create and later fetch protocol tied to IdP."""
-
- resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ resp, idp_id, proto = self._assign_protocol_to_idp(
+ expected_status=http_client.CREATED)
proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id']
url = "%s/protocols/%s" % (idp_id, proto_id)
url = self.base_url(suffix=url)
@@ -1254,12 +1341,14 @@ class FederatedIdentityProviderTests(FederationTests):
Compare input and output id sets.
"""
- resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ resp, idp_id, proto = self._assign_protocol_to_idp(
+ expected_status=http_client.CREATED)
iterations = random.randint(0, 16)
protocol_ids = []
for _ in range(iterations):
- resp, _, proto = self._assign_protocol_to_idp(idp_id=idp_id,
- expected_status=201)
+ resp, _, proto = self._assign_protocol_to_idp(
+ idp_id=idp_id,
+ expected_status=http_client.CREATED)
proto_id = self._fetch_attribute_from_response(resp, 'protocol')
proto_id = proto_id['id']
protocol_ids.append(proto_id)
@@ -1277,8 +1366,8 @@ class FederatedIdentityProviderTests(FederationTests):
def test_update_protocols_attribute(self):
"""Update protocol's attribute."""
-
- resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ resp, idp_id, proto = self._assign_protocol_to_idp(
+ expected_status=http_client.CREATED)
new_mapping_id = uuid.uuid4().hex
url = "%s/protocols/%s" % (idp_id, proto)
@@ -1294,19 +1383,21 @@ class FederatedIdentityProviderTests(FederationTests):
def test_delete_protocol(self):
"""Delete protocol.
- Expect HTTP 404 code for the GET call after the protocol is deleted.
+ Expect HTTP 404 Not Found code for the GET call after the protocol is
+ deleted.
"""
url = self.base_url(suffix='/%(idp_id)s/'
'protocols/%(protocol_id)s')
- resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ resp, idp_id, proto = self._assign_protocol_to_idp(
+ expected_status=http_client.CREATED)
url = url % {'idp_id': idp_id,
'protocol_id': proto}
self.delete(url)
self.get(url, expected_status=http_client.NOT_FOUND)
-class MappingCRUDTests(FederationTests):
+class MappingCRUDTests(test_v3.RestfulTestCase):
"""A class for testing CRUD operations for Mappings."""
MAPPING_URL = '/OS-FEDERATION/mappings/'
@@ -1340,7 +1431,7 @@ class MappingCRUDTests(FederationTests):
url = self.MAPPING_URL + uuid.uuid4().hex
resp = self.put(url,
body={'mapping': mapping_fixtures.MAPPING_LARGE},
- expected_status=201)
+ expected_status=http_client.CREATED)
return resp
def _get_id_from_response(self, resp):
@@ -1357,7 +1448,7 @@ class MappingCRUDTests(FederationTests):
resp = self.get(url)
entities = resp.result.get('mappings')
self.assertIsNotNone(entities)
- self.assertResponseStatus(resp, 200)
+ self.assertResponseStatus(resp, http_client.OK)
self.assertValidListLinks(resp.result.get('links'))
self.assertEqual(1, len(entities))
@@ -1367,7 +1458,7 @@ class MappingCRUDTests(FederationTests):
mapping_id = self._get_id_from_response(resp)
url = url % {'mapping_id': str(mapping_id)}
resp = self.delete(url)
- self.assertResponseStatus(resp, 204)
+ self.assertResponseStatus(resp, http_client.NO_CONTENT)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_mapping_get(self):
@@ -1463,8 +1554,8 @@ class MappingCRUDTests(FederationTests):
def test_create_mapping_with_blacklist_and_whitelist(self):
"""Test for adding whitelist and blacklist in the rule
- Server should respond with HTTP 400 error upon discovering both
- ``whitelist`` and ``blacklist`` keywords in the same rule.
+ Server should respond with HTTP 400 Bad Request error upon discovering
+ both ``whitelist`` and ``blacklist`` keywords in the same rule.
"""
url = self.MAPPING_URL + uuid.uuid4().hex
@@ -1472,8 +1563,37 @@ class MappingCRUDTests(FederationTests):
self.put(url, expected_status=http_client.BAD_REQUEST,
body={'mapping': mapping})
+ def test_create_mapping_with_local_user_and_local_domain(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ resp = self.put(
+ url,
+ body={
+ 'mapping': mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN
+ },
+ expected_status=http_client.CREATED)
+ self.assertValidMappingResponse(
+ resp, mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN)
+
+ def test_create_mapping_with_ephemeral(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ resp = self.put(
+ url,
+ body={'mapping': mapping_fixtures.MAPPING_EPHEMERAL_USER},
+ expected_status=http_client.CREATED)
+ self.assertValidMappingResponse(
+ resp, mapping_fixtures.MAPPING_EPHEMERAL_USER)
+
+ def test_create_mapping_with_bad_user_type(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ # get a copy of a known good map
+ bad_mapping = copy.deepcopy(mapping_fixtures.MAPPING_EPHEMERAL_USER)
+ # now sabotage the user type
+ bad_mapping['rules'][0]['local'][0]['user']['type'] = uuid.uuid4().hex
+ self.put(url, expected_status=http_client.BAD_REQUEST,
+ body={'mapping': bad_mapping})
+
-class FederatedTokenTests(FederationTests, FederatedSetupMixin):
+class FederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
def auth_plugin_config_override(self):
methods = ['saml2']
@@ -1510,7 +1630,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self.assertTrue(note['send_notification_called'])
def load_fixtures(self, fixtures):
- super(FederationTests, self).load_fixtures(fixtures)
+ super(FederatedTokenTests, self).load_fixtures(fixtures)
self.load_federation_sample_data()
def test_issue_unscoped_token_notify(self):
@@ -1609,7 +1729,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
def test_issue_unscoped_token_with_remote_unavailable(self):
self.config_fixture.config(group='federation',
remote_id_attribute=self.REMOTE_ID_ATTR)
- self.assertRaises(exception.ValidationError,
+ self.assertRaises(exception.Unauthorized,
self._issue_unscoped_token,
idp=self.IDP_WITH_REMOTE,
environment={
@@ -1649,13 +1769,13 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self.assertIsNotNone(r.headers.get('X-Subject-Token'))
def test_scope_to_project_once_notify(self):
- r = self.v3_authenticate_token(
+ r = self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
user_id = r.json['token']['user']['id']
self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL, user_id)
def test_scope_to_project_once(self):
- r = self.v3_authenticate_token(
+ r = self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
token_resp = r.result['token']
project_id = token_resp['project']['id']
@@ -1685,14 +1805,13 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
"""
enabled_false = {'enabled': False}
self.federation_api.update_idp(self.IDP, enabled_false)
- self.v3_authenticate_token(
+ self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
expected_status=http_client.FORBIDDEN)
def test_scope_to_bad_project(self):
"""Scope unscoped token with a project we don't have access to."""
-
- self.v3_authenticate_token(
+ self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
expected_status=http_client.UNAUTHORIZED)
@@ -1705,13 +1824,12 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
* Employees' project
"""
-
bodies = (self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN,
self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN)
project_ids = (self.proj_employees['id'],
self.proj_customers['id'])
for body, project_id_ref in zip(bodies, project_ids):
- r = self.v3_authenticate_token(body)
+ r = self.v3_create_token(body)
token_resp = r.result['token']
self._check_project_scoped_token_attributes(token_resp,
project_id_ref)
@@ -1719,7 +1837,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
def test_scope_to_project_with_only_inherited_roles(self):
"""Try to scope token whose only roles are inherited."""
self.config_fixture.config(group='os_inherit', enabled=True)
- r = self.v3_authenticate_token(
+ r = self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER)
token_resp = r.result['token']
self._check_project_scoped_token_attributes(
@@ -1731,7 +1849,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
def test_scope_token_from_nonexistent_unscoped_token(self):
"""Try to scope token from non-existent unscoped token."""
- self.v3_authenticate_token(
+ self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN,
expected_status=http_client.NOT_FOUND)
@@ -1755,7 +1873,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
assertion='CONTRACTOR_ASSERTION')
def test_scope_to_domain_once(self):
- r = self.v3_authenticate_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER)
+ r = self.v3_create_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER)
token_resp = r.result['token']
self._check_domain_scoped_token_attributes(token_resp,
self.domainA['id'])
@@ -1778,14 +1896,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self.domainC['id'])
for body, domain_id_ref in zip(bodies, domain_ids):
- r = self.v3_authenticate_token(body)
+ r = self.v3_create_token(body)
token_resp = r.result['token']
self._check_domain_scoped_token_attributes(token_resp,
domain_id_ref)
def test_scope_to_domain_with_only_inherited_roles_fails(self):
"""Try to scope to a domain that has no direct roles."""
- self.v3_authenticate_token(
+ self.v3_create_token(
self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER,
expected_status=http_client.UNAUTHORIZED)
@@ -1816,14 +1934,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
# TODO(samueldmq): Create another test class for role inheritance tests.
# The advantage would be to reduce the complexity of this test class and
- # have tests specific to this fuctionality grouped, easing readability and
+ # have tests specific to this functionality grouped, easing readability and
# maintenability.
def test_list_projects_for_inherited_project_assignment(self):
# Enable os_inherit extension
self.config_fixture.config(group='os_inherit', enabled=True)
# Create a subproject
- subproject_inherited = self.new_project_ref(
+ subproject_inherited = unit.new_project_ref(
domain_id=self.domainD['id'],
parent_id=self.project_inherited['id'])
self.resource_api.create_project(subproject_inherited['id'],
@@ -1878,6 +1996,9 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self.assertEqual(domains_ref, domains,
'match failed for url %s' % url)
+ @utils.wip('This will fail because of bug #1501032. The returned method'
+ 'list should contain "saml2". This is documented in bug '
+ '1501032.')
def test_full_workflow(self):
"""Test 'standard' workflow for granting access tokens.
@@ -1886,9 +2007,10 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
* Scope token to one of available projects
"""
-
r = self._issue_unscoped_token()
token_resp = r.json_body['token']
+ # NOTE(lbragstad): Ensure only 'saml2' is in the method list.
+ self.assertListEqual(['saml2'], token_resp['methods'])
self.assertValidMappedUser(token_resp)
employee_unscoped_token_id = r.headers.get('X-Subject-Token')
r = self.get('/auth/projects', token=employee_unscoped_token_id)
@@ -1899,8 +2021,12 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
v3_scope_request = self._scope_request(employee_unscoped_token_id,
'project', project['id'])
- r = self.v3_authenticate_token(v3_scope_request)
+ r = self.v3_create_token(v3_scope_request)
token_resp = r.result['token']
+ # FIXME(lbragstad): 'token' should be in the list of methods returned
+ # but it isn't. This is documented in bug 1501032.
+ self.assertIn('token', token_resp['methods'])
+ self.assertIn('saml2', token_resp['methods'])
self._check_project_scoped_token_attributes(token_resp, project['id'])
def test_workflow_with_groups_deletion(self):
@@ -1917,10 +2043,9 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
"""
# create group and role
- group = self.new_group_ref(
- domain_id=self.domainA['id'])
+ group = unit.new_group_ref(domain_id=self.domainA['id'])
group = self.identity_api.create_group(group)
- role = self.new_role_ref()
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
# assign role to group and project_admins
@@ -1971,7 +2096,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
token_id, 'project',
self.project_all['id'])
- self.v3_authenticate_token(scoped_token, expected_status=500)
+ self.v3_create_token(
+ scoped_token, expected_status=http_client.INTERNAL_SERVER_ERROR)
def test_lists_with_missing_group_in_backend(self):
"""Test a mapping that points to a group that does not exist
@@ -1990,8 +2116,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
"""
domain_id = self.domainA['id']
domain_name = self.domainA['name']
- group = self.new_group_ref(domain_id=domain_id)
- group['name'] = 'EXISTS'
+ group = unit.new_group_ref(domain_id=domain_id, name='EXISTS')
group = self.identity_api.create_group(group)
rules = {
'rules': [
@@ -2047,18 +2172,16 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
assigned
"""
-
domain_id = self.domainA['id']
domain_name = self.domainA['name']
# Add a group "EXISTS"
- group_exists = self.new_group_ref(domain_id=domain_id)
- group_exists['name'] = 'EXISTS'
+ group_exists = unit.new_group_ref(domain_id=domain_id, name='EXISTS')
group_exists = self.identity_api.create_group(group_exists)
# Add a group "NO_EXISTS"
- group_no_exists = self.new_group_ref(domain_id=domain_id)
- group_no_exists['name'] = 'NO_EXISTS'
+ group_no_exists = unit.new_group_ref(domain_id=domain_id,
+ name='NO_EXISTS')
group_no_exists = self.identity_api.create_group(group_no_exists)
group_ids = set([group_exists['id'], group_no_exists['id']])
@@ -2122,18 +2245,17 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
assigned
"""
-
domain_id = self.domainA['id']
domain_name = self.domainA['name']
# Add a group "EXISTS"
- group_exists = self.new_group_ref(domain_id=domain_id)
- group_exists['name'] = 'EXISTS'
+ group_exists = unit.new_group_ref(domain_id=domain_id,
+ name='EXISTS')
group_exists = self.identity_api.create_group(group_exists)
# Add a group "NO_EXISTS"
- group_no_exists = self.new_group_ref(domain_id=domain_id)
- group_no_exists['name'] = 'NO_EXISTS'
+ group_no_exists = unit.new_group_ref(domain_id=domain_id,
+ name='NO_EXISTS')
group_no_exists = self.identity_api.create_group(group_no_exists)
group_ids = set([group_exists['id'], group_no_exists['id']])
@@ -2198,8 +2320,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
"""
domain_id = self.domainA['id']
domain_name = self.domainA['name']
- group = self.new_group_ref(domain_id=domain_id)
- group['name'] = 'EXISTS'
+ group = unit.new_group_ref(domain_id=domain_id, name='EXISTS')
group = self.identity_api.create_group(group)
rules = {
'rules': [
@@ -2262,13 +2383,13 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
domain_name = self.domainA['name']
# Add a group "EXISTS"
- group_exists = self.new_group_ref(domain_id=domain_id)
- group_exists['name'] = 'EXISTS'
+ group_exists = unit.new_group_ref(domain_id=domain_id,
+ name='EXISTS')
group_exists = self.identity_api.create_group(group_exists)
# Add a group "NO_EXISTS"
- group_no_exists = self.new_group_ref(domain_id=domain_id)
- group_no_exists['name'] = 'NO_EXISTS'
+ group_no_exists = unit.new_group_ref(domain_id=domain_id,
+ name='NO_EXISTS')
group_no_exists = self.identity_api.create_group(group_no_exists)
group_ids = set([group_exists['id'], group_no_exists['id']])
@@ -2362,7 +2483,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self._check_domains_are_valid(r.json_body['token'])
def test_scoped_token_has_user_domain(self):
- r = self.v3_authenticate_token(
+ r = self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
self._check_domains_are_valid(r.result['token'])
@@ -2383,7 +2504,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
assertion='ANOTHER_LOCAL_USER_ASSERTION')
-class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin):
+class FernetFederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
AUTH_METHOD = 'token'
def load_fixtures(self, fixtures):
@@ -2436,7 +2557,7 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin):
v3_scope_request = self._scope_request(unscoped_token,
'project', project['id'])
- resp = self.v3_authenticate_token(v3_scope_request)
+ resp = self.v3_create_token(v3_scope_request)
token_resp = resp.result['token']
self._check_project_scoped_token_attributes(token_resp, project['id'])
@@ -2448,6 +2569,7 @@ class FederatedTokenTestsMethodToken(FederatedTokenTests):
way for scoping all the tokens.
"""
+
AUTH_METHOD = 'token'
def auth_plugin_config_override(self):
@@ -2455,8 +2577,67 @@ class FederatedTokenTestsMethodToken(FederatedTokenTests):
super(FederatedTokenTests,
self).auth_plugin_config_override(methods)
+ @utils.wip('This will fail because of bug #1501032. The returned method'
+ 'list should contain "saml2". This is documented in bug '
+ '1501032.')
+ def test_full_workflow(self):
+ """Test 'standard' workflow for granting access tokens.
+
+ * Issue unscoped token
+ * List available projects based on groups
+ * Scope token to one of available projects
+
+ """
+ r = self._issue_unscoped_token()
+ token_resp = r.json_body['token']
+ # NOTE(lbragstad): Ensure only 'saml2' is in the method list.
+ self.assertListEqual(['saml2'], token_resp['methods'])
+ self.assertValidMappedUser(token_resp)
+ employee_unscoped_token_id = r.headers.get('X-Subject-Token')
+ r = self.get('/auth/projects', token=employee_unscoped_token_id)
+ projects = r.result['projects']
+ random_project = random.randint(0, len(projects)) - 1
+ project = projects[random_project]
+
+ v3_scope_request = self._scope_request(employee_unscoped_token_id,
+ 'project', project['id'])
+
+ r = self.v3_authenticate_token(v3_scope_request)
+ token_resp = r.result['token']
+ self.assertIn('token', token_resp['methods'])
+ self.assertIn('saml2', token_resp['methods'])
+ self._check_project_scoped_token_attributes(token_resp, project['id'])
+
+
+class FederatedUserTests(test_v3.RestfulTestCase, FederatedSetupMixin):
+ """Tests for federated users
+
+ Tests new shadow users functionality
+
+ """
+
+ def auth_plugin_config_override(self):
+ methods = ['saml2']
+ super(FederatedUserTests, self).auth_plugin_config_override(methods)
+
+ def setUp(self):
+ super(FederatedUserTests, self).setUp()
+
+ def load_fixtures(self, fixtures):
+ super(FederatedUserTests, self).load_fixtures(fixtures)
+ self.load_federation_sample_data()
+
+ def test_user_id_persistense(self):
+ """Ensure user_id is persistend for multiple federated authn calls."""
+ r = self._issue_unscoped_token()
+ user_id = r.json_body['token']['user']['id']
-class JsonHomeTests(FederationTests, test_v3.JsonHomeTestMixin):
+ r = self._issue_unscoped_token()
+ user_id2 = r.json_body['token']['user']['id']
+ self.assertEqual(user_id, user_id2)
+
+
+class JsonHomeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
JSON_HOME_DATA = {
'http://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION/'
'1.0/rel/identity_provider': {
@@ -2484,7 +2665,7 @@ def _load_xml(filename):
return xml.read()
-class SAMLGenerationTests(FederationTests):
+class SAMLGenerationTests(test_v3.RestfulTestCase):
SP_AUTH_URL = ('http://beta.com:5000/v3/OS-FEDERATION/identity_providers'
'/BETA/protocols/saml2/auth')
@@ -2523,7 +2704,7 @@ class SAMLGenerationTests(FederationTests):
self.sp = self.sp_ref()
url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID
self.put(url, body={'service_provider': self.sp},
- expected_status=201)
+ expected_status=http_client.CREATED)
def test_samlize_token_values(self):
"""Test the SAML generator produces a SAML object.
@@ -2665,7 +2846,7 @@ class SAMLGenerationTests(FederationTests):
"""
if not _is_xmlsec1_installed():
- self.skip('xmlsec1 is not installed')
+ self.skipTest('xmlsec1 is not installed')
generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
@@ -2709,7 +2890,7 @@ class SAMLGenerationTests(FederationTests):
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
- resp = self.v3_authenticate_token(auth_data)
+ resp = self.v3_create_token(auth_data)
token_id = resp.headers.get('X-Subject-Token')
return token_id
@@ -2718,7 +2899,7 @@ class SAMLGenerationTests(FederationTests):
user_id=self.user['id'],
password=self.user['password'],
user_domain_id=self.domain['id'])
- resp = self.v3_authenticate_token(auth_data)
+ resp = self.v3_create_token(auth_data)
token_id = resp.headers.get('X-Subject-Token')
return token_id
@@ -2757,7 +2938,7 @@ class SAMLGenerationTests(FederationTests):
return_value=self.signed_assertion):
http_response = self.post(self.SAML_GENERATION_ROUTE, body=body,
response_content_type='text/xml',
- expected_status=200)
+ expected_status=http_client.OK)
response = etree.fromstring(http_response.result)
issuer = response[0]
@@ -2789,10 +2970,9 @@ class SAMLGenerationTests(FederationTests):
def test_invalid_scope_body(self):
"""Test that missing the scope in request body raises an exception.
- Raises exception.SchemaValidationError() - error code 400
+ Raises exception.SchemaValidationError() - error 400 Bad Request
"""
-
token_id = uuid.uuid4().hex
body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID)
@@ -2804,10 +2984,9 @@ class SAMLGenerationTests(FederationTests):
def test_invalid_token_body(self):
"""Test that missing the token in request body raises an exception.
- Raises exception.SchemaValidationError() - error code 400
+ Raises exception.SchemaValidationError() - error 400 Bad Request
"""
-
token_id = uuid.uuid4().hex
body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID)
@@ -2819,7 +2998,7 @@ class SAMLGenerationTests(FederationTests):
def test_sp_not_found(self):
"""Test SAML generation with an invalid service provider ID.
- Raises exception.ServiceProviderNotFound() - error code 404
+ Raises exception.ServiceProviderNotFound() - error Not Found 404
"""
sp_id = uuid.uuid4().hex
@@ -2830,7 +3009,6 @@ class SAMLGenerationTests(FederationTests):
def test_sp_disabled(self):
"""Try generating assertion for disabled Service Provider."""
-
# Disable Service Provider
sp_ref = {'enabled': False}
self.federation_api.update_sp(self.SERVICE_PROVDIER_ID, sp_ref)
@@ -2844,10 +3022,9 @@ class SAMLGenerationTests(FederationTests):
def test_token_not_found(self):
"""Test that an invalid token in the request body raises an exception.
- Raises exception.TokenNotFound() - error code 404
+ Raises exception.TokenNotFound() - error Not Found 404
"""
-
token_id = uuid.uuid4().hex
body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID)
@@ -2863,7 +3040,6 @@ class SAMLGenerationTests(FederationTests):
The controller should return a SAML assertion that is wrapped in a
SOAP envelope.
"""
-
self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
token_id = self._fetch_valid_token()
body = self._create_generate_saml_request(token_id,
@@ -2873,7 +3049,7 @@ class SAMLGenerationTests(FederationTests):
return_value=self.signed_assertion):
http_response = self.post(self.ECP_GENERATION_ROUTE, body=body,
response_content_type='text/xml',
- expected_status=200)
+ expected_status=http_client.OK)
env_response = etree.fromstring(http_response.result)
header = env_response[0]
@@ -2956,7 +3132,7 @@ class SAMLGenerationTests(FederationTests):
self.assertEqual(expected_log, logger_fixture.output)
-class IdPMetadataGenerationTests(FederationTests):
+class IdPMetadataGenerationTests(test_v3.RestfulTestCase):
"""A class for testing Identity Provider Metadata generation."""
METADATA_URL = '/OS-FEDERATION/saml2/metadata'
@@ -3073,20 +3249,20 @@ class IdPMetadataGenerationTests(FederationTests):
self.generator.generate_metadata)
def test_get_metadata_with_no_metadata_file_configured(self):
- self.get(self.METADATA_URL, expected_status=500)
+ self.get(self.METADATA_URL,
+ expected_status=http_client.INTERNAL_SERVER_ERROR)
def test_get_metadata(self):
self.config_fixture.config(
group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml')
- r = self.get(self.METADATA_URL, response_content_type='text/xml',
- expected_status=200)
+ r = self.get(self.METADATA_URL, response_content_type='text/xml')
self.assertEqual('text/xml', r.headers.get('Content-Type'))
reference_file = _load_xml('idp_saml2_metadata.xml')
self.assertEqual(reference_file, r.result)
-class ServiceProviderTests(FederationTests):
+class ServiceProviderTests(test_v3.RestfulTestCase):
"""A test class for Service Providers."""
MEMBER_NAME = 'service_provider'
@@ -3096,13 +3272,13 @@ class ServiceProviderTests(FederationTests):
'relay_state_prefix', 'sp_url']
def setUp(self):
- super(FederationTests, self).setUp()
+ super(ServiceProviderTests, self).setUp()
# Add a Service Provider
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.SP_REF = self.sp_ref()
self.SERVICE_PROVIDER = self.put(
url, body={'service_provider': self.SP_REF},
- expected_status=201).result
+ expected_status=http_client.CREATED).result
def sp_ref(self):
ref = {
@@ -3119,9 +3295,18 @@ class ServiceProviderTests(FederationTests):
return '/OS-FEDERATION/service_providers/' + str(suffix)
return '/OS-FEDERATION/service_providers'
+ def _create_default_sp(self, body=None):
+ """Create default Service Provider."""
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ if body is None:
+ body = self.sp_ref()
+ resp = self.put(url, body={'service_provider': body},
+ expected_status=http_client.CREATED)
+ return resp
+
def test_get_service_provider(self):
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- resp = self.get(url, expected_status=200)
+ resp = self.get(url)
self.assertValidEntity(resp.result['service_provider'],
keys_to_check=self.SP_KEYS)
@@ -3133,7 +3318,7 @@ class ServiceProviderTests(FederationTests):
url = self.base_url(suffix=uuid.uuid4().hex)
sp = self.sp_ref()
resp = self.put(url, body={'service_provider': sp},
- expected_status=201)
+ expected_status=http_client.CREATED)
self.assertValidEntity(resp.result['service_provider'],
keys_to_check=self.SP_KEYS)
@@ -3143,7 +3328,7 @@ class ServiceProviderTests(FederationTests):
sp = self.sp_ref()
del sp['relay_state_prefix']
resp = self.put(url, body={'service_provider': sp},
- expected_status=201)
+ expected_status=http_client.CREATED)
sp_result = resp.result['service_provider']
self.assertEqual(CONF.saml.relay_state_prefix,
sp_result['relay_state_prefix'])
@@ -3155,7 +3340,7 @@ class ServiceProviderTests(FederationTests):
non_default_prefix = uuid.uuid4().hex
sp['relay_state_prefix'] = non_default_prefix
resp = self.put(url, body={'service_provider': sp},
- expected_status=201)
+ expected_status=http_client.CREATED)
sp_result = resp.result['service_provider']
self.assertEqual(non_default_prefix,
sp_result['relay_state_prefix'])
@@ -3182,7 +3367,8 @@ class ServiceProviderTests(FederationTests):
}
for id, sp in ref_service_providers.items():
url = self.base_url(suffix=id)
- self.put(url, body={'service_provider': sp}, expected_status=201)
+ self.put(url, body={'service_provider': sp},
+ expected_status=http_client.CREATED)
# Insert ids into service provider object, we will compare it with
# responses from server and those include 'id' attribute.
@@ -3209,15 +3395,14 @@ class ServiceProviderTests(FederationTests):
"""
new_sp_ref = self.sp_ref()
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- resp = self.patch(url, body={'service_provider': new_sp_ref},
- expected_status=200)
+ resp = self.patch(url, body={'service_provider': new_sp_ref})
patch_result = resp.result
new_sp_ref['id'] = self.SERVICE_PROVIDER_ID
self.assertValidEntity(patch_result['service_provider'],
ref=new_sp_ref,
keys_to_check=self.SP_KEYS)
- resp = self.get(url, expected_status=200)
+ resp = self.get(url)
get_result = resp.result
self.assertDictEqual(patch_result['service_provider'],
@@ -3227,7 +3412,7 @@ class ServiceProviderTests(FederationTests):
"""Update immutable attributes in service provider.
In this particular case the test will try to change ``id`` attribute.
- The server should return an HTTP 403 error code.
+ The server should return an HTTP 403 Forbidden error code.
"""
new_sp_ref = {'id': uuid.uuid4().hex}
@@ -3242,7 +3427,7 @@ class ServiceProviderTests(FederationTests):
self.patch(url, body={'service_provider': new_sp_ref},
expected_status=http_client.BAD_REQUEST)
- def test_update_service_provider_404(self):
+ def test_update_service_provider_returns_not_found(self):
new_sp_ref = self.sp_ref()
new_sp_ref['description'] = uuid.uuid4().hex
url = self.base_url(suffix=uuid.uuid4().hex)
@@ -3250,25 +3435,74 @@ class ServiceProviderTests(FederationTests):
expected_status=http_client.NOT_FOUND)
def test_update_sp_relay_state(self):
- """Update an SP with custome relay state."""
+ """Update an SP with custom relay state."""
new_sp_ref = self.sp_ref()
non_default_prefix = uuid.uuid4().hex
new_sp_ref['relay_state_prefix'] = non_default_prefix
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- resp = self.patch(url, body={'service_provider': new_sp_ref},
- expected_status=200)
+ resp = self.patch(url, body={'service_provider': new_sp_ref})
sp_result = resp.result['service_provider']
self.assertEqual(non_default_prefix,
sp_result['relay_state_prefix'])
def test_delete_service_provider(self):
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- self.delete(url, expected_status=204)
+ self.delete(url)
- def test_delete_service_provider_404(self):
+ def test_delete_service_provider_returns_not_found(self):
url = self.base_url(suffix=uuid.uuid4().hex)
self.delete(url, expected_status=http_client.NOT_FOUND)
+ def test_filter_list_sp_by_id(self):
+ def get_id(resp):
+ sp = resp.result.get('service_provider')
+ return sp.get('id')
+
+ sp1_id = get_id(self._create_default_sp())
+ sp2_id = get_id(self._create_default_sp())
+
+ # list the SP, should get SPs.
+ url = self.base_url()
+ resp = self.get(url)
+ sps = resp.result.get('service_providers')
+ entities_ids = [e['id'] for e in sps]
+ self.assertIn(sp1_id, entities_ids)
+ self.assertIn(sp2_id, entities_ids)
+
+ # filter the SP by 'id'. Only SP1 should appear.
+ url = self.base_url() + '?id=' + sp1_id
+ resp = self.get(url)
+ sps = resp.result.get('service_providers')
+ entities_ids = [e['id'] for e in sps]
+ self.assertIn(sp1_id, entities_ids)
+ self.assertNotIn(sp2_id, entities_ids)
+
+ def test_filter_list_sp_by_enabled(self):
+ def get_id(resp):
+ sp = resp.result.get('service_provider')
+ return sp.get('id')
+
+ sp1_id = get_id(self._create_default_sp())
+ sp2_ref = self.sp_ref()
+ sp2_ref['enabled'] = False
+ sp2_id = get_id(self._create_default_sp(body=sp2_ref))
+
+ # list the SP, should get two SPs.
+ url = self.base_url()
+ resp = self.get(url)
+ sps = resp.result.get('service_providers')
+ entities_ids = [e['id'] for e in sps]
+ self.assertIn(sp1_id, entities_ids)
+ self.assertIn(sp2_id, entities_ids)
+
+ # filter the SP by 'enabled'. Only SP1 should appear.
+ url = self.base_url() + '?enabled=True'
+ resp = self.get(url)
+ sps = resp.result.get('service_providers')
+ entities_ids = [e['id'] for e in sps]
+ self.assertIn(sp1_id, entities_ids)
+ self.assertNotIn(sp2_id, entities_ids)
+
class WebSSOTests(FederatedTokenTests):
"""A class for testing Web SSO."""
@@ -3306,6 +3540,21 @@ class WebSSOTests(FederatedTokenTests):
resp = self.api.federated_sso_auth(context, self.PROTOCOL)
self.assertIn(self.TRUSTED_DASHBOARD, resp.body)
+ def test_get_sso_origin_host_case_insensitive(self):
+ # test lowercase hostname in trusted_dashboard
+ context = {
+ 'query_string': {
+ 'origin': "http://horizon.com",
+ },
+ }
+ host = self.api._get_sso_origin_host(context)
+ self.assertEqual("http://horizon.com", host)
+ # test uppercase hostname in trusted_dashboard
+ self.config_fixture.config(group='federation',
+ trusted_dashboard=['http://Horizon.com'])
+ host = self.api._get_sso_origin_host(context)
+ self.assertEqual("http://horizon.com", host)
+
def test_federated_sso_auth_with_protocol_specific_remote_id(self):
self.config_fixture.config(
group=self.PROTOCOL,
@@ -3380,7 +3629,7 @@ class WebSSOTests(FederatedTokenTests):
self.assertIn(self.TRUSTED_DASHBOARD, resp.body)
-class K2KServiceCatalogTests(FederationTests):
+class K2KServiceCatalogTests(test_v3.RestfulTestCase):
SP1 = 'SP1'
SP2 = 'SP2'
SP3 = 'SP3'
@@ -3429,11 +3678,10 @@ class K2KServiceCatalogTests(FederationTests):
for entity in service_providers:
id = entity.get('id')
ref_entity = self.sp_response(id, ref.get(id))
- self.assertDictEqual(ref_entity, entity)
+ self.assertDictEqual(entity, ref_entity)
def test_service_providers_in_token(self):
"""Check if service providers are listed in service catalog."""
-
token = self.token_v3_helper.get_token_data(self.user_id, ['password'])
ref = {}
for r in (self.sp_alpha, self.sp_beta, self.sp_gamma):