aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3.py197
1 files changed, 178 insertions, 19 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py
index f6d6ed93..9bbfa103 100644
--- a/keystone-moon/keystone/tests/unit/test_v3.py
+++ b/keystone-moon/keystone/tests/unit/test_v3.py
@@ -299,10 +299,11 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
ref = self.new_ref()
return ref
- def new_project_ref(self, domain_id, parent_id=None):
+ def new_project_ref(self, domain_id=None, parent_id=None, is_domain=False):
ref = self.new_ref()
ref['domain_id'] = domain_id
ref['parent_id'] = parent_id
+ ref['is_domain'] = is_domain
return ref
def new_user_ref(self, domain_id, project_id=None):
@@ -362,9 +363,9 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
if isinstance(expires, six.string_types):
ref['expires_at'] = expires
elif isinstance(expires, dict):
- ref['expires_at'] = timeutils.strtime(
- timeutils.utcnow() + datetime.timedelta(**expires),
- fmt=TIME_FORMAT)
+ ref['expires_at'] = (
+ timeutils.utcnow() + datetime.timedelta(**expires)
+ ).strftime(TIME_FORMAT)
elif expires is None:
pass
else:
@@ -396,6 +397,29 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
return project
+ def get_unscoped_token(self):
+ """Convenience method so that we can test authenticated requests."""
+ r = self.admin_request(
+ method='POST',
+ path='/v3/auth/tokens',
+ body={
+ 'auth': {
+ 'identity': {
+ 'methods': ['password'],
+ 'password': {
+ 'user': {
+ 'name': self.user['name'],
+ 'password': self.user['password'],
+ 'domain': {
+ 'id': self.user['domain_id']
+ }
+ }
+ }
+ }
+ }
+ })
+ return r.headers.get('X-Subject-Token')
+
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
r = self.admin_request(
@@ -424,6 +448,34 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
})
return r.headers.get('X-Subject-Token')
+ def get_domain_scoped_token(self):
+ """Convenience method for requesting domain scoped token."""
+ r = self.admin_request(
+ method='POST',
+ path='/v3/auth/tokens',
+ body={
+ 'auth': {
+ 'identity': {
+ 'methods': ['password'],
+ 'password': {
+ 'user': {
+ 'name': self.user['name'],
+ 'password': self.user['password'],
+ 'domain': {
+ 'id': self.user['domain_id']
+ }
+ }
+ }
+ },
+ 'scope': {
+ 'domain': {
+ 'id': self.domain['id'],
+ }
+ }
+ }
+ })
+ return r.headers.get('X-Subject-Token')
+
def get_requested_token(self, auth):
"""Request the specific token we want."""
@@ -593,20 +645,6 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
return entity
- def assertDictContainsSubset(self, expected, actual):
- """"Asserts if dictionary actual is a superset of expected.
-
- Tests whether the key/value pairs in dictionary actual are a superset
- of those in expected.
-
- """
- for k, v in expected.iteritems():
- self.assertIn(k, actual)
- if isinstance(v, dict):
- self.assertDictContainsSubset(v, actual[k])
- else:
- self.assertEqual(v, actual[k])
-
# auth validation
def assertValidISO8601ExtendedFormatDatetime(self, dt):
@@ -752,7 +790,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase,
self.assertValidCatalog(resp.json['catalog'])
self.assertIn('links', resp.json)
self.assertIsInstance(resp.json['links'], dict)
- self.assertEqual(['self'], resp.json['links'].keys())
+ self.assertEqual(['self'], list(resp.json['links'].keys()))
self.assertEqual(
'http://localhost/v3/auth/catalog',
resp.json['links']['self'])
@@ -1258,6 +1296,42 @@ class AuthContextMiddlewareTestCase(RestfulTestCase):
self.assertDictEqual(req.environ.get(authorization.AUTH_CONTEXT_ENV),
{})
+ def test_unscoped_token_auth_context(self):
+ unscoped_token = self.get_unscoped_token()
+ req = self._mock_request_object(unscoped_token)
+ application = None
+ middleware.AuthContextMiddleware(application).process_request(req)
+ for key in ['project_id', 'domain_id', 'domain_name']:
+ self.assertNotIn(
+ key,
+ req.environ.get(authorization.AUTH_CONTEXT_ENV))
+
+ def test_project_scoped_token_auth_context(self):
+ project_scoped_token = self.get_scoped_token()
+ req = self._mock_request_object(project_scoped_token)
+ application = None
+ middleware.AuthContextMiddleware(application).process_request(req)
+ self.assertEqual(
+ self.project['id'],
+ req.environ.get(authorization.AUTH_CONTEXT_ENV)['project_id'])
+
+ def test_domain_scoped_token_auth_context(self):
+ # grant the domain role to user
+ path = '/domains/%s/users/%s/roles/%s' % (
+ self.domain['id'], self.user['id'], self.role['id'])
+ self.put(path=path)
+
+ domain_scoped_token = self.get_domain_scoped_token()
+ req = self._mock_request_object(domain_scoped_token)
+ application = None
+ middleware.AuthContextMiddleware(application).process_request(req)
+ self.assertEqual(
+ self.domain['id'],
+ req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_id'])
+ self.assertEqual(
+ self.domain['name'],
+ req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_name'])
+
class JsonHomeTestMixin(object):
"""JSON Home test
@@ -1281,3 +1355,88 @@ class JsonHomeTestMixin(object):
for rel in self.JSON_HOME_DATA:
self.assertThat(resp_data['resources'][rel],
matchers.Equals(self.JSON_HOME_DATA[rel]))
+
+
+class AssignmentTestMixin(object):
+ """To hold assignment helper functions."""
+
+ def build_role_assignment_query_url(self, effective=False, **filters):
+ """Build and return a role assignment query url with provided params.
+
+ Available filters are: domain_id, project_id, user_id, group_id,
+ role_id and inherited_to_projects.
+ """
+
+ query_params = '?effective' if effective else ''
+
+ for k, v in filters.items():
+ query_params += '?' if not query_params else '&'
+
+ if k == 'inherited_to_projects':
+ query_params += 'scope.OS-INHERIT:inherited_to=projects'
+ else:
+ if k in ['domain_id', 'project_id']:
+ query_params += 'scope.'
+ elif k not in ['user_id', 'group_id', 'role_id']:
+ raise ValueError(
+ 'Invalid key \'%s\' in provided filters.' % k)
+
+ query_params += '%s=%s' % (k.replace('_', '.'), v)
+
+ return '/role_assignments%s' % query_params
+
+ def build_role_assignment_link(self, **attribs):
+ """Build and return a role assignment link with provided attributes.
+
+ Provided attributes are expected to contain: domain_id or project_id,
+ user_id or group_id, role_id and, optionally, inherited_to_projects.
+ """
+
+ if attribs.get('domain_id'):
+ link = '/domains/' + attribs['domain_id']
+ else:
+ link = '/projects/' + attribs['project_id']
+
+ if attribs.get('user_id'):
+ link += '/users/' + attribs['user_id']
+ else:
+ link += '/groups/' + attribs['group_id']
+
+ link += '/roles/' + attribs['role_id']
+
+ if attribs.get('inherited_to_projects'):
+ return '/OS-INHERIT%s/inherited_to_projects' % link
+
+ return link
+
+ def build_role_assignment_entity(self, link=None, **attribs):
+ """Build and return a role assignment entity with provided attributes.
+
+ Provided attributes are expected to contain: domain_id or project_id,
+ user_id or group_id, role_id and, optionally, inherited_to_projects.
+ """
+
+ entity = {'links': {'assignment': (
+ link or self.build_role_assignment_link(**attribs))}}
+
+ if attribs.get('domain_id'):
+ entity['scope'] = {'domain': {'id': attribs['domain_id']}}
+ else:
+ entity['scope'] = {'project': {'id': attribs['project_id']}}
+
+ if attribs.get('user_id'):
+ entity['user'] = {'id': attribs['user_id']}
+
+ if attribs.get('group_id'):
+ entity['links']['membership'] = ('/groups/%s/users/%s' %
+ (attribs['group_id'],
+ attribs['user_id']))
+ else:
+ entity['group'] = {'id': attribs['group_id']}
+
+ entity['role'] = {'id': attribs['role_id']}
+
+ if attribs.get('inherited_to_projects'):
+ entity['scope']['OS-INHERIT:inherited_to'] = 'projects'
+
+ return entity