diff options
Diffstat (limited to 'dashboard_notification/notification.py')
-rw-r--r-- | dashboard_notification/notification.py | 120 |
1 files changed, 120 insertions, 0 deletions
diff --git a/dashboard_notification/notification.py b/dashboard_notification/notification.py new file mode 100644 index 0000000..6843c76 --- /dev/null +++ b/dashboard_notification/notification.py @@ -0,0 +1,120 @@ +############################################################################## +# Copyright (c) 2016 Max Breitenfeldt and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import jsonpickle +import pika + + +class Message(object): + def __init__(self, type, topic, content): + self.type = type + self.topic = topic + self.content = content + + +class Notification(object): + """ + This class can be used by the dashboard and the labs to exchange notifications about booking + events and pod status. It utilizes rabbitmq to communicate. + + Notifications are associated to an event and to a topic. + Events are: + [ 'booking_start', 'booking_end'] + The topic is usually a POD name, ie: + 'Intel POD 2' + """ + + def __init__(self, dashboard_url, user=None, password=None, verbose=False): + self.rabbitmq_broker = dashboard_url + self.verbose = verbose + if user is None and password is None: + self._connection = pika.BlockingConnection(pika.ConnectionParameters( + host=self.rabbitmq_broker)) + else: + self.credentials = pika.PlainCredentials(user, password) + self._connection = pika.BlockingConnection(pika.ConnectionParameters( + credentials=self.credentials, + host=self.rabbitmq_broker)) + self._registry = {} + self._channel = self._connection.channel() + self._channel.exchange_declare(exchange='notifications', type='topic') + self._result = self._channel.queue_declare(exclusive=True, durable=True) + self._queue_name = self._result.method.queue + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._connection.close() + + def register(self, function, topic, type='all'): + """ + Registers a function to be called for the specified event. + :param function: the function to register + :param event: the event type + :param regex: a regex to specify for wich topics the function will be called. Some + possible Expressions can be: + 'Intel POD 2' : Intel POD 2 + """ + + if topic not in self._registry: + self._registry[topic] = [(function, type)] + else: + self._registry[topic].append((function, type)) + + def receive(self): + """ + Start receiving notifications. This is a blocking operation, if a notification is received, + the registered functions will be called. + """ + if self.verbose: + print('Start receiving Notifications. Keys: ', self._registry.keys()) + self._receive_message(self._registry.keys()) + + def send(self, message): + """ + Send an event notification. + :param event: the event type + :param topic: the pod name + :param content: a JSON-serializable dictionary + """ + self._send_message(message) + + def _send_message(self, message): + routing_key = message.topic + message_json = jsonpickle.encode(message) + self._channel.basic_publish(exchange='notifications', + routing_key=routing_key, + body=message_json, + properties=pika.BasicProperties( + content_type='application/json', + delivery_mode=2, # make message persistent + )) + if self.verbose: + print(" [x] Sent %r:%r" % (routing_key, message_json)) + + def _receive_message(self, binding_keys): + for key in binding_keys: + self._channel.queue_bind(exchange='notifications', + queue=self._queue_name, + routing_key=key) + self._channel.basic_consume(self._message_callback, + queue=self._queue_name) + self._channel.start_consuming() + + def _message_callback(self, ch, method, properties, body): + if self.verbose: + print(" [x] Got %r:%r" % (method.routing_key, body)) + if method.routing_key not in self._registry: + return + for func, type in self._registry[method.routing_key]: + message = jsonpickle.decode(body.decode()) + if message.type == type: + func(message) + ch.basic_ack(delivery_tag=method.delivery_tag) |