aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/common/cache/_memcache_pool.py
diff options
context:
space:
mode:
authorWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
committerWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
commitb8c756ecdd7cced1db4300935484e8c83701c82e (patch)
tree87e51107d82b217ede145de9d9d59e2100725bd7 /keystone-moon/keystone/common/cache/_memcache_pool.py
parentc304c773bae68fb854ed9eab8fb35c4ef17cf136 (diff)
migrate moon code from github to opnfv
Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604 Signed-off-by: WuKong <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/common/cache/_memcache_pool.py')
-rw-r--r--keystone-moon/keystone/common/cache/_memcache_pool.py233
1 files changed, 233 insertions, 0 deletions
diff --git a/keystone-moon/keystone/common/cache/_memcache_pool.py b/keystone-moon/keystone/common/cache/_memcache_pool.py
new file mode 100644
index 00000000..b15332db
--- /dev/null
+++ b/keystone-moon/keystone/common/cache/_memcache_pool.py
@@ -0,0 +1,233 @@
+# Copyright 2014 Mirantis Inc
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Thread-safe connection pool for python-memcached."""
+
+# NOTE(yorik-sar): this file is copied between keystone and keystonemiddleware
+# and should be kept in sync until we can use external library for this.
+
+import collections
+import contextlib
+import itertools
+import logging
+import threading
+import time
+
+import memcache
+from oslo_log import log
+from six.moves import queue
+
+from keystone import exception
+from keystone.i18n import _
+
+
+LOG = log.getLogger(__name__)
+
+# This 'class' is taken from http://stackoverflow.com/a/22520633/238308
+# Don't inherit client from threading.local so that we can reuse clients in
+# different threads
+_MemcacheClient = type('_MemcacheClient', (object,),
+ dict(memcache.Client.__dict__))
+
+_PoolItem = collections.namedtuple('_PoolItem', ['ttl', 'connection'])
+
+
+class ConnectionPool(queue.Queue):
+ """Base connection pool class
+
+ This class implements the basic connection pool logic as an abstract base
+ class.
+ """
+ def __init__(self, maxsize, unused_timeout, conn_get_timeout=None):
+ """Initialize the connection pool.
+
+ :param maxsize: maximum number of client connections for the pool
+ :type maxsize: int
+ :param unused_timeout: idle time to live for unused clients (in
+ seconds). If a client connection object has been
+ in the pool and idle for longer than the
+ unused_timeout, it will be reaped. This is to
+ ensure resources are released as utilization
+ goes down.
+ :type unused_timeout: int
+ :param conn_get_timeout: maximum time in seconds to wait for a
+ connection. If set to `None` timeout is
+ indefinite.
+ :type conn_get_timeout: int
+ """
+ # super() cannot be used here because Queue in stdlib is an
+ # old-style class
+ queue.Queue.__init__(self, maxsize)
+ self._unused_timeout = unused_timeout
+ self._connection_get_timeout = conn_get_timeout
+ self._acquired = 0
+
+ def _create_connection(self):
+ """Returns a connection instance.
+
+ This is called when the pool needs another instance created.
+
+ :returns: a new connection instance
+
+ """
+ raise NotImplementedError
+
+ def _destroy_connection(self, conn):
+ """Destroy and cleanup a connection instance.
+
+ This is called when the pool wishes to get rid of an existing
+ connection. This is the opportunity for a subclass to free up
+ resources and cleaup after itself.
+
+ :param conn: the connection object to destroy
+
+ """
+ raise NotImplementedError
+
+ def _debug_logger(self, msg, *args, **kwargs):
+ if LOG.isEnabledFor(logging.DEBUG):
+ thread_id = threading.current_thread().ident
+ args = (id(self), thread_id) + args
+ prefix = 'Memcached pool %s, thread %s: '
+ LOG.debug(prefix + msg, *args, **kwargs)
+
+ @contextlib.contextmanager
+ def acquire(self):
+ self._debug_logger('Acquiring connection')
+ try:
+ conn = self.get(timeout=self._connection_get_timeout)
+ except queue.Empty:
+ raise exception.UnexpectedError(
+ _('Unable to get a connection from pool id %(id)s after '
+ '%(seconds)s seconds.') %
+ {'id': id(self), 'seconds': self._connection_get_timeout})
+ self._debug_logger('Acquired connection %s', id(conn))
+ try:
+ yield conn
+ finally:
+ self._debug_logger('Releasing connection %s', id(conn))
+ self._drop_expired_connections()
+ try:
+ # super() cannot be used here because Queue in stdlib is an
+ # old-style class
+ queue.Queue.put(self, conn, block=False)
+ except queue.Full:
+ self._debug_logger('Reaping exceeding connection %s', id(conn))
+ self._destroy_connection(conn)
+
+ def _qsize(self):
+ if self.maxsize:
+ return self.maxsize - self._acquired
+ else:
+ # A value indicating there is always a free connection
+ # if maxsize is None or 0
+ return 1
+
+ # NOTE(dstanek): stdlib and eventlet Queue implementations
+ # have different names for the qsize method. This ensures
+ # that we override both of them.
+ if not hasattr(queue.Queue, '_qsize'):
+ qsize = _qsize
+
+ def _get(self):
+ if self.queue:
+ conn = self.queue.pop().connection
+ else:
+ conn = self._create_connection()
+ self._acquired += 1
+ return conn
+
+ def _drop_expired_connections(self):
+ """Drop all expired connections from the right end of the queue."""
+ now = time.time()
+ while self.queue and self.queue[0].ttl < now:
+ conn = self.queue.popleft().connection
+ self._debug_logger('Reaping connection %s', id(conn))
+ self._destroy_connection(conn)
+
+ def _put(self, conn):
+ self.queue.append(_PoolItem(
+ ttl=time.time() + self._unused_timeout,
+ connection=conn,
+ ))
+ self._acquired -= 1
+
+
+class MemcacheClientPool(ConnectionPool):
+ def __init__(self, urls, arguments, **kwargs):
+ # super() cannot be used here because Queue in stdlib is an
+ # old-style class
+ ConnectionPool.__init__(self, **kwargs)
+ self.urls = urls
+ self._arguments = arguments
+ # NOTE(morganfainberg): The host objects expect an int for the
+ # deaduntil value. Initialize this at 0 for each host with 0 indicating
+ # the host is not dead.
+ self._hosts_deaduntil = [0] * len(urls)
+
+ def _create_connection(self):
+ return _MemcacheClient(self.urls, **self._arguments)
+
+ def _destroy_connection(self, conn):
+ conn.disconnect_all()
+
+ def _get(self):
+ # super() cannot be used here because Queue in stdlib is an
+ # old-style class
+ conn = ConnectionPool._get(self)
+ try:
+ # Propagate host state known to us to this client's list
+ now = time.time()
+ for deaduntil, host in zip(self._hosts_deaduntil, conn.servers):
+ if deaduntil > now and host.deaduntil <= now:
+ host.mark_dead('propagating death mark from the pool')
+ host.deaduntil = deaduntil
+ except Exception:
+ # We need to be sure that connection doesn't leak from the pool.
+ # This code runs before we enter context manager's try-finally
+ # block, so we need to explicitly release it here.
+ # super() cannot be used here because Queue in stdlib is an
+ # old-style class
+ ConnectionPool._put(self, conn)
+ raise
+ return conn
+
+ def _put(self, conn):
+ try:
+ # If this client found that one of the hosts is dead, mark it as
+ # such in our internal list
+ now = time.time()
+ for i, host in zip(itertools.count(), conn.servers):
+ deaduntil = self._hosts_deaduntil[i]
+ # Do nothing if we already know this host is dead
+ if deaduntil <= now:
+ if host.deaduntil > now:
+ self._hosts_deaduntil[i] = host.deaduntil
+ self._debug_logger(
+ 'Marked host %s dead until %s',
+ self.urls[i], host.deaduntil)
+ else:
+ self._hosts_deaduntil[i] = 0
+ # If all hosts are dead we should forget that they're dead. This
+ # way we won't get completely shut off until dead_retry seconds
+ # pass, but will be checking servers as frequent as we can (over
+ # way smaller socket_timeout)
+ if all(deaduntil > now for deaduntil in self._hosts_deaduntil):
+ self._debug_logger('All hosts are dead. Marking them as live.')
+ self._hosts_deaduntil[:] = [0] * len(self._hosts_deaduntil)
+ finally:
+ # super() cannot be used here because Queue in stdlib is an
+ # old-style class
+ ConnectionPool._put(self, conn)