# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # """TZlibTransport provides a compressed transport and transport factory class, using the python standard library zlib module to implement data compression. """ from __future__ import division import zlib from cStringIO import StringIO from TTransport import TTransportBase, CReadableTransport class TZlibTransportFactory(object): """Factory transport that builds zlib compressed transports. This factory caches the last single client/transport that it was passed and returns the same TZlibTransport object that was created. This caching means the TServer class will get the _same_ transport object for both input and output transports from this factory. (For non-threaded scenarios only, since the cache only holds one object) The purpose of this caching is to allocate only one TZlibTransport where only one is really needed (since it must have separate read/write buffers), and makes the statistics from getCompSavings() and getCompRatio() easier to understand. """ # class scoped cache of last transport given and zlibtransport returned _last_trans = None _last_z = None def getTransport(self, trans, compresslevel=9): """Wrap a transport, trans, with the TZlibTransport compressed transport class, returning a new transport to the caller. @param compresslevel: The zlib compression level, ranging from 0 (no compression) to 9 (best compression). Defaults to 9. @type compresslevel: int This method returns a TZlibTransport which wraps the passed C{trans} TTransport derived instance. """ if trans == self._last_trans: return self._last_z ztrans = TZlibTransport(trans, compresslevel) self._last_trans = trans self._last_z = ztrans return ztrans class TZlibTransport(TTransportBase, CReadableTransport): """Class that wraps a transport with zlib, compressing writes and decompresses reads, using the python standard library zlib module. """ # Read buffer size for the python fastbinary C extension, # the TBinaryProtocolAccelerated class. DEFAULT_BUFFSIZE = 4096 def __init__(self, trans, compresslevel=9): """Create a new TZlibTransport, wrapping C{trans}, another TTransport derived object. @param trans: A thrift transport object, i.e. a TSocket() object. @type trans: TTransport @param compresslevel: The zlib compression level, ranging from 0 (no compression) to 9 (best compression). Default is 9. @type compresslevel: int """ self.__trans = trans self.compresslevel = compresslevel self.__rbuf = StringIO() self.__wbuf = StringIO() self._init_zlib() self._init_stats() def _reinit_buffers(self): """Internal method to initialize/reset the internal StringIO objects for read and write buffers. """ self.__rbuf = StringIO() self.__wbuf = StringIO() def _init_stats(self): """Internal method to reset the internal statistics counters for compression ratios and bandwidth savings. """ self.bytes_in = 0 self.bytes_out = 0 self.bytes_in_comp = 0 self.bytes_out_comp = 0 def _init_zlib(self): """Internal method for setting up the zlib compression and decompression objects. """ self._zcomp_read = zlib.decompressobj() self._zcomp_write = zlib.compressobj(self.compresslevel) def getCompRatio(self): """Get the current measured compression ratios (in,out) from this transport. Returns a tuple of: (inbound_compression_ratio, outbound_compression_ratio) The compression ratios are computed as: compressed / uncompressed E.g., data that compresses by 10x will have a ratio of: 0.10 and data that compresses to half of ts original size will have a ratio of 0.5 None is returned if no bytes have yet been processed in a particular direction. """ r_percent, w_percent = (None, None) if self.bytes_in > 0: r_percent = self.bytes_in_comp / self.bytes_in if self.bytes_out > 0: w_percent = self.bytes_out_comp / self.bytes_out return (r_percent, w_percent) def getCompSavings(self): """Get the current count of saved bytes due to data compression. Returns a tuple of: (inbound_saved_bytes, outbound_saved_bytes) Note: if compression is actually expanding your data (only likely with very tiny thrift objects), then the values returned will be negative. """ r_saved = self.bytes_in - self.bytes_in_comp w_saved = self.bytes_out - self.bytes_out_comp return (r_saved, w_saved) def isOpen(self): """Return the underlying transport's open status""" return self.__trans.isOpen() def open(self): """Open the underlying transport""" self._init_stats() return self.__trans.open() def listen(self): """Invoke the underlying transport's listen() method""" self.__trans.listen() def accept(self): """Accept connections on the underlying transport""" return self.__trans.accept() def close(self): """Close the underlying transport,""" self._reinit_buffers() self._init_zlib() return self.__trans.close() def read(self, sz): """Read up to sz bytes from the decompressed bytes buffer, and read from the underlying transport if the decompression buffer is empty. """ ret = self.__rbuf.read(sz) if len(ret) > 0: return ret # keep reading from transport until something comes back while True: if self.readComp(sz): break ret = self.__rbuf.read(sz) return ret def readComp(self, sz): """Read compressed data from the underlying transport, then decompress it and append it to the internal StringIO read buffer """ zbuf = self.__trans.read(sz) zbuf = self._zcomp_read.unconsumed_tail + zbuf buf = self._zcomp_read.decompress(zbuf) self.bytes_in += len(zbuf) self.bytes_in_comp += len(buf) old = self.__rbuf.read() self.__rbuf = StringIO(old + buf) if len(old) + len(buf) == 0: return False return True def write(self, buf): """Write some bytes, putting them into the internal write buffer for eventual compression. """ self.__wbuf.write(buf) def flush(self): """Flush any queued up data in the write buffer and ensure the compression buffer is flushed out to the underlying transport """ wout = self.__wbuf.getvalue() if len(wout) > 0: zbuf = self._zcomp_write.compress(wout) self.bytes_out += len(wout) self.bytes_out_comp += len(zbuf) else: zbuf = '' ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH) self.bytes_out_comp += len(ztail) if (len(zbuf) + len(ztail)) > 0: self.__wbuf = StringIO() self.__trans.write(zbuf + ztail) self.__trans.flush() @property def cstringio_buf(self): """Implement the CReadableTransport interface""" return self.__rbuf def cstringio_refill(self, partialread, reqlen): """Implement the CReadableTransport interface for refill""" retstring = partialread if reqlen < self.DEFAULT_BUFFSIZE: retstring += self.read(self.DEFAULT_BUFFSIZE) while len(retstring) < reqlen: retstring += self.read(reqlen - len(retstring)) self.__rbuf = StringIO(retstring) return self.__rbuf