diff options
Diffstat (limited to 'DominoServer.py')
-rwxr-xr-x | DominoServer.py | 185 |
1 files changed, 135 insertions, 50 deletions
diff --git a/DominoServer.py b/DominoServer.py index e7ee04a..d92b72c 100755 --- a/DominoServer.py +++ b/DominoServer.py @@ -66,34 +66,33 @@ class CommunicationHandler: # Create a client to use the protocol encoder self.sender = Communication.Client(self.protocol) self.transport.open() - except Thrift.TException, tx: - logging.error('%s' , tx.message) - + except: + raise def closeconnection(self): self.transport.close() - def push_template(self,template,ipaddr,tcpport): - self.openconnection(ipaddr,tcpport) - pushm = PushMessage() - pushm.domino_udid = SERVER_UDID - pushm.seq_no = self.seqno - pushm.template_type = 'tosca-nfv-v1.0' - pushm.template = template + def push_template(self,template,ipaddr,tcpport,TUID): try: + self.openconnection(ipaddr,tcpport) + pushm = PushMessage() + pushm.domino_udid = SERVER_UDID + pushm.seq_no = self.seqno + pushm.template_type = 'tosca-nfv-v1.0' + pushm.template = template + pushm.template_UUID = TUID + self.seqno = self.seqno + 1 + push_r = self.sender.d_push(pushm) logging.info('Push Response received from %s' , push_r.domino_udid) - except (Thrift.TException, TSocket.TTransportException) as tx: - logging.error('%s' , tx.message) + self.closeconnection() except (socket.timeout) as tx: self.dominoServer.handle_RPC_timeout(pushm) + raise tx except: logging.error('Unexpected error: %s', sys.exc_info()[0]) - - self.seqno = self.seqno + 1 - - self.closeconnection() + raise #Heartbeat from Domino Client is received #Actions: @@ -192,8 +191,8 @@ class CommunicationHandler: c = dbconn.cursor() newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid] try: - c.execute("REPLACE INTO labels (udid, label_list) VALUES ({udid}, '{newvalue}')".\ - format(udid=sub_msg.domino_udid, newvalue=','.join(list(newlabelset)) )) + newvalue=','.join(list(newlabelset)) + c.execute( "REPLACE INTO labels VALUES (?,?)", (sub_msg.domino_udid,newvalue) ) except sqlite3.OperationalError as ex1: logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message) except: @@ -202,8 +201,8 @@ class CommunicationHandler: newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] try: - c.execute("REPLACE INTO ttypes (udid, ttype_list) VALUES ({udid}, '{newvalue}')".\ - format(udid=sub_msg.domino_udid, newvalue=','.join(list(newttypeset)) )) + newvalue=','.join(list(newttypeset)) + c.execute( "REPLACE INTO ttypes VALUES (?,?)", (sub_msg.domino_udid,newvalue) ) except sqlite3.OperationalError as ex1: logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message) except: @@ -228,11 +227,25 @@ class CommunicationHandler: #Actions: # - Parse the template, perform mapping, partition the template # - Launch Push service - # - Respond Back with Publication Response + # - Respond Back with Publication Response def d_publish(self, pub_msg): logging.info('Publish Request received from %s' , pub_msg.domino_udid) - logging.debug(pub_msg.template) + #logging.debug(pub_msg.template) + + # Create response with response code as SUCCESS by default + # Response code will be overwritten if partial or full failure occurs + pub_r = PublishResponseMessage() + pub_r.domino_udid = SERVER_UDID + pub_r.seq_no = self.seqno + pub_r.responseCode = SUCCESS + pub_r.template_UDID = pub_msg.template_UUID + self.seqno = self.seqno + 1 + if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False): + logging.debug('TEMPLATE UUID %s does not exist', pub_msg.template_UUID) + pub_r.responseCode = FAILED + return pub_r + # Save as file try: os.makedirs(TOSCADIR) @@ -249,13 +262,9 @@ class CommunicationHandler: #Some sort of race condition should have occured that prevented the write operation #treat as failure logging.error('FAILED to write the published file: %s', sys.exc_info()[0]) - pub_r = PublishResponseMessage() - pub_r.domino_udid = SERVER_UDID - pub_r.seq_no = self.seqno pub_r.responseCode = FAILED - self.seqno = self.seqno + 1 return pub_r - + # Load tosca object from file into memory try: #tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME ) @@ -263,11 +272,7 @@ class CommunicationHandler: except: logging.error('Tosca Parser error: %s', sys.exc_info()[0]) #tosca file could not be read - pub_r = PublishResponseMessage() - pub_r.domino_udid = SERVER_UDID - pub_r.seq_no = self.seqno pub_r.responseCode = FAILED - self.seqno = self.seqno + 1 return pub_r # Extract Labels @@ -288,13 +293,36 @@ class CommunicationHandler: # Create work-flow - # Create response with response code as SUCCESS by default - # Response code will be overwrittent if partial or full failure occurs - pub_r = PublishResponseMessage() - pub_r.domino_udid = SERVER_UDID - pub_r.seq_no = self.seqno - pub_r.responseCode = SUCCESS - self.seqno = self.seqno + 1 + # Assign template UUID if no UUID specified + # Otherwise update the existing domains subscribed to TUID + unsuccessful_updates = [] + if pub_msg.template_UUID is None: + pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID + else: + logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID) + if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject + logging.error('FAILED to verify publisher: %s against the publisher on record: %s', pub_msg.domino_udid, self.dominoServer.TUID2Publisher[pub_msg.template_UUID]) + pub_r.responseCode = FAILED + return pub_r + else: #Template exists, we need to find clients that are no longer in the subscription list list + TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys())) + if len(TUID_unsubscribed_list) > 0: + logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID) + # Send empty bodied templates to domains which no longer has any assigned resource + template_lines = [] + for i in range(len(TUID_unsubscribed_list)): + domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr + domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport + try: + self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID) + except: + logging.error('Error in pushing template: %s', sys.exc_info()[0]) + unsuccessful_updates.append(TUID_unsubscribed_list[i]) + + # The following template distribution is not transactional, meaning that some domains + # might be successfull receiving their sub-templates while some other might not + # The function returns FAILED code to the publisher in such situations, meaning that + # publisher must republish to safely orchestrate/manage NS or VNF # Send domain templates to each domain agent/client # FOR NOW: send untranslated but partitioned tosca files to scheduled sites @@ -311,7 +339,7 @@ class CommunicationHandler: template_lines = [ output ] else: template_lines = miscutil.read_templatefile(file_paths[site]) - self.push_template(template_lines, domino_client_ip, domino_client_port) + self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID) except IOError as e: logging.error('I/O error(%d): %s' , e.errno, e.strerror) pub_r.responseCode = FAILED @@ -325,6 +353,39 @@ class CommunicationHandler: if len(file_paths) == 0: pub_r.responseCode = FAILED + + dbconn = sqlite3.connect(SERVER_DBFILE) + c = dbconn.cursor() + + if pub_r.responseCode == SUCCESS: + # update in memory database + self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid + try: + c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) ) + dbconn.commit() + except sqlite3.OperationalError as ex1: + logging.error('Could not add new TUID %s DB for Domino Client %s : %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message) + except: + logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid) + logging.error('Unexpected error: %s', sys.exc_info()[0]) + else: + self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid + + # update in memory database + self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys() + logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID) + try: + newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) + c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) ) + dbconn.commit() + except sqlite3.OperationalError as ex1: + logging.error('Could not add new subscribers for TUID %s for Domino Client %s: %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message) + except: + logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid) + logging.error('Unexpected error: %s', sys.exc_info()[0]) + + dbconn.close() + return pub_r #Query from Domino Client is received @@ -334,7 +395,16 @@ class CommunicationHandler: def d_query(self, qu_msg): #Fill in the details qu_r = QueryResponseMessage() + qu_r.domino_udid = SERVER_UDID + qu_r.seq_no = self.seqno + qu_r.responseCode = SUCCESS + qu_r.queryResponse = [] + + for i in range(len(qu_msg.queryString)): + if qu_msg.queryString[i] == 'list-tuids': # limit the response to TUIDs that belong to this domino client + qu_r.queryResponse.extend([j for j in self.dominoServer.TUID2Publisher.keys() if self.dominoServer.TUID2Publisher[j] == qu_msg.domino_udid]) + self.seqno = self.seqno + 1 return qu_r @@ -344,6 +414,9 @@ class DominoServer: self.subscribed_labels = dict() self.subscribed_templateformats = dict() self.registration_record = dict() + self.assignedTUIDs = list() + self.TUID2Publisher = dict() + self.TUID2Subscribers = dict() self.communicationHandler = CommunicationHandler(self) self.processor = Communication.Processor(self.communicationHandler) self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT) @@ -363,17 +436,19 @@ class DominoServer: #If not assigned, assign it #If assigned, offer a new random id def assign_udid(self, udid_desired): - if udid_desired in self.assignedUUIDs: - new_udid = uuid.uuid4().hex #random.getrandbits(63) - while new_udid in self.assignedUUIDs: - new_udid = uuid.uuid4().hex #random.getrandbits(63) - - self.assignedUUIDs.append(new_udid) - return new_udid - else: - self.assignedUUIDs.append(udid_desired) - return udid_desired - + new_udid = udid_desired + while new_udid in self.assignedUUIDs: + new_udid = uuid.uuid4().hex + self.assignedUUIDs.append(new_udid) + return new_udid + + def assign_tuid(self): + new_TUID = uuid.uuid4().hex + while new_TUID in self.assignedTUIDs: + new_TUID = uuid.uuid4().hex + self.assignedTUIDs.append(new_TUID) + return new_TUID + def handle_RPC_timeout(self, RPCmessage): if RPCmessage.messageType == PUSH: logging.debug('RPC Timeout for message type: PUSH') @@ -424,6 +499,16 @@ def main(argv): except sqlite3.OperationalError as ex: logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message) + try: + c.execute('''CREATE TABLE templates (uuid_t TEXT PRIMARY KEY, udid TEXT)''') + except sqlite3.OperationalError as ex: + logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message) + + try: + c.execute('''CREATE TABLE subscribers (tuid TEXT PRIMARY KEY, subscriber_list TEXT)''') + except sqlite3.OperationalError as ex: + logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message) + dbconn.commit() dbconn.close() |