aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_backend_kvs.py
blob: 36af1c36d00f897328885eebd72abbea9b8773d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import uuid

from oslo_utils import timeutils
import six

from keystone.common import utils
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit.token import test_backends as token_tests


class KvsToken(unit.TestCase, token_tests.TokenTests):
    def setUp(self):
        super(KvsToken, self).setUp()
        self.load_backends()

    def test_flush_expired_token(self):
        self.assertRaises(
            exception.NotImplemented,
            self.token_provider_api._persistence.flush_expired_tokens)

    def _update_user_token_index_direct(self, user_key, token_id, new_data):
        persistence = self.token_provider_api._persistence
        token_list = persistence.driver._get_user_token_list_with_expiry(
            user_key)
        # Update the user-index so that the expires time is _actually_ expired
        # since we do not do an explicit get on the token, we only reference
        # the data in the user index (to save extra round-trips to the kvs
        # backend).
        for i, data in enumerate(token_list):
            if data[0] == token_id:
                token_list[i] = new_data
                break
        self.token_provider_api._persistence.driver._store.set(user_key,
                                                               token_list)

    def test_cleanup_user_index_on_create(self):
        user_id = six.text_type(uuid.uuid4().hex)
        valid_token_id, data = self.create_token_sample_data(user_id=user_id)
        expired_token_id, expired_data = self.create_token_sample_data(
            user_id=user_id)

        expire_delta = datetime.timedelta(seconds=86400)

        # NOTE(morganfainberg): Directly access the data cache since we need to
        # get expired tokens as well as valid tokens.
        token_persistence = self.token_provider_api._persistence
        user_key = token_persistence.driver._prefix_user_id(user_id)
        user_token_list = token_persistence.driver._store.get(user_key)
        valid_token_ref = token_persistence.get_token(valid_token_id)
        expired_token_ref = token_persistence.get_token(expired_token_id)
        expected_user_token_list = [
            (valid_token_id, utils.isotime(valid_token_ref['expires'],
                                           subsecond=True)),
            (expired_token_id, utils.isotime(expired_token_ref['expires'],
                                             subsecond=True))]
        self.assertEqual(expected_user_token_list, user_token_list)
        new_expired_data = (expired_token_id,
                            utils.isotime(
                                (timeutils.utcnow() - expire_delta),
                                subsecond=True))
        self._update_user_token_index_direct(user_key, expired_token_id,
                                             new_expired_data)
        valid_token_id_2, valid_data_2 = self.create_token_sample_data(
            user_id=user_id)
        valid_token_ref_2 = token_persistence.get_token(valid_token_id_2)
        expected_user_token_list = [
            (valid_token_id, utils.isotime(valid_token_ref['expires'],
                                           subsecond=True)),
            (valid_token_id_2, utils.isotime(valid_token_ref_2['expires'],
                                             subsecond=True))]
        user_token_list = token_persistence.driver._store.get(user_key)
        self.assertEqual(expected_user_token_list, user_token_list)

        # Test that revoked tokens are removed from the list on create.
        token_persistence.delete_token(valid_token_id_2)
        new_token_id, data = self.create_token_sample_data(user_id=user_id)
        new_token_ref = token_persistence.get_token(new_token_id)
        expected_user_token_list = [
            (valid_token_id, utils.isotime(valid_token_ref['expires'],
                                           subsecond=True)),
            (new_token_id, utils.isotime(new_token_ref['expires'],
                                         subsecond=True))]
        user_token_list = token_persistence.driver._store.get(user_key)
        self.assertEqual(expected_user_token_list, user_token_list)


class KvsTokenCacheInvalidation(unit.TestCase,
                                token_tests.TokenCacheInvalidation):
    def setUp(self):
        super(KvsTokenCacheInvalidation, self).setUp()
        self.useFixture(database.Database(self.sql_driver_version_overrides))
        self.load_backends()
        self._create_test_data()

    def config_overrides(self):
        super(KvsTokenCacheInvalidation, self).config_overrides()
        self.config_fixture.config(group='token', driver='kvs')