summaryrefslogtreecommitdiffstats
path: root/VNFs/DPPD-PROX/file_utils.c
blob: b3cf084634dc713162fae56e307588f23fc0acbe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python

#Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#       http://www.apache.org/licenses/LICENSE-2.0
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import sys, os, glob, random, errno
import getopt, socket
import logging, json
#sys.path.append('gen-py')
#sys.path.insert(0, glob.glob('./lib/py/build/lib.*')[0])
sys.path.insert(0, glob.glob('./lib')[0])


from dominoRPC import Communication
from dominoRPC.ttypes import *
from dominoRPC.constants import *

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from 
ass="bp">self): self.log = {} def __init__(self, dominoserver): self.log = {} self.dominoServer = dominoserver self.seqno = 0; def openconnection(self, ipaddr, tcpport): try: # Make socket transport = TSocket.TSocket(ipaddr, tcpport) transport.setTimeout(THRIFT_RPC_TIMEOUT_MS) # Add buffering to compensate for slow raw sockets self.transport = TTransport.TBufferedTransport(transport) # Wrap in a protocol self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport) # Create a client to use the protocol encoder self.sender = Communication.Client(self.protocol) self.transport.open() except Thrift.TException, tx: logging.error('%s' , tx.message) def closeconnection(self): self.transport.close() def push_template(self,template,ipaddr,tcpport): self.openconnection(ipaddr,tcpport) pushm = PushMessage() pushm.domino_udid = SERVER_UDID pushm.seq_no = self.seqno pushm.template_type = 'tosca-nfv-v1.0' pushm.template = template try: push_r = self.sender.d_push(pushm) logging.info('Push Response received from %d' , push_r.domino_udid) except (Thrift.TException, TSocket.TTransportException) as tx: logging.error('%s' , tx.message) except (socket.timeout) as tx: self.dominoServer.handle_RPC_timeout(pushm) except: logging.error('Unexpected error: %s', sys.exc_info()[0]) self.seqno = self.seqno + 1 self.closeconnection() #Heartbeat from Domino Client is received #Actions: # - Respond Back with a heartbeat def d_heartbeat(self, hb_msg): global SERVER_UDID logging.info('heartbeat received from %d' , hb_msg.domino_udid) hb_r = HeartBeatMessage() hb_r.domino_udid = SERVER_UDID hb_r.seq_no = self.seqno self.seqno = self.seqno + 1 return hb_r #Registration from Domino Client is received #Actions: # # - Respond Back with Registration Response def d_register(self, reg_msg): global SERVER_UDID #Prepare and send Registration Response reg_r = RegisterResponseMessage() logging.info('Registration Request received for UDID %d from IP: %s port: %d', reg_msg.domino_udid_desired, reg_msg.ipaddr, reg_msg.tcpport) reg_r.domino_udid_assigned = self.dominoServer.assign_udid(reg_msg.domino_udid_desired) reg_r.seq_no = self.seqno reg_r.domino_udid = SERVER_UDID #return unconditional success #To be implemented: #Define conditions for unsuccessful registration (e.g., unsupported mapping) reg_r.responseCode = SUCCESS #no need to send comments #To be implemented: #Logic for a new UDID assignment self.seqno = self.seqno + 1 # Store the Domino Client info # TBD: check the sequence number to ensure the most recent record is saved self.dominoServer.registration_record[reg_r.domino_udid_assigned] = reg_msg data = {} data[reg_r.domino_udid_assigned] = [reg_msg.ipaddr, reg_msg.tcpport, reg_msg.supported_templates, reg_msg.seq_no] with open(SERVER_DBFILE, 'a') as f: json.dump(data, f) f.close() return reg_r #Subscription from Domino Client is received #Actions: # - Save the templates & labels # - Respond Back with Subscription Response def d_subscribe(self, sub_msg): global SERVER_UDID, SERVER_SEQNO logging.info('Subscribe Request received from %d' , sub_msg.domino_udid) if sub_msg.template_op == APPEND: if self.dominoServer.subscribed_templateformats.has_key(sub_msg.domino_udid): self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].update(set(sub_msg.supported_template_types)) else: self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types) elif sub_msg.template_op == OVERWRITE: self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] = set(sub_msg.supported_template_types) elif sub_msg.template_op == DELETE: self.dominoServer.subscribed_templateformats[sub_msg.domino_udid].difference_update(set(sub_msg.supported_template_types)) if sub_msg.labels != []: if sub_msg.label_op == APPEND: if self.dominoServer.subscribed_labels.has_key(sub_msg.domino_udid): self.dominoServer.subscribed_labels[sub_msg.domino_udid].update(set(sub_msg.labels)) else: self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels) elif sub_msg.label_op == OVERWRITE: self.dominoServer.subscribed_labels[sub_msg.domino_udid] = set(sub_msg.labels) elif sub_msg.label_op == DELETE: self.dominoServer.subscribed_labels[sub_msg.domino_udid].difference_update(set(sub_msg.labels)) logging.debug('Supported Template: %s Supported Labels: %s' , self.dominoServer.subscribed_templateformats[sub_msg.domino_udid] , self.dominoServer.subscribed_labels[sub_msg.domino_udid]) #Fill in the details sub_r = SubscribeResponseMessage() sub_r.domino_udid = SERVER_UDID sub_r.seq_no = self.seqno sub_r.responseCode = SUCCESS self.seqno = self.seqno + 1 return sub_r #Template Publication from Domino Client is received #Actions: # - Parse the template, perform mapping, partition the template # - Launch Push service # - Respond Back with Publication Response def d_publish(self, pub_msg): global SERVER_UDID, SERVER_SEQNO, TOSCADIR, TOSCA_DEFAULT_FNAME logging.info('Publish Request received from %d' , pub_msg.domino_udid) logging.debug(pub_msg.template) # Save as file try: os.makedirs(TOSCADIR) except OSError as exception: if exception.errno == errno.EEXIST: logging.debug('ERRNO %d; %s exists. Creating: %s', exception.errno, TOSCADIR, TOSCADIR+TOSCA_DEFAULT_FNAME) else: logging.error('Error occurred in creating %s. Err no: %d', exception.errno) #Risking a race condition if another process is attempting to write to same file f = open(TOSCADIR+TOSCA_DEFAULT_FNAME, 'w') for item in pub_msg.template: print>>f, item f.close() # Load tosca object from file into memory tosca = ToscaTemplate( TOSCADIR+TOSCA_DEFAULT_FNAME ) # Extract Labels node_labels = label.extract_labels( tosca ) logging.debug('Node Labels: %s', node_labels) # Map nodes in the template to resource domains site_map = label.map_nodes( self.dominoServer.subscribed_labels , node_labels ) logging.debug('Site Maps: %s' , site_map) # Select a site for each VNF node_site = label.select_site( site_map ) logging.debug('Selected Sites: %s', node_site) # Create per-domain Tosca files file_paths = partitioner.partition_tosca('./toscafiles/template1.yaml',node_site,tosca.tpl) # Create list of translated template files # Create work-flow # Send domain templates to each domain agent/client # FOR NOW: send untranslated but partitioned tosca files to scheduled sites # TBD: read from work-flow for site in file_paths: domino_client_ip = self.dominoServer.registration_record[site].ipaddr domino_client_port = self.dominoServer.registration_record[site].tcpport self.push_template(miscutil.read_templatefile(file_paths[site]), domino_client_ip, domino_client_port) #Fill in the details pub_r = PublishResponseMessage() pub_r.domino_udid = SERVER_UDID pub_r.seq_no = self.seqno pub_r.responseCode = SUCCESS self.seqno = self.seqno + 1 return pub_r #Query from Domino Client is received #Actions: # # - Respond Back with Query Response def d_query(self, qu_msg): #Fill in the details qu_r = QueryResponseMessage() return qu_r class DominoServer: def __init__(self): self.assignedUUIDs = list() self.subscribed_labels = dict() self.subscribed_templateformats = dict() self.registration_record = dict() self.communicationHandler = CommunicationHandler(self) self.processor = Communication.Processor(self.communicationHandler) self.transport = TSocket.TServerSocket(port=DOMINO_SERVER_PORT) self.tfactory = TTransport.TBufferedTransportFactory() self.pfactory = TBinaryProtocol.TBinaryProtocolFactory() #Use TThreadedServer or TThreadPoolServer for a multithreaded server #self.communicationServer = TServer.TThreadedServer(self.processor, self.transport, self.tfactory, self.pfactory) self.communicationServer = TServer.TThreadPoolServer(self.processor, self.transport, self.tfactory, self.pfactory) def start_communicationService(self): self.communicationServer.serve() #For now assign the desired UDID #To be implemented: #Check if ID is already assigned and in use #If not assigned, assign it #If assigned, offer a new random id def assign_udid(self, udid_desired): if udid_desired in self.assignedUUIDs: new_udid = random.getrandbits(63) while new_udid in self.assignedUUIDs: new_udid = random.getrandbits(63) self.assignedUUIDs.append(new_udid) return new_udid else: self.assignedUUIDs.append(udid_desired) return udid_desired def handle_RPC_timeout(self, RPCmessage): if RPCmessage.messageType == PUSH: logging.debug('RPC Timeout for message type: PUSH') # TBD: handle each RPC timeout separately def main(argv): server = DominoServer() loglevel = 'WARNING' #process input arguments try: opts, args = getopt.getopt(argv,"hc:l:",["conf=","log="]) except getopt.GetoptError: print 'DominoServer.py -c/--conf <configfile> -l/--log <loglevel>' sys.exit(2) for opt, arg in opts: if opt == '-h': print 'DominoClient.py -c/--conf <configfile> -p/--port <socketport> -i/--ipaddr <IPaddr> -l/--log <loglevel>' sys.exit() elif opt in ("-c", "--conf"): configfile = arg elif opt in ("-l", "--log"): loglevel= arg #Set logging level numeric_level = getattr(logging, loglevel.upper(), None) try: if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % loglevel) logging.basicConfig(filename=logfile,level=numeric_level, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') except ValueError, ex: print ex.message sys.exit(2) logging.debug('Domino Server Starting...') server.start_communicationService() print 'done.' if __name__ == "__main__": main(sys.argv[1:])