From 3666bf8b35db7a4fb22e8f795185c599f510443f Mon Sep 17 00:00:00 2001 From: maxbr Date: Mon, 19 Sep 2016 10:11:55 +0200 Subject: Implement notification framework JIRA: PHAROS-265 The framework can be used by the dashboard and the labs to exchange notifications about booking events and pod status. Change-Id: Ibd7cd353c4933f7662d5368182faef8298b85efc Signed-off-by: maxbr --- .../notification_framework/__init__.py | 0 .../notification_framework/notification.py | 104 +++++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 pharos-dashboard/notification_framework/__init__.py create mode 100644 pharos-dashboard/notification_framework/notification.py diff --git a/pharos-dashboard/notification_framework/__init__.py b/pharos-dashboard/notification_framework/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pharos-dashboard/notification_framework/notification.py b/pharos-dashboard/notification_framework/notification.py new file mode 100644 index 0000000..7d77283 --- /dev/null +++ b/pharos-dashboard/notification_framework/notification.py @@ -0,0 +1,104 @@ +import json +import re + +import pika + + +class Notification(object): + """ + This class can be used by the dashboard and the labs to exchange notifications about booking + events and pod status. It utilizes rabbitmq to communicate. + + Notifications are associated to an event and to a topic. + Events are: + [ 'booking_start', 'booking_stop', 'pod_status' ] + The topic is usually a POD name, ie: + 'Intel POD 2' + """ + + def __init__(self, dashboard_url, verbose=False): + self.rabbitmq_broker = dashboard_url + self.verbose = verbose + self._registry = {} + + self.connection = pika.BlockingConnection(pika.ConnectionParameters( + host=self.rabbitmq_broker)) + self.channel = self.connection.channel() + + self.channel.exchange_declare(exchange='notifications', type='topic') + + self.result = self.channel.queue_declare(exclusive=True) + self.queue_name = self.result.method.queue + + def register(self, function, event, regex): + """ + Registers a function to be called for the specified event. + :param function: the function to register + :param event: the event type + :param regex: a regex to specify for wich topics the function will be called. Some + possible Expressions can be: + 'Intel POD 2' : Intel POD 2 + 'Intel POD .*' : All Intel Pods + '.*' : All Topics + """ + + if event not in self._registry: + self._registry[event] = [(function, regex)] + else: + self._registry[event].append((function, regex)) + + def receive(self): + """ + Start receiving notifications. This is a blocking operation, if a notification is received, + the registered functions will be called. + """ + if self.verbose: + print('Start receiving Notifications. Keys: ', self._registry.keys()) + self._receive_message(self._registry.keys()) + + def send(self, event, topic, content): + """ + Send an event notification. + :param event: the event type + :param topic: the pod name + :param content: a JSON-serializable dictionary + """ + message = { + 'event': event, + 'topic': topic, + 'content': content + } + self._send_message(message) + + def _send_message(self, event): + routing_key = event['type'] + message = json.dumps(event) + self.channel.basic_publish(exchange='notifications', + routing_key=routing_key, + body=message, + properties=pika.BasicProperties( + content_type='application/json' + )) + if self.verbose: + print(" [x] Sent %r:%r" % (routing_key, message)) + + def _receive_message(self, binding_keys): + for key in binding_keys: + self.channel.queue_bind(exchange='notifications', + queue=self.queue_name, + routing_key=key) + self.channel.basic_consume(self._message_callback, + queue=self.queue_name, + no_ack=True) + self.channel.start_consuming() + + def _message_callback(self, ch, method, properties, body): + if self.verbose: + print(" [x] Got %r:%r" % (method.routing_key, body)) + if method.routing_key not in self._registry: + return + for func, regex in self._registry[method.routing_key]: + message = json.loads(body.decode()) + match = re.match(regex, message['topic']) + if match: + func(body) -- cgit 1.2.3-korg