diff options
author | maxbr <maxbr@mi.fu-berlin.de> | 2016-09-19 10:11:55 +0200 |
---|---|---|
committer | maxbr <maxbr@mi.fu-berlin.de> | 2016-09-19 10:11:55 +0200 |
commit | 3666bf8b35db7a4fb22e8f795185c599f510443f (patch) | |
tree | fa342283fd1e0c43346ec97fcb4b01e3ca6ffe60 | |
parent | 176188d16eab0a8c56537c2c6d119a6807f47999 (diff) |
Implement notification framework
JIRA: PHAROS-265
The framework can be used by the dashboard and the labs to exchange
notifications about booking events and pod status.
Change-Id: Ibd7cd353c4933f7662d5368182faef8298b85efc
Signed-off-by: maxbr <maxbr@mi.fu-berlin.de>
-rw-r--r-- | pharos-dashboard/notification_framework/__init__.py | 0 | ||||
-rw-r--r-- | pharos-dashboard/notification_framework/notification.py | 104 |
2 files changed, 104 insertions, 0 deletions
diff --git a/pharos-dashboard/notification_framework/__init__.py b/pharos-dashboard/notification_framework/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/pharos-dashboard/notification_framework/__init__.py diff --git a/pharos-dashboard/notification_framework/notification.py b/pharos-dashboard/notification_framework/notification.py new file mode 100644 index 0000000..7d77283 --- /dev/null +++ b/pharos-dashboard/notification_framework/notification.py @@ -0,0 +1,104 @@ +import json +import re + +import pika + + +class Notification(object): + """ + This class can be used by the dashboard and the labs to exchange notifications about booking + events and pod status. It utilizes rabbitmq to communicate. + + Notifications are associated to an event and to a topic. + Events are: + [ 'booking_start', 'booking_stop', 'pod_status' ] + The topic is usually a POD name, ie: + 'Intel POD 2' + """ + + def __init__(self, dashboard_url, verbose=False): + self.rabbitmq_broker = dashboard_url + self.verbose = verbose + self._registry = {} + + self.connection = pika.BlockingConnection(pika.ConnectionParameters( + host=self.rabbitmq_broker)) + self.channel = self.connection.channel() + + self.channel.exchange_declare(exchange='notifications', type='topic') + + self.result = self.channel.queue_declare(exclusive=True) + self.queue_name = self.result.method.queue + + def register(self, function, event, regex): + """ + Registers a function to be called for the specified event. + :param function: the function to register + :param event: the event type + :param regex: a regex to specify for wich topics the function will be called. Some + possible Expressions can be: + 'Intel POD 2' : Intel POD 2 + 'Intel POD .*' : All Intel Pods + '.*' : All Topics + """ + + if event not in self._registry: + self._registry[event] = [(function, regex)] + else: + self._registry[event].append((function, regex)) + + def receive(self): + """ + Start receiving notifications. This is a blocking operation, if a notification is received, + the registered functions will be called. + """ + if self.verbose: + print('Start receiving Notifications. Keys: ', self._registry.keys()) + self._receive_message(self._registry.keys()) + + def send(self, event, topic, content): + """ + Send an event notification. + :param event: the event type + :param topic: the pod name + :param content: a JSON-serializable dictionary + """ + message = { + 'event': event, + 'topic': topic, + 'content': content + } + self._send_message(message) + + def _send_message(self, event): + routing_key = event['type'] + message = json.dumps(event) + self.channel.basic_publish(exchange='notifications', + routing_key=routing_key, + body=message, + properties=pika.BasicProperties( + content_type='application/json' + )) + if self.verbose: + print(" [x] Sent %r:%r" % (routing_key, message)) + + def _receive_message(self, binding_keys): + for key in binding_keys: + self.channel.queue_bind(exchange='notifications', + queue=self.queue_name, + routing_key=key) + self.channel.basic_consume(self._message_callback, + queue=self.queue_name, + no_ack=True) + self.channel.start_consuming() + + def _message_callback(self, ch, method, properties, body): + if self.verbose: + print(" [x] Got %r:%r" % (method.routing_key, body)) + if method.routing_key not in self._registry: + return + for func, regex in self._registry[method.routing_key]: + message = json.loads(body.decode()) + match = re.match(regex, message['topic']) + if match: + func(body) |