aboutsummaryrefslogtreecommitdiffstats
path: root/keystonemiddleware-moon/keystonemiddleware/auth_token/_memcache_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystonemiddleware-moon/keystonemiddleware/auth_token/_memcache_pool.py')
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/auth_token/_memcache_pool.py184
1 files changed, 184 insertions, 0 deletions
diff --git a/keystonemiddleware-moon/keystonemiddleware/auth_token/_memcache_pool.py b/keystonemiddleware-moon/keystonemiddleware/auth_token/_memcache_pool.py
new file mode 100644
index 00000000..77652868
--- /dev/null
+++ b/keystonemiddleware-moon/keystonemiddleware/auth_token/_memcache_pool.py
@@ -0,0 +1,184 @@
+# Copyright 2014 Mirantis Inc
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Thread-safe connection pool for python-memcached."""
+
+# NOTE(yorik-sar): this file is copied between keystone and keystonemiddleware
+# and should be kept in sync until we can use external library for this.
+
+import collections
+import contextlib
+import itertools
+import logging
+import time
+
+from six.moves import queue
+
+from keystonemiddleware.i18n import _LC
+
+
+_PoolItem = collections.namedtuple('_PoolItem', ['ttl', 'connection'])
+
+
+class ConnectionGetTimeoutException(Exception):
+ pass
+
+
+class ConnectionPool(queue.Queue):
+ """Base connection pool class
+
+ This class implements the basic connection pool logic as an abstract base
+ class.
+ """
+ def __init__(self, maxsize, unused_timeout, conn_get_timeout=None):
+ """Initialize the connection pool.
+
+ :param maxsize: maximum number of client connections for the pool
+ :type maxsize: int
+ :param unused_timeout: idle time to live for unused clients (in
+ seconds). If a client connection object has been
+ in the pool and idle for longer than the
+ unused_timeout, it will be reaped. This is to
+ ensure resources are released as utilization
+ goes down.
+ :type unused_timeout: int
+ :param conn_get_timeout: maximum time in seconds to wait for a
+ connection. If set to `None` timeout is
+ indefinite.
+ :type conn_get_timeout: int
+ """
+ queue.Queue.__init__(self, maxsize)
+ self._unused_timeout = unused_timeout
+ self._connection_get_timeout = conn_get_timeout
+ self._acquired = 0
+ self._LOG = logging.getLogger(__name__)
+
+ def _create_connection(self):
+ raise NotImplementedError
+
+ def _destroy_connection(self, conn):
+ raise NotImplementedError
+
+ @contextlib.contextmanager
+ def acquire(self):
+ try:
+ conn = self.get(timeout=self._connection_get_timeout)
+ except queue.Empty:
+ self._LOG.critical(_LC('Unable to get a connection from pool id '
+ '%(id)s after %(seconds)s seconds.'),
+ {'id': id(self),
+ 'seconds': self._connection_get_timeout})
+ raise ConnectionGetTimeoutException()
+ try:
+ yield conn
+ finally:
+ self.put(conn)
+
+ def _qsize(self):
+ return self.maxsize - self._acquired
+
+ if not hasattr(queue.Queue, '_qsize'):
+ qsize = _qsize
+
+ def _get(self):
+ if self.queue:
+ conn = self.queue.pop().connection
+ else:
+ conn = self._create_connection()
+ self._acquired += 1
+ return conn
+
+ def _put(self, conn):
+ self.queue.append(_PoolItem(
+ ttl=time.time() + self._unused_timeout,
+ connection=conn,
+ ))
+ self._acquired -= 1
+ # Drop all expired connections from the right end of the queue
+ now = time.time()
+ while self.queue and self.queue[0].ttl < now:
+ conn = self.queue.popleft().connection
+ self._destroy_connection(conn)
+
+
+class MemcacheClientPool(ConnectionPool):
+ def __init__(self, urls, arguments, **kwargs):
+ ConnectionPool.__init__(self, **kwargs)
+ self._urls = urls
+ self._arguments = arguments
+ # NOTE(morganfainberg): The host objects expect an int for the
+ # deaduntil value. Initialize this at 0 for each host with 0 indicating
+ # the host is not dead.
+ self._hosts_deaduntil = [0] * len(urls)
+
+ # NOTE(morganfainberg): Lazy import to allow middleware to work with
+ # python 3k even if memcache will not due to python 3k
+ # incompatibilities within the python-memcache library.
+ global memcache
+ import memcache
+
+ # This 'class' is taken from http://stackoverflow.com/a/22520633/238308
+ # Don't inherit client from threading.local so that we can reuse
+ # clients in different threads
+ MemcacheClient = type('_MemcacheClient', (object,),
+ dict(memcache.Client.__dict__))
+
+ self._memcache_client_class = MemcacheClient
+
+ def _create_connection(self):
+ return self._memcache_client_class(self._urls, **self._arguments)
+
+ def _destroy_connection(self, conn):
+ conn.disconnect_all()
+
+ def _get(self):
+ conn = ConnectionPool._get(self)
+ try:
+ # Propagate host state known to us to this client's list
+ now = time.time()
+ for deaduntil, host in zip(self._hosts_deaduntil, conn.servers):
+ if deaduntil > now and host.deaduntil <= now:
+ host.mark_dead('propagating death mark from the pool')
+ host.deaduntil = deaduntil
+ except Exception:
+ # We need to be sure that connection doesn't leak from the pool.
+ # This code runs before we enter context manager's try-finally
+ # block, so we need to explicitly release it here
+ ConnectionPool._put(self, conn)
+ raise
+ return conn
+
+ def _put(self, conn):
+ try:
+ # If this client found that one of the hosts is dead, mark it as
+ # such in our internal list
+ now = time.time()
+ for i, deaduntil, host in zip(itertools.count(),
+ self._hosts_deaduntil,
+ conn.servers):
+ # Do nothing if we already know this host is dead
+ if deaduntil <= now:
+ if host.deaduntil > now:
+ self._hosts_deaduntil[i] = host.deaduntil
+ else:
+ self._hosts_deaduntil[i] = 0
+ # If all hosts are dead we should forget that they're dead. This
+ # way we won't get completely shut off until dead_retry seconds
+ # pass, but will be checking servers as frequent as we can (over
+ # way smaller socket_timeout)
+ if all(deaduntil > now for deaduntil in self._hosts_deaduntil):
+ self._hosts_deaduntil[:] = [0] * len(self._hosts_deaduntil)
+ finally:
+ ConnectionPool._put(self, conn)