diff options
author | Ulas Kozat <ulas.kozat@gmail.com> | 2016-05-20 14:42:39 -0700 |
---|---|---|
committer | Ulas Kozat <ulas.kozat@gmail.com> | 2016-05-20 14:42:39 -0700 |
commit | 57a23f93ae803640517206de67f33f2fc442ca4c (patch) | |
tree | c486d8385cfa1fba2144c82ce43623e36acb770f | |
parent | 7893dd02c15f2a727eb50887c9ddb829fc71f556 (diff) |
added features: logger, exception handling, CLI changes, move constants to domino_conf.py; fixed bug: packing UDID generated as UINT64 by the random number generator to INT64
Change-Id: Id79e79050d1a1011d3548bb4445792354e7b63d0
Signed-off-by: Ulas Kozat <ulas.kozat@gmail.com>
-rwxr-xr-x | DominoClient.py | 158 | ||||
-rwxr-xr-x | DominoServer.py | 111 | ||||
-rw-r--r-- | README.md | 19 | ||||
-rw-r--r-- | domino_conf.py | 18 | ||||
-rw-r--r-- | lib/dominoRPC/constants.py | 2 | ||||
-rw-r--r-- | lib/dominoRPC/constants.pyc | bin | 719 -> 761 bytes |
6 files changed, 221 insertions, 87 deletions
diff --git a/DominoClient.py b/DominoClient.py index a705288..51a765b 100755 --- a/DominoClient.py +++ b/DominoClient.py @@ -1,11 +1,20 @@ #!/usr/bin/env python -# -# Licence statement goes here -# +#Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import sys, glob, threading -import getopt +import getopt, socket +import logging #sys.path.append('gen-py') #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0]) @@ -23,16 +32,8 @@ from thrift.server import TServer from util import * -CLIENT_UDID = 1 -CLIENT_SEQNO = 0 - -DOMINO_SERVER_IP = 'localhost' -DOMINO_SERVER_PORT = 9090 - -UDID_DESIRED = 12467 -LIST_SUPPORTED_TEMPLATES = ['tosca-nfv-v1.0'] -#DEFAULT_TOSCA_PUBFILE = './tosca-templates/tosca_simpleVNF.yaml' -DEFAULT_TOSCA_PUBFILE = './tosca-templates/tosca_helloworld_nfv.yaml' +#Load configuration parameters +from domino_conf import * class CommunicationHandler: def __init__(self): @@ -45,6 +46,7 @@ class CommunicationHandler: try: # Make socket transport = TSocket.TSocket(DOMINO_SERVER_IP, DOMINO_SERVER_PORT) + transport.setTimeout(THRIFT_RPC_TIMEOUT_MS) # Add buffering to compensate for slow raw sockets self.transport = TTransport.TBufferedTransport(transport) # Wrap in a protocol @@ -52,14 +54,14 @@ class CommunicationHandler: # Create a client to use the protocol encoder self.sender = Communication.Client(self.protocol) except Thrift.TException, tx: - print '%s' % (tx.message) + logging.error('%s' , tx.message) # Template Push from Domino Server is received # Actions: # - Depending on Controller Domain, call API # - Respond Back with Push Response def d_push(self, push_msg): - print 'Received Template File' + logging.info('%d Received Template File', self.dominoClient.UDID) # Retrieve the template file ## End of retrieval @@ -123,12 +125,19 @@ class DominoClientCLIService(threading.Thread): try: sys.stdout.write('>>') if args[0] == 'heartbeat': - print '\nSending heatbeat' + logging.info('%d Sending heatbeat', self.dominoclient.UDID) hbm = HeartBeatMessage() hbm.domino_udid = self.dominoclient.UDID hbm.seq_no = self.dominoclient.seqno - hbm_r = self.communicationhandler.sender.d_heartbeat(hbm) - print 'heart beat received from: %d ,sequence number: %d' % (hbm_r.domino_udid, hbm_r.seq_no) + try: + hbm_r = self.communicationhandler.sender.d_heartbeat(hbm) + logging.info('heart beat received from: %d ,sequence number: %d' , hbm_r.domino_udid, hbm_r.seq_no) + except (Thrift.TException, TSocket.TTransportException) as tx: + logging.error('%s' , tx.message) + except (socket.timeout) as tx: + self.dominoclient.handle_RPC_timeout(hbm) + except: + logging.error('Unexpected error: %s', sys.exc_info()[0]) self.dominoclient.seqno = self.dominoclient.seqno + 1 elif args[0] == 'publish': @@ -149,11 +158,17 @@ class DominoClientCLIService(threading.Thread): try: pub_msg.template = read_templatefile(toscafile) except IOError as e: - print "I/O error({0}): {1}".format(e.errno, e.strerror) + logging.error('I/O error(%d): %s' , e.errno, e.strerror) continue - print '\nPublishing the template file: ' + toscafile - pub_msg_r = self.communicationhandler.sender.d_publish(pub_msg) - print 'Publish Response is received from: %d ,sequence number: %d' % (pub_msg_r.domino_udid, pub_msg_r.seq_no) + logging.info('Publishing the template file: ' + toscafile) + try: + pub_msg_r = self.communicationhandler.sender.d_publish(pub_msg) + logging.info('Publish Response is received from: %d ,sequence number: %d', pub_msg_r.domino_udid, pub_msg_r.seq_no) + except (Thrift.TException, TSocket.TTransportException) as tx: + print '%s' % (tx.message) + except (socket.timeout) as tx: + self.dominoclient.handle_RPC_timeout(pub_msg) + self.dominoclient.seqno = self.dominoclient.seqno + 1 elif args[0] == 'subscribe': @@ -163,7 +178,10 @@ class DominoClientCLIService(threading.Thread): labels = labels + arg.split(',') elif opt in ('-t', '--ttype'): templateTypes = templateTypes + arg.split(',') - + + elif args[0] == 'register': + self.dominoclient.start() + except getopt.GetoptError: print 'Command is misentered or not supported!' @@ -178,14 +196,19 @@ class DominoClientCLIService(threading.Thread): sub_msg.supported_template_types = templateTypes sub_msg.label_op = APPEND sub_msg.labels = labels - print 'subscribing labels %s and templates %s' % (labels,templateTypes) - sub_msg_r = self.communicationhandler.sender.d_subscribe(sub_msg) - print 'Subscribe Response is received from: %d ,sequence number: %d' % (sub_msg_r.domino_udid,sub_msg_r.seq_no) + logging.info('subscribing labels %s and templates %s', labels, templateTypes) + try: + sub_msg_r = self.communicationhandler.sender.d_subscribe(sub_msg) + logging.info('Subscribe Response is received from: %d ,sequence number: %d', sub_msg_r.domino_udid,sub_msg_r.seq_no) + except (Thrift.TException, TSocket.TTransportException) as tx: + logging.error('%s' , tx.message) + except (socket.timeout) as tx: + self.dominoclient.handle_RPC_timeout(sub_msg) + self.dominoclient.seqno = self.dominoclient.seqno + 1 class DominoClient: def __init__(self): - self.log = {} self.communicationHandler = CommunicationHandler(self) self.processor = None self.transport = None @@ -218,9 +241,11 @@ class DominoClient: def start(self): try: self.communicationHandler.openconnection() + self.register() except Thrift.TException, tx: print '%s' % (tx.message) - + + def register(self): if self.state == 'UNREGISTERED': #prepare registration message reg_msg = RegisterMessage() @@ -230,59 +255,72 @@ class DominoClient: reg_msg.tcpport = self.serviceport reg_msg.supported_templates = LIST_SUPPORTED_TEMPLATES - reg_msg_r = self.sender().d_register(reg_msg) - print 'Registration Response:\n' - print 'Response Code: %d' % (reg_msg_r.responseCode) - print 'Response Comments:' - if reg_msg_r.comments: - print reg_msg_r.comments - - if reg_msg_r.responseCode == SUCCESS: - self.state = 'REGISTERED' - self.UDID = reg_msg_r.domino_udid_assigned - else: - #Handle registration failure here (possibly based on reponse comments) - pass - + try: + reg_msg_r = self.sender().d_register(reg_msg) + logging.info('Registration Response: Response Code: %d' , reg_msg_r.responseCode) + if reg_msg_r.comments: + logging.debug('Response Comments: %s' , reg_msg_r.comments) + + if reg_msg_r.responseCode == SUCCESS: + self.state = 'REGISTERED' + self.UDID = reg_msg_r.domino_udid_assigned + else: + #Handle registration failure here (possibly based on reponse comments) + pass + except (Thrift.TException, TSocket.TTransportException) as tx: + logging.error('%s' , tx.message) + except (socket.timeout) as tx: + self.dominoclient.handle_RPC_timeout(pub_msg) + except (socket.error) as tx: + logging.error('%s' , tx.message) self.seqno = self.seqno + 1 def stop(self): try: self.communicationHandler.closeconnection() except Thrift.TException, tx: - print '%s' % (tx.message) + logging.error('%s' , tx.message) def sender(self): return self.communicationHandler.sender def startCLI(self): - print 'CLI Service is starting' + logging.info('CLI Service is starting') self.CLIservice.start() #to wait until CLI service is finished #self.CLIservice.join() def set_serviceport(self, port): self.serviceport = port - print 'port: ' - print self.serviceport def set_dominoserver_ipaddr(self, ipaddr): self.dominoserver_IP = ipaddr - print 'ip addr: ' - print self.dominoserver_IP + + def handle_RPC_timeout(self, RPCmessage): + # TBD: handle each RPC timeout separately + if RPCmessage.messageType == HEART_BEAT: + logging.debug('RPC Timeout for message type: HEART_BEAT') + elif RPCmessage.messageType == PUBLISH: + logging.debug('RPC Timeout for message type: PUBLISH') + elif RPCmessage.messageType == SUBSCRIBE: + logging.debug('RPC Timeout for message type: SUBSCRIBE') + elif RPCmessage.messageType == REGISTER: + logging.debug('RPC Timeout for message type: REGISTER') + elif RPCmessage.messageType == QUERY: + logging.debug('RPC Timeout for message type: QUERY') def main(argv): client = DominoClient() - + loglevel = 'WARNING' #process input arguments try: - opts, args = getopt.getopt(argv,"hc:p:i:",["conf=","port=","ipaddr="]) + opts, args = getopt.getopt(argv,"hc:p:i:l:",["conf=","port=","ipaddr=","log="]) except getopt.GetoptError: - print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr>' + print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>' sys.exit(2) for opt, arg in opts: if opt == '-h': - print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr>' + print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>' sys.exit() elif opt in ("-c", "--conf"): configfile = arg @@ -290,9 +328,21 @@ def main(argv): client.set_serviceport(int(arg)) elif opt in ("-i", "--ipaddr"): client.set_dominoserver_ipaddr(arg) + elif opt in ("--log"): + loglevel = arg + #Set logging level + numeric_level = getattr(logging, loglevel.upper(), None) + try: + if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % loglevel) + logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') + except ValueError, ex: + print ex.message + exit() + #The client is starting - print 'Starting the client...' + logging.debug('Domino Client Starting...') client.start() client.startCLI() client.start_communicationService() diff --git a/DominoServer.py b/DominoServer.py index 7d7a977..4e8836c 100755 --- a/DominoServer.py +++ b/DominoServer.py @@ -1,11 +1,19 @@ #!/usr/bin/env python -# -# Licence statement goes here -# - +#Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import sys, os, glob, random, errno +import getopt, socket +import logging, json #sys.path.append('gen-py') #sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0]) sys.path.insert(0, glob.glob('./lib')[0]) @@ -29,11 +37,9 @@ from mapper import * from partitioner import * from util import miscutil -SERVER_UDID = 0 -DOMINO_CLIENT_IP = 'localhost' -DOMINO_CLIENT_PORT = 9091 -TOSCADIR = './toscafiles/' -TOSCA_DEFAULT_FNAME = 'template1.yaml' +#Load configuration parameters +from domino_conf import * + class CommunicationHandler: def __init__(self): @@ -48,6 +54,7 @@ class CommunicationHandler: try: # Make socket transport = TSocket.TSocket(ipaddr, tcpport) + transport.setTimeout(THRIFT_RPC_TIMEOUT_MS) # Add buffering to compensate for slow raw sockets self.transport = TTransport.TBufferedTransport(transport) # Wrap in a protocol @@ -56,24 +63,30 @@ class CommunicationHandler: self.sender = Communication.Client(self.protocol) self.transport.open() except Thrift.TException, tx: - print '%s' % (tx.message) + logging.error('%s' , tx.message) + def closeconnection(self): self.transport.close() def push_template(self,template,ipaddr,tcpport): - global SERVER_UDID self.openconnection(ipaddr,tcpport) pushm = PushMessage() pushm.domino_udid = SERVER_UDID pushm.seq_no = self.seqno pushm.template_type = 'tosca-nfv-v1.0' pushm.template = template + try: + push_r = self.sender.d_push(pushm) + logging.info('Push Response received from %d' , push_r.domino_udid) + except (Thrift.TException, TSocket.TTransportException) as tx: + logging.error('%s' , tx.message) + except (socket.timeout) as tx: + self.dominoServer.handle_RPC_timeout(pushm) + except: + logging.error('Unexpected error: %s', sys.exc_info()[0]) - push_r = self.sender.d_push(pushm) - - print 'Push Response received from %d' % push_r.domino_udid self.seqno = self.seqno + 1 self.closeconnection() @@ -84,7 +97,7 @@ class CommunicationHandler: def d_heartbeat(self, hb_msg): global SERVER_UDID - print 'heart beat received from %d' % hb_msg.domino_udid + logging.info('heartbeat received from %d' , hb_msg.domino_udid) hb_r = HeartBeatMessage() hb_r.domino_udid = SERVER_UDID @@ -103,7 +116,7 @@ class CommunicationHandler: #Prepare and send Registration Response reg_r = RegisterResponseMessage() - print 'Registration Request received for UDID %d from IP: %s port: %d ' % (reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport) + logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport) reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired) @@ -122,6 +135,12 @@ class CommunicationHandler: # Store the Domino Client info # TBD: check the sequence number to ensure the most recent record is saved self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg + data = {} + data[reg_r.domino_udid_assigned] = [reg_msg.ipaddr, reg_msg.tcpport, reg_msg.supported_templates, reg_msg.seq_no] + with open(SERVER_DBFILE, 'a') as f: + json.dump(data, f) + f.close() + return reg_r @@ -131,7 +150,7 @@ class CommunicationHandler: # - Respond Back with Subscription Response def d_subscribe(self, sub_msg): global SERVER_UDID, SERVER_SEQNO - print 'Subscribe Request received from %d' % sub_msg.domino_udid + logging.info('Subscribe Request received from %d' , sub_msg.domino_udid) if sub_msg.template_op == APPEND: if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid): @@ -154,8 +173,8 @@ class CommunicationHandler: elif sub_msg.label_op == DELETE: self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels)) - print 'Supported Template: %s' % self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] - print 'Supported Labels: %s' % self.dominoServer.subscribed_labels[sub_msg.domino_udid] + logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid]) + #Fill in the details sub_r = SubscribeResponseMessage() sub_r.domino_udid = SERVER_UDID @@ -172,17 +191,17 @@ class CommunicationHandler: # - Respond Back with Publication Response def d_publish(self, pub_msg): global SERVER_UDID, SERVER_SEQNO, TOSCADIR, TOSCA_DEFAULT_FNAME - print 'Publish Request received from %d' % pub_msg.domino_udid - print pub_msg.template + logging.info('Publish Request received from %d' , pub_msg.domino_udid) + logging.debug(pub_msg.template) # Save as file try: os.makedirs(TOSCADIR) except OSError as exception: if exception.errno == errno.EEXIST: - print TOSCADIR, ' exists. Creating: ' , TOSCADIR+TOSCA_DEFAULT_FNAME + logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR, TOSCADIR+TOSCA_DEFAULT_FNAME) else: - print 'Error occurred in creating the directory. Err no: ', exception.errno + logging.error('Error occurred in creating %s. Err no: %d', exception.errno) #Risking a race condition if another process is attempting to write to same file f = open(TOSCADIR+TOSCA_DEFAULT_FNAME, 'w') @@ -195,15 +214,15 @@ class CommunicationHandler: # Extract Labels node_labels = label.extract_labels( tosca ) - print '\nNode Labels: \n', node_labels + logging.debug('Node Labels: %s', node_labels) # Map nodes in the template to resource domains site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels ) - print '\nSite Maps: \n' , site_map + logging.debug('Site Maps: %s' , site_map) # Select a site for each VNF node_site = label.select_site( site_map ) - print '\nSelected Sites:\n' , node_site , '\n' + logging.debug('Selected Sites: %s', node_site) # Create per-domain Tosca files file_paths = partitioner.partition_tosca('./toscafiles/template1.yaml',node_site,tosca.tpl) @@ -219,7 +238,6 @@ class CommunicationHandler: domino_client_ip = self.dominoServer.registration_record[site].ipaddr domino_client_port = self.dominoServer.registration_record[site].tcpport self.push_template(miscutil.read_templatefile(file_paths[site]), domino_client_ip, domino_client_port) - # self.push_template(pub_msg.template, DOMINO_CLIENT_IP, DOMINO_CLIENT_PORT) #Fill in the details pub_r = PublishResponseMessage() @@ -242,7 +260,6 @@ class CommunicationHandler: class DominoServer: def __init__(self): - self.log = {} self.assignedUUIDs = list() self.subscribed_labels = dict() self.subscribed_templateformats = dict() @@ -256,6 +273,7 @@ class DominoServer: #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory) self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory) + def start_communicationService(self): self.communicationServer.serve() @@ -266,20 +284,49 @@ class DominoServer: #If assigned, offer a new random id def assign_udid(self, udid_desired): if udid_desired in self.assignedUUIDs: - new_udid = random.getrandbits(64) + new_udid = random.getrandbits(63) while new_udid in self.assignedUUIDs: - new_udid = random.getrandbits(64) + new_udid = random.getrandbits(63) self.assignedUUIDs.append(new_udid) return new_udid else: self.assignedUUIDs.append(udid_desired) return udid_desired - + + def handle_RPC_timeout(self, RPCmessage): + if RPCmessage.messageType == PUSH: + logging.debug('RPC Timeout for message type: PUSH') + # TBD: handle each RPC timeout separately def main(argv): server = DominoServer() - print 'Starting the server...' + + #process input arguments + try: + opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="]) + except getopt.GetoptError: + print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>' + sys.exit(2) + for opt, arg in opts: + if opt == '-h': + print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>' + sys.exit() + elif opt in ("-c", "--conf"): + configfile = arg + elif opt in ("--log"): + loglevel= arg + #Set logging level + numeric_level = getattr(logging, loglevel.upper(), None) + try: + if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % loglevel) + logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') + except ValueError, ex: + print ex.message + sys.exit(2) + + logging.debug('Domino Server Starting...') server.start_communicationService() print 'done.' @@ -22,7 +22,24 @@ Tested on Ubuntu 14.04 and OS X El Capitan heartbeat ###subscribe for policy labels - subscribe -l/--labels <policytype>:properties:key:value + subscribe -l/--label <policytype>:properties:key:value + +Example: +First checkout the tosca file "./tosca-templates/tosca_helloworld_nfv.yaml" and see how policy types and rules are defined. Then, from any Domino Client, use subscribe command as: + subscribe --label tosca.policies.Placement.Geolocation:properties:region:us-west-1 + ###publish default template file under tosca-templates publish --tosca-file <path_to_toscafile> + +Example: +Run the following command from any Domino Client: + + publish --tosca-file ./tosca-templates/tosca_helloworld_nfv.yaml + +Now, inspect the files generated under ./toscafiles, where the original file as well as parts sent to each Domino Client are shown (each part identified by UDID assigned to that client) + +##NOTES + If accidentally you start DominoClient before DominoServer, don't panic. First start the DominoServer and then input the command on the DominoClient side: + + register diff --git a/domino_conf.py b/domino_conf.py new file mode 100644 index 0000000..3ce442b --- /dev/null +++ b/domino_conf.py @@ -0,0 +1,18 @@ +logfile = None + +#Client Parameters +CLIENT_UDID = 1 +CLIENT_SEQNO = 0 + +DOMINO_SERVER_IP = 'localhost' +DOMINO_SERVER_PORT = 9090 + +UDID_DESIRED = 12467 +LIST_SUPPORTED_TEMPLATES = ['tosca-nfv-v1.0'] +DEFAULT_TOSCA_PUBFILE = './tosca-templates/tosca_helloworld_nfv.yaml' + +#Server Parameters +SERVER_UDID = 0 +TOSCADIR = './toscafiles/' +TOSCA_DEFAULT_FNAME = 'template1.yaml' +SERVER_DBFILE = 'dominoserver.json' diff --git a/lib/dominoRPC/constants.py b/lib/dominoRPC/constants.py index 5687d91..57b7c26 100644 --- a/lib/dominoRPC/constants.py +++ b/lib/dominoRPC/constants.py @@ -25,3 +25,5 @@ FAILED = 2 APPEND = 0 OVERWRITE = 1 DELETE = 2 + +THRIFT_RPC_TIMEOUT_MS = 1000 diff --git a/lib/dominoRPC/constants.pyc b/lib/dominoRPC/constants.pyc Binary files differindex 2a494a7..18c888f 100644 --- a/lib/dominoRPC/constants.pyc +++ b/lib/dominoRPC/constants.pyc |