aboutsummaryrefslogtreecommitdiffstats
path: root/DominoClient.py
diff options
context:
space:
mode:
Diffstat (limited to 'DominoClient.py')
-rwxr-xr-xDominoClient.py352
1 files changed, 352 insertions, 0 deletions
diff --git a/DominoClient.py b/DominoClient.py
new file mode 100755
index 0000000..51a765b
--- /dev/null
+++ b/DominoClient.py
@@ -0,0 +1,352 @@
+#!/usr/bin/env python
+
+#Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import sys, glob, threading
+import getopt, socket
+import logging
+
+#sys.path.append('gen-py')
+#sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
+sys.path.insert(0, glob.glob('./lib')[0])
+
+from dominoRPC import Communication
+from dominoRPC.ttypes import *
+from dominoRPC.constants import *
+
+from thrift import Thrift
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+from util import *
+
+#Load configuration parameters
+from domino_conf import *
+
+class CommunicationHandler:
+ def __init__(self):
+ self.log = {}
+
+ def __init__(self, dominoclient):
+ global DOMINO_SERVER_IP, DOMINO_SERVER_PORT
+ self.log = {}
+ self.dominoClient = dominoclient
+ try:
+ # Make socket
+ transport = TSocket.TSocket(DOMINO_SERVER_IP, DOMINO_SERVER_PORT)
+ transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
+ # Add buffering to compensate for slow raw sockets
+ self.transport = TTransport.TBufferedTransport(transport)
+ # Wrap in a protocol
+ self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
+ # Create a client to use the protocol encoder
+ self.sender = Communication.Client(self.protocol)
+ except Thrift.TException, tx:
+ logging.error('%s' , tx.message)
+
+ # Template Push from Domino Server is received
+ # Actions:
+ # - Depending on Controller Domain, call API
+ # - Respond Back with Push Response
+ def d_push(self, push_msg):
+ logging.info('%d Received Template File', self.dominoClient.UDID)
+ # Retrieve the template file
+
+ ## End of retrieval
+
+ # Any inspection code goes here
+
+ ## End of inspection
+
+ # Call NB API
+ # If heat client, call heat command
+
+ # If ONOS client, run as shell script
+
+
+ ## End of NB API call
+
+ # Marshall the response message for the Domino Server Fill
+ push_r = PushResponseMessage()
+ # Fill response message fields
+ push_r.domino_udid = self.dominoClient.UDID
+ push_r.seq_no = self.dominoClient.seqno
+ push_r.responseCode = SUCCESS
+ ## End of filling
+
+ self.dominoClient.seqno = self.dominoClient.seqno + 1
+
+ return push_r
+
+
+ def openconnection(self):
+ self.transport.open()
+
+ def closeconnection():
+ self.transport.close()
+
+def read_templatefile(temp_filename):
+ f = open(temp_filename, 'r')
+ lines = f.read().splitlines()
+
+ return lines
+
+class DominoClientCLIService(threading.Thread):
+ def __init__(self, dominoclient, communicationhandler):
+ threading.Thread.__init__(self)
+ self.dominoclient = dominoclient
+ self.communicationhandler = communicationhandler
+
+ def run(self):
+ global DEFAULT_TOSCA_PUBFILE
+ while True:
+ sys.stdout.write('>>')
+ input_string = raw_input()
+ args = input_string.split()
+ if len(args) == 0:
+ continue
+
+ labels = []
+ templateTypes = []
+
+ #process input arguments
+ try:
+ sys.stdout.write('>>')
+ if args[0] == 'heartbeat':
+ logging.info('%d Sending heatbeat', self.dominoclient.UDID)
+ hbm = HeartBeatMessage()
+ hbm.domino_udid = self.dominoclient.UDID
+ hbm.seq_no = self.dominoclient.seqno
+ try:
+ hbm_r = self.communicationhandler.sender.d_heartbeat(hbm)
+ logging.info('heart beat received from: %d ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no)
+ except (Thrift.TException, TSocket.TTransportException) as tx:
+ logging.error('%s' , tx.message)
+ except (socket.timeout) as tx:
+ self.dominoclient.handle_RPC_timeout(hbm)
+ except:
+ logging.error('Unexpected error: %s', sys.exc_info()[0])
+ self.dominoclient.seqno = self.dominoclient.seqno + 1
+
+ elif args[0] == 'publish':
+ opts, args = getopt.getopt(args[1:],"t:",["tosca-file="])
+ if len(opts) == 0:
+ print '\nUsage: publish -t <toscafile>'
+ continue
+
+ #toscafile = DEFAULT_TOSCA_PUBFILE
+ for opt, arg in opts:
+ if opt in ('-t', '--tosca-file'):
+ toscafile = arg
+
+ pub_msg = PublishMessage()
+ pub_msg.domino_udid = self.dominoclient.UDID
+ pub_msg.seq_no = self.dominoclient.seqno
+ pub_msg.template_type = 'tosca-nfv-v1.0'
+ try:
+ pub_msg.template = read_templatefile(toscafile)
+ except IOError as e:
+ logging.error('I/O error(%d): %s' , e.errno, e.strerror)
+ continue
+ logging.info('Publishing the template file: ' + toscafile)
+ try:
+ pub_msg_r = self.communicationhandler.sender.d_publish(pub_msg)
+ logging.info('Publish Response is received from: %d ,sequence number: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no)
+ except (Thrift.TException, TSocket.TTransportException) as tx:
+ print '%s' % (tx.message)
+ except (socket.timeout) as tx:
+ self.dominoclient.handle_RPC_timeout(pub_msg)
+
+ self.dominoclient.seqno = self.dominoclient.seqno + 1
+
+ elif args[0] == 'subscribe':
+ opts, args = getopt.getopt(args[1:],"l:t:",["labels=","ttype="])
+ for opt, arg in opts:
+ if opt in ('-l', '--labels'):
+ labels = labels + arg.split(',')
+ elif opt in ('-t', '--ttype'):
+ templateTypes = templateTypes + arg.split(',')
+
+ elif args[0] == 'register':
+ self.dominoclient.start()
+
+ except getopt.GetoptError:
+ print 'Command is misentered or not supported!'
+
+
+ #check if labels or supported templates are nonempty
+ if labels != [] or templateTypes != []:
+ #send subscription message
+ sub_msg = SubscribeMessage()
+ sub_msg.domino_udid = self.dominoclient.UDID
+ sub_msg.seq_no = self.dominoclient.seqno
+ sub_msg.template_op = APPEND
+ sub_msg.supported_template_types = templateTypes
+ sub_msg.label_op = APPEND
+ sub_msg.labels = labels
+ logging.info('subscribing labels %s and templates %s', labels, templateTypes)
+ try:
+ sub_msg_r = self.communicationhandler.sender.d_subscribe(sub_msg)
+ logging.info('Subscribe Response is received from: %d ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no)
+ except (Thrift.TException, TSocket.TTransportException) as tx:
+ logging.error('%s' , tx.message)
+ except (socket.timeout) as tx:
+ self.dominoclient.handle_RPC_timeout(sub_msg)
+
+ self.dominoclient.seqno = self.dominoclient.seqno + 1
+
+class DominoClient:
+ def __init__(self):
+ self.communicationHandler = CommunicationHandler(self)
+ self.processor = None
+ self.transport = None
+ self.tfactory = None
+ self.pfactory = None
+ self.communicationServer = None
+
+ self.CLIservice = DominoClientCLIService(self, self.communicationHandler)
+
+ self.serviceport = 9091
+ self.dominoserver_IP = 'localhost'
+
+ #Start from UNREGISTERED STATE
+ #TO BE DONE: initialize from a saved state
+ self.state = 'UNREGISTERED'
+ self.seqno = 0
+ self.UDID = 1
+
+ def start_communicationService(self):
+ self.processor = Communication.Processor(self.communicationHandler)
+ self.transport = TSocket.TServerSocket(port=int(self.serviceport))
+ self.tfactory = TTransport.TBufferedTransportFactory()
+ self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+ #Use TThreadedServer or TThreadPoolServer for a multithreaded server
+ #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
+ self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
+
+ self.communicationServer.serve()
+
+ def start(self):
+ try:
+ self.communicationHandler.openconnection()
+ self.register()
+ except Thrift.TException, tx:
+ print '%s' % (tx.message)
+
+ def register(self):
+ if self.state == 'UNREGISTERED':
+ #prepare registration message
+ reg_msg = RegisterMessage()
+ reg_msg.domino_udid_desired = UDID_DESIRED
+ reg_msg.seq_no = self.seqno
+ reg_msg.ipaddr = netutil.get_ip()
+ reg_msg.tcpport = self.serviceport
+ reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES
+
+ try:
+ reg_msg_r = self.sender().d_register(reg_msg)
+ logging.info('Registration Response: Response Code: %d' , reg_msg_r.responseCode)
+ if reg_msg_r.comments:
+ logging.debug('Response Comments: %s' , reg_msg_r.comments)
+
+ if reg_msg_r.responseCode == SUCCESS:
+ self.state = 'REGISTERED'
+ self.UDID = reg_msg_r.domino_udid_assigned
+ else:
+ #Handle registration failure here (possibly based on reponse comments)
+ pass
+ except (Thrift.TException, TSocket.TTransportException) as tx:
+ logging.error('%s' , tx.message)
+ except (socket.timeout) as tx:
+ self.dominoclient.handle_RPC_timeout(pub_msg)
+ except (socket.error) as tx:
+ logging.error('%s' , tx.message)
+ self.seqno = self.seqno + 1
+
+ def stop(self):
+ try:
+ self.communicationHandler.closeconnection()
+ except Thrift.TException, tx:
+ logging.error('%s' , tx.message)
+
+ def sender(self):
+ return self.communicationHandler.sender
+
+ def startCLI(self):
+ logging.info('CLI Service is starting')
+ self.CLIservice.start()
+ #to wait until CLI service is finished
+ #self.CLIservice.join()
+
+ def set_serviceport(self, port):
+ self.serviceport = port
+
+ def set_dominoserver_ipaddr(self, ipaddr):
+ self.dominoserver_IP = ipaddr
+
+ def handle_RPC_timeout(self, RPCmessage):
+ # TBD: handle each RPC timeout separately
+ if RPCmessage.messageType == HEART_BEAT:
+ logging.debug('RPC Timeout for message type: HEART_BEAT')
+ elif RPCmessage.messageType == PUBLISH:
+ logging.debug('RPC Timeout for message type: PUBLISH')
+ elif RPCmessage.messageType == SUBSCRIBE:
+ logging.debug('RPC Timeout for message type: SUBSCRIBE')
+ elif RPCmessage.messageType == REGISTER:
+ logging.debug('RPC Timeout for message type: REGISTER')
+ elif RPCmessage.messageType == QUERY:
+ logging.debug('RPC Timeout for message type: QUERY')
+
+def main(argv):
+ client = DominoClient()
+ loglevel = 'WARNING'
+ #process input arguments
+ try:
+ opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log="])
+ except getopt.GetoptError:
+ print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
+ sys.exit(2)
+ for opt, arg in opts:
+ if opt == '-h':
+ print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
+ sys.exit()
+ elif opt in ("-c", "--conf"):
+ configfile = arg
+ elif opt in ("-p", "--port"):
+ client.set_serviceport(int(arg))
+ elif opt in ("-i", "--ipaddr"):
+ client.set_dominoserver_ipaddr(arg)
+ elif opt in ("--log"):
+ loglevel = arg
+
+ #Set logging level
+ numeric_level = getattr(logging, loglevel.upper(), None)
+ try:
+ if not isinstance(numeric_level, int):
+ raise ValueError('Invalid log level: %s' % loglevel)
+ logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
+ except ValueError, ex:
+ print ex.message
+ exit()
+
+ #The client is starting
+ logging.debug('Domino Client Starting...')
+ client.start()
+ client.startCLI()
+ client.start_communicationService()
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
+