summaryrefslogtreecommitdiffstats
path: root/networking-odl/networking_odl/common/cache.py
blob: 8b5287e38cae12169dc7aa920dc2abeb7c0611c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import collections
import six
import sys
import time

from oslo_log import log

from networking_odl._i18n import _LW


LOG = log.getLogger(__name__)


class CacheEntry(collections.namedtuple('CacheEntry', ['timeout', 'values'])):

    error = None

    @classmethod
    def create(cls, timeout, *values):
        return CacheEntry(timeout, list(values))

    def add_value(self, value):
        self.values.append(value)

    def is_expired(self, current_clock):
        return self.timeout <= current_clock

    def __hash__(self):
        return id(self)

    def __eq__(self, other):
        return self is other


class Cache(object):
    '''Generic mapping class used to cache mapping

    Example of uses:
        - host name to IP addresses mapping
        - IP addresses to ODL networking topology elements mapping
    '''

    # TODO(Federico Ressi) after Mitaka: this class should store cached data
    # in a place shared between more hosts using a caching mechanism coherent
    # with other OpenStack libraries. This is specially interesting in the
    # context of reliability when there are more Neutron instances and direct
    # connection to ODL is broken.

    create_new_entry = CacheEntry.create

    def __init__(self, fetch_all_func):
        if not callable(fetch_all_func):
            message = 'Expected callable as parameter, got {!r}.'.format(
                fetch_all_func)
            raise TypeError(message)
        self._fetch_all = fetch_all_func
        self.clear()

    def clear(self):
        self._entries = collections.OrderedDict()

    def fetch(self, key, timeout):
        __, value = self.fetch_any([key], timeout=timeout)
        return value

    def fetch_any(self, keys, timeout):
        return next(self.fetch_all(keys=keys, timeout=timeout))

    def fetch_all(self, keys, timeout):
        # this mean now in numbers
        current_clock = time.clock()
        # this is the moment in the future in which new entries will expires
        new_entries_timeout = current_clock + timeout
        # entries to be fetched because missing or expired
        new_entries = collections.OrderedDict()
        # all entries missing or expired
        missing = collections.OrderedDict()
        # captured error for the case a problem has to be reported
        cause_exc_info = None

        for key in keys:
            entry = self._entries.get(key)
            if entry is None or entry.is_expired(current_clock) or entry.error:
                # this entry has to be fetched
                new_entries[key] = missing[key] =\
                    self.create_new_entry(new_entries_timeout)
            elif entry.values:
                # Yield existing entry
                for value in entry.values:
                    yield key, value
            else:
                # This entry is not expired and there were no error where it
                # has been fetch. Therefore we accept that there are no values
                # for given key until it expires. This is going to produce a
                # KeyError if it is still missing at the end of this function.
                missing[key] = entry

        if missing:
            if new_entries:
                # Fetch some entries and update the cache
                try:
                    new_entry_keys = tuple(new_entries)
                    for key, value in self._fetch_all(new_entry_keys):
                        entry = new_entries.get(key)
                        if entry:
                            # Add fresh new value
                            entry.add_value(value)
                        else:
                            # This key was not asked, but we take it in any
                            # way. "Noli equi dentes inspicere donati."
                            new_entries[key] = entry = self.create_new_entry(
                                new_entries_timeout, value)

                # pylint: disable=broad-except
                except Exception:
                    # Something has gone wrong: update and yield what got until
                    # now before raising any error
                    cause_exc_info = sys.exc_info()
                    LOG.warning(
                        _LW('Error fetching values for keys: %r'),
                        ', '.join(repr(k) for k in new_entry_keys),
                        exc_info=cause_exc_info)

                # update the cache with new fresh entries
                self._entries.update(new_entries)

            missing_keys = []
            for key, entry in six.iteritems(missing):
                if entry.values:
                    # yield entries that was missing before
                    for value in entry.values:
                        # Yield just fetched entry
                        yield key, value
                else:
                    if cause_exc_info:
                        # mark this entry as failed
                        entry.error = cause_exc_info
                    # after all this entry is still without any value
                    missing_keys.append(key)

            if missing_keys:
                # After all some entry is still missing, probably because the
                # key was invalid. It's time to raise an error.
                missing_keys = tuple(missing_keys)
                if not cause_exc_info:
                    # Search for the error cause in missing entries
                    for key in missing_keys:
                        error = self._entries[key].error
                        if error:
                            # A cached entry for which fetch method produced an
                            # error will produce the same error if fetch method
                            # fails to fetch it again without giving any error
                            # Is this what we want?
                            break

                    else:
                        # If the cause of the problem is not knwow then
                        # probably keys were wrong
                        message = 'Invalid keys: {!r}'.format(
                            ', '.join(missing_keys))
                        error = KeyError(message)

                    try:
                        raise error
                    except KeyError:
                        cause_exc_info = sys.exc_info()

                raise CacheFetchError(
                    missing_keys=missing_keys, cause_exc_info=cause_exc_info)


class CacheFetchError(KeyError):

    def __init__(self, missing_keys, cause_exc_info):
        super(CacheFetchError, self).__init__(str(cause_exc_info[1]))
        self.cause_exc_info = cause_exc_info
        self.missing_keys = missing_keys

    def reraise_cause(self):
        six.reraise(*self.cause_exc_info)