aboutsummaryrefslogtreecommitdiffstats
path: root/DominoServer.py
diff options
context:
space:
mode:
Diffstat (limited to 'DominoServer.py')
-rwxr-xr-xDominoServer.py185
1 files changed, 135 insertions, 50 deletions
diff --git a/DominoServer.py b/DominoServer.py
index e7ee04a..d92b72c 100755
--- a/DominoServer.py
+++ b/DominoServer.py
@@ -66,34 +66,33 @@ class CommunicationHandler:
# Create a client to use the protocol encoder
self.sender = Communication.Client(self.protocol)
self.transport.open()
- except Thrift.TException, tx:
- logging.error('%s' , tx.message)
-
+ except:
+ raise
def closeconnection(self):
self.transport.close()
- def push_template(self,template,ipaddr,tcpport):
- self.openconnection(ipaddr,tcpport)
- pushm = PushMessage()
- pushm.domino_udid = SERVER_UDID
- pushm.seq_no = self.seqno
- pushm.template_type = 'tosca-nfv-v1.0'
- pushm.template = template
+ def push_template(self,template,ipaddr,tcpport,TUID):
try:
+ self.openconnection(ipaddr,tcpport)
+ pushm = PushMessage()
+ pushm.domino_udid = SERVER_UDID
+ pushm.seq_no = self.seqno
+ pushm.template_type = 'tosca-nfv-v1.0'
+ pushm.template = template
+ pushm.template_UUID = TUID
+ self.seqno = self.seqno + 1
+
push_r = self.sender.d_push(pushm)
logging.info('Push Response received from %s' , push_r.domino_udid)
- except (Thrift.TException, TSocket.TTransportException) as tx:
- logging.error('%s' , tx.message)
+ self.closeconnection()
except (socket.timeout) as tx:
self.dominoServer.handle_RPC_timeout(pushm)
+ raise tx
except:
logging.error('Unexpected error: %s', sys.exc_info()[0])
-
- self.seqno = self.seqno + 1
-
- self.closeconnection()
+ raise
#Heartbeat from Domino Client is received
#Actions:
@@ -192,8 +191,8 @@ class CommunicationHandler:
c = dbconn.cursor()
newlabelset = self.dominoServer.subscribed_labels[sub_msg.domino_udid]
try:
- c.execute("REPLACE INTO labels (udid, label_list) VALUES ({udid}, '{newvalue}')".\
- format(udid=sub_msg.domino_udid, newvalue=','.join(list(newlabelset)) ))
+ newvalue=','.join(list(newlabelset))
+ c.execute( "REPLACE INTO labels VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
except sqlite3.OperationalError as ex1:
logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
except:
@@ -202,8 +201,8 @@ class CommunicationHandler:
newttypeset = self.dominoServer.subscribed_templateformats[sub_msg.domino_udid]
try:
- c.execute("REPLACE INTO ttypes (udid, ttype_list) VALUES ({udid}, '{newvalue}')".\
- format(udid=sub_msg.domino_udid, newvalue=','.join(list(newttypeset)) ))
+ newvalue=','.join(list(newttypeset))
+ c.execute( "REPLACE INTO ttypes VALUES (?,?)", (sub_msg.domino_udid,newvalue) )
except sqlite3.OperationalError as ex1:
logging.error('Could not add the new labels to %s for Domino Client %s : %s', SERVER_DBFILE, sub_msg.domino_udid, ex1.message)
except:
@@ -228,11 +227,25 @@ class CommunicationHandler:
#Actions:
# - Parse the template, perform mapping, partition the template
# - Launch Push service
- # - Respond Back with Publication Response
+ # - Respond Back with Publication Response
def d_publish(self, pub_msg):
logging.info('Publish Request received from %s' , pub_msg.domino_udid)
- logging.debug(pub_msg.template)
+ #logging.debug(pub_msg.template)
+
+ # Create response with response code as SUCCESS by default
+ # Response code will be overwritten if partial or full failure occurs
+ pub_r = PublishResponseMessage()
+ pub_r.domino_udid = SERVER_UDID
+ pub_r.seq_no = self.seqno
+ pub_r.responseCode = SUCCESS
+ pub_r.template_UDID = pub_msg.template_UUID
+ self.seqno = self.seqno + 1
+ if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False):
+ logging.debug('TEMPLATE UUID %s does not exist', pub_msg.template_UUID)
+ pub_r.responseCode = FAILED
+ return pub_r
+
# Save as file
try:
os.makedirs(TOSCADIR)
@@ -249,13 +262,9 @@ class CommunicationHandler:
#Some sort of race condition should have occured that prevented the write operation
#treat as failure
logging.error('FAILED to write the published file: %s', sys.exc_info()[0])
- pub_r = PublishResponseMessage()
- pub_r.domino_udid = SERVER_UDID
- pub_r.seq_no = self.seqno
pub_r.responseCode = FAILED
- self.seqno = self.seqno + 1
return pub_r
-
+
# Load tosca object from file into memory
try:
#tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
@@ -263,11 +272,7 @@ class CommunicationHandler:
except:
logging.error('Tosca Parser error: %s', sys.exc_info()[0])
#tosca file could not be read
- pub_r = PublishResponseMessage()
- pub_r.domino_udid = SERVER_UDID
- pub_r.seq_no = self.seqno
pub_r.responseCode = FAILED
- self.seqno = self.seqno + 1
return pub_r
# Extract Labels
@@ -288,13 +293,36 @@ class CommunicationHandler:
# Create work-flow
- # Create response with response code as SUCCESS by default
- # Response code will be overwrittent if partial or full failure occurs
- pub_r = PublishResponseMessage()
- pub_r.domino_udid = SERVER_UDID
- pub_r.seq_no = self.seqno
- pub_r.responseCode = SUCCESS
- self.seqno = self.seqno + 1
+ # Assign template UUID if no UUID specified
+ # Otherwise update the existing domains subscribed to TUID
+ unsuccessful_updates = []
+ if pub_msg.template_UUID is None:
+ pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID
+ else:
+ logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID)
+ if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject
+ logging.error('FAILED to verify publisher: %s against the publisher on record: %s', pub_msg.domino_udid, self.dominoServer.TUID2Publisher[pub_msg.template_UUID])
+ pub_r.responseCode = FAILED
+ return pub_r
+ else: #Template exists, we need to find clients that are no longer in the subscription list list
+ TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys()))
+ if len(TUID_unsubscribed_list) > 0:
+ logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID)
+ # Send empty bodied templates to domains which no longer has any assigned resource
+ template_lines = []
+ for i in range(len(TUID_unsubscribed_list)):
+ domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr
+ domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport
+ try:
+ self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
+ except:
+ logging.error('Error in pushing template: %s', sys.exc_info()[0])
+ unsuccessful_updates.append(TUID_unsubscribed_list[i])
+
+ # The following template distribution is not transactional, meaning that some domains
+ # might be successfull receiving their sub-templates while some other might not
+ # The function returns FAILED code to the publisher in such situations, meaning that
+ # publisher must republish to safely orchestrate/manage NS or VNF
# Send domain templates to each domain agent/client
# FOR NOW: send untranslated but partitioned tosca files to scheduled sites
@@ -311,7 +339,7 @@ class CommunicationHandler:
template_lines = [ output ]
else:
template_lines = miscutil.read_templatefile(file_paths[site])
- self.push_template(template_lines, domino_client_ip, domino_client_port)
+ self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
except IOError as e:
logging.error('I/O error(%d): %s' , e.errno, e.strerror)
pub_r.responseCode = FAILED
@@ -325,6 +353,39 @@ class CommunicationHandler:
if len(file_paths) == 0:
pub_r.responseCode = FAILED
+
+ dbconn = sqlite3.connect(SERVER_DBFILE)
+ c = dbconn.cursor()
+
+ if pub_r.responseCode == SUCCESS:
+ # update in memory database
+ self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
+ try:
+ c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) )
+ dbconn.commit()
+ except sqlite3.OperationalError as ex1:
+ logging.error('Could not add new TUID %s DB for Domino Client %s : %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
+ except:
+ logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
+ logging.error('Unexpected error: %s', sys.exc_info()[0])
+ else:
+ self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
+
+ # update in memory database
+ self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys()
+ logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID)
+ try:
+ newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID])
+ c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) )
+ dbconn.commit()
+ except sqlite3.OperationalError as ex1:
+ logging.error('Could not add new subscribers for TUID %s for Domino Client %s: %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
+ except:
+ logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
+ logging.error('Unexpected error: %s', sys.exc_info()[0])
+
+ dbconn.close()
+
return pub_r
#Query from Domino Client is received
@@ -334,7 +395,16 @@ class CommunicationHandler:
def d_query(self, qu_msg):
#Fill in the details
qu_r = QueryResponseMessage()
+ qu_r.domino_udid = SERVER_UDID
+ qu_r.seq_no = self.seqno
+ qu_r.responseCode = SUCCESS
+ qu_r.queryResponse = []
+
+ for i in range(len(qu_msg.queryString)):
+ if qu_msg.queryString[i] == 'list-tuids': # limit the response to TUIDs that belong to this domino client
+ qu_r.queryResponse.extend([j for j in self.dominoServer.TUID2Publisher.keys() if self.dominoServer.TUID2Publisher[j] == qu_msg.domino_udid])
+ self.seqno = self.seqno + 1
return qu_r
@@ -344,6 +414,9 @@ class DominoServer:
self.subscribed_labels = dict()
self.subscribed_templateformats = dict()
self.registration_record = dict()
+ self.assignedTUIDs = list()
+ self.TUID2Publisher = dict()
+ self.TUID2Subscribers = dict()
self.communicationHandler = CommunicationHandler(self)
self.processor = Communication.Processor(self.communicationHandler)
self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
@@ -363,17 +436,19 @@ class DominoServer:
#If not assigned, assign it
#If assigned, offer a new random id
def assign_udid(self, udid_desired):
- if udid_desired in self.assignedUUIDs:
- new_udid = uuid.uuid4().hex #random.getrandbits(63)
- while new_udid in self.assignedUUIDs:
- new_udid = uuid.uuid4().hex #random.getrandbits(63)
-
- self.assignedUUIDs.append(new_udid)
- return new_udid
- else:
- self.assignedUUIDs.append(udid_desired)
- return udid_desired
-
+ new_udid = udid_desired
+ while new_udid in self.assignedUUIDs:
+ new_udid = uuid.uuid4().hex
+ self.assignedUUIDs.append(new_udid)
+ return new_udid
+
+ def assign_tuid(self):
+ new_TUID = uuid.uuid4().hex
+ while new_TUID in self.assignedTUIDs:
+ new_TUID = uuid.uuid4().hex
+ self.assignedTUIDs.append(new_TUID)
+ return new_TUID
+
def handle_RPC_timeout(self, RPCmessage):
if RPCmessage.messageType == PUSH:
logging.debug('RPC Timeout for message type: PUSH')
@@ -424,6 +499,16 @@ def main(argv):
except sqlite3.OperationalError as ex:
logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+ try:
+ c.execute('''CREATE TABLE templates (uuid_t TEXT PRIMARY KEY, udid TEXT)''')
+ except sqlite3.OperationalError as ex:
+ logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
+ try:
+ c.execute('''CREATE TABLE subscribers (tuid TEXT PRIMARY KEY, subscriber_list TEXT)''')
+ except sqlite3.OperationalError as ex:
+ logging.debug('In database file %s, no table is created as %s', SERVER_DBFILE, ex.message)
+
dbconn.commit()
dbconn.close()