diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3.py | 197 |
1 files changed, 178 insertions, 19 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py index f6d6ed93..9bbfa103 100644 --- a/keystone-moon/keystone/tests/unit/test_v3.py +++ b/keystone-moon/keystone/tests/unit/test_v3.py @@ -299,10 +299,11 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, ref = self.new_ref() return ref - def new_project_ref(self, domain_id, parent_id=None): + def new_project_ref(self, domain_id=None, parent_id=None, is_domain=False): ref = self.new_ref() ref['domain_id'] = domain_id ref['parent_id'] = parent_id + ref['is_domain'] = is_domain return ref def new_user_ref(self, domain_id, project_id=None): @@ -362,9 +363,9 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, if isinstance(expires, six.string_types): ref['expires_at'] = expires elif isinstance(expires, dict): - ref['expires_at'] = timeutils.strtime( - timeutils.utcnow() + datetime.timedelta(**expires), - fmt=TIME_FORMAT) + ref['expires_at'] = ( + timeutils.utcnow() + datetime.timedelta(**expires) + ).strftime(TIME_FORMAT) elif expires is None: pass else: @@ -396,6 +397,29 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, return project + def get_unscoped_token(self): + """Convenience method so that we can test authenticated requests.""" + r = self.admin_request( + method='POST', + path='/v3/auth/tokens', + body={ + 'auth': { + 'identity': { + 'methods': ['password'], + 'password': { + 'user': { + 'name': self.user['name'], + 'password': self.user['password'], + 'domain': { + 'id': self.user['domain_id'] + } + } + } + } + } + }) + return r.headers.get('X-Subject-Token') + def get_scoped_token(self): """Convenience method so that we can test authenticated requests.""" r = self.admin_request( @@ -424,6 +448,34 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, }) return r.headers.get('X-Subject-Token') + def get_domain_scoped_token(self): + """Convenience method for requesting domain scoped token.""" + r = self.admin_request( + method='POST', + path='/v3/auth/tokens', + body={ + 'auth': { + 'identity': { + 'methods': ['password'], + 'password': { + 'user': { + 'name': self.user['name'], + 'password': self.user['password'], + 'domain': { + 'id': self.user['domain_id'] + } + } + } + }, + 'scope': { + 'domain': { + 'id': self.domain['id'], + } + } + } + }) + return r.headers.get('X-Subject-Token') + def get_requested_token(self, auth): """Request the specific token we want.""" @@ -593,20 +645,6 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, return entity - def assertDictContainsSubset(self, expected, actual): - """"Asserts if dictionary actual is a superset of expected. - - Tests whether the key/value pairs in dictionary actual are a superset - of those in expected. - - """ - for k, v in expected.iteritems(): - self.assertIn(k, actual) - if isinstance(v, dict): - self.assertDictContainsSubset(v, actual[k]) - else: - self.assertEqual(v, actual[k]) - # auth validation def assertValidISO8601ExtendedFormatDatetime(self, dt): @@ -752,7 +790,7 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, self.assertValidCatalog(resp.json['catalog']) self.assertIn('links', resp.json) self.assertIsInstance(resp.json['links'], dict) - self.assertEqual(['self'], resp.json['links'].keys()) + self.assertEqual(['self'], list(resp.json['links'].keys())) self.assertEqual( 'http://localhost/v3/auth/catalog', resp.json['links']['self']) @@ -1258,6 +1296,42 @@ class AuthContextMiddlewareTestCase(RestfulTestCase): self.assertDictEqual(req.environ.get(authorization.AUTH_CONTEXT_ENV), {}) + def test_unscoped_token_auth_context(self): + unscoped_token = self.get_unscoped_token() + req = self._mock_request_object(unscoped_token) + application = None + middleware.AuthContextMiddleware(application).process_request(req) + for key in ['project_id', 'domain_id', 'domain_name']: + self.assertNotIn( + key, + req.environ.get(authorization.AUTH_CONTEXT_ENV)) + + def test_project_scoped_token_auth_context(self): + project_scoped_token = self.get_scoped_token() + req = self._mock_request_object(project_scoped_token) + application = None + middleware.AuthContextMiddleware(application).process_request(req) + self.assertEqual( + self.project['id'], + req.environ.get(authorization.AUTH_CONTEXT_ENV)['project_id']) + + def test_domain_scoped_token_auth_context(self): + # grant the domain role to user + path = '/domains/%s/users/%s/roles/%s' % ( + self.domain['id'], self.user['id'], self.role['id']) + self.put(path=path) + + domain_scoped_token = self.get_domain_scoped_token() + req = self._mock_request_object(domain_scoped_token) + application = None + middleware.AuthContextMiddleware(application).process_request(req) + self.assertEqual( + self.domain['id'], + req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_id']) + self.assertEqual( + self.domain['name'], + req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_name']) + class JsonHomeTestMixin(object): """JSON Home test @@ -1281,3 +1355,88 @@ class JsonHomeTestMixin(object): for rel in self.JSON_HOME_DATA: self.assertThat(resp_data['resources'][rel], matchers.Equals(self.JSON_HOME_DATA[rel])) + + +class AssignmentTestMixin(object): + """To hold assignment helper functions.""" + + def build_role_assignment_query_url(self, effective=False, **filters): + """Build and return a role assignment query url with provided params. + + Available filters are: domain_id, project_id, user_id, group_id, + role_id and inherited_to_projects. + """ + + query_params = '?effective' if effective else '' + + for k, v in filters.items(): + query_params += '?' if not query_params else '&' + + if k == 'inherited_to_projects': + query_params += 'scope.OS-INHERIT:inherited_to=projects' + else: + if k in ['domain_id', 'project_id']: + query_params += 'scope.' + elif k not in ['user_id', 'group_id', 'role_id']: + raise ValueError( + 'Invalid key \'%s\' in provided filters.' % k) + + query_params += '%s=%s' % (k.replace('_', '.'), v) + + return '/role_assignments%s' % query_params + + def build_role_assignment_link(self, **attribs): + """Build and return a role assignment link with provided attributes. + + Provided attributes are expected to contain: domain_id or project_id, + user_id or group_id, role_id and, optionally, inherited_to_projects. + """ + + if attribs.get('domain_id'): + link = '/domains/' + attribs['domain_id'] + else: + link = '/projects/' + attribs['project_id'] + + if attribs.get('user_id'): + link += '/users/' + attribs['user_id'] + else: + link += '/groups/' + attribs['group_id'] + + link += '/roles/' + attribs['role_id'] + + if attribs.get('inherited_to_projects'): + return '/OS-INHERIT%s/inherited_to_projects' % link + + return link + + def build_role_assignment_entity(self, link=None, **attribs): + """Build and return a role assignment entity with provided attributes. + + Provided attributes are expected to contain: domain_id or project_id, + user_id or group_id, role_id and, optionally, inherited_to_projects. + """ + + entity = {'links': {'assignment': ( + link or self.build_role_assignment_link(**attribs))}} + + if attribs.get('domain_id'): + entity['scope'] = {'domain': {'id': attribs['domain_id']}} + else: + entity['scope'] = {'project': {'id': attribs['project_id']}} + + if attribs.get('user_id'): + entity['user'] = {'id': attribs['user_id']} + + if attribs.get('group_id'): + entity['links']['membership'] = ('/groups/%s/users/%s' % + (attribs['group_id'], + attribs['user_id'])) + else: + entity['group'] = {'id': attribs['group_id']} + + entity['role'] = {'id': attribs['role_id']} + + if attribs.get('inherited_to_projects'): + entity['scope']['OS-INHERIT:inherited_to'] = 'projects' + + return entity |