1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid
from oslo_utils import timeutils
import six
from testtools import matchers
from keystone.contrib.revoke import model
from keystone.tests.unit import test_v3
from keystone.token import provider
def _future_time_string():
expire_delta = datetime.timedelta(seconds=1000)
future_time = timeutils.utcnow() + expire_delta
return timeutils.isotime(future_time)
class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
EXTENSION_NAME = 'revoke'
EXTENSION_TO_ADD = 'revoke_extension'
JSON_HOME_DATA = {
'http://docs.openstack.org/api/openstack-identity/3/ext/OS-REVOKE/1.0/'
'rel/events': {
'href': '/OS-REVOKE/events',
},
}
def test_get_empty_list(self):
resp = self.get('/OS-REVOKE/events')
self.assertEqual([], resp.json_body['events'])
def _blank_event(self):
return {}
# The two values will be the same with the exception of
# 'issued_before' which is set when the event is recorded.
def assertReportedEventMatchesRecorded(self, event, sample, before_time):
after_time = timeutils.utcnow()
event_issued_before = timeutils.normalize_time(
timeutils.parse_isotime(event['issued_before']))
self.assertTrue(
before_time <= event_issued_before,
'invalid event issued_before time; %s is not later than %s.' % (
timeutils.isotime(event_issued_before, subsecond=True),
timeutils.isotime(before_time, subsecond=True)))
self.assertTrue(
event_issued_before <= after_time,
'invalid event issued_before time; %s is not earlier than %s.' % (
timeutils.isotime(event_issued_before, subsecond=True),
timeutils.isotime(after_time, subsecond=True)))
del (event['issued_before'])
self.assertEqual(sample, event)
def test_revoked_list_self_url(self):
revoked_list_url = '/OS-REVOKE/events'
resp = self.get(revoked_list_url)
links = resp.json_body['links']
self.assertThat(links['self'], matchers.EndsWith(revoked_list_url))
def test_revoked_token_in_list(self):
user_id = uuid.uuid4().hex
expires_at = provider.default_expire_time()
sample = self._blank_event()
sample['user_id'] = six.text_type(user_id)
sample['expires_at'] = six.text_type(timeutils.isotime(expires_at))
before_time = timeutils.utcnow()
self.revoke_api.revoke_by_expiration(user_id, expires_at)
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
self.assertEqual(1, len(events))
self.assertReportedEventMatchesRecorded(events[0], sample, before_time)
def test_disabled_project_in_list(self):
project_id = uuid.uuid4().hex
sample = dict()
sample['project_id'] = six.text_type(project_id)
before_time = timeutils.utcnow()
self.revoke_api.revoke(
model.RevokeEvent(project_id=project_id))
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
self.assertEqual(1, len(events))
self.assertReportedEventMatchesRecorded(events[0], sample, before_time)
def test_disabled_domain_in_list(self):
domain_id = uuid.uuid4().hex
sample = dict()
sample['domain_id'] = six.text_type(domain_id)
before_time = timeutils.utcnow()
self.revoke_api.revoke(
model.RevokeEvent(domain_id=domain_id))
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
self.assertEqual(1, len(events))
self.assertReportedEventMatchesRecorded(events[0], sample, before_time)
def test_list_since_invalid(self):
self.get('/OS-REVOKE/events?since=blah', expected_status=400)
def test_list_since_valid(self):
resp = self.get('/OS-REVOKE/events?since=2013-02-27T18:30:59.999999Z')
events = resp.json_body['events']
self.assertEqual(0, len(events))
def test_since_future_time_no_events(self):
domain_id = uuid.uuid4().hex
sample = dict()
sample['domain_id'] = six.text_type(domain_id)
self.revoke_api.revoke(
model.RevokeEvent(domain_id=domain_id))
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
self.assertEqual(1, len(events))
resp = self.get('/OS-REVOKE/events?since=%s' % _future_time_string())
events = resp.json_body['events']
self.assertEqual([], events)
|