diff options
Diffstat (limited to 'lib')
63 files changed, 7819 insertions, 0 deletions
diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/__init__.py diff --git a/lib/dominoRPC/Communication-remote b/lib/dominoRPC/Communication-remote new file mode 100755 index 0000000..77a96d7 --- /dev/null +++ b/lib/dominoRPC/Communication-remote @@ -0,0 +1,129 @@ +#!/usr/bin/env python +# +# Autogenerated by Thrift Compiler (0.9.3) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +import sys +import pprint +from urlparse import urlparse +from thrift.transport import TTransport +from thrift.transport import TSocket +from thrift.transport import TSSLSocket +from thrift.transport import THttpClient +from thrift.protocol import TBinaryProtocol + +from domino import Communication +from domino.ttypes import * + +if len(sys.argv) <= 1 or sys.argv[1] == '--help': + print('') + print('Usage: ' + sys.argv[0] + ' [-h host[:port]] [-u url] [-f[ramed]] [-s[sl]] function [arg1 [arg2...]]') + print('') + print('Functions:') + print(' HeartBeatMessage d_heartbeat(HeartBeatMessage msg)') + print(' RegisterResponseMessage d_register(RegisterMessage msg)') + print(' SubscribeResponseMessage d_subscribe(SubscribeMessage msg)') + print(' PublishResponseMessage d_publish(PublishMessage msg)') + print(' PushResponseMessage d_push(PushMessage msg)') + print(' QueryResponseMessage d_query(QueryMessage msg)') + print('') + sys.exit(0) + +pp = pprint.PrettyPrinter(indent = 2) +host = 'localhost' +port = 9090 +uri = '' +framed = False +ssl = False +http = False +argi = 1 + +if sys.argv[argi] == '-h': + parts = sys.argv[argi+1].split(':') + host = parts[0] + if len(parts) > 1: + port = int(parts[1]) + argi += 2 + +if sys.argv[argi] == '-u': + url = urlparse(sys.argv[argi+1]) + parts = url[1].split(':') + host = parts[0] + if len(parts) > 1: + port = int(parts[1]) + else: + port = 80 + uri = url[2] + if url[4]: + uri += '?%s' % url[4] + http = True + argi += 2 + +if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed': + framed = True + argi += 1 + +if sys.argv[argi] == '-s' or sys.argv[argi] == '-ssl': + ssl = True + argi += 1 + +cmd = sys.argv[argi] +args = sys.argv[argi+1:] + +if http: + transport = THttpClient.THttpClient(host, port, uri) +else: + socket = TSSLSocket.TSSLSocket(host, port, validate=False) if ssl else TSocket.TSocket(host, port) + if framed: + transport = TTransport.TFramedTransport(socket) + else: + transport = TTransport.TBufferedTransport(socket) +protocol = TBinaryProtocol.TBinaryProtocol(transport) +client = Communication.Client(protocol) +transport.open() + +if cmd == 'd_heartbeat': + if len(args) != 1: + print('d_heartbeat requires 1 args') + sys.exit(1) + pp.pprint(client.d_heartbeat(eval(args[0]),)) + +elif cmd == 'd_register': + if len(args) != 1: + print('d_register requires 1 args') + sys.exit(1) + pp.pprint(client.d_register(eval(args[0]),)) + +elif cmd == 'd_subscribe': + if len(args) != 1: + print('d_subscribe requires 1 args') + sys.exit(1) + pp.pprint(client.d_subscribe(eval(args[0]),)) + +elif cmd == 'd_publish': + if len(args) != 1: + print('d_publish requires 1 args') + sys.exit(1) + pp.pprint(client.d_publish(eval(args[0]),)) + +elif cmd == 'd_push': + if len(args) != 1: + print('d_push requires 1 args') + sys.exit(1) + pp.pprint(client.d_push(eval(args[0]),)) + +elif cmd == 'd_query': + if len(args) != 1: + print('d_query requires 1 args') + sys.exit(1) + pp.pprint(client.d_query(eval(args[0]),)) + +else: + print('Unrecognized method %s' % cmd) + sys.exit(1) + +transport.close() diff --git a/lib/dominoRPC/Communication.py b/lib/dominoRPC/Communication.py new file mode 100644 index 0000000..2252bf7 --- /dev/null +++ b/lib/dominoRPC/Communication.py @@ -0,0 +1,1196 @@ +# +# Autogenerated by Thrift Compiler (0.9.3) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TException, TApplicationException +import logging +from ttypes import * +from thrift.Thrift import TProcessor +from thrift.transport import TTransport +from thrift.protocol import TBinaryProtocol, TProtocol +try: + from thrift.protocol import fastbinary +except: + fastbinary = None + + +class Iface: + def d_heartbeat(self, msg): + """ + A method definition looks like C code. It has a return type, arguments, + and optionally a list of exceptions that it may throw. Note that argument + lists and exception lists are specified using the exact same syntax as + field lists in struct or exception definitions. + + Parameters: + - msg + """ + pass + + def d_register(self, msg): + """ + Parameters: + - msg + """ + pass + + def d_subscribe(self, msg): + """ + Parameters: + - msg + """ + pass + + def d_publish(self, msg): + """ + Parameters: + - msg + """ + pass + + def d_push(self, msg): + """ + Parameters: + - msg + """ + pass + + def d_query(self, msg): + """ + Parameters: + - msg + """ + pass + + +class Client(Iface): + def __init__(self, iprot, oprot=None): + self._iprot = self._oprot = iprot + if oprot is not None: + self._oprot = oprot + self._seqid = 0 + + def d_heartbeat(self, msg): + """ + A method definition looks like C code. It has a return type, arguments, + and optionally a list of exceptions that it may throw. Note that argument + lists and exception lists are specified using the exact same syntax as + field lists in struct or exception definitions. + + Parameters: + - msg + """ + self.send_d_heartbeat(msg) + return self.recv_d_heartbeat() + + def send_d_heartbeat(self, msg): + self._oprot.writeMessageBegin('d_heartbeat', TMessageType.CALL, self._seqid) + args = d_heartbeat_args() + args.msg = msg + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_d_heartbeat(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = d_heartbeat_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "d_heartbeat failed: unknown result") + + def d_register(self, msg): + """ + Parameters: + - msg + """ + self.send_d_register(msg) + return self.recv_d_register() + + def send_d_register(self, msg): + self._oprot.writeMessageBegin('d_register', TMessageType.CALL, self._seqid) + args = d_register_args() + args.msg = msg + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_d_register(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = d_register_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "d_register failed: unknown result") + + def d_subscribe(self, msg): + """ + Parameters: + - msg + """ + self.send_d_subscribe(msg) + return self.recv_d_subscribe() + + def send_d_subscribe(self, msg): + self._oprot.writeMessageBegin('d_subscribe', TMessageType.CALL, self._seqid) + args = d_subscribe_args() + args.msg = msg + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_d_subscribe(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = d_subscribe_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "d_subscribe failed: unknown result") + + def d_publish(self, msg): + """ + Parameters: + - msg + """ + self.send_d_publish(msg) + return self.recv_d_publish() + + def send_d_publish(self, msg): + self._oprot.writeMessageBegin('d_publish', TMessageType.CALL, self._seqid) + args = d_publish_args() + args.msg = msg + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_d_publish(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = d_publish_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "d_publish failed: unknown result") + + def d_push(self, msg): + """ + Parameters: + - msg + """ + self.send_d_push(msg) + return self.recv_d_push() + + def send_d_push(self, msg): + self._oprot.writeMessageBegin('d_push', TMessageType.CALL, self._seqid) + args = d_push_args() + args.msg = msg + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_d_push(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = d_push_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "d_push failed: unknown result") + + def d_query(self, msg): + """ + Parameters: + - msg + """ + self.send_d_query(msg) + return self.recv_d_query() + + def send_d_query(self, msg): + self._oprot.writeMessageBegin('d_query', TMessageType.CALL, self._seqid) + args = d_query_args() + args.msg = msg + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_d_query(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = d_query_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + raise TApplicationException(TApplicationException.MISSING_RESULT, "d_query failed: unknown result") + + +class Processor(Iface, TProcessor): + def __init__(self, handler): + self._handler = handler + self._processMap = {} + self._processMap["d_heartbeat"] = Processor.process_d_heartbeat + self._processMap["d_register"] = Processor.process_d_register + self._processMap["d_subscribe"] = Processor.process_d_subscribe + self._processMap["d_publish"] = Processor.process_d_publish + self._processMap["d_push"] = Processor.process_d_push + self._processMap["d_query"] = Processor.process_d_query + + def process(self, iprot, oprot): + (name, type, seqid) = iprot.readMessageBegin() + if name not in self._processMap: + iprot.skip(TType.STRUCT) + iprot.readMessageEnd() + x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name)) + oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid) + x.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + return + else: + self._processMap[name](self, seqid, iprot, oprot) + return True + + def process_d_heartbeat(self, seqid, iprot, oprot): + args = d_heartbeat_args() + args.read(iprot) + iprot.readMessageEnd() + result = d_heartbeat_result() + try: + result.success = self._handler.d_heartbeat(args.msg) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("d_heartbeat", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + + def process_d_register(self, seqid, iprot, oprot): + args = d_register_args() + args.read(iprot) + iprot.readMessageEnd() + result = d_register_result() + try: + result.success = self._handler.d_register(args.msg) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("d_register", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + + def process_d_subscribe(self, seqid, iprot, oprot): + args = d_subscribe_args() + args.read(iprot) + iprot.readMessageEnd() + result = d_subscribe_result() + try: + result.success = self._handler.d_subscribe(args.msg) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("d_subscribe", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + + def process_d_publish(self, seqid, iprot, oprot): + args = d_publish_args() + args.read(iprot) + iprot.readMessageEnd() + result = d_publish_result() + try: + result.success = self._handler.d_publish(args.msg) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("d_publish", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + + def process_d_push(self, seqid, iprot, oprot): + args = d_push_args() + args.read(iprot) + iprot.readMessageEnd() + result = d_push_result() + try: + result.success = self._handler.d_push(args.msg) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("d_push", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + + def process_d_query(self, seqid, iprot, oprot): + args = d_query_args() + args.read(iprot) + iprot.readMessageEnd() + result = d_query_result() + try: + result.success = self._handler.d_query(args.msg) + msg_type = TMessageType.REPLY + except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): + raise + except Exception as ex: + msg_type = TMessageType.EXCEPTION + logging.exception(ex) + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("d_query", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + + +# HELPER FUNCTIONS AND STRUCTURES + +class d_heartbeat_args: + """ + Attributes: + - msg + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'msg', (HeartBeatMessage, HeartBeatMessage.thrift_spec), None, ), # 1 + ) + + def __init__(self, msg=None,): + self.msg = msg + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.msg = HeartBeatMessage() + self.msg.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_heartbeat_args') + if self.msg is not None: + oprot.writeFieldBegin('msg', TType.STRUCT, 1) + self.msg.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.msg) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_heartbeat_result: + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.STRUCT, 'success', (HeartBeatMessage, HeartBeatMessage.thrift_spec), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = HeartBeatMessage() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_heartbeat_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_register_args: + """ + Attributes: + - msg + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'msg', (RegisterMessage, RegisterMessage.thrift_spec), None, ), # 1 + ) + + def __init__(self, msg=None,): + self.msg = msg + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.msg = RegisterMessage() + self.msg.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_register_args') + if self.msg is not None: + oprot.writeFieldBegin('msg', TType.STRUCT, 1) + self.msg.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.msg) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_register_result: + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.STRUCT, 'success', (RegisterResponseMessage, RegisterResponseMessage.thrift_spec), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = RegisterResponseMessage() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_register_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_subscribe_args: + """ + Attributes: + - msg + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'msg', (SubscribeMessage, SubscribeMessage.thrift_spec), None, ), # 1 + ) + + def __init__(self, msg=None,): + self.msg = msg + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.msg = SubscribeMessage() + self.msg.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_subscribe_args') + if self.msg is not None: + oprot.writeFieldBegin('msg', TType.STRUCT, 1) + self.msg.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.msg) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_subscribe_result: + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.STRUCT, 'success', (SubscribeResponseMessage, SubscribeResponseMessage.thrift_spec), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = SubscribeResponseMessage() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_subscribe_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_publish_args: + """ + Attributes: + - msg + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'msg', (PublishMessage, PublishMessage.thrift_spec), None, ), # 1 + ) + + def __init__(self, msg=None,): + self.msg = msg + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.msg = PublishMessage() + self.msg.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_publish_args') + if self.msg is not None: + oprot.writeFieldBegin('msg', TType.STRUCT, 1) + self.msg.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.msg) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_publish_result: + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.STRUCT, 'success', (PublishResponseMessage, PublishResponseMessage.thrift_spec), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = PublishResponseMessage() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_publish_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_push_args: + """ + Attributes: + - msg + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'msg', (PushMessage, PushMessage.thrift_spec), None, ), # 1 + ) + + def __init__(self, msg=None,): + self.msg = msg + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.msg = PushMessage() + self.msg.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_push_args') + if self.msg is not None: + oprot.writeFieldBegin('msg', TType.STRUCT, 1) + self.msg.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.msg) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_push_result: + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.STRUCT, 'success', (PushResponseMessage, PushResponseMessage.thrift_spec), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = PushResponseMessage() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_push_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_query_args: + """ + Attributes: + - msg + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRUCT, 'msg', (QueryMessage, QueryMessage.thrift_spec), None, ), # 1 + ) + + def __init__(self, msg=None,): + self.msg = msg + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRUCT: + self.msg = QueryMessage() + self.msg.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_query_args') + if self.msg is not None: + oprot.writeFieldBegin('msg', TType.STRUCT, 1) + self.msg.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.msg) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class d_query_result: + """ + Attributes: + - success + """ + + thrift_spec = ( + (0, TType.STRUCT, 'success', (QueryResponseMessage, QueryResponseMessage.thrift_spec), None, ), # 0 + ) + + def __init__(self, success=None,): + self.success = success + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.STRUCT: + self.success = QueryResponseMessage() + self.success.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('d_query_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.STRUCT, 0) + self.success.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.success) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/lib/dominoRPC/Communication.pyc b/lib/dominoRPC/Communication.pyc Binary files differnew file mode 100644 index 0000000..b697bc2 --- /dev/null +++ b/lib/dominoRPC/Communication.pyc diff --git a/lib/dominoRPC/__init__.py b/lib/dominoRPC/__init__.py new file mode 100644 index 0000000..e2212bc --- /dev/null +++ b/lib/dominoRPC/__init__.py @@ -0,0 +1 @@ +__all__ = ['ttypes', 'constants', 'Communication'] diff --git a/lib/dominoRPC/__init__.pyc b/lib/dominoRPC/__init__.pyc Binary files differnew file mode 100644 index 0000000..52cf85c --- /dev/null +++ b/lib/dominoRPC/__init__.pyc diff --git a/lib/dominoRPC/constants.py b/lib/dominoRPC/constants.py new file mode 100644 index 0000000..57b7c26 --- /dev/null +++ b/lib/dominoRPC/constants.py @@ -0,0 +1,29 @@ +# +# Autogenerated by Thrift Compiler (0.9.3) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TException, TApplicationException +from ttypes import * + +HEART_BEAT = 1 +REGISTER = 2 +REGISTER_RESPONSE = 3 +SUBSCRIBE = 4 +SUBSCRIBE_RESPONSE = 5 +PUBLISH = 6 +PUBLISH_RESPONSE = 7 +PUSH = 8 +PUSH_RESPONSE = 9 +QUERY = 10 +QUERY_RESPONSE = 11 +SUCCESS = 1 +FAILED = 2 +APPEND = 0 +OVERWRITE = 1 +DELETE = 2 + +THRIFT_RPC_TIMEOUT_MS = 1000 diff --git a/lib/dominoRPC/constants.pyc b/lib/dominoRPC/constants.pyc Binary files differnew file mode 100644 index 0000000..18c888f --- /dev/null +++ b/lib/dominoRPC/constants.pyc diff --git a/lib/dominoRPC/ttypes.py b/lib/dominoRPC/ttypes.py new file mode 100644 index 0000000..8a80d5f --- /dev/null +++ b/lib/dominoRPC/ttypes.py @@ -0,0 +1,1435 @@ +# +# Autogenerated by Thrift Compiler (0.9.3) +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# +# options string: py +# + +from thrift.Thrift import TType, TMessageType, TException, TApplicationException + +from thrift.transport import TTransport +from thrift.protocol import TBinaryProtocol, TProtocol +try: + from thrift.protocol import fastbinary +except: + fastbinary = None + + + +class HeartBeatMessage: + """ + Domino sends periodic heartbeats from + Domino Clients and Domino Server echos + + Attributes: + - messageType + - domino_udid + - seq_no + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 1, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 1 + self.messageType = messageType + self.domino_udid = domino_udid + self.seq_no = seq_no + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('HeartBeatMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.seq_no) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class RegisterMessage: + """ + Domino Clients must first register with + Domino Server. Clients can ask for a specific + Unique Domino ID (UDID) + + Attributes: + - messageType + - domino_udid_desired + - seq_no + - ipaddr + - tcpport + - supported_templates + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 2, ), # 1 + (2, TType.I64, 'domino_udid_desired', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + (4, TType.STRING, 'ipaddr', None, None, ), # 4 + (5, TType.I16, 'tcpport', None, None, ), # 5 + (6, TType.LIST, 'supported_templates', (TType.STRING,None), None, ), # 6 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid_desired=None, seq_no=None, ipaddr=None, tcpport=None, supported_templates=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 2 + self.messageType = messageType + self.domino_udid_desired = domino_udid_desired + self.seq_no = seq_no + self.ipaddr = ipaddr + self.tcpport = tcpport + self.supported_templates = supported_templates + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid_desired = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRING: + self.ipaddr = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I16: + self.tcpport = iprot.readI16() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.LIST: + self.supported_templates = [] + (_etype3, _size0) = iprot.readListBegin() + for _i4 in xrange(_size0): + _elem5 = iprot.readString() + self.supported_templates.append(_elem5) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('RegisterMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid_desired is not None: + oprot.writeFieldBegin('domino_udid_desired', TType.I64, 2) + oprot.writeI64(self.domino_udid_desired) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.ipaddr is not None: + oprot.writeFieldBegin('ipaddr', TType.STRING, 4) + oprot.writeString(self.ipaddr) + oprot.writeFieldEnd() + if self.tcpport is not None: + oprot.writeFieldBegin('tcpport', TType.I16, 5) + oprot.writeI16(self.tcpport) + oprot.writeFieldEnd() + if self.supported_templates is not None: + oprot.writeFieldBegin('supported_templates', TType.LIST, 6) + oprot.writeListBegin(TType.STRING, len(self.supported_templates)) + for iter6 in self.supported_templates: + oprot.writeString(iter6) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid_desired) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.ipaddr) + value = (value * 31) ^ hash(self.tcpport) + value = (value * 31) ^ hash(self.supported_templates) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class RegisterResponseMessage: + """ + Attributes: + - messageType + - domino_udid + - domino_udid_assigned + - seq_no + - responseCode + - comments + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 3, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'domino_udid_assigned', None, None, ), # 3 + (4, TType.I64, 'seq_no', None, None, ), # 4 + (5, TType.BYTE, 'responseCode', None, None, ), # 5 + (6, TType.LIST, 'comments', (TType.STRING,None), None, ), # 6 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, domino_udid_assigned=None, seq_no=None, responseCode=None, comments=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 3 + self.messageType = messageType + self.domino_udid = domino_udid + self.domino_udid_assigned = domino_udid_assigned + self.seq_no = seq_no + self.responseCode = responseCode + self.comments = comments + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.domino_udid_assigned = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.BYTE: + self.responseCode = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.LIST: + self.comments = [] + (_etype10, _size7) = iprot.readListBegin() + for _i11 in xrange(_size7): + _elem12 = iprot.readString() + self.comments.append(_elem12) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('RegisterResponseMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.domino_udid_assigned is not None: + oprot.writeFieldBegin('domino_udid_assigned', TType.I64, 3) + oprot.writeI64(self.domino_udid_assigned) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 4) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.responseCode is not None: + oprot.writeFieldBegin('responseCode', TType.BYTE, 5) + oprot.writeByte(self.responseCode) + oprot.writeFieldEnd() + if self.comments is not None: + oprot.writeFieldBegin('comments', TType.LIST, 6) + oprot.writeListBegin(TType.STRING, len(self.comments)) + for iter13 in self.comments: + oprot.writeString(iter13) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.domino_udid_assigned) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.responseCode) + value = (value * 31) ^ hash(self.comments) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class SubscribeMessage: + """ + Attributes: + - messageType + - domino_udid + - seq_no + - template_op + - supported_template_types + - label_op + - labels + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 4, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + (4, TType.BYTE, 'template_op', None, None, ), # 4 + (5, TType.LIST, 'supported_template_types', (TType.STRING,None), None, ), # 5 + (6, TType.BYTE, 'label_op', None, None, ), # 6 + (7, TType.LIST, 'labels', (TType.STRING,None), None, ), # 7 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, template_op=None, supported_template_types=None, label_op=None, labels=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 4 + self.messageType = messageType + self.domino_udid = domino_udid + self.seq_no = seq_no + self.template_op = template_op + self.supported_template_types = supported_template_types + self.label_op = label_op + self.labels = labels + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.BYTE: + self.template_op = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.LIST: + self.supported_template_types = [] + (_etype17, _size14) = iprot.readListBegin() + for _i18 in xrange(_size14): + _elem19 = iprot.readString() + self.supported_template_types.append(_elem19) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.BYTE: + self.label_op = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.LIST: + self.labels = [] + (_etype23, _size20) = iprot.readListBegin() + for _i24 in xrange(_size20): + _elem25 = iprot.readString() + self.labels.append(_elem25) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('SubscribeMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.template_op is not None: + oprot.writeFieldBegin('template_op', TType.BYTE, 4) + oprot.writeByte(self.template_op) + oprot.writeFieldEnd() + if self.supported_template_types is not None: + oprot.writeFieldBegin('supported_template_types', TType.LIST, 5) + oprot.writeListBegin(TType.STRING, len(self.supported_template_types)) + for iter26 in self.supported_template_types: + oprot.writeString(iter26) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.label_op is not None: + oprot.writeFieldBegin('label_op', TType.BYTE, 6) + oprot.writeByte(self.label_op) + oprot.writeFieldEnd() + if self.labels is not None: + oprot.writeFieldBegin('labels', TType.LIST, 7) + oprot.writeListBegin(TType.STRING, len(self.labels)) + for iter27 in self.labels: + oprot.writeString(iter27) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.template_op) + value = (value * 31) ^ hash(self.supported_template_types) + value = (value * 31) ^ hash(self.label_op) + value = (value * 31) ^ hash(self.labels) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class SubscribeResponseMessage: + """ + Attributes: + - messageType + - domino_udid + - seq_no + - responseCode + - comments + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 5, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + (4, TType.BYTE, 'responseCode', None, None, ), # 4 + (5, TType.LIST, 'comments', (TType.STRING,None), None, ), # 5 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, responseCode=None, comments=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 5 + self.messageType = messageType + self.domino_udid = domino_udid + self.seq_no = seq_no + self.responseCode = responseCode + self.comments = comments + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.BYTE: + self.responseCode = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.LIST: + self.comments = [] + (_etype31, _size28) = iprot.readListBegin() + for _i32 in xrange(_size28): + _elem33 = iprot.readString() + self.comments.append(_elem33) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('SubscribeResponseMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.responseCode is not None: + oprot.writeFieldBegin('responseCode', TType.BYTE, 4) + oprot.writeByte(self.responseCode) + oprot.writeFieldEnd() + if self.comments is not None: + oprot.writeFieldBegin('comments', TType.LIST, 5) + oprot.writeListBegin(TType.STRING, len(self.comments)) + for iter34 in self.comments: + oprot.writeString(iter34) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.responseCode) + value = (value * 31) ^ hash(self.comments) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class PublishMessage: + """ + Attributes: + - messageType + - domino_udid + - seq_no + - template_type + - template + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 6, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + (4, TType.STRING, 'template_type', None, None, ), # 4 + (5, TType.LIST, 'template', (TType.STRING,None), None, ), # 5 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, template_type=None, template=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 6 + self.messageType = messageType + self.domino_udid = domino_udid + self.seq_no = seq_no + self.template_type = template_type + self.template = template + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRING: + self.template_type = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.LIST: + self.template = [] + (_etype38, _size35) = iprot.readListBegin() + for _i39 in xrange(_size35): + _elem40 = iprot.readString() + self.template.append(_elem40) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('PublishMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.template_type is not None: + oprot.writeFieldBegin('template_type', TType.STRING, 4) + oprot.writeString(self.template_type) + oprot.writeFieldEnd() + if self.template is not None: + oprot.writeFieldBegin('template', TType.LIST, 5) + oprot.writeListBegin(TType.STRING, len(self.template)) + for iter41 in self.template: + oprot.writeString(iter41) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.template_type) + value = (value * 31) ^ hash(self.template) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class PublishResponseMessage: + """ + Attributes: + - messageType + - domino_udid + - seq_no + - responseCode + - comments + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 7, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + (4, TType.BYTE, 'responseCode', None, None, ), # 4 + (5, TType.LIST, 'comments', (TType.STRING,None), None, ), # 5 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, responseCode=None, comments=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 7 + self.messageType = messageType + self.domino_udid = domino_udid + self.seq_no = seq_no + self.responseCode = responseCode + self.comments = comments + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.BYTE: + self.responseCode = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.LIST: + self.comments = [] + (_etype45, _size42) = iprot.readListBegin() + for _i46 in xrange(_size42): + _elem47 = iprot.readString() + self.comments.append(_elem47) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('PublishResponseMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.responseCode is not None: + oprot.writeFieldBegin('responseCode', TType.BYTE, 4) + oprot.writeByte(self.responseCode) + oprot.writeFieldEnd() + if self.comments is not None: + oprot.writeFieldBegin('comments', TType.LIST, 5) + oprot.writeListBegin(TType.STRING, len(self.comments)) + for iter48 in self.comments: + oprot.writeString(iter48) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.responseCode) + value = (value * 31) ^ hash(self.comments) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class PushMessage: + """ + Attributes: + - messageType + - domino_udid + - seq_no + - template_type + - template + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 8, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + (4, TType.STRING, 'template_type', None, None, ), # 4 + (5, TType.LIST, 'template', (TType.STRING,None), None, ), # 5 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, template_type=None, template=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 8 + self.messageType = messageType + self.domino_udid = domino_udid + self.seq_no = seq_no + self.template_type = template_type + self.template = template + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRING: + self.template_type = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.LIST: + self.template = [] + (_etype52, _size49) = iprot.readListBegin() + for _i53 in xrange(_size49): + _elem54 = iprot.readString() + self.template.append(_elem54) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('PushMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.template_type is not None: + oprot.writeFieldBegin('template_type', TType.STRING, 4) + oprot.writeString(self.template_type) + oprot.writeFieldEnd() + if self.template is not None: + oprot.writeFieldBegin('template', TType.LIST, 5) + oprot.writeListBegin(TType.STRING, len(self.template)) + for iter55 in self.template: + oprot.writeString(iter55) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.template_type) + value = (value * 31) ^ hash(self.template) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class PushResponseMessage: + """ + Attributes: + - messageType + - domino_udid + - seq_no + - responseCode + - comments + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 9, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + (4, TType.BYTE, 'responseCode', None, None, ), # 4 + (5, TType.LIST, 'comments', (TType.STRING,None), None, ), # 5 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, responseCode=None, comments=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 9 + self.messageType = messageType + self.domino_udid = domino_udid + self.seq_no = seq_no + self.responseCode = responseCode + self.comments = comments + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.BYTE: + self.responseCode = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.LIST: + self.comments = [] + (_etype59, _size56) = iprot.readListBegin() + for _i60 in xrange(_size56): + _elem61 = iprot.readString() + self.comments.append(_elem61) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('PushResponseMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.responseCode is not None: + oprot.writeFieldBegin('responseCode', TType.BYTE, 4) + oprot.writeByte(self.responseCode) + oprot.writeFieldEnd() + if self.comments is not None: + oprot.writeFieldBegin('comments', TType.LIST, 5) + oprot.writeListBegin(TType.STRING, len(self.comments)) + for iter62 in self.comments: + oprot.writeString(iter62) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.responseCode) + value = (value * 31) ^ hash(self.comments) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class QueryMessage: + """ + Attributes: + - messageType + - domino_udid + - seq_no + - queryString + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 10, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + (4, TType.LIST, 'queryString', (TType.STRING,None), None, ), # 4 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, queryString=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 10 + self.messageType = messageType + self.domino_udid = domino_udid + self.seq_no = seq_no + self.queryString = queryString + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.LIST: + self.queryString = [] + (_etype66, _size63) = iprot.readListBegin() + for _i67 in xrange(_size63): + _elem68 = iprot.readString() + self.queryString.append(_elem68) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('QueryMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.queryString is not None: + oprot.writeFieldBegin('queryString', TType.LIST, 4) + oprot.writeListBegin(TType.STRING, len(self.queryString)) + for iter69 in self.queryString: + oprot.writeString(iter69) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.queryString) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class QueryResponseMessage: + """ + Attributes: + - messageType + - domino_udid + - seq_no + - responseCode + - queryResponse + """ + + thrift_spec = ( + None, # 0 + (1, TType.BYTE, 'messageType', None, 11, ), # 1 + (2, TType.I64, 'domino_udid', None, None, ), # 2 + (3, TType.I64, 'seq_no', None, None, ), # 3 + (4, TType.BYTE, 'responseCode', None, None, ), # 4 + (5, TType.LIST, 'queryResponse', (TType.STRING,None), None, ), # 5 + ) + + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, responseCode=None, queryResponse=None,): + if messageType is self.thrift_spec[1][4]: + messageType = 11 + self.messageType = messageType + self.domino_udid = domino_udid + self.seq_no = seq_no + self.responseCode = responseCode + self.queryResponse = queryResponse + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.BYTE: + self.messageType = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I64: + self.domino_udid = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.seq_no = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.BYTE: + self.responseCode = iprot.readByte() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.LIST: + self.queryResponse = [] + (_etype73, _size70) = iprot.readListBegin() + for _i74 in xrange(_size70): + _elem75 = iprot.readString() + self.queryResponse.append(_elem75) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('QueryResponseMessage') + if self.messageType is not None: + oprot.writeFieldBegin('messageType', TType.BYTE, 1) + oprot.writeByte(self.messageType) + oprot.writeFieldEnd() + if self.domino_udid is not None: + oprot.writeFieldBegin('domino_udid', TType.I64, 2) + oprot.writeI64(self.domino_udid) + oprot.writeFieldEnd() + if self.seq_no is not None: + oprot.writeFieldBegin('seq_no', TType.I64, 3) + oprot.writeI64(self.seq_no) + oprot.writeFieldEnd() + if self.responseCode is not None: + oprot.writeFieldBegin('responseCode', TType.BYTE, 4) + oprot.writeByte(self.responseCode) + oprot.writeFieldEnd() + if self.queryResponse is not None: + oprot.writeFieldBegin('queryResponse', TType.LIST, 5) + oprot.writeListBegin(TType.STRING, len(self.queryResponse)) + for iter76 in self.queryResponse: + oprot.writeString(iter76) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.messageType) + value = (value * 31) ^ hash(self.domino_udid) + value = (value * 31) ^ hash(self.seq_no) + value = (value * 31) ^ hash(self.responseCode) + value = (value * 31) ^ hash(self.queryResponse) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) diff --git a/lib/dominoRPC/ttypes.pyc b/lib/dominoRPC/ttypes.pyc Binary files differnew file mode 100644 index 0000000..be548d1 --- /dev/null +++ b/lib/dominoRPC/ttypes.pyc diff --git a/lib/mapper/__init__.py b/lib/mapper/__init__.py new file mode 100644 index 0000000..75d6ed3 --- /dev/null +++ b/lib/mapper/__init__.py @@ -0,0 +1 @@ +__all__ = ['label'] diff --git a/lib/mapper/__init__.pyc b/lib/mapper/__init__.pyc Binary files differnew file mode 100644 index 0000000..9fda083 --- /dev/null +++ b/lib/mapper/__init__.pyc diff --git a/lib/mapper/label.py b/lib/mapper/label.py new file mode 100644 index 0000000..8591d7c --- /dev/null +++ b/lib/mapper/label.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python + +# +# Licence statement goes here +# + +#from toscaparser.tosca_template import ToscaTemplate + +#Current version: +#Parses policy rules, extracts targets, extracts policy properties +#Returns set of policy properties for each target in a dictionary object +#e.g., node_labels['VNF1'] = {label1, label2, ..., labeln} +def extract_labels(tosca): + node_labels = dict() #stores labels for each node + + if tosca.tpl.has_key('topology_template'): + if tosca.tpl['topology_template'].has_key('policies'): + policies = tosca.tpl['topology_template']['policies'] + else: + return node_labels + else: + return node_labels + + #extract label sets for each policy target + for p in policies: + for rule in p: + targetlist = p[rule]['targets'] + for props in p[rule]['properties']: + prop_list = p[rule]['properties'][props] + for values in prop_list: + labelkey = p[rule]['type']+ ':properties:' + props + ":" + values + for target in targetlist: + if node_labels.has_key(target): + node_labels[target].update(set([labelkey])) + else: + node_labels[target] = set([labelkey]) + return node_labels + +# Returns a map from nodes to regions based on label matching +def map_nodes(site_labels,node_labels): + sitemap = dict() #stores mapping + + #for each target find a map of sites + for node in node_labels: + sitemap[node] = set() + for site in site_labels: + if node_labels[node].issubset(site_labels[site]): + sitemap[node].add(site) + + return sitemap + +# Selects sites for nodes if multiple candidates exist +# First iterate for nodes with single candidate site +# Rank sites with most nodes higher +def select_site( site_map ): + node_site = dict() + counter = dict() + #SHALL I CHECK IF ANY KEY HAS AN EMPTY SET TO THROW EXCEPTION? + #For now, I assume input as safe + + for node in site_map: + node_site[node] = [] + if len(site_map[node]) == 1: + for site in site_map[node]: + node_site[node] = site + if counter.has_key(site): + counter[site] = counter[site] + 1 + else: + counter[site] = 1 + + for node in site_map: + if len(site_map[node]) > 1: + maxval = 0 + maxkey = '-1' + for site in site_map[node]: + if counter.has_key(site) and counter[site] >= maxval: + maxval = counter[site] + maxkey = site + elif counter.has_key(site) == False: + counter[site] = 1 + if maxval == 0: + maxval = 1 + maxkey = site + node_site[node] = maxkey + return node_site diff --git a/lib/mapper/label.pyc b/lib/mapper/label.pyc Binary files differnew file mode 100644 index 0000000..2a223fe --- /dev/null +++ b/lib/mapper/label.pyc diff --git a/lib/partitioner/__init__.py b/lib/partitioner/__init__.py new file mode 100644 index 0000000..15b0269 --- /dev/null +++ b/lib/partitioner/__init__.py @@ -0,0 +1 @@ +__all__ = ['partitioner'] diff --git a/lib/partitioner/__init__.pyc b/lib/partitioner/__init__.pyc Binary files differnew file mode 100644 index 0000000..f380195 --- /dev/null +++ b/lib/partitioner/__init__.pyc diff --git a/lib/partitioner/constants.py b/lib/partitioner/constants.py new file mode 100644 index 0000000..f758e96 --- /dev/null +++ b/lib/partitioner/constants.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python + +# +# Licence statement goes here +# + +tosca_ordered_fields = ['tosca_definitions_version', 'description', 'metadata', 'policy_types', 'topology_template'] diff --git a/lib/partitioner/constants.pyc b/lib/partitioner/constants.pyc Binary files differnew file mode 100644 index 0000000..7d28f20 --- /dev/null +++ b/lib/partitioner/constants.pyc diff --git a/lib/partitioner/partitioner.py b/lib/partitioner/partitioner.py new file mode 100644 index 0000000..a098aca --- /dev/null +++ b/lib/partitioner/partitioner.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python + +# +# Licence statement goes here +# + +import constants +import copy + +def partition_tosca(filepath, nodesite, tpl): + file_paths = {} #holds the list of partitioned files + flag = {} #True when key exists + sitenodes = {} #holds nodes in each site + + #identify the number of partitions + for node in nodesite: + if nodesite[node] != []: + flag[nodesite[node]] = True + if sitenodes.has_key(nodesite[node]): + sitenodes[nodesite[node]].append(node) + else: + sitenodes[nodesite[node]] = [node] + + n_parts = len(flag) + + #prepare the nodes + tpl_local = {} + for site in sitenodes: + tpl_local[site] = copy.deepcopy(tpl) + #remove the nodes not assigned to a site + for node in nodesite: + for site in sitenodes: + if node not in sitenodes[site]: + del tpl_local[site]['topology_template']['node_templates'][node] + #remove from policy targets + if tpl_local[site]['topology_template'].has_key('policies'): + for rule in tpl_local[site]['topology_template']['policies']: + for key in rule: #there should be only one key + if node in rule[key]['targets']: + rule[key]['targets'].remove(node) + # remove the rule is there is no target left! + if len(rule[key]['targets']) is 0: + tpl_local[site]['topology_template']['policies'].remove(rule) + + for site in sitenodes: + tpl_l = tpl_local[site] + print tpl_l , '\n' + file_paths[site] = filepath + '_part' + str(site) + fout = open(filepath + '_part' + str(site),'w') + + if tpl_l.has_key('tosca_definitions_version'): + fout.write('tosca_definitions_version: ') + fout.write(tpl_l['tosca_definitions_version'] + '\n') + write_obj(fout, tpl_l['tosca_definitions_version'], None, ' ') + + fout.write('\n') + + if tpl_l.has_key('description'): + fout.write('description: ') + fout.write(tpl_l['description'] + '\n') + write_obj(fout, tpl_l['description'], None, ' ') + + fout.write('\n') + + if tpl_l.has_key('metadata'): + fout.write('metadata: ' + '\n') + write_obj(fout, tpl_l['metadata'], None, ' ') + + fout.write('\n') + + if tpl_l.has_key('policy_types'): + fout.write('policy_types: ' + '\n') + write_obj(fout, tpl_l['policy_types'], None, ' ') + + fout.write('\n') + + if tpl_l.has_key('topology_template'): + fout.write('topology_template: ' + '\n') + write_obj(fout, tpl_l['topology_template'], None, ' ') + + fout.close() + + return file_paths + + +def write_obj(f, curr, prev, prepad): + if type(curr) in (str,int,float,bool): + #should be a string, numerical, boolean, etc. + if type(prev) is dict: + #write on the same line, key should be written + f.write(' ' + str(curr) + '\n') + + elif type(curr) is dict: + if prev is not None and type(prev) is not list: + f.write('\n') + for key in curr: + if type(prev) is list: + f.write(prepad + '- ' + str(key) + ':') + write_obj(f, curr[key], curr, prepad + ' ') + else: + f.write(prepad + str(key) + ':') + write_obj(f, curr[key], curr, prepad + ' ') + + elif type(curr) is list: + #check if this list is a leaf node + if len(curr) is 0 or type(curr[0]) in (str,int,float,bool): + f.write(' ') + f.write(str(curr).replace("'","")) + #iterate over list of dictionaries + f.write('\n') + for item in curr: + write_obj(f, item, curr, prepad ) diff --git a/lib/partitioner/partitioner.pyc b/lib/partitioner/partitioner.pyc Binary files differnew file mode 100644 index 0000000..7c04d38 --- /dev/null +++ b/lib/partitioner/partitioner.pyc diff --git a/lib/thrift/TMultiplexedProcessor.py b/lib/thrift/TMultiplexedProcessor.py new file mode 100644 index 0000000..a8d5565 --- /dev/null +++ b/lib/thrift/TMultiplexedProcessor.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from thrift.Thrift import TProcessor, TMessageType, TException +from thrift.protocol import TProtocolDecorator, TMultiplexedProtocol + +class TMultiplexedProcessor(TProcessor): + def __init__(self): + self.services = {} + + def registerProcessor(self, serviceName, processor): + self.services[serviceName] = processor + + def process(self, iprot, oprot): + (name, type, seqid) = iprot.readMessageBegin(); + if type != TMessageType.CALL & type != TMessageType.ONEWAY: + raise TException("TMultiplex protocol only supports CALL & ONEWAY") + + index = name.find(TMultiplexedProtocol.SEPARATOR) + if index < 0: + raise TException("Service name not found in message name: " + name + ". Did you forget to use TMultiplexProtocol in your client?") + + serviceName = name[0:index] + call = name[index+len(TMultiplexedProtocol.SEPARATOR):] + if not serviceName in self.services: + raise TException("Service name not found: " + serviceName + ". Did you forget to call registerProcessor()?") + + standardMessage = ( + call, + type, + seqid + ) + return self.services[serviceName].process(StoredMessageProtocol(iprot, standardMessage), oprot) + + +class StoredMessageProtocol(TProtocolDecorator.TProtocolDecorator): + def __init__(self, protocol, messageBegin): + TProtocolDecorator.TProtocolDecorator.__init__(self, protocol) + self.messageBegin = messageBegin + + def readMessageBegin(self): + return self.messageBegin diff --git a/lib/thrift/TSCons.py b/lib/thrift/TSCons.py new file mode 100644 index 0000000..da8d283 --- /dev/null +++ b/lib/thrift/TSCons.py @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from os import path +from SCons.Builder import Builder + + +def scons_env(env, add=''): + opath = path.dirname(path.abspath('$TARGET')) + lstr = 'thrift --gen cpp -o ' + opath + ' ' + add + ' $SOURCE' + cppbuild = Builder(action=lstr) + env.Append(BUILDERS={'ThriftCpp': cppbuild}) + + +def gen_cpp(env, dir, file): + scons_env(env) + suffixes = ['_types.h', '_types.cpp'] + targets = map(lambda s: 'gen-cpp/' + file + s, suffixes) + return env.ThriftCpp(targets, dir + file + '.thrift') diff --git a/lib/thrift/TSerialization.py b/lib/thrift/TSerialization.py new file mode 100644 index 0000000..8a58d89 --- /dev/null +++ b/lib/thrift/TSerialization.py @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from protocol import TBinaryProtocol +from transport import TTransport + + +def serialize(thrift_object, + protocol_factory=TBinaryProtocol.TBinaryProtocolFactory()): + transport = TTransport.TMemoryBuffer() + protocol = protocol_factory.getProtocol(transport) + thrift_object.write(protocol) + return transport.getvalue() + + +def deserialize(base, + buf, + protocol_factory=TBinaryProtocol.TBinaryProtocolFactory()): + transport = TTransport.TMemoryBuffer(buf) + protocol = protocol_factory.getProtocol(transport) + base.read(protocol) + return base diff --git a/lib/thrift/TTornado.py b/lib/thrift/TTornado.py new file mode 100644 index 0000000..ef3e0f2 --- /dev/null +++ b/lib/thrift/TTornado.py @@ -0,0 +1,182 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import +import socket +import struct + +import logging +logger = logging.getLogger(__name__) + +from thrift.transport.TTransport import TTransportException, TTransportBase, TMemoryBuffer + +from io import BytesIO +from collections import deque +from contextlib import contextmanager +from tornado import gen, iostream, ioloop, tcpserver, concurrent + +__all__ = ['TTornadoServer', 'TTornadoStreamTransport'] + + +class _Lock(object): + def __init__(self): + self._waiters = deque() + + def acquired(self): + return len(self._waiters) > 0 + + @gen.coroutine + def acquire(self): + blocker = self._waiters[-1] if self.acquired() else None + future = concurrent.Future() + self._waiters.append(future) + if blocker: + yield blocker + + raise gen.Return(self._lock_context()) + + def release(self): + assert self.acquired(), 'Lock not aquired' + future = self._waiters.popleft() + future.set_result(None) + + @contextmanager + def _lock_context(self): + try: + yield + finally: + self.release() + + +class TTornadoStreamTransport(TTransportBase): + """a framed, buffered transport over a Tornado stream""" + def __init__(self, host, port, stream=None, io_loop=None): + self.host = host + self.port = port + self.io_loop = io_loop or ioloop.IOLoop.current() + self.__wbuf = BytesIO() + self._read_lock = _Lock() + + # servers provide a ready-to-go stream + self.stream = stream + + def with_timeout(self, timeout, future): + return gen.with_timeout(timeout, future, self.io_loop) + + @gen.coroutine + def open(self, timeout=None): + logger.debug('socket connecting') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + self.stream = iostream.IOStream(sock) + + try: + connect = self.stream.connect((self.host, self.port)) + if timeout is not None: + yield self.with_timeout(timeout, connect) + else: + yield connect + except (socket.error, IOError, ioloop.TimeoutError) as e: + message = 'could not connect to {}:{} ({})'.format(self.host, self.port, e) + raise TTransportException( + type=TTransportException.NOT_OPEN, + message=message) + + raise gen.Return(self) + + def set_close_callback(self, callback): + """ + Should be called only after open() returns + """ + self.stream.set_close_callback(callback) + + def close(self): + # don't raise if we intend to close + self.stream.set_close_callback(None) + self.stream.close() + + def read(self, _): + # The generated code for Tornado shouldn't do individual reads -- only + # frames at a time + assert False, "you're doing it wrong" + + @contextmanager + def io_exception_context(self): + try: + yield + except (socket.error, IOError) as e: + raise TTransportException( + type=TTransportException.END_OF_FILE, + message=str(e)) + except iostream.StreamBufferFullError as e: + raise TTransportException( + type=TTransportException.UNKNOWN, + message=str(e)) + + @gen.coroutine + def readFrame(self): + # IOStream processes reads one at a time + with (yield self._read_lock.acquire()): + with self.io_exception_context(): + frame_header = yield self.stream.read_bytes(4) + if len(frame_header) == 0: + raise iostream.StreamClosedError('Read zero bytes from stream') + frame_length, = struct.unpack('!i', frame_header) + frame = yield self.stream.read_bytes(frame_length) + raise gen.Return(frame) + + def write(self, buf): + self.__wbuf.write(buf) + + def flush(self): + frame = self.__wbuf.getvalue() + # reset wbuf before write/flush to preserve state on underlying failure + frame_length = struct.pack('!i', len(frame)) + self.__wbuf = BytesIO() + with self.io_exception_context(): + return self.stream.write(frame_length + frame) + + +class TTornadoServer(tcpserver.TCPServer): + def __init__(self, processor, iprot_factory, oprot_factory=None, + *args, **kwargs): + super(TTornadoServer, self).__init__(*args, **kwargs) + + self._processor = processor + self._iprot_factory = iprot_factory + self._oprot_factory = (oprot_factory if oprot_factory is not None + else iprot_factory) + + @gen.coroutine + def handle_stream(self, stream, address): + host, port = address + trans = TTornadoStreamTransport(host=host, port=port, stream=stream, + io_loop=self.io_loop) + oprot = self._oprot_factory.getProtocol(trans) + + try: + while not trans.stream.closed(): + frame = yield trans.readFrame() + tr = TMemoryBuffer(frame) + iprot = self._iprot_factory.getProtocol(tr) + yield self._processor.process(iprot, oprot) + except Exception: + logger.exception('thrift exception in handle_stream') + trans.close() + + logger.info('client disconnected %s:%d', host, port) diff --git a/lib/thrift/Thrift.py b/lib/thrift/Thrift.py new file mode 100644 index 0000000..9890af7 --- /dev/null +++ b/lib/thrift/Thrift.py @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys + + +class TType: + STOP = 0 + VOID = 1 + BOOL = 2 + BYTE = 3 + I08 = 3 + DOUBLE = 4 + I16 = 6 + I32 = 8 + I64 = 10 + STRING = 11 + UTF7 = 11 + STRUCT = 12 + MAP = 13 + SET = 14 + LIST = 15 + UTF8 = 16 + UTF16 = 17 + + _VALUES_TO_NAMES = ('STOP', + 'VOID', + 'BOOL', + 'BYTE', + 'DOUBLE', + None, + 'I16', + None, + 'I32', + None, + 'I64', + 'STRING', + 'STRUCT', + 'MAP', + 'SET', + 'LIST', + 'UTF8', + 'UTF16') + + +class TMessageType: + CALL = 1 + REPLY = 2 + EXCEPTION = 3 + ONEWAY = 4 + + +class TProcessor: + """Base class for procsessor, which works on two streams.""" + + def process(iprot, oprot): + pass + + +class TException(Exception): + """Base class for all thrift exceptions.""" + + # BaseException.message is deprecated in Python v[2.6,3.0) + if (2, 6, 0) <= sys.version_info < (3, 0): + def _get_message(self): + return self._message + + def _set_message(self, message): + self._message = message + message = property(_get_message, _set_message) + + def __init__(self, message=None): + Exception.__init__(self, message) + self.message = message + + +class TApplicationException(TException): + """Application level thrift exceptions.""" + + UNKNOWN = 0 + UNKNOWN_METHOD = 1 + INVALID_MESSAGE_TYPE = 2 + WRONG_METHOD_NAME = 3 + BAD_SEQUENCE_ID = 4 + MISSING_RESULT = 5 + INTERNAL_ERROR = 6 + PROTOCOL_ERROR = 7 + INVALID_TRANSFORM = 8 + INVALID_PROTOCOL = 9 + UNSUPPORTED_CLIENT_TYPE = 10 + + def __init__(self, type=UNKNOWN, message=None): + TException.__init__(self, message) + self.type = type + + def __str__(self): + if self.message: + return self.message + elif self.type == self.UNKNOWN_METHOD: + return 'Unknown method' + elif self.type == self.INVALID_MESSAGE_TYPE: + return 'Invalid message type' + elif self.type == self.WRONG_METHOD_NAME: + return 'Wrong method name' + elif self.type == self.BAD_SEQUENCE_ID: + return 'Bad sequence ID' + elif self.type == self.MISSING_RESULT: + return 'Missing result' + elif self.type == self.INTERNAL_ERROR: + return 'Internal error' + elif self.type == self.PROTOCOL_ERROR: + return 'Protocol error' + elif self.type == self.INVALID_TRANSFORM: + return 'Invalid transform' + elif self.type == self.INVALID_PROTOCOL: + return 'Invalid protocol' + elif self.type == self.UNSUPPORTED_CLIENT_TYPE: + return 'Unsupported client type' + else: + return 'Default (unknown) TApplicationException' + + def read(self, iprot): + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.message = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I32: + self.type = iprot.readI32() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + oprot.writeStructBegin('TApplicationException') + if self.message is not None: + oprot.writeFieldBegin('message', TType.STRING, 1) + oprot.writeString(self.message) + oprot.writeFieldEnd() + if self.type is not None: + oprot.writeFieldBegin('type', TType.I32, 2) + oprot.writeI32(self.type) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() diff --git a/lib/thrift/Thrift.pyc b/lib/thrift/Thrift.pyc Binary files differnew file mode 100644 index 0000000..941bb77 --- /dev/null +++ b/lib/thrift/Thrift.pyc diff --git a/lib/thrift/__init__.py b/lib/thrift/__init__.py new file mode 100644 index 0000000..48d659c --- /dev/null +++ b/lib/thrift/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +__all__ = ['Thrift', 'TSCons'] diff --git a/lib/thrift/__init__.pyc b/lib/thrift/__init__.pyc Binary files differnew file mode 100644 index 0000000..856c5ca --- /dev/null +++ b/lib/thrift/__init__.pyc diff --git a/lib/thrift/protocol/TBase.py b/lib/thrift/protocol/TBase.py new file mode 100644 index 0000000..6cbd5f3 --- /dev/null +++ b/lib/thrift/protocol/TBase.py @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from thrift.Thrift import * +from thrift.protocol import TBinaryProtocol +from thrift.transport import TTransport + +try: + from thrift.protocol import fastbinary +except: + fastbinary = None + + +class TBase(object): + __slots__ = [] + + def __repr__(self): + L = ['%s=%r' % (key, getattr(self, key)) + for key in self.__slots__] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + for attr in self.__slots__: + my_val = getattr(self, attr) + other_val = getattr(other, attr) + if my_val != other_val: + return False + return True + + def __ne__(self, other): + return not (self == other) + + def read(self, iprot): + if (iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and + isinstance(iprot.trans, TTransport.CReadableTransport) and + self.thrift_spec is not None and + fastbinary is not None): + fastbinary.decode_binary(self, + iprot.trans, + (self.__class__, self.thrift_spec)) + return + iprot.readStruct(self, self.thrift_spec) + + def write(self, oprot): + if (oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and + self.thrift_spec is not None and + fastbinary is not None): + oprot.trans.write( + fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStruct(self, self.thrift_spec) + + +class TExceptionBase(Exception): + # old style class so python2.4 can raise exceptions derived from this + # This can't inherit from TBase because of that limitation. + __slots__ = [] + + __repr__ = TBase.__repr__.im_func + __eq__ = TBase.__eq__.im_func + __ne__ = TBase.__ne__.im_func + read = TBase.read.im_func + write = TBase.write.im_func diff --git a/lib/thrift/protocol/TBinaryProtocol.py b/lib/thrift/protocol/TBinaryProtocol.py new file mode 100644 index 0000000..6fdd08c --- /dev/null +++ b/lib/thrift/protocol/TBinaryProtocol.py @@ -0,0 +1,260 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from TProtocol import * +from struct import pack, unpack + + +class TBinaryProtocol(TProtocolBase): + """Binary implementation of the Thrift protocol driver.""" + + # NastyHaxx. Python 2.4+ on 32-bit machines forces hex constants to be + # positive, converting this into a long. If we hardcode the int value + # instead it'll stay in 32 bit-land. + + # VERSION_MASK = 0xffff0000 + VERSION_MASK = -65536 + + # VERSION_1 = 0x80010000 + VERSION_1 = -2147418112 + + TYPE_MASK = 0x000000ff + + def __init__(self, trans, strictRead=False, strictWrite=True): + TProtocolBase.__init__(self, trans) + self.strictRead = strictRead + self.strictWrite = strictWrite + + def writeMessageBegin(self, name, type, seqid): + if self.strictWrite: + self.writeI32(TBinaryProtocol.VERSION_1 | type) + self.writeString(name) + self.writeI32(seqid) + else: + self.writeString(name) + self.writeByte(type) + self.writeI32(seqid) + + def writeMessageEnd(self): + pass + + def writeStructBegin(self, name): + pass + + def writeStructEnd(self): + pass + + def writeFieldBegin(self, name, type, id): + self.writeByte(type) + self.writeI16(id) + + def writeFieldEnd(self): + pass + + def writeFieldStop(self): + self.writeByte(TType.STOP) + + def writeMapBegin(self, ktype, vtype, size): + self.writeByte(ktype) + self.writeByte(vtype) + self.writeI32(size) + + def writeMapEnd(self): + pass + + def writeListBegin(self, etype, size): + self.writeByte(etype) + self.writeI32(size) + + def writeListEnd(self): + pass + + def writeSetBegin(self, etype, size): + self.writeByte(etype) + self.writeI32(size) + + def writeSetEnd(self): + pass + + def writeBool(self, bool): + if bool: + self.writeByte(1) + else: + self.writeByte(0) + + def writeByte(self, byte): + buff = pack("!b", byte) + self.trans.write(buff) + + def writeI16(self, i16): + buff = pack("!h", i16) + self.trans.write(buff) + + def writeI32(self, i32): + buff = pack("!i", i32) + self.trans.write(buff) + + def writeI64(self, i64): + buff = pack("!q", i64) + self.trans.write(buff) + + def writeDouble(self, dub): + buff = pack("!d", dub) + self.trans.write(buff) + + def writeString(self, str): + self.writeI32(len(str)) + self.trans.write(str) + + def readMessageBegin(self): + sz = self.readI32() + if sz < 0: + version = sz & TBinaryProtocol.VERSION_MASK + if version != TBinaryProtocol.VERSION_1: + raise TProtocolException( + type=TProtocolException.BAD_VERSION, + message='Bad version in readMessageBegin: %d' % (sz)) + type = sz & TBinaryProtocol.TYPE_MASK + name = self.readString() + seqid = self.readI32() + else: + if self.strictRead: + raise TProtocolException(type=TProtocolException.BAD_VERSION, + message='No protocol version header') + name = self.trans.readAll(sz) + type = self.readByte() + seqid = self.readI32() + return (name, type, seqid) + + def readMessageEnd(self): + pass + + def readStructBegin(self): + pass + + def readStructEnd(self): + pass + + def readFieldBegin(self): + type = self.readByte() + if type == TType.STOP: + return (None, type, 0) + id = self.readI16() + return (None, type, id) + + def readFieldEnd(self): + pass + + def readMapBegin(self): + ktype = self.readByte() + vtype = self.readByte() + size = self.readI32() + return (ktype, vtype, size) + + def readMapEnd(self): + pass + + def readListBegin(self): + etype = self.readByte() + size = self.readI32() + return (etype, size) + + def readListEnd(self): + pass + + def readSetBegin(self): + etype = self.readByte() + size = self.readI32() + return (etype, size) + + def readSetEnd(self): + pass + + def readBool(self): + byte = self.readByte() + if byte == 0: + return False + return True + + def readByte(self): + buff = self.trans.readAll(1) + val, = unpack('!b', buff) + return val + + def readI16(self): + buff = self.trans.readAll(2) + val, = unpack('!h', buff) + return val + + def readI32(self): + buff = self.trans.readAll(4) + val, = unpack('!i', buff) + return val + + def readI64(self): + buff = self.trans.readAll(8) + val, = unpack('!q', buff) + return val + + def readDouble(self): + buff = self.trans.readAll(8) + val, = unpack('!d', buff) + return val + + def readString(self): + len = self.readI32() + str = self.trans.readAll(len) + return str + + +class TBinaryProtocolFactory: + def __init__(self, strictRead=False, strictWrite=True): + self.strictRead = strictRead + self.strictWrite = strictWrite + + def getProtocol(self, trans): + prot = TBinaryProtocol(trans, self.strictRead, self.strictWrite) + return prot + + +class TBinaryProtocolAccelerated(TBinaryProtocol): + """C-Accelerated version of TBinaryProtocol. + + This class does not override any of TBinaryProtocol's methods, + but the generated code recognizes it directly and will call into + our C module to do the encoding, bypassing this object entirely. + We inherit from TBinaryProtocol so that the normal TBinaryProtocol + encoding can happen if the fastbinary module doesn't work for some + reason. (TODO(dreiss): Make this happen sanely in more cases.) + + In order to take advantage of the C module, just use + TBinaryProtocolAccelerated instead of TBinaryProtocol. + + NOTE: This code was contributed by an external developer. + The internal Thrift team has reviewed and tested it, + but we cannot guarantee that it is production-ready. + Please feel free to report bugs and/or success stories + to the public mailing list. + """ + pass + + +class TBinaryProtocolAcceleratedFactory: + def getProtocol(self, trans): + return TBinaryProtocolAccelerated(trans) diff --git a/lib/thrift/protocol/TBinaryProtocol.pyc b/lib/thrift/protocol/TBinaryProtocol.pyc Binary files differnew file mode 100644 index 0000000..8c89556 --- /dev/null +++ b/lib/thrift/protocol/TBinaryProtocol.pyc diff --git a/lib/thrift/protocol/TCompactProtocol.py b/lib/thrift/protocol/TCompactProtocol.py new file mode 100644 index 0000000..7054ab0 --- /dev/null +++ b/lib/thrift/protocol/TCompactProtocol.py @@ -0,0 +1,405 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from TProtocol import * +from struct import pack, unpack + +__all__ = ['TCompactProtocol', 'TCompactProtocolFactory'] + +CLEAR = 0 +FIELD_WRITE = 1 +VALUE_WRITE = 2 +CONTAINER_WRITE = 3 +BOOL_WRITE = 4 +FIELD_READ = 5 +CONTAINER_READ = 6 +VALUE_READ = 7 +BOOL_READ = 8 + + +def make_helper(v_from, container): + def helper(func): + def nested(self, *args, **kwargs): + assert self.state in (v_from, container), (self.state, v_from, container) + return func(self, *args, **kwargs) + return nested + return helper +writer = make_helper(VALUE_WRITE, CONTAINER_WRITE) +reader = make_helper(VALUE_READ, CONTAINER_READ) + + +def makeZigZag(n, bits): + checkIntegerLimits(n, bits) + return (n << 1) ^ (n >> (bits - 1)) + + +def fromZigZag(n): + return (n >> 1) ^ -(n & 1) + + +def writeVarint(trans, n): + out = [] + while True: + if n & ~0x7f == 0: + out.append(n) + break + else: + out.append((n & 0xff) | 0x80) + n = n >> 7 + trans.write(''.join(map(chr, out))) + + +def readVarint(trans): + result = 0 + shift = 0 + while True: + x = trans.readAll(1) + byte = ord(x) + result |= (byte & 0x7f) << shift + if byte >> 7 == 0: + return result + shift += 7 + + +class CompactType: + STOP = 0x00 + TRUE = 0x01 + FALSE = 0x02 + BYTE = 0x03 + I16 = 0x04 + I32 = 0x05 + I64 = 0x06 + DOUBLE = 0x07 + BINARY = 0x08 + LIST = 0x09 + SET = 0x0A + MAP = 0x0B + STRUCT = 0x0C + +CTYPES = {TType.STOP: CompactType.STOP, + TType.BOOL: CompactType.TRUE, # used for collection + TType.BYTE: CompactType.BYTE, + TType.I16: CompactType.I16, + TType.I32: CompactType.I32, + TType.I64: CompactType.I64, + TType.DOUBLE: CompactType.DOUBLE, + TType.STRING: CompactType.BINARY, + TType.STRUCT: CompactType.STRUCT, + TType.LIST: CompactType.LIST, + TType.SET: CompactType.SET, + TType.MAP: CompactType.MAP + } + +TTYPES = {} +for k, v in CTYPES.items(): + TTYPES[v] = k +TTYPES[CompactType.FALSE] = TType.BOOL +del k +del v + + +class TCompactProtocol(TProtocolBase): + """Compact implementation of the Thrift protocol driver.""" + + PROTOCOL_ID = 0x82 + VERSION = 1 + VERSION_MASK = 0x1f + TYPE_MASK = 0xe0 + TYPE_BITS = 0x07 + TYPE_SHIFT_AMOUNT = 5 + + def __init__(self, trans): + TProtocolBase.__init__(self, trans) + self.state = CLEAR + self.__last_fid = 0 + self.__bool_fid = None + self.__bool_value = None + self.__structs = [] + self.__containers = [] + + def __writeVarint(self, n): + writeVarint(self.trans, n) + + def writeMessageBegin(self, name, type, seqid): + assert self.state == CLEAR + self.__writeUByte(self.PROTOCOL_ID) + self.__writeUByte(self.VERSION | (type << self.TYPE_SHIFT_AMOUNT)) + self.__writeVarint(seqid) + self.__writeString(name) + self.state = VALUE_WRITE + + def writeMessageEnd(self): + assert self.state == VALUE_WRITE + self.state = CLEAR + + def writeStructBegin(self, name): + assert self.state in (CLEAR, CONTAINER_WRITE, VALUE_WRITE), self.state + self.__structs.append((self.state, self.__last_fid)) + self.state = FIELD_WRITE + self.__last_fid = 0 + + def writeStructEnd(self): + assert self.state == FIELD_WRITE + self.state, self.__last_fid = self.__structs.pop() + + def writeFieldStop(self): + self.__writeByte(0) + + def __writeFieldHeader(self, type, fid): + delta = fid - self.__last_fid + if 0 < delta <= 15: + self.__writeUByte(delta << 4 | type) + else: + self.__writeByte(type) + self.__writeI16(fid) + self.__last_fid = fid + + def writeFieldBegin(self, name, type, fid): + assert self.state == FIELD_WRITE, self.state + if type == TType.BOOL: + self.state = BOOL_WRITE + self.__bool_fid = fid + else: + self.state = VALUE_WRITE + self.__writeFieldHeader(CTYPES[type], fid) + + def writeFieldEnd(self): + assert self.state in (VALUE_WRITE, BOOL_WRITE), self.state + self.state = FIELD_WRITE + + def __writeUByte(self, byte): + self.trans.write(pack('!B', byte)) + + def __writeByte(self, byte): + self.trans.write(pack('!b', byte)) + + def __writeI16(self, i16): + self.__writeVarint(makeZigZag(i16, 16)) + + def __writeSize(self, i32): + self.__writeVarint(i32) + + def writeCollectionBegin(self, etype, size): + assert self.state in (VALUE_WRITE, CONTAINER_WRITE), self.state + if size <= 14: + self.__writeUByte(size << 4 | CTYPES[etype]) + else: + self.__writeUByte(0xf0 | CTYPES[etype]) + self.__writeSize(size) + self.__containers.append(self.state) + self.state = CONTAINER_WRITE + writeSetBegin = writeCollectionBegin + writeListBegin = writeCollectionBegin + + def writeMapBegin(self, ktype, vtype, size): + assert self.state in (VALUE_WRITE, CONTAINER_WRITE), self.state + if size == 0: + self.__writeByte(0) + else: + self.__writeSize(size) + self.__writeUByte(CTYPES[ktype] << 4 | CTYPES[vtype]) + self.__containers.append(self.state) + self.state = CONTAINER_WRITE + + def writeCollectionEnd(self): + assert self.state == CONTAINER_WRITE, self.state + self.state = self.__containers.pop() + writeMapEnd = writeCollectionEnd + writeSetEnd = writeCollectionEnd + writeListEnd = writeCollectionEnd + + def writeBool(self, bool): + if self.state == BOOL_WRITE: + if bool: + ctype = CompactType.TRUE + else: + ctype = CompactType.FALSE + self.__writeFieldHeader(ctype, self.__bool_fid) + elif self.state == CONTAINER_WRITE: + if bool: + self.__writeByte(CompactType.TRUE) + else: + self.__writeByte(CompactType.FALSE) + else: + raise AssertionError("Invalid state in compact protocol") + + writeByte = writer(__writeByte) + writeI16 = writer(__writeI16) + + @writer + def writeI32(self, i32): + self.__writeVarint(makeZigZag(i32, 32)) + + @writer + def writeI64(self, i64): + self.__writeVarint(makeZigZag(i64, 64)) + + @writer + def writeDouble(self, dub): + self.trans.write(pack('<d', dub)) + + def __writeString(self, s): + self.__writeSize(len(s)) + self.trans.write(s) + writeString = writer(__writeString) + + def readFieldBegin(self): + assert self.state == FIELD_READ, self.state + type = self.__readUByte() + if type & 0x0f == TType.STOP: + return (None, 0, 0) + delta = type >> 4 + if delta == 0: + fid = self.__readI16() + else: + fid = self.__last_fid + delta + self.__last_fid = fid + type = type & 0x0f + if type == CompactType.TRUE: + self.state = BOOL_READ + self.__bool_value = True + elif type == CompactType.FALSE: + self.state = BOOL_READ + self.__bool_value = False + else: + self.state = VALUE_READ + return (None, self.__getTType(type), fid) + + def readFieldEnd(self): + assert self.state in (VALUE_READ, BOOL_READ), self.state + self.state = FIELD_READ + + def __readUByte(self): + result, = unpack('!B', self.trans.readAll(1)) + return result + + def __readByte(self): + result, = unpack('!b', self.trans.readAll(1)) + return result + + def __readVarint(self): + return readVarint(self.trans) + + def __readZigZag(self): + return fromZigZag(self.__readVarint()) + + def __readSize(self): + result = self.__readVarint() + if result < 0: + raise TException("Length < 0") + return result + + def readMessageBegin(self): + assert self.state == CLEAR + proto_id = self.__readUByte() + if proto_id != self.PROTOCOL_ID: + raise TProtocolException(TProtocolException.BAD_VERSION, + 'Bad protocol id in the message: %d' % proto_id) + ver_type = self.__readUByte() + type = (ver_type >> self.TYPE_SHIFT_AMOUNT) & self.TYPE_BITS + version = ver_type & self.VERSION_MASK + if version != self.VERSION: + raise TProtocolException(TProtocolException.BAD_VERSION, + 'Bad version: %d (expect %d)' % (version, self.VERSION)) + seqid = self.__readVarint() + name = self.__readString() + return (name, type, seqid) + + def readMessageEnd(self): + assert self.state == CLEAR + assert len(self.__structs) == 0 + + def readStructBegin(self): + assert self.state in (CLEAR, CONTAINER_READ, VALUE_READ), self.state + self.__structs.append((self.state, self.__last_fid)) + self.state = FIELD_READ + self.__last_fid = 0 + + def readStructEnd(self): + assert self.state == FIELD_READ + self.state, self.__last_fid = self.__structs.pop() + + def readCollectionBegin(self): + assert self.state in (VALUE_READ, CONTAINER_READ), self.state + size_type = self.__readUByte() + size = size_type >> 4 + type = self.__getTType(size_type) + if size == 15: + size = self.__readSize() + self.__containers.append(self.state) + self.state = CONTAINER_READ + return type, size + readSetBegin = readCollectionBegin + readListBegin = readCollectionBegin + + def readMapBegin(self): + assert self.state in (VALUE_READ, CONTAINER_READ), self.state + size = self.__readSize() + types = 0 + if size > 0: + types = self.__readUByte() + vtype = self.__getTType(types) + ktype = self.__getTType(types >> 4) + self.__containers.append(self.state) + self.state = CONTAINER_READ + return (ktype, vtype, size) + + def readCollectionEnd(self): + assert self.state == CONTAINER_READ, self.state + self.state = self.__containers.pop() + readSetEnd = readCollectionEnd + readListEnd = readCollectionEnd + readMapEnd = readCollectionEnd + + def readBool(self): + if self.state == BOOL_READ: + return self.__bool_value == CompactType.TRUE + elif self.state == CONTAINER_READ: + return self.__readByte() == CompactType.TRUE + else: + raise AssertionError("Invalid state in compact protocol: %d" % + self.state) + + readByte = reader(__readByte) + __readI16 = __readZigZag + readI16 = reader(__readZigZag) + readI32 = reader(__readZigZag) + readI64 = reader(__readZigZag) + + @reader + def readDouble(self): + buff = self.trans.readAll(8) + val, = unpack('<d', buff) + return val + + def __readString(self): + len = self.__readSize() + return self.trans.readAll(len) + readString = reader(__readString) + + def __getTType(self, byte): + return TTYPES[byte & 0x0f] + + +class TCompactProtocolFactory: + def __init__(self): + pass + + def getProtocol(self, trans): + return TCompactProtocol(trans) diff --git a/lib/thrift/protocol/TJSONProtocol.py b/lib/thrift/protocol/TJSONProtocol.py new file mode 100644 index 0000000..9c1877b --- /dev/null +++ b/lib/thrift/protocol/TJSONProtocol.py @@ -0,0 +1,569 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from TProtocol import TType, TProtocolBase, TProtocolException, \ + checkIntegerLimits +import base64 +import json +import math + +__all__ = ['TJSONProtocol', + 'TJSONProtocolFactory', + 'TSimpleJSONProtocol', + 'TSimpleJSONProtocolFactory'] + +VERSION = 1 + +COMMA = ',' +COLON = ':' +LBRACE = '{' +RBRACE = '}' +LBRACKET = '[' +RBRACKET = ']' +QUOTE = '"' +BACKSLASH = '\\' +ZERO = '0' + +ESCSEQ = '\\u00' +ESCAPE_CHAR = '"\\bfnrt' +ESCAPE_CHAR_VALS = ['"', '\\', '\b', '\f', '\n', '\r', '\t'] +NUMERIC_CHAR = '+-.0123456789Ee' + +CTYPES = {TType.BOOL: 'tf', + TType.BYTE: 'i8', + TType.I16: 'i16', + TType.I32: 'i32', + TType.I64: 'i64', + TType.DOUBLE: 'dbl', + TType.STRING: 'str', + TType.STRUCT: 'rec', + TType.LIST: 'lst', + TType.SET: 'set', + TType.MAP: 'map'} + +JTYPES = {} +for key in CTYPES.keys(): + JTYPES[CTYPES[key]] = key + + +class JSONBaseContext(object): + + def __init__(self, protocol): + self.protocol = protocol + self.first = True + + def doIO(self, function): + pass + + def write(self): + pass + + def read(self): + pass + + def escapeNum(self): + return False + + def __str__(self): + return self.__class__.__name__ + + +class JSONListContext(JSONBaseContext): + + def doIO(self, function): + if self.first is True: + self.first = False + else: + function(COMMA) + + def write(self): + self.doIO(self.protocol.trans.write) + + def read(self): + self.doIO(self.protocol.readJSONSyntaxChar) + + +class JSONPairContext(JSONBaseContext): + + def __init__(self, protocol): + super(JSONPairContext, self).__init__(protocol) + self.colon = True + + def doIO(self, function): + if self.first: + self.first = False + self.colon = True + else: + function(COLON if self.colon else COMMA) + self.colon = not self.colon + + def write(self): + self.doIO(self.protocol.trans.write) + + def read(self): + self.doIO(self.protocol.readJSONSyntaxChar) + + def escapeNum(self): + return self.colon + + def __str__(self): + return '%s, colon=%s' % (self.__class__.__name__, self.colon) + + +class LookaheadReader(): + hasData = False + data = '' + + def __init__(self, protocol): + self.protocol = protocol + + def read(self): + if self.hasData is True: + self.hasData = False + else: + self.data = self.protocol.trans.read(1) + return self.data + + def peek(self): + if self.hasData is False: + self.data = self.protocol.trans.read(1) + self.hasData = True + return self.data + +class TJSONProtocolBase(TProtocolBase): + + def __init__(self, trans): + TProtocolBase.__init__(self, trans) + self.resetWriteContext() + self.resetReadContext() + + def resetWriteContext(self): + self.context = JSONBaseContext(self) + self.contextStack = [self.context] + + def resetReadContext(self): + self.resetWriteContext() + self.reader = LookaheadReader(self) + + def pushContext(self, ctx): + self.contextStack.append(ctx) + self.context = ctx + + def popContext(self): + self.contextStack.pop() + if self.contextStack: + self.context = self.contextStack[-1] + else: + self.context = JSONBaseContext(self) + + def writeJSONString(self, string): + self.context.write() + self.trans.write(json.dumps(string)) + + def writeJSONNumber(self, number): + self.context.write() + jsNumber = str(number) + if self.context.escapeNum(): + jsNumber = "%s%s%s" % (QUOTE, jsNumber, QUOTE) + self.trans.write(jsNumber) + + def writeJSONBase64(self, binary): + self.context.write() + self.trans.write(QUOTE) + self.trans.write(base64.b64encode(binary)) + self.trans.write(QUOTE) + + def writeJSONObjectStart(self): + self.context.write() + self.trans.write(LBRACE) + self.pushContext(JSONPairContext(self)) + + def writeJSONObjectEnd(self): + self.popContext() + self.trans.write(RBRACE) + + def writeJSONArrayStart(self): + self.context.write() + self.trans.write(LBRACKET) + self.pushContext(JSONListContext(self)) + + def writeJSONArrayEnd(self): + self.popContext() + self.trans.write(RBRACKET) + + def readJSONSyntaxChar(self, character): + current = self.reader.read() + if character != current: + raise TProtocolException(TProtocolException.INVALID_DATA, + "Unexpected character: %s" % current) + + def readJSONString(self, skipContext): + string = [] + if skipContext is False: + self.context.read() + self.readJSONSyntaxChar(QUOTE) + while True: + character = self.reader.read() + if character == QUOTE: + break + if character == ESCSEQ[0]: + character = self.reader.read() + if character == ESCSEQ[1]: + self.readJSONSyntaxChar(ZERO) + self.readJSONSyntaxChar(ZERO) + character = json.JSONDecoder().decode('"\u00%s"' % self.trans.read(2)) + else: + off = ESCAPE_CHAR.find(character) + if off == -1: + raise TProtocolException(TProtocolException.INVALID_DATA, + "Expected control char") + character = ESCAPE_CHAR_VALS[off] + string.append(character) + return ''.join(string) + + def isJSONNumeric(self, character): + return (True if NUMERIC_CHAR.find(character) != - 1 else False) + + def readJSONQuotes(self): + if (self.context.escapeNum()): + self.readJSONSyntaxChar(QUOTE) + + def readJSONNumericChars(self): + numeric = [] + while True: + character = self.reader.peek() + if self.isJSONNumeric(character) is False: + break + numeric.append(self.reader.read()) + return ''.join(numeric) + + def readJSONInteger(self): + self.context.read() + self.readJSONQuotes() + numeric = self.readJSONNumericChars() + self.readJSONQuotes() + try: + return int(numeric) + except ValueError: + raise TProtocolException(TProtocolException.INVALID_DATA, + "Bad data encounted in numeric data") + + def readJSONDouble(self): + self.context.read() + if self.reader.peek() == QUOTE: + string = self.readJSONString(True) + try: + double = float(string) + if (self.context.escapeNum is False and + not math.isinf(double) and + not math.isnan(double)): + raise TProtocolException(TProtocolException.INVALID_DATA, + "Numeric data unexpectedly quoted") + return double + except ValueError: + raise TProtocolException(TProtocolException.INVALID_DATA, + "Bad data encounted in numeric data") + else: + if self.context.escapeNum() is True: + self.readJSONSyntaxChar(QUOTE) + try: + return float(self.readJSONNumericChars()) + except ValueError: + raise TProtocolException(TProtocolException.INVALID_DATA, + "Bad data encounted in numeric data") + + def readJSONBase64(self): + string = self.readJSONString(False) + return base64.b64decode(string) + + def readJSONObjectStart(self): + self.context.read() + self.readJSONSyntaxChar(LBRACE) + self.pushContext(JSONPairContext(self)) + + def readJSONObjectEnd(self): + self.readJSONSyntaxChar(RBRACE) + self.popContext() + + def readJSONArrayStart(self): + self.context.read() + self.readJSONSyntaxChar(LBRACKET) + self.pushContext(JSONListContext(self)) + + def readJSONArrayEnd(self): + self.readJSONSyntaxChar(RBRACKET) + self.popContext() + + +class TJSONProtocol(TJSONProtocolBase): + + def readMessageBegin(self): + self.resetReadContext() + self.readJSONArrayStart() + if self.readJSONInteger() != VERSION: + raise TProtocolException(TProtocolException.BAD_VERSION, + "Message contained bad version.") + name = self.readJSONString(False) + typen = self.readJSONInteger() + seqid = self.readJSONInteger() + return (name, typen, seqid) + + def readMessageEnd(self): + self.readJSONArrayEnd() + + def readStructBegin(self): + self.readJSONObjectStart() + + def readStructEnd(self): + self.readJSONObjectEnd() + + def readFieldBegin(self): + character = self.reader.peek() + ttype = 0 + id = 0 + if character == RBRACE: + ttype = TType.STOP + else: + id = self.readJSONInteger() + self.readJSONObjectStart() + ttype = JTYPES[self.readJSONString(False)] + return (None, ttype, id) + + def readFieldEnd(self): + self.readJSONObjectEnd() + + def readMapBegin(self): + self.readJSONArrayStart() + keyType = JTYPES[self.readJSONString(False)] + valueType = JTYPES[self.readJSONString(False)] + size = self.readJSONInteger() + self.readJSONObjectStart() + return (keyType, valueType, size) + + def readMapEnd(self): + self.readJSONObjectEnd() + self.readJSONArrayEnd() + + def readCollectionBegin(self): + self.readJSONArrayStart() + elemType = JTYPES[self.readJSONString(False)] + size = self.readJSONInteger() + return (elemType, size) + readListBegin = readCollectionBegin + readSetBegin = readCollectionBegin + + def readCollectionEnd(self): + self.readJSONArrayEnd() + readSetEnd = readCollectionEnd + readListEnd = readCollectionEnd + + def readBool(self): + return (False if self.readJSONInteger() == 0 else True) + + def readNumber(self): + return self.readJSONInteger() + readByte = readNumber + readI16 = readNumber + readI32 = readNumber + readI64 = readNumber + + def readDouble(self): + return self.readJSONDouble() + + def readString(self): + return self.readJSONString(False) + + def readBinary(self): + return self.readJSONBase64() + + def writeMessageBegin(self, name, request_type, seqid): + self.resetWriteContext() + self.writeJSONArrayStart() + self.writeJSONNumber(VERSION) + self.writeJSONString(name) + self.writeJSONNumber(request_type) + self.writeJSONNumber(seqid) + + def writeMessageEnd(self): + self.writeJSONArrayEnd() + + def writeStructBegin(self, name): + self.writeJSONObjectStart() + + def writeStructEnd(self): + self.writeJSONObjectEnd() + + def writeFieldBegin(self, name, ttype, id): + self.writeJSONNumber(id) + self.writeJSONObjectStart() + self.writeJSONString(CTYPES[ttype]) + + def writeFieldEnd(self): + self.writeJSONObjectEnd() + + def writeFieldStop(self): + pass + + def writeMapBegin(self, ktype, vtype, size): + self.writeJSONArrayStart() + self.writeJSONString(CTYPES[ktype]) + self.writeJSONString(CTYPES[vtype]) + self.writeJSONNumber(size) + self.writeJSONObjectStart() + + def writeMapEnd(self): + self.writeJSONObjectEnd() + self.writeJSONArrayEnd() + + def writeListBegin(self, etype, size): + self.writeJSONArrayStart() + self.writeJSONString(CTYPES[etype]) + self.writeJSONNumber(size) + + def writeListEnd(self): + self.writeJSONArrayEnd() + + def writeSetBegin(self, etype, size): + self.writeJSONArrayStart() + self.writeJSONString(CTYPES[etype]) + self.writeJSONNumber(size) + + def writeSetEnd(self): + self.writeJSONArrayEnd() + + def writeBool(self, boolean): + self.writeJSONNumber(1 if boolean is True else 0) + + def writeByte(self, byte): + checkIntegerLimits(byte, 8) + self.writeJSONNumber(byte) + + def writeI16(self, i16): + checkIntegerLimits(i16, 16) + self.writeJSONNumber(i16) + + def writeI32(self, i32): + checkIntegerLimits(i32, 32) + self.writeJSONNumber(i32) + + def writeI64(self, i64): + checkIntegerLimits(i64, 64) + self.writeJSONNumber(i64) + + def writeDouble(self, dbl): + self.writeJSONNumber(dbl) + + def writeString(self, string): + self.writeJSONString(string) + + def writeBinary(self, binary): + self.writeJSONBase64(binary) + + +class TJSONProtocolFactory: + + def getProtocol(self, trans): + return TJSONProtocol(trans) + + +class TSimpleJSONProtocol(TJSONProtocolBase): + """Simple, readable, write-only JSON protocol. + + Useful for interacting with scripting languages. + """ + + def readMessageBegin(self): + raise NotImplementedError() + + def readMessageEnd(self): + raise NotImplementedError() + + def readStructBegin(self): + raise NotImplementedError() + + def readStructEnd(self): + raise NotImplementedError() + + def writeMessageBegin(self, name, request_type, seqid): + self.resetWriteContext() + + def writeMessageEnd(self): + pass + + def writeStructBegin(self, name): + self.writeJSONObjectStart() + + def writeStructEnd(self): + self.writeJSONObjectEnd() + + def writeFieldBegin(self, name, ttype, fid): + self.writeJSONString(name) + + def writeFieldEnd(self): + pass + + def writeMapBegin(self, ktype, vtype, size): + self.writeJSONObjectStart() + + def writeMapEnd(self): + self.writeJSONObjectEnd() + + def _writeCollectionBegin(self, etype, size): + self.writeJSONArrayStart() + + def _writeCollectionEnd(self): + self.writeJSONArrayEnd() + writeListBegin = _writeCollectionBegin + writeListEnd = _writeCollectionEnd + writeSetBegin = _writeCollectionBegin + writeSetEnd = _writeCollectionEnd + + def writeByte(self, byte): + checkIntegerLimits(byte, 8) + self.writeJSONNumber(byte) + + def writeI16(self, i16): + checkIntegerLimits(i16, 16) + self.writeJSONNumber(i16) + + def writeI32(self, i32): + checkIntegerLimits(i32, 32) + self.writeJSONNumber(i32) + + def writeI64(self, i64): + checkIntegerLimits(i64, 64) + self.writeJSONNumber(i64) + + def writeBool(self, boolean): + self.writeJSONNumber(1 if boolean is True else 0) + + def writeDouble(self, dbl): + self.writeJSONNumber(dbl) + + def writeString(self, string): + self.writeJSONString(string) + + def writeBinary(self, binary): + self.writeJSONBase64(binary) + + +class TSimpleJSONProtocolFactory(object): + + def getProtocol(self, trans): + return TSimpleJSONProtocol(trans) diff --git a/lib/thrift/protocol/TMultiplexedProtocol.py b/lib/thrift/protocol/TMultiplexedProtocol.py new file mode 100644 index 0000000..d25f367 --- /dev/null +++ b/lib/thrift/protocol/TMultiplexedProtocol.py @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from thrift.Thrift import TMessageType +from thrift.protocol import TProtocolDecorator + +SEPARATOR = ":" + +class TMultiplexedProtocol(TProtocolDecorator.TProtocolDecorator): + def __init__(self, protocol, serviceName): + TProtocolDecorator.TProtocolDecorator.__init__(self, protocol) + self.serviceName = serviceName + + def writeMessageBegin(self, name, type, seqid): + if (type == TMessageType.CALL or + type == TMessageType.ONEWAY): + self.protocol.writeMessageBegin( + self.serviceName + SEPARATOR + name, + type, + seqid + ) + else: + self.protocol.writeMessageBegin(name, type, seqid) diff --git a/lib/thrift/protocol/TProtocol.py b/lib/thrift/protocol/TProtocol.py new file mode 100644 index 0000000..311a635 --- /dev/null +++ b/lib/thrift/protocol/TProtocol.py @@ -0,0 +1,421 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from thrift.Thrift import * + + +class TProtocolException(TException): + """Custom Protocol Exception class""" + + UNKNOWN = 0 + INVALID_DATA = 1 + NEGATIVE_SIZE = 2 + SIZE_LIMIT = 3 + BAD_VERSION = 4 + NOT_IMPLEMENTED = 5 + DEPTH_LIMIT = 6 + + def __init__(self, type=UNKNOWN, message=None): + TException.__init__(self, message) + self.type = type + + +class TProtocolBase: + """Base class for Thrift protocol driver.""" + + def __init__(self, trans): + self.trans = trans + + def writeMessageBegin(self, name, ttype, seqid): + pass + + def writeMessageEnd(self): + pass + + def writeStructBegin(self, name): + pass + + def writeStructEnd(self): + pass + + def writeFieldBegin(self, name, ttype, fid): + pass + + def writeFieldEnd(self): + pass + + def writeFieldStop(self): + pass + + def writeMapBegin(self, ktype, vtype, size): + pass + + def writeMapEnd(self): + pass + + def writeListBegin(self, etype, size): + pass + + def writeListEnd(self): + pass + + def writeSetBegin(self, etype, size): + pass + + def writeSetEnd(self): + pass + + def writeBool(self, bool_val): + pass + + def writeByte(self, byte): + pass + + def writeI16(self, i16): + pass + + def writeI32(self, i32): + pass + + def writeI64(self, i64): + pass + + def writeDouble(self, dub): + pass + + def writeString(self, str_val): + pass + + def readMessageBegin(self): + pass + + def readMessageEnd(self): + pass + + def readStructBegin(self): + pass + + def readStructEnd(self): + pass + + def readFieldBegin(self): + pass + + def readFieldEnd(self): + pass + + def readMapBegin(self): + pass + + def readMapEnd(self): + pass + + def readListBegin(self): + pass + + def readListEnd(self): + pass + + def readSetBegin(self): + pass + + def readSetEnd(self): + pass + + def readBool(self): + pass + + def readByte(self): + pass + + def readI16(self): + pass + + def readI32(self): + pass + + def readI64(self): + pass + + def readDouble(self): + pass + + def readString(self): + pass + + def skip(self, ttype): + if ttype == TType.STOP: + return + elif ttype == TType.BOOL: + self.readBool() + elif ttype == TType.BYTE: + self.readByte() + elif ttype == TType.I16: + self.readI16() + elif ttype == TType.I32: + self.readI32() + elif ttype == TType.I64: + self.readI64() + elif ttype == TType.DOUBLE: + self.readDouble() + elif ttype == TType.STRING: + self.readString() + elif ttype == TType.STRUCT: + name = self.readStructBegin() + while True: + (name, ttype, id) = self.readFieldBegin() + if ttype == TType.STOP: + break + self.skip(ttype) + self.readFieldEnd() + self.readStructEnd() + elif ttype == TType.MAP: + (ktype, vtype, size) = self.readMapBegin() + for i in xrange(size): + self.skip(ktype) + self.skip(vtype) + self.readMapEnd() + elif ttype == TType.SET: + (etype, size) = self.readSetBegin() + for i in xrange(size): + self.skip(etype) + self.readSetEnd() + elif ttype == TType.LIST: + (etype, size) = self.readListBegin() + for i in xrange(size): + self.skip(etype) + self.readListEnd() + + # tuple of: ( 'reader method' name, is_container bool, 'writer_method' name ) + _TTYPE_HANDLERS = ( + (None, None, False), # 0 TType.STOP + (None, None, False), # 1 TType.VOID # TODO: handle void? + ('readBool', 'writeBool', False), # 2 TType.BOOL + ('readByte', 'writeByte', False), # 3 TType.BYTE and I08 + ('readDouble', 'writeDouble', False), # 4 TType.DOUBLE + (None, None, False), # 5 undefined + ('readI16', 'writeI16', False), # 6 TType.I16 + (None, None, False), # 7 undefined + ('readI32', 'writeI32', False), # 8 TType.I32 + (None, None, False), # 9 undefined + ('readI64', 'writeI64', False), # 10 TType.I64 + ('readString', 'writeString', False), # 11 TType.STRING and UTF7 + ('readContainerStruct', 'writeContainerStruct', True), # 12 *.STRUCT + ('readContainerMap', 'writeContainerMap', True), # 13 TType.MAP + ('readContainerSet', 'writeContainerSet', True), # 14 TType.SET + ('readContainerList', 'writeContainerList', True), # 15 TType.LIST + (None, None, False), # 16 TType.UTF8 # TODO: handle utf8 types? + (None, None, False) # 17 TType.UTF16 # TODO: handle utf16 types? + ) + + def readFieldByTType(self, ttype, spec): + try: + (r_handler, w_handler, is_container) = self._TTYPE_HANDLERS[ttype] + except IndexError: + raise TProtocolException(type=TProtocolException.INVALID_DATA, + message='Invalid field type %d' % (ttype)) + if r_handler is None: + raise TProtocolException(type=TProtocolException.INVALID_DATA, + message='Invalid field type %d' % (ttype)) + reader = getattr(self, r_handler) + if not is_container: + return reader() + return reader(spec) + + def readContainerList(self, spec): + results = [] + ttype, tspec = spec[0], spec[1] + r_handler = self._TTYPE_HANDLERS[ttype][0] + reader = getattr(self, r_handler) + (list_type, list_len) = self.readListBegin() + if tspec is None: + # list values are simple types + for idx in xrange(list_len): + results.append(reader()) + else: + # this is like an inlined readFieldByTType + container_reader = self._TTYPE_HANDLERS[list_type][0] + val_reader = getattr(self, container_reader) + for idx in xrange(list_len): + val = val_reader(tspec) + results.append(val) + self.readListEnd() + return results + + def readContainerSet(self, spec): + results = set() + ttype, tspec = spec[0], spec[1] + r_handler = self._TTYPE_HANDLERS[ttype][0] + reader = getattr(self, r_handler) + (set_type, set_len) = self.readSetBegin() + if tspec is None: + # set members are simple types + for idx in xrange(set_len): + results.add(reader()) + else: + container_reader = self._TTYPE_HANDLERS[set_type][0] + val_reader = getattr(self, container_reader) + for idx in xrange(set_len): + results.add(val_reader(tspec)) + self.readSetEnd() + return results + + def readContainerStruct(self, spec): + (obj_class, obj_spec) = spec + obj = obj_class() + obj.read(self) + return obj + + def readContainerMap(self, spec): + results = dict() + key_ttype, key_spec = spec[0], spec[1] + val_ttype, val_spec = spec[2], spec[3] + (map_ktype, map_vtype, map_len) = self.readMapBegin() + # TODO: compare types we just decoded with thrift_spec and + # abort/skip if types disagree + key_reader = getattr(self, self._TTYPE_HANDLERS[key_ttype][0]) + val_reader = getattr(self, self._TTYPE_HANDLERS[val_ttype][0]) + # list values are simple types + for idx in xrange(map_len): + if key_spec is None: + k_val = key_reader() + else: + k_val = self.readFieldByTType(key_ttype, key_spec) + if val_spec is None: + v_val = val_reader() + else: + v_val = self.readFieldByTType(val_ttype, val_spec) + # this raises a TypeError with unhashable keys types + # i.e. this fails: d=dict(); d[[0,1]] = 2 + results[k_val] = v_val + self.readMapEnd() + return results + + def readStruct(self, obj, thrift_spec): + self.readStructBegin() + while True: + (fname, ftype, fid) = self.readFieldBegin() + if ftype == TType.STOP: + break + try: + field = thrift_spec[fid] + except IndexError: + self.skip(ftype) + else: + if field is not None and ftype == field[1]: + fname = field[2] + fspec = field[3] + val = self.readFieldByTType(ftype, fspec) + setattr(obj, fname, val) + else: + self.skip(ftype) + self.readFieldEnd() + self.readStructEnd() + + def writeContainerStruct(self, val, spec): + val.write(self) + + def writeContainerList(self, val, spec): + self.writeListBegin(spec[0], len(val)) + r_handler, w_handler, is_container = self._TTYPE_HANDLERS[spec[0]] + e_writer = getattr(self, w_handler) + if not is_container: + for elem in val: + e_writer(elem) + else: + for elem in val: + e_writer(elem, spec[1]) + self.writeListEnd() + + def writeContainerSet(self, val, spec): + self.writeSetBegin(spec[0], len(val)) + r_handler, w_handler, is_container = self._TTYPE_HANDLERS[spec[0]] + e_writer = getattr(self, w_handler) + if not is_container: + for elem in val: + e_writer(elem) + else: + for elem in val: + e_writer(elem, spec[1]) + self.writeSetEnd() + + def writeContainerMap(self, val, spec): + k_type = spec[0] + v_type = spec[2] + ignore, ktype_name, k_is_container = self._TTYPE_HANDLERS[k_type] + ignore, vtype_name, v_is_container = self._TTYPE_HANDLERS[v_type] + k_writer = getattr(self, ktype_name) + v_writer = getattr(self, vtype_name) + self.writeMapBegin(k_type, v_type, len(val)) + for m_key, m_val in val.iteritems(): + if not k_is_container: + k_writer(m_key) + else: + k_writer(m_key, spec[1]) + if not v_is_container: + v_writer(m_val) + else: + v_writer(m_val, spec[3]) + self.writeMapEnd() + + def writeStruct(self, obj, thrift_spec): + self.writeStructBegin(obj.__class__.__name__) + for field in thrift_spec: + if field is None: + continue + fname = field[2] + val = getattr(obj, fname) + if val is None: + # skip writing out unset fields + continue + fid = field[0] + ftype = field[1] + fspec = field[3] + # get the writer method for this value + self.writeFieldBegin(fname, ftype, fid) + self.writeFieldByTType(ftype, val, fspec) + self.writeFieldEnd() + self.writeFieldStop() + self.writeStructEnd() + + def writeFieldByTType(self, ttype, val, spec): + r_handler, w_handler, is_container = self._TTYPE_HANDLERS[ttype] + writer = getattr(self, w_handler) + if is_container: + writer(val, spec) + else: + writer(val) + +def checkIntegerLimits(i, bits): + if bits == 8 and (i < -128 or i > 127): + raise TProtocolException(TProtocolException.INVALID_DATA, + "i8 requires -128 <= number <= 127") + elif bits == 16 and (i < -32768 or i > 32767): + raise TProtocolException(TProtocolException.INVALID_DATA, + "i16 requires -32768 <= number <= 32767") + elif bits == 32 and (i < -2147483648 or i > 2147483647): + raise TProtocolException(TProtocolException.INVALID_DATA, + "i32 requires -2147483648 <= number <= 2147483647") + elif bits == 64 and (i < -9223372036854775808 or i > 9223372036854775807): + raise TProtocolException(TProtocolException.INVALID_DATA, + "i64 requires -9223372036854775808 <= number <= 9223372036854775807") + +class TProtocolFactory: + def getProtocol(self, trans): + pass diff --git a/lib/thrift/protocol/TProtocol.pyc b/lib/thrift/protocol/TProtocol.pyc Binary files differnew file mode 100644 index 0000000..8ec8411 --- /dev/null +++ b/lib/thrift/protocol/TProtocol.pyc diff --git a/lib/thrift/protocol/TProtocolDecorator.py b/lib/thrift/protocol/TProtocolDecorator.py new file mode 100644 index 0000000..3e9e500 --- /dev/null +++ b/lib/thrift/protocol/TProtocolDecorator.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from thrift.protocol.TProtocol import TProtocolBase +from types import * + +class TProtocolDecorator(): + def __init__(self, protocol): + TProtocolBase(protocol) + self.protocol = protocol + + def __getattr__(self, name): + if hasattr(self.protocol, name): + member = getattr(self.protocol, name) + if type(member) in [MethodType, UnboundMethodType, FunctionType, LambdaType, BuiltinFunctionType, BuiltinMethodType]: + return lambda *args, **kwargs: self._wrap(member, args, kwargs) + else: + return member + raise AttributeError(name) + + def _wrap(self, func, args, kwargs): + if type(func) == MethodType: + result = func(*args, **kwargs) + else: + result = func(self.protocol, *args, **kwargs) + return result diff --git a/lib/thrift/protocol/__init__.py b/lib/thrift/protocol/__init__.py new file mode 100644 index 0000000..7eefb45 --- /dev/null +++ b/lib/thrift/protocol/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +__all__ = ['fastbinary', 'TBase', 'TBinaryProtocol', 'TCompactProtocol', 'TJSONProtocol', 'TProtocol'] diff --git a/lib/thrift/protocol/__init__.pyc b/lib/thrift/protocol/__init__.pyc Binary files differnew file mode 100644 index 0000000..a259bbd --- /dev/null +++ b/lib/thrift/protocol/__init__.pyc diff --git a/lib/thrift/protocol/fastbinary.so b/lib/thrift/protocol/fastbinary.so Binary files differnew file mode 100755 index 0000000..e9e73e3 --- /dev/null +++ b/lib/thrift/protocol/fastbinary.so diff --git a/lib/thrift/server/THttpServer.py b/lib/thrift/server/THttpServer.py new file mode 100644 index 0000000..6f92173 --- /dev/null +++ b/lib/thrift/server/THttpServer.py @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import BaseHTTPServer + +from thrift.server import TServer +from thrift.transport import TTransport + + +class ResponseException(Exception): + """Allows handlers to override the HTTP response + + Normally, THttpServer always sends a 200 response. If a handler wants + to override this behavior (e.g., to simulate a misconfigured or + overloaded web server during testing), it can raise a ResponseException. + The function passed to the constructor will be called with the + RequestHandler as its only argument. + """ + def __init__(self, handler): + self.handler = handler + + +class THttpServer(TServer.TServer): + """A simple HTTP-based Thrift server + + This class is not very performant, but it is useful (for example) for + acting as a mock version of an Apache-based PHP Thrift endpoint. + """ + def __init__(self, + processor, + server_address, + inputProtocolFactory, + outputProtocolFactory=None, + server_class=BaseHTTPServer.HTTPServer): + """Set up protocol factories and HTTP server. + + See BaseHTTPServer for server_address. + See TServer for protocol factories. + """ + if outputProtocolFactory is None: + outputProtocolFactory = inputProtocolFactory + + TServer.TServer.__init__(self, processor, None, None, None, + inputProtocolFactory, outputProtocolFactory) + + thttpserver = self + + class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler): + def do_POST(self): + # Don't care about the request path. + itrans = TTransport.TFileObjectTransport(self.rfile) + otrans = TTransport.TFileObjectTransport(self.wfile) + itrans = TTransport.TBufferedTransport( + itrans, int(self.headers['Content-Length'])) + otrans = TTransport.TMemoryBuffer() + iprot = thttpserver.inputProtocolFactory.getProtocol(itrans) + oprot = thttpserver.outputProtocolFactory.getProtocol(otrans) + try: + thttpserver.processor.process(iprot, oprot) + except ResponseException as exn: + exn.handler(self) + else: + self.send_response(200) + self.send_header("content-type", "application/x-thrift") + self.end_headers() + self.wfile.write(otrans.getvalue()) + + self.httpd = server_class(server_address, RequestHander) + + def serve(self): + self.httpd.serve_forever() diff --git a/lib/thrift/server/TNonblockingServer.py b/lib/thrift/server/TNonblockingServer.py new file mode 100644 index 0000000..39486cd --- /dev/null +++ b/lib/thrift/server/TNonblockingServer.py @@ -0,0 +1,348 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +"""Implementation of non-blocking server. + +The main idea of the server is to receive and send requests +only from the main thread. + +The thread poool should be sized for concurrent tasks, not +maximum connections +""" +import threading +import socket +import Queue +import select +import struct + +import logging +logger = logging.getLogger(__name__) + +from thrift.transport import TTransport +from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory + +__all__ = ['TNonblockingServer'] + + +class Worker(threading.Thread): + """Worker is a small helper to process incoming connection.""" + + def __init__(self, queue): + threading.Thread.__init__(self) + self.queue = queue + + def run(self): + """Process queries from task queue, stop if processor is None.""" + while True: + try: + processor, iprot, oprot, otrans, callback = self.queue.get() + if processor is None: + break + processor.process(iprot, oprot) + callback(True, otrans.getvalue()) + except Exception: + logger.exception("Exception while processing request") + callback(False, '') + +WAIT_LEN = 0 +WAIT_MESSAGE = 1 +WAIT_PROCESS = 2 +SEND_ANSWER = 3 +CLOSED = 4 + + +def locked(func): + """Decorator which locks self.lock.""" + def nested(self, *args, **kwargs): + self.lock.acquire() + try: + return func(self, *args, **kwargs) + finally: + self.lock.release() + return nested + + +def socket_exception(func): + """Decorator close object on socket.error.""" + def read(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except socket.error: + self.close() + return read + + +class Connection: + """Basic class is represented connection. + + It can be in state: + WAIT_LEN --- connection is reading request len. + WAIT_MESSAGE --- connection is reading request. + WAIT_PROCESS --- connection has just read whole request and + waits for call ready routine. + SEND_ANSWER --- connection is sending answer string (including length + of answer). + CLOSED --- socket was closed and connection should be deleted. + """ + def __init__(self, new_socket, wake_up): + self.socket = new_socket + self.socket.setblocking(False) + self.status = WAIT_LEN + self.len = 0 + self.message = '' + self.lock = threading.Lock() + self.wake_up = wake_up + + def _read_len(self): + """Reads length of request. + + It's a safer alternative to self.socket.recv(4) + """ + read = self.socket.recv(4 - len(self.message)) + if len(read) == 0: + # if we read 0 bytes and self.message is empty, then + # the client closed the connection + if len(self.message) != 0: + logger.error("can't read frame size from socket") + self.close() + return + self.message += read + if len(self.message) == 4: + self.len, = struct.unpack('!i', self.message) + if self.len < 0: + logger.error("negative frame size, it seems client " + "doesn't use FramedTransport") + self.close() + elif self.len == 0: + logger.error("empty frame, it's really strange") + self.close() + else: + self.message = '' + self.status = WAIT_MESSAGE + + @socket_exception + def read(self): + """Reads data from stream and switch state.""" + assert self.status in (WAIT_LEN, WAIT_MESSAGE) + if self.status == WAIT_LEN: + self._read_len() + # go back to the main loop here for simplicity instead of + # falling through, even though there is a good chance that + # the message is already available + elif self.status == WAIT_MESSAGE: + read = self.socket.recv(self.len - len(self.message)) + if len(read) == 0: + logger.error("can't read frame from socket (get %d of " + "%d bytes)" % (len(self.message), self.len)) + self.close() + return + self.message += read + if len(self.message) == self.len: + self.status = WAIT_PROCESS + + @socket_exception + def write(self): + """Writes data from socket and switch state.""" + assert self.status == SEND_ANSWER + sent = self.socket.send(self.message) + if sent == len(self.message): + self.status = WAIT_LEN + self.message = '' + self.len = 0 + else: + self.message = self.message[sent:] + + @locked + def ready(self, all_ok, message): + """Callback function for switching state and waking up main thread. + + This function is the only function witch can be called asynchronous. + + The ready can switch Connection to three states: + WAIT_LEN if request was oneway. + SEND_ANSWER if request was processed in normal way. + CLOSED if request throws unexpected exception. + + The one wakes up main thread. + """ + assert self.status == WAIT_PROCESS + if not all_ok: + self.close() + self.wake_up() + return + self.len = '' + if len(message) == 0: + # it was a oneway request, do not write answer + self.message = '' + self.status = WAIT_LEN + else: + self.message = struct.pack('!i', len(message)) + message + self.status = SEND_ANSWER + self.wake_up() + + @locked + def is_writeable(self): + """Return True if connection should be added to write list of select""" + return self.status == SEND_ANSWER + + # it's not necessary, but... + @locked + def is_readable(self): + """Return True if connection should be added to read list of select""" + return self.status in (WAIT_LEN, WAIT_MESSAGE) + + @locked + def is_closed(self): + """Returns True if connection is closed.""" + return self.status == CLOSED + + def fileno(self): + """Returns the file descriptor of the associated socket.""" + return self.socket.fileno() + + def close(self): + """Closes connection""" + self.status = CLOSED + self.socket.close() + + +class TNonblockingServer: + """Non-blocking server.""" + + def __init__(self, + processor, + lsocket, + inputProtocolFactory=None, + outputProtocolFactory=None, + threads=10): + self.processor = processor + self.socket = lsocket + self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() + self.out_protocol = outputProtocolFactory or self.in_protocol + self.threads = int(threads) + self.clients = {} + self.tasks = Queue.Queue() + self._read, self._write = socket.socketpair() + self.prepared = False + self._stop = False + + def setNumThreads(self, num): + """Set the number of worker threads that should be created.""" + # implement ThreadPool interface + assert not self.prepared, "Can't change number of threads after start" + self.threads = num + + def prepare(self): + """Prepares server for serve requests.""" + if self.prepared: + return + self.socket.listen() + for _ in xrange(self.threads): + thread = Worker(self.tasks) + thread.setDaemon(True) + thread.start() + self.prepared = True + + def wake_up(self): + """Wake up main thread. + + The server usually waits in select call in we should terminate one. + The simplest way is using socketpair. + + Select always wait to read from the first socket of socketpair. + + In this case, we can just write anything to the second socket from + socketpair. + """ + self._write.send('1') + + def stop(self): + """Stop the server. + + This method causes the serve() method to return. stop() may be invoked + from within your handler, or from another thread. + + After stop() is called, serve() will return but the server will still + be listening on the socket. serve() may then be called again to resume + processing requests. Alternatively, close() may be called after + serve() returns to close the server socket and shutdown all worker + threads. + """ + self._stop = True + self.wake_up() + + def _select(self): + """Does select on open connections.""" + readable = [self.socket.handle.fileno(), self._read.fileno()] + writable = [] + for i, connection in self.clients.items(): + if connection.is_readable(): + readable.append(connection.fileno()) + if connection.is_writeable(): + writable.append(connection.fileno()) + if connection.is_closed(): + del self.clients[i] + return select.select(readable, writable, readable) + + def handle(self): + """Handle requests. + + WARNING! You must call prepare() BEFORE calling handle() + """ + assert self.prepared, "You have to call prepare before handle" + rset, wset, xset = self._select() + for readable in rset: + if readable == self._read.fileno(): + # don't care i just need to clean readable flag + self._read.recv(1024) + elif readable == self.socket.handle.fileno(): + client = self.socket.accept().handle + self.clients[client.fileno()] = Connection(client, + self.wake_up) + else: + connection = self.clients[readable] + connection.read() + if connection.status == WAIT_PROCESS: + itransport = TTransport.TMemoryBuffer(connection.message) + otransport = TTransport.TMemoryBuffer() + iprot = self.in_protocol.getProtocol(itransport) + oprot = self.out_protocol.getProtocol(otransport) + self.tasks.put([self.processor, iprot, oprot, + otransport, connection.ready]) + for writeable in wset: + self.clients[writeable].write() + for oob in xset: + self.clients[oob].close() + del self.clients[oob] + + def close(self): + """Closes the server.""" + for _ in xrange(self.threads): + self.tasks.put([None, None, None, None, None]) + self.socket.close() + self.prepared = False + + def serve(self): + """Serve requests. + + Serve requests forever, or until stop() is called. + """ + self._stop = False + self.prepare() + while not self._stop: + self.handle() diff --git a/lib/thrift/server/TProcessPoolServer.py b/lib/thrift/server/TProcessPoolServer.py new file mode 100644 index 0000000..ae7fe1c --- /dev/null +++ b/lib/thrift/server/TProcessPoolServer.py @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +import logging +logger = logging.getLogger(__name__) + +from multiprocessing import Process, Value, Condition, reduction + +from TServer import TServer +from thrift.transport.TTransport import TTransportException + + +class TProcessPoolServer(TServer): + """Server with a fixed size pool of worker subprocesses to service requests + + Note that if you need shared state between the handlers - it's up to you! + Written by Dvir Volk, doat.com + """ + def __init__(self, *args): + TServer.__init__(self, *args) + self.numWorkers = 10 + self.workers = [] + self.isRunning = Value('b', False) + self.stopCondition = Condition() + self.postForkCallback = None + + def setPostForkCallback(self, callback): + if not callable(callback): + raise TypeError("This is not a callback!") + self.postForkCallback = callback + + def setNumWorkers(self, num): + """Set the number of worker threads that should be created""" + self.numWorkers = num + + def workerProcess(self): + """Loop getting clients from the shared queue and process them""" + if self.postForkCallback: + self.postForkCallback() + + while self.isRunning.value: + try: + client = self.serverTransport.accept() + if not client: + continue + self.serveClient(client) + except (KeyboardInterrupt, SystemExit): + return 0 + except Exception as x: + logger.exception(x) + + def serveClient(self, client): + """Process input/output from a client for as long as possible""" + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + try: + while True: + self.processor.process(iprot, oprot) + except TTransportException as tx: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + otrans.close() + + def serve(self): + """Start workers and put into queue""" + # this is a shared state that can tell the workers to exit when False + self.isRunning.value = True + + # first bind and listen to the port + self.serverTransport.listen() + + # fork the children + for i in range(self.numWorkers): + try: + w = Process(target=self.workerProcess) + w.daemon = True + w.start() + self.workers.append(w) + except Exception as x: + logger.exception(x) + + # wait until the condition is set by stop() + while True: + self.stopCondition.acquire() + try: + self.stopCondition.wait() + break + except (SystemExit, KeyboardInterrupt): + break + except Exception as x: + logger.exception(x) + + self.isRunning.value = False + + def stop(self): + self.isRunning.value = False + self.stopCondition.acquire() + self.stopCondition.notify() + self.stopCondition.release() diff --git a/lib/thrift/server/TServer.py b/lib/thrift/server/TServer.py new file mode 100644 index 0000000..8c58e39 --- /dev/null +++ b/lib/thrift/server/TServer.py @@ -0,0 +1,279 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import Queue +import os +import sys +import threading +import traceback + +import logging +logger = logging.getLogger(__name__) + +from thrift.Thrift import TProcessor +from thrift.protocol import TBinaryProtocol +from thrift.transport import TTransport + + +class TServer: + """Base interface for a server, which must have a serve() method. + + Three constructors for all servers: + 1) (processor, serverTransport) + 2) (processor, serverTransport, transportFactory, protocolFactory) + 3) (processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory) + """ + def __init__(self, *args): + if (len(args) == 2): + self.__initArgs__(args[0], args[1], + TTransport.TTransportFactoryBase(), + TTransport.TTransportFactoryBase(), + TBinaryProtocol.TBinaryProtocolFactory(), + TBinaryProtocol.TBinaryProtocolFactory()) + elif (len(args) == 4): + self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3]) + elif (len(args) == 6): + self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5]) + + def __initArgs__(self, processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory): + self.processor = processor + self.serverTransport = serverTransport + self.inputTransportFactory = inputTransportFactory + self.outputTransportFactory = outputTransportFactory + self.inputProtocolFactory = inputProtocolFactory + self.outputProtocolFactory = outputProtocolFactory + + def serve(self): + pass + + +class TSimpleServer(TServer): + """Simple single-threaded server that just pumps around one transport.""" + + def __init__(self, *args): + TServer.__init__(self, *args) + + def serve(self): + self.serverTransport.listen() + while True: + client = self.serverTransport.accept() + if not client: + continue + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException as tx: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + otrans.close() + + +class TThreadedServer(TServer): + """Threaded server that spawns a new thread per each connection.""" + + def __init__(self, *args, **kwargs): + TServer.__init__(self, *args) + self.daemon = kwargs.get("daemon", False) + + def serve(self): + self.serverTransport.listen() + while True: + try: + client = self.serverTransport.accept() + if not client: + continue + t = threading.Thread(target=self.handle, args=(client,)) + t.setDaemon(self.daemon) + t.start() + except KeyboardInterrupt: + raise + except Exception as x: + logger.exception(x) + + def handle(self, client): + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException as tx: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + otrans.close() + + +class TThreadPoolServer(TServer): + """Server with a fixed size pool of threads which service requests.""" + + def __init__(self, *args, **kwargs): + TServer.__init__(self, *args) + self.clients = Queue.Queue() + self.threads = 10 + self.daemon = kwargs.get("daemon", False) + + def setNumThreads(self, num): + """Set the number of worker threads that should be created""" + self.threads = num + + def serveThread(self): + """Loop around getting clients from the shared queue and process them.""" + while True: + try: + client = self.clients.get() + self.serveClient(client) + except Exception as x: + logger.exception(x) + + def serveClient(self, client): + """Process input/output from a client for as long as possible""" + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException as tx: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + otrans.close() + + def serve(self): + """Start a fixed number of worker threads and put client into a queue""" + for i in range(self.threads): + try: + t = threading.Thread(target=self.serveThread) + t.setDaemon(self.daemon) + t.start() + except Exception as x: + logger.exception(x) + + # Pump the socket for clients + self.serverTransport.listen() + while True: + try: + client = self.serverTransport.accept() + if not client: + continue + self.clients.put(client) + except Exception as x: + logger.exception(x) + + +class TForkingServer(TServer): + """A Thrift server that forks a new process for each request + + This is more scalable than the threaded server as it does not cause + GIL contention. + + Note that this has different semantics from the threading server. + Specifically, updates to shared variables will no longer be shared. + It will also not work on windows. + + This code is heavily inspired by SocketServer.ForkingMixIn in the + Python stdlib. + """ + def __init__(self, *args): + TServer.__init__(self, *args) + self.children = [] + + def serve(self): + def try_close(file): + try: + file.close() + except IOError as e: + logger.warning(e, exc_info=True) + + self.serverTransport.listen() + while True: + client = self.serverTransport.accept() + if not client: + continue + try: + pid = os.fork() + + if pid: # parent + # add before collect, otherwise you race w/ waitpid + self.children.append(pid) + self.collect_children() + + # Parent must close socket or the connection may not get + # closed promptly + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + try_close(itrans) + try_close(otrans) + else: + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + ecode = 0 + try: + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException as tx: + pass + except Exception as e: + logger.exception(e) + ecode = 1 + finally: + try_close(itrans) + try_close(otrans) + + os._exit(ecode) + + except TTransport.TTransportException as tx: + pass + except Exception as x: + logger.exception(x) + + def collect_children(self): + while self.children: + try: + pid, status = os.waitpid(0, os.WNOHANG) + except os.error: + pid = None + + if pid: + self.children.remove(pid) + else: + break diff --git a/lib/thrift/server/TServer.pyc b/lib/thrift/server/TServer.pyc Binary files differnew file mode 100644 index 0000000..5a9174d --- /dev/null +++ b/lib/thrift/server/TServer.pyc diff --git a/lib/thrift/server/__init__.py b/lib/thrift/server/__init__.py new file mode 100644 index 0000000..1bf6e25 --- /dev/null +++ b/lib/thrift/server/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +__all__ = ['TServer', 'TNonblockingServer'] diff --git a/lib/thrift/server/__init__.pyc b/lib/thrift/server/__init__.pyc Binary files differnew file mode 100644 index 0000000..7a11045 --- /dev/null +++ b/lib/thrift/server/__init__.pyc diff --git a/lib/thrift/transport/THttpClient.py b/lib/thrift/transport/THttpClient.py new file mode 100644 index 0000000..5851fa2 --- /dev/null +++ b/lib/thrift/transport/THttpClient.py @@ -0,0 +1,151 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import httplib +import os +import socket +import sys +import urllib +import urlparse +import warnings + +from cStringIO import StringIO + +from TTransport import * + + +class THttpClient(TTransportBase): + """Http implementation of TTransport base.""" + + def __init__(self, uri_or_host, port=None, path=None): + """THttpClient supports two different types constructor parameters. + + THttpClient(host, port, path) - deprecated + THttpClient(uri) + + Only the second supports https. + """ + if port is not None: + warnings.warn( + "Please use the THttpClient('http://host:port/path') syntax", + DeprecationWarning, + stacklevel=2) + self.host = uri_or_host + self.port = port + assert path + self.path = path + self.scheme = 'http' + else: + parsed = urlparse.urlparse(uri_or_host) + self.scheme = parsed.scheme + assert self.scheme in ('http', 'https') + if self.scheme == 'http': + self.port = parsed.port or httplib.HTTP_PORT + elif self.scheme == 'https': + self.port = parsed.port or httplib.HTTPS_PORT + self.host = parsed.hostname + self.path = parsed.path + if parsed.query: + self.path += '?%s' % parsed.query + self.__wbuf = StringIO() + self.__http = None + self.__timeout = None + self.__custom_headers = None + + def open(self): + if self.scheme == 'http': + self.__http = httplib.HTTP(self.host, self.port) + else: + self.__http = httplib.HTTPS(self.host, self.port) + + def close(self): + self.__http.close() + self.__http = None + + def isOpen(self): + return self.__http is not None + + def setTimeout(self, ms): + if not hasattr(socket, 'getdefaulttimeout'): + raise NotImplementedError + + if ms is None: + self.__timeout = None + else: + self.__timeout = ms / 1000.0 + + def setCustomHeaders(self, headers): + self.__custom_headers = headers + + def read(self, sz): + return self.__http.file.read(sz) + + def write(self, buf): + self.__wbuf.write(buf) + + def __withTimeout(f): + def _f(*args, **kwargs): + orig_timeout = socket.getdefaulttimeout() + socket.setdefaulttimeout(args[0].__timeout) + try: + result = f(*args, **kwargs) + finally: + socket.setdefaulttimeout(orig_timeout) + return result + return _f + + def flush(self): + if self.isOpen(): + self.close() + self.open() + + # Pull data out of buffer + data = self.__wbuf.getvalue() + self.__wbuf = StringIO() + + # HTTP request + self.__http.putrequest('POST', self.path) + + # Write headers + self.__http.putheader('Host', self.host) + self.__http.putheader('Content-Type', 'application/x-thrift') + self.__http.putheader('Content-Length', str(len(data))) + + if not self.__custom_headers or 'User-Agent' not in self.__custom_headers: + user_agent = 'Python/THttpClient' + script = os.path.basename(sys.argv[0]) + if script: + user_agent = '%s (%s)' % (user_agent, urllib.quote(script)) + self.__http.putheader('User-Agent', user_agent) + + if self.__custom_headers: + for key, val in self.__custom_headers.iteritems(): + self.__http.putheader(key, val) + + self.__http.endheaders() + + # Write payload + self.__http.send(data) + + # Get reply to flush the request + self.code, self.message, self.headers = self.__http.getreply() + + # Decorate if we know how to timeout + if hasattr(socket, 'getdefaulttimeout'): + flush = __withTimeout(flush) diff --git a/lib/thrift/transport/TSSLSocket.py b/lib/thrift/transport/TSSLSocket.py new file mode 100644 index 0000000..6ad8d50 --- /dev/null +++ b/lib/thrift/transport/TSSLSocket.py @@ -0,0 +1,227 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import socket +import ssl + +from thrift.transport import TSocket +from thrift.transport.TTransport import TTransportException + + +class TSSLSocket(TSocket.TSocket): + """ + SSL implementation of client-side TSocket + + This class creates outbound sockets wrapped using the + python standard ssl module for encrypted connections. + + The protocol used is set using the class variable + SSL_VERSION, which must be one of ssl.PROTOCOL_* and + defaults to ssl.PROTOCOL_TLSv1 for greatest security. + """ + SSL_VERSION = ssl.PROTOCOL_TLSv1 + + def __init__(self, + host='localhost', + port=9090, + validate=True, + ca_certs=None, + keyfile=None, + certfile=None, + unix_socket=None, + ciphers=None): + """Create SSL TSocket + + @param validate: Set to False to disable SSL certificate validation + @type validate: bool + @param ca_certs: Filename to the Certificate Authority pem file, possibly a + file downloaded from: http://curl.haxx.se/ca/cacert.pem This is passed to + the ssl_wrap function as the 'ca_certs' parameter. + @type ca_certs: str + @param keyfile: The private key + @type keyfile: str + @param certfile: The cert file + @type certfile: str + @param ciphers: The cipher suites to allow. This is passed to + the ssl_wrap function as the 'ciphers' parameter. + @type ciphers: str + + Raises an IOError exception if validate is True and the ca_certs file is + None, not present or unreadable. + """ + self.validate = validate + self.is_valid = False + self.peercert = None + if not validate: + self.cert_reqs = ssl.CERT_NONE + else: + self.cert_reqs = ssl.CERT_REQUIRED + self.ca_certs = ca_certs + self.keyfile = keyfile + self.certfile = certfile + self.ciphers = ciphers + if validate: + if ca_certs is None or not os.access(ca_certs, os.R_OK): + raise IOError('Certificate Authority ca_certs file "%s" ' + 'is not readable, cannot validate SSL ' + 'certificates.' % (ca_certs)) + TSocket.TSocket.__init__(self, host, port, unix_socket) + + def open(self): + try: + res0 = self._resolveAddr() + for res in res0: + sock_family, sock_type = res[0:2] + ip_port = res[4] + plain_sock = socket.socket(sock_family, sock_type) + self.handle = ssl.wrap_socket(plain_sock, + ssl_version=self.SSL_VERSION, + do_handshake_on_connect=True, + ca_certs=self.ca_certs, + keyfile=self.keyfile, + certfile=self.certfile, + cert_reqs=self.cert_reqs, + ciphers=self.ciphers) + self.handle.settimeout(self._timeout) + try: + self.handle.connect(ip_port) + except socket.error as e: + if res is not res0[-1]: + continue + else: + raise e + break + except socket.error as e: + if self._unix_socket: + message = 'Could not connect to secure socket %s: %s' \ + % (self._unix_socket, e) + else: + message = 'Could not connect to %s:%d: %s' % (self.host, self.port, e) + raise TTransportException(type=TTransportException.NOT_OPEN, + message=message) + if self.validate: + self._validate_cert() + + def _validate_cert(self): + """internal method to validate the peer's SSL certificate, and to check the + commonName of the certificate to ensure it matches the hostname we + used to make this connection. Does not support subjectAltName records + in certificates. + + raises TTransportException if the certificate fails validation. + """ + cert = self.handle.getpeercert() + self.peercert = cert + if 'subject' not in cert: + raise TTransportException( + type=TTransportException.NOT_OPEN, + message='No SSL certificate found from %s:%s' % (self.host, self.port)) + fields = cert['subject'] + for field in fields: + # ensure structure we get back is what we expect + if not isinstance(field, tuple): + continue + cert_pair = field[0] + if len(cert_pair) < 2: + continue + cert_key, cert_value = cert_pair[0:2] + if cert_key != 'commonName': + continue + certhost = cert_value + # this check should be performed by some sort of Access Manager + if certhost == self.host: + # success, cert commonName matches desired hostname + self.is_valid = True + return + else: + raise TTransportException( + type=TTransportException.UNKNOWN, + message='Hostname we connected to "%s" doesn\'t match certificate ' + 'provided commonName "%s"' % (self.host, certhost)) + raise TTransportException( + type=TTransportException.UNKNOWN, + message='Could not validate SSL certificate from ' + 'host "%s". Cert=%s' % (self.host, cert)) + + +class TSSLServerSocket(TSocket.TServerSocket): + """SSL implementation of TServerSocket + + This uses the ssl module's wrap_socket() method to provide SSL + negotiated encryption. + """ + SSL_VERSION = ssl.PROTOCOL_TLSv1 + + def __init__(self, + host=None, + port=9090, + certfile='cert.pem', + unix_socket=None, + ciphers=None): + """Initialize a TSSLServerSocket + + @param certfile: filename of the server certificate, defaults to cert.pem + @type certfile: str + @param host: The hostname or IP to bind the listen socket to, + i.e. 'localhost' for only allowing local network connections. + Pass None to bind to all interfaces. + @type host: str + @param port: The port to listen on for inbound connections. + @type port: int + @param ciphers: The cipher suites to allow. This is passed to + the ssl_wrap function as the 'ciphers' parameter. + @type ciphers: str + + """ + self.setCertfile(certfile) + TSocket.TServerSocket.__init__(self, host, port) + self.ciphers = ciphers + + def setCertfile(self, certfile): + """Set or change the server certificate file used to wrap new connections. + + @param certfile: The filename of the server certificate, + i.e. '/etc/certs/server.pem' + @type certfile: str + + Raises an IOError exception if the certfile is not present or unreadable. + """ + if not os.access(certfile, os.R_OK): + raise IOError('No such certfile found: %s' % (certfile)) + self.certfile = certfile + + def accept(self): + plain_client, addr = self.handle.accept() + try: + client = ssl.wrap_socket(plain_client, certfile=self.certfile, + server_side=True, ssl_version=self.SSL_VERSION, + ciphers=self.ciphers) + except ssl.SSLError as ssl_exc: + # failed handshake/ssl wrap, close socket to client + plain_client.close() + # raise ssl_exc + # We can't raise the exception, because it kills most TServer derived + # serve() methods. + # Instead, return None, and let the TServer instance deal with it in + # other exception handling. (but TSimpleServer dies anyway) + return None + result = TSocket.TSocket() + result.setHandle(client) + return result diff --git a/lib/thrift/transport/TSocket.py b/lib/thrift/transport/TSocket.py new file mode 100644 index 0000000..7b564aa --- /dev/null +++ b/lib/thrift/transport/TSocket.py @@ -0,0 +1,180 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import errno +import os +import socket +import sys + +from TTransport import * + + +class TSocketBase(TTransportBase): + def _resolveAddr(self): + if self._unix_socket is not None: + return [(socket.AF_UNIX, socket.SOCK_STREAM, None, None, + self._unix_socket)] + else: + return socket.getaddrinfo(self.host, + self.port, + self._socket_family, + socket.SOCK_STREAM, + 0, + socket.AI_PASSIVE | socket.AI_ADDRCONFIG) + + def close(self): + if self.handle: + self.handle.close() + self.handle = None + + +class TSocket(TSocketBase): + """Socket implementation of TTransport base.""" + + def __init__(self, host='localhost', port=9090, unix_socket=None, socket_family=socket.AF_UNSPEC): + """Initialize a TSocket + + @param host(str) The host to connect to. + @param port(int) The (TCP) port to connect to. + @param unix_socket(str) The filename of a unix socket to connect to. + (host and port will be ignored.) + @param socket_family(int) The socket family to use with this socket. + """ + self.host = host + self.port = port + self.handle = None + self._unix_socket = unix_socket + self._timeout = None + self._socket_family = socket_family + + def setHandle(self, h): + self.handle = h + + def isOpen(self): + return self.handle is not None + + def setTimeout(self, ms): + if ms is None: + self._timeout = None + else: + self._timeout = ms / 1000.0 + + if self.handle is not None: + self.handle.settimeout(self._timeout) + + def open(self): + try: + res0 = self._resolveAddr() + for res in res0: + self.handle = socket.socket(res[0], res[1]) + self.handle.settimeout(self._timeout) + try: + self.handle.connect(res[4]) + except socket.error as e: + if res is not res0[-1]: + continue + else: + raise e + break + except socket.error as e: + if self._unix_socket: + message = 'Could not connect to socket %s' % self._unix_socket + else: + message = 'Could not connect to %s:%d' % (self.host, self.port) + raise TTransportException(type=TTransportException.NOT_OPEN, + message=message) + + def read(self, sz): + try: + buff = self.handle.recv(sz) + except socket.error as e: + if (e.args[0] == errno.ECONNRESET and + (sys.platform == 'darwin' or sys.platform.startswith('freebsd'))): + # freebsd and Mach don't follow POSIX semantic of recv + # and fail with ECONNRESET if peer performed shutdown. + # See corresponding comment and code in TSocket::read() + # in lib/cpp/src/transport/TSocket.cpp. + self.close() + # Trigger the check to raise the END_OF_FILE exception below. + buff = '' + else: + raise + if len(buff) == 0: + raise TTransportException(type=TTransportException.END_OF_FILE, + message='TSocket read 0 bytes') + return buff + + def write(self, buff): + if not self.handle: + raise TTransportException(type=TTransportException.NOT_OPEN, + message='Transport not open') + sent = 0 + have = len(buff) + while sent < have: + plus = self.handle.send(buff) + if plus == 0: + raise TTransportException(type=TTransportException.END_OF_FILE, + message='TSocket sent 0 bytes') + sent += plus + buff = buff[plus:] + + def flush(self): + pass + + +class TServerSocket(TSocketBase, TServerTransportBase): + """Socket implementation of TServerTransport base.""" + + def __init__(self, host=None, port=9090, unix_socket=None, socket_family=socket.AF_UNSPEC): + self.host = host + self.port = port + self._unix_socket = unix_socket + self._socket_family = socket_family + self.handle = None + + def listen(self): + res0 = self._resolveAddr() + socket_family = self._socket_family == socket.AF_UNSPEC and socket.AF_INET6 or self._socket_family + for res in res0: + if res[0] is socket_family or res is res0[-1]: + break + + # We need remove the old unix socket if the file exists and + # nobody is listening on it. + if self._unix_socket: + tmp = socket.socket(res[0], res[1]) + try: + tmp.connect(res[4]) + except socket.error as err: + eno, message = err.args + if eno == errno.ECONNREFUSED: + os.unlink(res[4]) + + self.handle = socket.socket(res[0], res[1]) + self.handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if hasattr(self.handle, 'settimeout'): + self.handle.settimeout(None) + self.handle.bind(res[4]) + self.handle.listen(128) + + def accept(self): + client, addr = self.handle.accept() + result = TSocket() + result.setHandle(client) + return result diff --git a/lib/thrift/transport/TSocket.pyc b/lib/thrift/transport/TSocket.pyc Binary files differnew file mode 100644 index 0000000..ff141f3 --- /dev/null +++ b/lib/thrift/transport/TSocket.pyc diff --git a/lib/thrift/transport/TTransport.py b/lib/thrift/transport/TTransport.py new file mode 100644 index 0000000..5914aca --- /dev/null +++ b/lib/thrift/transport/TTransport.py @@ -0,0 +1,446 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from cStringIO import StringIO +from struct import pack, unpack +from thrift.Thrift import TException + + +class TTransportException(TException): + """Custom Transport Exception class""" + + UNKNOWN = 0 + NOT_OPEN = 1 + ALREADY_OPEN = 2 + TIMED_OUT = 3 + END_OF_FILE = 4 + + def __init__(self, type=UNKNOWN, message=None): + TException.__init__(self, message) + self.type = type + + +class TTransportBase: + """Base class for Thrift transport layer.""" + + def isOpen(self): + pass + + def open(self): + pass + + def close(self): + pass + + def read(self, sz): + pass + + def readAll(self, sz): + buff = '' + have = 0 + while (have < sz): + chunk = self.read(sz - have) + have += len(chunk) + buff += chunk + + if len(chunk) == 0: + raise EOFError() + + return buff + + def write(self, buf): + pass + + def flush(self): + pass + + +# This class should be thought of as an interface. +class CReadableTransport: + """base class for transports that are readable from C""" + + # TODO(dreiss): Think about changing this interface to allow us to use + # a (Python, not c) StringIO instead, because it allows + # you to write after reading. + + # NOTE: This is a classic class, so properties will NOT work + # correctly for setting. + @property + def cstringio_buf(self): + """A cStringIO buffer that contains the current chunk we are reading.""" + pass + + def cstringio_refill(self, partialread, reqlen): + """Refills cstringio_buf. + + Returns the currently used buffer (which can but need not be the same as + the old cstringio_buf). partialread is what the C code has read from the + buffer, and should be inserted into the buffer before any more reads. The + return value must be a new, not borrowed reference. Something along the + lines of self._buf should be fine. + + If reqlen bytes can't be read, throw EOFError. + """ + pass + + +class TServerTransportBase: + """Base class for Thrift server transports.""" + + def listen(self): + pass + + def accept(self): + pass + + def close(self): + pass + + +class TTransportFactoryBase: + """Base class for a Transport Factory""" + + def getTransport(self, trans): + return trans + + +class TBufferedTransportFactory: + """Factory transport that builds buffered transports""" + + def getTransport(self, trans): + buffered = TBufferedTransport(trans) + return buffered + + +class TBufferedTransport(TTransportBase, CReadableTransport): + """Class that wraps another transport and buffers its I/O. + + The implementation uses a (configurable) fixed-size read buffer + but buffers all writes until a flush is performed. + """ + DEFAULT_BUFFER = 4096 + + def __init__(self, trans, rbuf_size=DEFAULT_BUFFER): + self.__trans = trans + self.__wbuf = StringIO() + self.__rbuf = StringIO("") + self.__rbuf_size = rbuf_size + + def isOpen(self): + return self.__trans.isOpen() + + def open(self): + return self.__trans.open() + + def close(self): + return self.__trans.close() + + def read(self, sz): + ret = self.__rbuf.read(sz) + if len(ret) != 0: + return ret + + self.__rbuf = StringIO(self.__trans.read(max(sz, self.__rbuf_size))) + return self.__rbuf.read(sz) + + def write(self, buf): + try: + self.__wbuf.write(buf) + except Exception as e: + # on exception reset wbuf so it doesn't contain a partial function call + self.__wbuf = StringIO() + raise e + + def flush(self): + out = self.__wbuf.getvalue() + # reset wbuf before write/flush to preserve state on underlying failure + self.__wbuf = StringIO() + self.__trans.write(out) + self.__trans.flush() + + # Implement the CReadableTransport interface. + @property + def cstringio_buf(self): + return self.__rbuf + + def cstringio_refill(self, partialread, reqlen): + retstring = partialread + if reqlen < self.__rbuf_size: + # try to make a read of as much as we can. + retstring += self.__trans.read(self.__rbuf_size) + + # but make sure we do read reqlen bytes. + if len(retstring) < reqlen: + retstring += self.__trans.readAll(reqlen - len(retstring)) + + self.__rbuf = StringIO(retstring) + return self.__rbuf + + +class TMemoryBuffer(TTransportBase, CReadableTransport): + """Wraps a cStringIO object as a TTransport. + + NOTE: Unlike the C++ version of this class, you cannot write to it + then immediately read from it. If you want to read from a + TMemoryBuffer, you must either pass a string to the constructor. + TODO(dreiss): Make this work like the C++ version. + """ + + def __init__(self, value=None): + """value -- a value to read from for stringio + + If value is set, this will be a transport for reading, + otherwise, it is for writing""" + if value is not None: + self._buffer = StringIO(value) + else: + self._buffer = StringIO() + + def isOpen(self): + return not self._buffer.closed + + def open(self): + pass + + def close(self): + self._buffer.close() + + def read(self, sz): + return self._buffer.read(sz) + + def write(self, buf): + self._buffer.write(buf) + + def flush(self): + pass + + def getvalue(self): + return self._buffer.getvalue() + + # Implement the CReadableTransport interface. + @property + def cstringio_buf(self): + return self._buffer + + def cstringio_refill(self, partialread, reqlen): + # only one shot at reading... + raise EOFError() + + +class TFramedTransportFactory: + """Factory transport that builds framed transports""" + + def getTransport(self, trans): + framed = TFramedTransport(trans) + return framed + + +class TFramedTransport(TTransportBase, CReadableTransport): + """Class that wraps another transport and frames its I/O when writing.""" + + def __init__(self, trans,): + self.__trans = trans + self.__rbuf = StringIO() + self.__wbuf = StringIO() + + def isOpen(self): + return self.__trans.isOpen() + + def open(self): + return self.__trans.open() + + def close(self): + return self.__trans.close() + + def read(self, sz): + ret = self.__rbuf.read(sz) + if len(ret) != 0: + return ret + + self.readFrame() + return self.__rbuf.read(sz) + + def readFrame(self): + buff = self.__trans.readAll(4) + sz, = unpack('!i', buff) + self.__rbuf = StringIO(self.__trans.readAll(sz)) + + def write(self, buf): + self.__wbuf.write(buf) + + def flush(self): + wout = self.__wbuf.getvalue() + wsz = len(wout) + # reset wbuf before write/flush to preserve state on underlying failure + self.__wbuf = StringIO() + # N.B.: Doing this string concatenation is WAY cheaper than making + # two separate calls to the underlying socket object. Socket writes in + # Python turn out to be REALLY expensive, but it seems to do a pretty + # good job of managing string buffer operations without excessive copies + buf = pack("!i", wsz) + wout + self.__trans.write(buf) + self.__trans.flush() + + # Implement the CReadableTransport interface. + @property + def cstringio_buf(self): + return self.__rbuf + + def cstringio_refill(self, prefix, reqlen): + # self.__rbuf will already be empty here because fastbinary doesn't + # ask for a refill until the previous buffer is empty. Therefore, + # we can start reading new frames immediately. + while len(prefix) < reqlen: + self.readFrame() + prefix += self.__rbuf.getvalue() + self.__rbuf = StringIO(prefix) + return self.__rbuf + + +class TFileObjectTransport(TTransportBase): + """Wraps a file-like object to make it work as a Thrift transport.""" + + def __init__(self, fileobj): + self.fileobj = fileobj + + def isOpen(self): + return True + + def close(self): + self.fileobj.close() + + def read(self, sz): + return self.fileobj.read(sz) + + def write(self, buf): + self.fileobj.write(buf) + + def flush(self): + self.fileobj.flush() + + +class TSaslClientTransport(TTransportBase, CReadableTransport): + """ + SASL transport + """ + + START = 1 + OK = 2 + BAD = 3 + ERROR = 4 + COMPLETE = 5 + + def __init__(self, transport, host, service, mechanism='GSSAPI', + **sasl_kwargs): + """ + transport: an underlying transport to use, typically just a TSocket + host: the name of the server, from a SASL perspective + service: the name of the server's service, from a SASL perspective + mechanism: the name of the preferred mechanism to use + + All other kwargs will be passed to the puresasl.client.SASLClient + constructor. + """ + + from puresasl.client import SASLClient + + self.transport = transport + self.sasl = SASLClient(host, service, mechanism, **sasl_kwargs) + + self.__wbuf = StringIO() + self.__rbuf = StringIO() + + def open(self): + if not self.transport.isOpen(): + self.transport.open() + + self.send_sasl_msg(self.START, self.sasl.mechanism) + self.send_sasl_msg(self.OK, self.sasl.process()) + + while True: + status, challenge = self.recv_sasl_msg() + if status == self.OK: + self.send_sasl_msg(self.OK, self.sasl.process(challenge)) + elif status == self.COMPLETE: + if not self.sasl.complete: + raise TTransportException("The server erroneously indicated " + "that SASL negotiation was complete") + else: + break + else: + raise TTransportException("Bad SASL negotiation status: %d (%s)" + % (status, challenge)) + + def send_sasl_msg(self, status, body): + header = pack(">BI", status, len(body)) + self.transport.write(header + body) + self.transport.flush() + + def recv_sasl_msg(self): + header = self.transport.readAll(5) + status, length = unpack(">BI", header) + if length > 0: + payload = self.transport.readAll(length) + else: + payload = "" + return status, payload + + def write(self, data): + self.__wbuf.write(data) + + def flush(self): + data = self.__wbuf.getvalue() + encoded = self.sasl.wrap(data) + self.transport.write(''.join((pack("!i", len(encoded)), encoded))) + self.transport.flush() + self.__wbuf = StringIO() + + def read(self, sz): + ret = self.__rbuf.read(sz) + if len(ret) != 0: + return ret + + self._read_frame() + return self.__rbuf.read(sz) + + def _read_frame(self): + header = self.transport.readAll(4) + length, = unpack('!i', header) + encoded = self.transport.readAll(length) + self.__rbuf = StringIO(self.sasl.unwrap(encoded)) + + def close(self): + self.sasl.dispose() + self.transport.close() + + # based on TFramedTransport + @property + def cstringio_buf(self): + return self.__rbuf + + def cstringio_refill(self, prefix, reqlen): + # self.__rbuf will already be empty here because fastbinary doesn't + # ask for a refill until the previous buffer is empty. Therefore, + # we can start reading new frames immediately. + while len(prefix) < reqlen: + self._read_frame() + prefix += self.__rbuf.getvalue() + self.__rbuf = StringIO(prefix) + return self.__rbuf + diff --git a/lib/thrift/transport/TTransport.pyc b/lib/thrift/transport/TTransport.pyc Binary files differnew file mode 100644 index 0000000..c7cf4cb --- /dev/null +++ b/lib/thrift/transport/TTransport.pyc diff --git a/lib/thrift/transport/TTwisted.py b/lib/thrift/transport/TTwisted.py new file mode 100644 index 0000000..29bbd4c --- /dev/null +++ b/lib/thrift/transport/TTwisted.py @@ -0,0 +1,324 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import struct +from cStringIO import StringIO + +from zope.interface import implements, Interface, Attribute +from twisted.internet.protocol import ServerFactory, ClientFactory, \ + connectionDone +from twisted.internet import defer +from twisted.internet.threads import deferToThread +from twisted.protocols import basic +from twisted.web import server, resource, http + +from thrift.transport import TTransport + + +class TMessageSenderTransport(TTransport.TTransportBase): + + def __init__(self): + self.__wbuf = StringIO() + + def write(self, buf): + self.__wbuf.write(buf) + + def flush(self): + msg = self.__wbuf.getvalue() + self.__wbuf = StringIO() + return self.sendMessage(msg) + + def sendMessage(self, message): + raise NotImplementedError + + +class TCallbackTransport(TMessageSenderTransport): + + def __init__(self, func): + TMessageSenderTransport.__init__(self) + self.func = func + + def sendMessage(self, message): + return self.func(message) + + +class ThriftClientProtocol(basic.Int32StringReceiver): + + MAX_LENGTH = 2 ** 31 - 1 + + def __init__(self, client_class, iprot_factory, oprot_factory=None): + self._client_class = client_class + self._iprot_factory = iprot_factory + if oprot_factory is None: + self._oprot_factory = iprot_factory + else: + self._oprot_factory = oprot_factory + + self.recv_map = {} + self.started = defer.Deferred() + + def dispatch(self, msg): + self.sendString(msg) + + def connectionMade(self): + tmo = TCallbackTransport(self.dispatch) + self.client = self._client_class(tmo, self._oprot_factory) + self.started.callback(self.client) + + def connectionLost(self, reason=connectionDone): + for k, v in self.client._reqs.iteritems(): + tex = TTransport.TTransportException( + type=TTransport.TTransportException.END_OF_FILE, + message='Connection closed') + v.errback(tex) + + def stringReceived(self, frame): + tr = TTransport.TMemoryBuffer(frame) + iprot = self._iprot_factory.getProtocol(tr) + (fname, mtype, rseqid) = iprot.readMessageBegin() + + try: + method = self.recv_map[fname] + except KeyError: + method = getattr(self.client, 'recv_' + fname) + self.recv_map[fname] = method + + method(iprot, mtype, rseqid) + + +class ThriftSASLClientProtocol(ThriftClientProtocol): + + START = 1 + OK = 2 + BAD = 3 + ERROR = 4 + COMPLETE = 5 + + MAX_LENGTH = 2 ** 31 - 1 + + def __init__(self, client_class, iprot_factory, oprot_factory=None, + host=None, service=None, mechanism='GSSAPI', **sasl_kwargs): + """ + host: the name of the server, from a SASL perspective + service: the name of the server's service, from a SASL perspective + mechanism: the name of the preferred mechanism to use + + All other kwargs will be passed to the puresasl.client.SASLClient + constructor. + """ + + from puresasl.client import SASLClient + self.SASLCLient = SASLClient + + ThriftClientProtocol.__init__(self, client_class, iprot_factory, oprot_factory) + + self._sasl_negotiation_deferred = None + self._sasl_negotiation_status = None + self.client = None + + if host is not None: + self.createSASLClient(host, service, mechanism, **sasl_kwargs) + + def createSASLClient(self, host, service, mechanism, **kwargs): + self.sasl = self.SASLClient(host, service, mechanism, **kwargs) + + def dispatch(self, msg): + encoded = self.sasl.wrap(msg) + len_and_encoded = ''.join((struct.pack('!i', len(encoded)), encoded)) + ThriftClientProtocol.dispatch(self, len_and_encoded) + + @defer.inlineCallbacks + def connectionMade(self): + self._sendSASLMessage(self.START, self.sasl.mechanism) + initial_message = yield deferToThread(self.sasl.process) + self._sendSASLMessage(self.OK, initial_message) + + while True: + status, challenge = yield self._receiveSASLMessage() + if status == self.OK: + response = yield deferToThread(self.sasl.process, challenge) + self._sendSASLMessage(self.OK, response) + elif status == self.COMPLETE: + if not self.sasl.complete: + msg = "The server erroneously indicated that SASL " \ + "negotiation was complete" + raise TTransport.TTransportException(msg, message=msg) + else: + break + else: + msg = "Bad SASL negotiation status: %d (%s)" % (status, challenge) + raise TTransport.TTransportException(msg, message=msg) + + self._sasl_negotiation_deferred = None + ThriftClientProtocol.connectionMade(self) + + def _sendSASLMessage(self, status, body): + if body is None: + body = "" + header = struct.pack(">BI", status, len(body)) + self.transport.write(header + body) + + def _receiveSASLMessage(self): + self._sasl_negotiation_deferred = defer.Deferred() + self._sasl_negotiation_status = None + return self._sasl_negotiation_deferred + + def connectionLost(self, reason=connectionDone): + if self.client: + ThriftClientProtocol.connectionLost(self, reason) + + def dataReceived(self, data): + if self._sasl_negotiation_deferred: + # we got a sasl challenge in the format (status, length, challenge) + # save the status, let IntNStringReceiver piece the challenge data together + self._sasl_negotiation_status, = struct.unpack("B", data[0]) + ThriftClientProtocol.dataReceived(self, data[1:]) + else: + # normal frame, let IntNStringReceiver piece it together + ThriftClientProtocol.dataReceived(self, data) + + def stringReceived(self, frame): + if self._sasl_negotiation_deferred: + # the frame is just a SASL challenge + response = (self._sasl_negotiation_status, frame) + self._sasl_negotiation_deferred.callback(response) + else: + # there's a second 4 byte length prefix inside the frame + decoded_frame = self.sasl.unwrap(frame[4:]) + ThriftClientProtocol.stringReceived(self, decoded_frame) + + +class ThriftServerProtocol(basic.Int32StringReceiver): + + MAX_LENGTH = 2 ** 31 - 1 + + def dispatch(self, msg): + self.sendString(msg) + + def processError(self, error): + self.transport.loseConnection() + + def processOk(self, _, tmo): + msg = tmo.getvalue() + + if len(msg) > 0: + self.dispatch(msg) + + def stringReceived(self, frame): + tmi = TTransport.TMemoryBuffer(frame) + tmo = TTransport.TMemoryBuffer() + + iprot = self.factory.iprot_factory.getProtocol(tmi) + oprot = self.factory.oprot_factory.getProtocol(tmo) + + d = self.factory.processor.process(iprot, oprot) + d.addCallbacks(self.processOk, self.processError, + callbackArgs=(tmo,)) + + +class IThriftServerFactory(Interface): + + processor = Attribute("Thrift processor") + + iprot_factory = Attribute("Input protocol factory") + + oprot_factory = Attribute("Output protocol factory") + + +class IThriftClientFactory(Interface): + + client_class = Attribute("Thrift client class") + + iprot_factory = Attribute("Input protocol factory") + + oprot_factory = Attribute("Output protocol factory") + + +class ThriftServerFactory(ServerFactory): + + implements(IThriftServerFactory) + + protocol = ThriftServerProtocol + + def __init__(self, processor, iprot_factory, oprot_factory=None): + self.processor = processor + self.iprot_factory = iprot_factory + if oprot_factory is None: + self.oprot_factory = iprot_factory + else: + self.oprot_factory = oprot_factory + + +class ThriftClientFactory(ClientFactory): + + implements(IThriftClientFactory) + + protocol = ThriftClientProtocol + + def __init__(self, client_class, iprot_factory, oprot_factory=None): + self.client_class = client_class + self.iprot_factory = iprot_factory + if oprot_factory is None: + self.oprot_factory = iprot_factory + else: + self.oprot_factory = oprot_factory + + def buildProtocol(self, addr): + p = self.protocol(self.client_class, self.iprot_factory, + self.oprot_factory) + p.factory = self + return p + + +class ThriftResource(resource.Resource): + + allowedMethods = ('POST',) + + def __init__(self, processor, inputProtocolFactory, + outputProtocolFactory=None): + resource.Resource.__init__(self) + self.inputProtocolFactory = inputProtocolFactory + if outputProtocolFactory is None: + self.outputProtocolFactory = inputProtocolFactory + else: + self.outputProtocolFactory = outputProtocolFactory + self.processor = processor + + def getChild(self, path, request): + return self + + def _cbProcess(self, _, request, tmo): + msg = tmo.getvalue() + request.setResponseCode(http.OK) + request.setHeader("content-type", "application/x-thrift") + request.write(msg) + request.finish() + + def render_POST(self, request): + request.content.seek(0, 0) + data = request.content.read() + tmi = TTransport.TMemoryBuffer(data) + tmo = TTransport.TMemoryBuffer() + + iprot = self.inputProtocolFactory.getProtocol(tmi) + oprot = self.outputProtocolFactory.getProtocol(tmo) + + d = self.processor.process(iprot, oprot) + d.addCallback(self._cbProcess, request, tmo) + return server.NOT_DONE_YET diff --git a/lib/thrift/transport/TZlibTransport.py b/lib/thrift/transport/TZlibTransport.py new file mode 100644 index 0000000..a2f42a5 --- /dev/null +++ b/lib/thrift/transport/TZlibTransport.py @@ -0,0 +1,248 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""TZlibTransport provides a compressed transport and transport factory +class, using the python standard library zlib module to implement +data compression. +""" + +from __future__ import division +import zlib +from cStringIO import StringIO +from TTransport import TTransportBase, CReadableTransport + + +class TZlibTransportFactory(object): + """Factory transport that builds zlib compressed transports. + + This factory caches the last single client/transport that it was passed + and returns the same TZlibTransport object that was created. + + This caching means the TServer class will get the _same_ transport + object for both input and output transports from this factory. + (For non-threaded scenarios only, since the cache only holds one object) + + The purpose of this caching is to allocate only one TZlibTransport where + only one is really needed (since it must have separate read/write buffers), + and makes the statistics from getCompSavings() and getCompRatio() + easier to understand. + """ + # class scoped cache of last transport given and zlibtransport returned + _last_trans = None + _last_z = None + + def getTransport(self, trans, compresslevel=9): + """Wrap a transport, trans, with the TZlibTransport + compressed transport class, returning a new + transport to the caller. + + @param compresslevel: The zlib compression level, ranging + from 0 (no compression) to 9 (best compression). Defaults to 9. + @type compresslevel: int + + This method returns a TZlibTransport which wraps the + passed C{trans} TTransport derived instance. + """ + if trans == self._last_trans: + return self._last_z + ztrans = TZlibTransport(trans, compresslevel) + self._last_trans = trans + self._last_z = ztrans + return ztrans + + +class TZlibTransport(TTransportBase, CReadableTransport): + """Class that wraps a transport with zlib, compressing writes + and decompresses reads, using the python standard + library zlib module. + """ + # Read buffer size for the python fastbinary C extension, + # the TBinaryProtocolAccelerated class. + DEFAULT_BUFFSIZE = 4096 + + def __init__(self, trans, compresslevel=9): + """Create a new TZlibTransport, wrapping C{trans}, another + TTransport derived object. + + @param trans: A thrift transport object, i.e. a TSocket() object. + @type trans: TTransport + @param compresslevel: The zlib compression level, ranging + from 0 (no compression) to 9 (best compression). Default is 9. + @type compresslevel: int + """ + self.__trans = trans + self.compresslevel = compresslevel + self.__rbuf = StringIO() + self.__wbuf = StringIO() + self._init_zlib() + self._init_stats() + + def _reinit_buffers(self): + """Internal method to initialize/reset the internal StringIO objects + for read and write buffers. + """ + self.__rbuf = StringIO() + self.__wbuf = StringIO() + + def _init_stats(self): + """Internal method to reset the internal statistics counters + for compression ratios and bandwidth savings. + """ + self.bytes_in = 0 + self.bytes_out = 0 + self.bytes_in_comp = 0 + self.bytes_out_comp = 0 + + def _init_zlib(self): + """Internal method for setting up the zlib compression and + decompression objects. + """ + self._zcomp_read = zlib.decompressobj() + self._zcomp_write = zlib.compressobj(self.compresslevel) + + def getCompRatio(self): + """Get the current measured compression ratios (in,out) from + this transport. + + Returns a tuple of: + (inbound_compression_ratio, outbound_compression_ratio) + + The compression ratios are computed as: + compressed / uncompressed + + E.g., data that compresses by 10x will have a ratio of: 0.10 + and data that compresses to half of ts original size will + have a ratio of 0.5 + + None is returned if no bytes have yet been processed in + a particular direction. + """ + r_percent, w_percent = (None, None) + if self.bytes_in > 0: + r_percent = self.bytes_in_comp / self.bytes_in + if self.bytes_out > 0: + w_percent = self.bytes_out_comp / self.bytes_out + return (r_percent, w_percent) + + def getCompSavings(self): + """Get the current count of saved bytes due to data + compression. + + Returns a tuple of: + (inbound_saved_bytes, outbound_saved_bytes) + + Note: if compression is actually expanding your + data (only likely with very tiny thrift objects), then + the values returned will be negative. + """ + r_saved = self.bytes_in - self.bytes_in_comp + w_saved = self.bytes_out - self.bytes_out_comp + return (r_saved, w_saved) + + def isOpen(self): + """Return the underlying transport's open status""" + return self.__trans.isOpen() + + def open(self): + """Open the underlying transport""" + self._init_stats() + return self.__trans.open() + + def listen(self): + """Invoke the underlying transport's listen() method""" + self.__trans.listen() + + def accept(self): + """Accept connections on the underlying transport""" + return self.__trans.accept() + + def close(self): + """Close the underlying transport,""" + self._reinit_buffers() + self._init_zlib() + return self.__trans.close() + + def read(self, sz): + """Read up to sz bytes from the decompressed bytes buffer, and + read from the underlying transport if the decompression + buffer is empty. + """ + ret = self.__rbuf.read(sz) + if len(ret) > 0: + return ret + # keep reading from transport until something comes back + while True: + if self.readComp(sz): + break + ret = self.__rbuf.read(sz) + return ret + + def readComp(self, sz): + """Read compressed data from the underlying transport, then + decompress it and append it to the internal StringIO read buffer + """ + zbuf = self.__trans.read(sz) + zbuf = self._zcomp_read.unconsumed_tail + zbuf + buf = self._zcomp_read.decompress(zbuf) + self.bytes_in += len(zbuf) + self.bytes_in_comp += len(buf) + old = self.__rbuf.read() + self.__rbuf = StringIO(old + buf) + if len(old) + len(buf) == 0: + return False + return True + + def write(self, buf): + """Write some bytes, putting them into the internal write + buffer for eventual compression. + """ + self.__wbuf.write(buf) + + def flush(self): + """Flush any queued up data in the write buffer and ensure the + compression buffer is flushed out to the underlying transport + """ + wout = self.__wbuf.getvalue() + if len(wout) > 0: + zbuf = self._zcomp_write.compress(wout) + self.bytes_out += len(wout) + self.bytes_out_comp += len(zbuf) + else: + zbuf = '' + ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH) + self.bytes_out_comp += len(ztail) + if (len(zbuf) + len(ztail)) > 0: + self.__wbuf = StringIO() + self.__trans.write(zbuf + ztail) + self.__trans.flush() + + @property + def cstringio_buf(self): + """Implement the CReadableTransport interface""" + return self.__rbuf + + def cstringio_refill(self, partialread, reqlen): + """Implement the CReadableTransport interface for refill""" + retstring = partialread + if reqlen < self.DEFAULT_BUFFSIZE: + retstring += self.read(self.DEFAULT_BUFFSIZE) + while len(retstring) < reqlen: + retstring += self.read(reqlen - len(retstring)) + self.__rbuf = StringIO(retstring) + return self.__rbuf diff --git a/lib/thrift/transport/__init__.py b/lib/thrift/transport/__init__.py new file mode 100644 index 0000000..c9596d9 --- /dev/null +++ b/lib/thrift/transport/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +__all__ = ['TTransport', 'TSocket', 'THttpClient', 'TZlibTransport'] diff --git a/lib/thrift/transport/__init__.pyc b/lib/thrift/transport/__init__.pyc Binary files differnew file mode 100644 index 0000000..dcbb012 --- /dev/null +++ b/lib/thrift/transport/__init__.pyc diff --git a/lib/util/__init__.py b/lib/util/__init__.py new file mode 100644 index 0000000..850a751 --- /dev/null +++ b/lib/util/__init__.py @@ -0,0 +1 @@ +__all__ = ['netutil','miscutil'] diff --git a/lib/util/__init__.pyc b/lib/util/__init__.pyc Binary files differnew file mode 100644 index 0000000..9226731 --- /dev/null +++ b/lib/util/__init__.pyc diff --git a/lib/util/miscutil.py b/lib/util/miscutil.py new file mode 100644 index 0000000..4afa565 --- /dev/null +++ b/lib/util/miscutil.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python + +# +# Licence statement goes here +# + +def read_templatefile(temp_filename): + f = open(temp_filename, 'r') + lines = f.read().splitlines() + + return lines + diff --git a/lib/util/miscutil.pyc b/lib/util/miscutil.pyc Binary files differnew file mode 100644 index 0000000..ffc4524 --- /dev/null +++ b/lib/util/miscutil.pyc diff --git a/lib/util/netutil.py b/lib/util/netutil.py new file mode 100644 index 0000000..f8cc2d4 --- /dev/null +++ b/lib/util/netutil.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python + +# +# Licence statement goes here +# + +import socket +def get_ip(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # doesn't even have to be reachable + s.connect(('192.255.255.255', 1)) + IP = s.getsockname()[0] + except: + IP = '127.0.0.1' + finally: + s.close() + return IP diff --git a/lib/util/netutil.pyc b/lib/util/netutil.pyc Binary files differnew file mode 100644 index 0000000..ad3953e --- /dev/null +++ b/lib/util/netutil.pyc |