summaryrefslogtreecommitdiffstats
path: root/DominoServer.py
diff options
context:
space:
mode:
Diffstat (limited to 'DominoServer.py')
-rwxr-xr-xDominoServer.py334
1 files changed, 334 insertions, 0 deletions
diff --git a/DominoServer.py b/DominoServer.py
new file mode 100755
index 0000000..d056542
--- /dev/null
+++ b/DominoServer.py
@@ -0,0 +1,334 @@
+#!/usr/bin/env python
+
+#Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys, os, glob, random, errno
+import getopt, socket
+import logging, json
+#sys.path.append('gen-py')
+#sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
+sys.path.insert(0, glob.glob('./lib')[0])
+
+
+from dominoRPC import Communication
+from dominoRPC.ttypes import *
+from dominoRPC.constants import *
+
+from thrift import Thrift
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+from toscaparser.tosca_template import ToscaTemplate
+#from toscaparser.utils.gettextutils import _
+#import toscaparser.utils.urlutils
+
+from mapper import *
+from partitioner import *
+from util import miscutil
+
+#Load configuration parameters
+from domino_conf import *
+
+
+class CommunicationHandler:
+ def __init__(self):
+ self.log = {}
+
+ def __init__(self, dominoserver):
+ self.log = {}
+ self.dominoServer = dominoserver
+ self.seqno = 0;
+
+ def openconnection(self, ipaddr, tcpport):
+ try:
+ # Make socket
+ transport = TSocket.TSocket(ipaddr, tcpport)
+ transport.setTimeout(THRIFT_RPC_TIMEOUT_MS)
+ # Add buffering to compensate for slow raw sockets
+ self.transport = TTransport.TBufferedTransport(transport)
+ # Wrap in a protocol
+ self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
+ # Create a client to use the protocol encoder
+ self.sender = Communication.Client(self.protocol)
+ self.transport.open()
+ except Thrift.TException, tx:
+ logging.error('%s' , tx.message)
+
+
+
+ def closeconnection(self):
+ self.transport.close()
+
+ def push_template(self,template,ipaddr,tcpport):
+ self.openconnection(ipaddr,tcpport)
+ pushm = PushMessage()
+ pushm.domino_udid = SERVER_UDID
+ pushm.seq_no = self.seqno
+ pushm.template_type = 'tosca-nfv-v1.0'
+ pushm.template = template
+ try:
+ push_r = self.sender.d_push(pushm)
+ logging.info('Push Response received from %d' , push_r.domino_udid)
+ except (Thrift.TException, TSocket.TTransportException) as tx:
+ logging.error('%s' , tx.message)
+ except (socket.timeout) as tx:
+ self.dominoServer.handle_RPC_timeout(pushm)
+ except:
+ logging.error('Unexpected error: %s', sys.exc_info()[0])
+
+ self.seqno = self.seqno + 1
+
+ self.closeconnection()
+
+ #Heartbeat from Domino Client is received
+ #Actions:
+ # - Respond Back with a heartbeat
+
+ def d_heartbeat(self, hb_msg):
+ global SERVER_UDID
+ logging.info('heartbeat received from %d' , hb_msg.domino_udid)
+
+ hb_r = HeartBeatMessage()
+ hb_r.domino_udid = SERVER_UDID
+ hb_r.seq_no = self.seqno
+
+ self.seqno = self.seqno + 1
+
+ return hb_r
+
+ #Registration from Domino Client is received
+ #Actions:
+ #
+ # - Respond Back with Registration Response
+ def d_register(self, reg_msg):
+ global SERVER_UDID
+
+ #Prepare and send Registration Response
+ reg_r = RegisterResponseMessage()
+ logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport)
+
+
+ reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired)
+ reg_r.seq_no = self.seqno
+ reg_r.domino_udid = SERVER_UDID
+ #return unconditional success
+ #To be implemented:
+ #Define conditions for unsuccessful registration (e.g., unsupported mapping)
+ reg_r.responseCode = SUCCESS
+ #no need to send comments
+ #To be implemented:
+ #Logic for a new UDID assignment
+
+ self.seqno = self.seqno + 1
+
+ # Store the Domino Client info
+ # TBD: check the sequence number to ensure the most recent record is saved
+ self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg
+ data = {}
+ data[reg_r.domino_udid_assigned] = [reg_msg.ipaddr, reg_msg.tcpport, reg_msg.supported_templates, reg_msg.seq_no]
+ with open(SERVER_DBFILE, 'a') as f:
+ json.dump(data, f)
+ f.close()
+
+ return reg_r
+
+
+ #Subscription from Domino Client is received
+ #Actions:
+ # - Save the templates & labels
+ # - Respond Back with Subscription Response
+ def d_subscribe(self, sub_msg):
+ global SERVER_UDID, SERVER_SEQNO
+ logging.info('Subscribe Request received from %d' , sub_msg.domino_udid)
+
+ if sub_msg.template_op == APPEND:
+ if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid):
+ self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types))
+ else:
+ self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
+ elif sub_msg.template_op == OVERWRITE:
+ self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types)
+ elif sub_msg.template_op == DELETE:
+ self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types))
+
+ if sub_msg.labels != []:
+ if sub_msg.label_op == APPEND:
+ if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid):
+ self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels))
+ else:
+ self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
+ elif sub_msg.label_op == OVERWRITE:
+ self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels)
+ elif sub_msg.label_op == DELETE:
+ self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels))
+
+ logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid])
+
+ #Fill in the details
+ sub_r = SubscribeResponseMessage()
+ sub_r.domino_udid = SERVER_UDID
+ sub_r.seq_no = self.seqno
+ sub_r.responseCode = SUCCESS
+ self.seqno = self.seqno + 1
+
+ return sub_r
+
+ #Template Publication from Domino Client is received
+ #Actions:
+ # - Parse the template, perform mapping, partition the template
+ # - Launch Push service
+ # - Respond Back with Publication Response
+ def d_publish(self, pub_msg):
+ global SERVER_UDID, SERVER_SEQNO, TOSCADIR, TOSCA_DEFAULT_FNAME
+ logging.info('Publish Request received from %d' , pub_msg.domino_udid)
+ logging.debug(pub_msg.template)
+
+ # Save as file
+ try:
+ os.makedirs(TOSCADIR)
+ except OSError as exception:
+ if exception.errno == errno.EEXIST:
+ logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR, TOSCADIR+TOSCA_DEFAULT_FNAME)
+ else:
+ logging.error('Error occurred in creating %s. Err no: %d', exception.errno)
+
+ #Risking a race condition if another process is attempting to write to same file
+ f = open(TOSCADIR+TOSCA_DEFAULT_FNAME, 'w')
+ for item in pub_msg.template:
+ print>>f, item
+ f.close()
+
+ # Load tosca object from file into memory
+ tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME )
+
+ # Extract Labels
+ node_labels = label.extract_labels( tosca )
+ logging.debug('Node Labels: %s', node_labels)
+
+ # Map nodes in the template to resource domains
+ site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels )
+ logging.debug('Site Maps: %s' , site_map)
+
+ # Select a site for each VNF
+ node_site = label.select_site( site_map )
+ logging.debug('Selected Sites: %s', node_site)
+
+ # Create per-domain Tosca files
+ file_paths = partitioner.partition_tosca('./toscafiles/template1.yaml',node_site,tosca.tpl)
+
+ # Create list of translated template files
+
+ # Create work-flow
+
+ # Send domain templates to each domain agent/client
+ # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
+ # TBD: read from work-flow
+ for site in file_paths:
+ domino_client_ip = self.dominoServer.registration_record[site].ipaddr
+ domino_client_port = self.dominoServer.registration_record[site].tcpport
+ self.push_template(miscutil.read_templatefile(file_paths[site]), domino_client_ip, domino_client_port)
+
+ #Fill in the details
+ pub_r = PublishResponseMessage()
+ pub_r.domino_udid = SERVER_UDID
+ pub_r.seq_no = self.seqno
+ pub_r.responseCode = SUCCESS
+ self.seqno = self.seqno + 1
+ return pub_r
+
+ #Query from Domino Client is received
+ #Actions:
+ #
+ # - Respond Back with Query Response
+ def d_query(self, qu_msg):
+ #Fill in the details
+ qu_r = QueryResponseMessage()
+
+ return qu_r
+
+
+class DominoServer:
+ def __init__(self):
+ self.assignedUUIDs = list()
+ self.subscribed_labels = dict()
+ self.subscribed_templateformats = dict()
+ self.registration_record = dict()
+ self.communicationHandler = CommunicationHandler(self)
+ self.processor = Communication.Processor(self.communicationHandler)
+ self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT)
+ self.tfactory = TTransport.TBufferedTransportFactory()
+ self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+ #Use TThreadedServer or TThreadPoolServer for a multithreaded server
+ #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory)
+ self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory)
+
+
+ def start_communicationService(self):
+ self.communicationServer.serve()
+
+ #For now assign the desired UDID
+ #To be implemented:
+ #Check if ID is already assigned and in use
+ #If not assigned, assign it
+ #If assigned, offer a new random id
+ def assign_udid(self, udid_desired):
+ if udid_desired in self.assignedUUIDs:
+ new_udid = random.getrandbits(63)
+ while new_udid in self.assignedUUIDs:
+ new_udid = random.getrandbits(63)
+
+ self.assignedUUIDs.append(new_udid)
+ return new_udid
+ else:
+ self.assignedUUIDs.append(udid_desired)
+ return udid_desired
+
+ def handle_RPC_timeout(self, RPCmessage):
+ if RPCmessage.messageType == PUSH:
+ logging.debug('RPC Timeout for message type: PUSH')
+ # TBD: handle each RPC timeout separately
+
+def main(argv):
+ server = DominoServer()
+ loglevel = 'WARNING'
+ #process input arguments
+ try:
+ opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="])
+ except getopt.GetoptError:
+ print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>'
+ sys.exit(2)
+ for opt, arg in opts:
+ if opt == '-h':
+ print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>'
+ sys.exit()
+ elif opt in ("-c", "--conf"):
+ configfile = arg
+ elif opt in ("--log"):
+ loglevel= arg
+ #Set logging level
+ numeric_level = getattr(logging, loglevel.upper(), None)
+ try:
+ if not isinstance(numeric_level, int):
+ raise ValueError('Invalid log level: %s' % loglevel)
+ logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
+ except ValueError, ex:
+ print ex.message
+ sys.exit(2)
+
+ logging.debug('Domino Server Starting...')
+ server.start_communicationService()
+ print 'done.'
+
+if __name__ == "__main__":
+ main(sys.argv[1:])