diff options
author | maxbr <maxbr@mi.fu-berlin.de> | 2017-01-05 12:36:54 +0100 |
---|---|---|
committer | maxbr <maxbr@mi.fu-berlin.de> | 2017-01-05 12:36:54 +0100 |
commit | 29a4b8697e4a7960931528142d7778383810b91e (patch) | |
tree | e331d02d7dee9e5bdef1fecdf994bd85e07e8bef | |
parent | 0f779323898999cc6269ee2f9183de75e1eedab0 (diff) |
Add library for dashboard notification System
JIRA: PHAROS-265
Change-Id: Ia33235c5160ef6b36b27a6fe1a2eb97a45e72367
Signed-off-by: maxbr <maxbr@mi.fu-berlin.de>
-rw-r--r-- | pharos-dashboard/config.env.sample | 6 | ||||
-rw-r--r-- | pharos-dashboard/dashboard_notification/__init__.py (renamed from pharos-dashboard/src/notification_framework/__init__.py) | 0 | ||||
-rw-r--r-- | pharos-dashboard/dashboard_notification/notification.py | 120 | ||||
-rw-r--r-- | pharos-dashboard/docker-compose.yml | 17 | ||||
-rw-r--r-- | pharos-dashboard/rabbitmq/Dockerfile | 4 | ||||
-rwxr-xr-x | pharos-dashboard/rabbitmq/init.sh | 10 | ||||
-rw-r--r-- | pharos-dashboard/src/__init__.py | 8 | ||||
-rw-r--r-- | pharos-dashboard/src/notification/migrations/0001_initial.py | 2 | ||||
-rw-r--r-- | pharos-dashboard/src/notification/models.py | 12 | ||||
-rw-r--r-- | pharos-dashboard/src/notification/tasks.py | 40 | ||||
-rw-r--r-- | pharos-dashboard/src/notification_framework/notification.py | 114 | ||||
-rw-r--r-- | pharos-dashboard/web/Dockerfile | 4 | ||||
-rw-r--r-- | pharos-dashboard/web/requirements.txt | 1 | ||||
-rw-r--r-- | pharos-dashboard/worker/Dockerfile | 2 | ||||
-rw-r--r-- | pharos-dashboard/worker/requirements.txt | 1 |
15 files changed, 196 insertions, 145 deletions
diff --git a/pharos-dashboard/config.env.sample b/pharos-dashboard/config.env.sample index bd93616..892faac 100644 --- a/pharos-dashboard/config.env.sample +++ b/pharos-dashboard/config.env.sample @@ -15,4 +15,8 @@ OAUTH_CONSUMER_SECRET=sample_secret JIRA_URL=sample_url JIRA_USER_NAME=sample_jira_user -JIRA_USER_PASSWORD=sample_jira_pass
\ No newline at end of file +JIRA_USER_PASSWORD=sample_jira_pass + +# Rabbitmq +RABBITMQ_USER=opnfv +RABBITMQ_PASSWORD=opnfvopnfv diff --git a/pharos-dashboard/src/notification_framework/__init__.py b/pharos-dashboard/dashboard_notification/__init__.py index b5914ce..b5914ce 100644 --- a/pharos-dashboard/src/notification_framework/__init__.py +++ b/pharos-dashboard/dashboard_notification/__init__.py diff --git a/pharos-dashboard/dashboard_notification/notification.py b/pharos-dashboard/dashboard_notification/notification.py new file mode 100644 index 0000000..6843c76 --- /dev/null +++ b/pharos-dashboard/dashboard_notification/notification.py @@ -0,0 +1,120 @@ +############################################################################## +# Copyright (c) 2016 Max Breitenfeldt and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import jsonpickle +import pika + + +class Message(object): + def __init__(self, type, topic, content): + self.type = type + self.topic = topic + self.content = content + + +class Notification(object): + """ + This class can be used by the dashboard and the labs to exchange notifications about booking + events and pod status. It utilizes rabbitmq to communicate. + + Notifications are associated to an event and to a topic. + Events are: + [ 'booking_start', 'booking_end'] + The topic is usually a POD name, ie: + 'Intel POD 2' + """ + + def __init__(self, dashboard_url, user=None, password=None, verbose=False): + self.rabbitmq_broker = dashboard_url + self.verbose = verbose + if user is None and password is None: + self._connection = pika.BlockingConnection(pika.ConnectionParameters( + host=self.rabbitmq_broker)) + else: + self.credentials = pika.PlainCredentials(user, password) + self._connection = pika.BlockingConnection(pika.ConnectionParameters( + credentials=self.credentials, + host=self.rabbitmq_broker)) + self._registry = {} + self._channel = self._connection.channel() + self._channel.exchange_declare(exchange='notifications', type='topic') + self._result = self._channel.queue_declare(exclusive=True, durable=True) + self._queue_name = self._result.method.queue + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._connection.close() + + def register(self, function, topic, type='all'): + """ + Registers a function to be called for the specified event. + :param function: the function to register + :param event: the event type + :param regex: a regex to specify for wich topics the function will be called. Some + possible Expressions can be: + 'Intel POD 2' : Intel POD 2 + """ + + if topic not in self._registry: + self._registry[topic] = [(function, type)] + else: + self._registry[topic].append((function, type)) + + def receive(self): + """ + Start receiving notifications. This is a blocking operation, if a notification is received, + the registered functions will be called. + """ + if self.verbose: + print('Start receiving Notifications. Keys: ', self._registry.keys()) + self._receive_message(self._registry.keys()) + + def send(self, message): + """ + Send an event notification. + :param event: the event type + :param topic: the pod name + :param content: a JSON-serializable dictionary + """ + self._send_message(message) + + def _send_message(self, message): + routing_key = message.topic + message_json = jsonpickle.encode(message) + self._channel.basic_publish(exchange='notifications', + routing_key=routing_key, + body=message_json, + properties=pika.BasicProperties( + content_type='application/json', + delivery_mode=2, # make message persistent + )) + if self.verbose: + print(" [x] Sent %r:%r" % (routing_key, message_json)) + + def _receive_message(self, binding_keys): + for key in binding_keys: + self._channel.queue_bind(exchange='notifications', + queue=self._queue_name, + routing_key=key) + self._channel.basic_consume(self._message_callback, + queue=self._queue_name) + self._channel.start_consuming() + + def _message_callback(self, ch, method, properties, body): + if self.verbose: + print(" [x] Got %r:%r" % (method.routing_key, body)) + if method.routing_key not in self._registry: + return + for func, type in self._registry[method.routing_key]: + message = jsonpickle.decode(body.decode()) + if message.type == type: + func(message) + ch.basic_ack(delivery_tag=method.delivery_tag) diff --git a/pharos-dashboard/docker-compose.yml b/pharos-dashboard/docker-compose.yml index b487620..d2d672a 100644 --- a/pharos-dashboard/docker-compose.yml +++ b/pharos-dashboard/docker-compose.yml @@ -34,7 +34,7 @@ services: - postgres env_file: config.env volumes: - - ./src:/src + - ./:/pharos_dashboard - /var/lib/pharos_dashboard/static:/static - /var/lib/pharos_dashboard/media:/media expose: @@ -51,19 +51,20 @@ services: rabbitmq: restart: always - image: rabbitmq:latest + build: ./rabbitmq/ container_name: rm01 - expose: - - "5672" - + env_file: config.env + ports: + - "5672:5672" + worker: restart: always build: ./worker/ - command: bash -c "celery -A pharos_dashboard worker -l info -B --schedule=~/celerybeat-schedule"" + command: bash -c "celery -A pharos_dashboard worker -l info -B --schedule=~/celerybeat-schedule" env_file: config.env links: - postgres - rabbitmq volumes: - - ./src:/src -
\ No newline at end of file + - ./:/pharos_dashboard + diff --git a/pharos-dashboard/rabbitmq/Dockerfile b/pharos-dashboard/rabbitmq/Dockerfile new file mode 100644 index 0000000..71162a4 --- /dev/null +++ b/pharos-dashboard/rabbitmq/Dockerfile @@ -0,0 +1,4 @@ +FROM rabbitmq + +ADD init.sh /init.sh +CMD ["/init.sh"]
\ No newline at end of file diff --git a/pharos-dashboard/rabbitmq/init.sh b/pharos-dashboard/rabbitmq/init.sh new file mode 100755 index 0000000..f8ac089 --- /dev/null +++ b/pharos-dashboard/rabbitmq/init.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +# Create Rabbitmq user +( sleep 20 ; \ +rabbitmqctl add_user $RABBITMQ_USER $RABBITMQ_PASSWORD 2>/dev/null ; \ +rabbitmqctl set_user_tags $RABBITMQ_USER administrator ; \ +rabbitmqctl set_permissions -p / $RABBITMQ_USER ".*" ".*" ".*" ; \ +echo "*** User '$RABBITMQ_USER' with password '$RABBITMQ_PASSWORD' completed. ***") & + +rabbitmq-server $@ diff --git a/pharos-dashboard/src/__init__.py b/pharos-dashboard/src/__init__.py new file mode 100644 index 0000000..ce1acf3 --- /dev/null +++ b/pharos-dashboard/src/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2016 Max Breitenfeldt and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################
\ No newline at end of file diff --git a/pharos-dashboard/src/notification/migrations/0001_initial.py b/pharos-dashboard/src/notification/migrations/0001_initial.py index d4af751..8b8414e 100644 --- a/pharos-dashboard/src/notification/migrations/0001_initial.py +++ b/pharos-dashboard/src/notification/migrations/0001_initial.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Generated by Django 1.10 on 2016-09-23 11:36 +# Generated by Django 1.10 on 2016-11-03 13:33 from __future__ import unicode_literals from django.db import migrations, models diff --git a/pharos-dashboard/src/notification/models.py b/pharos-dashboard/src/notification/models.py index 0ee275d..89b3023 100644 --- a/pharos-dashboard/src/notification/models.py +++ b/pharos-dashboard/src/notification/models.py @@ -19,15 +19,15 @@ class BookingNotification(models.Model): def get_content(self): return { - 'start': self.booking.start.isoformat(), - 'end': self.booking.end.isoformat(), + 'resource_id': self.booking.resource.id, + 'booking_id': self.booking.id, 'user': self.booking.user.username, - 'purpose': self.booking.purpose + 'user_id': self.booking.user.id, } def save(self, *args, **kwargs): notifications = self.booking.bookingnotification_set.filter(type=self.type).exclude( id=self.id) - if notifications.count() > 0: - raise ValueError('Doubled Notification') - return super(BookingNotification, self).save(*args, **kwargs)
\ No newline at end of file + #if notifications.count() > 0: + # raise ValueError('Doubled Notification') + return super(BookingNotification, self).save(*args, **kwargs) diff --git a/pharos-dashboard/src/notification/tasks.py b/pharos-dashboard/src/notification/tasks.py index 4173433..e2b34ca 100644 --- a/pharos-dashboard/src/notification/tasks.py +++ b/pharos-dashboard/src/notification/tasks.py @@ -8,6 +8,8 @@ ############################################################################## +import os +import sys from datetime import timedelta from celery import shared_task @@ -15,19 +17,33 @@ from django.conf import settings from django.utils import timezone from notification.models import BookingNotification -from notification_framework.notification import Notification + +# this adds the top level directory to the python path, this is needed so that we can access the +# notification library +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) + +from dashboard_notification.notification import Notification, Message @shared_task def send_booking_notifications(): - messaging = Notification(dashboard_url=settings.RABBITMQ_URL) - - now = timezone.now() - notifications = BookingNotification.objects.filter(submitted=False, - submit_time__gt=now, - submit_time__lt=now + timedelta(minutes=5)) - for notification in notifications: - messaging.send(notification.type, notification.booking.resource.name, - notification.get_content()) - notification.submitted = True - notification.save() + with Notification(dashboard_url=settings.RABBITMQ_URL) as messaging: + now = timezone.now() + notifications = BookingNotification.objects.filter(submitted=False, + submit_time__gt=now - timedelta(minutes=1), + submit_time__lt=now + timedelta(minutes=5)) + for notification in notifications: + message = Message(type=notification.type, topic=notification.booking.resource.name, + content=notification.get_content()) + messaging.send(message) + notification.submitted = True + notification.save() + +@shared_task +def notification_debug(): + with Notification(dashboard_url=settings.RABBITMQ_URL) as messaging: + notifications = BookingNotification.objects.all() + for notification in notifications: + message = Message(type=notification.type, topic=notification.booking.resource.name, + content=notification.get_content()) + messaging.send(message) diff --git a/pharos-dashboard/src/notification_framework/notification.py b/pharos-dashboard/src/notification_framework/notification.py deleted file mode 100644 index 84fbcff..0000000 --- a/pharos-dashboard/src/notification_framework/notification.py +++ /dev/null @@ -1,114 +0,0 @@ -############################################################################## -# Copyright (c) 2016 Max Breitenfeldt and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - - -import json -import re - -import pika - - -class Notification(object): - """ - This class can be used by the dashboard and the labs to exchange notifications about booking - events and pod status. It utilizes rabbitmq to communicate. - - Notifications are associated to an event and to a topic. - Events are: - [ 'booking_start', 'booking_stop', 'pod_status' ] - The topic is usually a POD name, ie: - 'Intel POD 2' - """ - - def __init__(self, dashboard_url, verbose=False): - self.rabbitmq_broker = dashboard_url - self.verbose = verbose - self._registry = {} - - self.connection = pika.BlockingConnection(pika.ConnectionParameters( - host=self.rabbitmq_broker)) - self.channel = self.connection.channel() - - self.channel.exchange_declare(exchange='notifications', type='topic') - - self.result = self.channel.queue_declare(exclusive=True) - self.queue_name = self.result.method.queue - - def register(self, function, event, regex): - """ - Registers a function to be called for the specified event. - :param function: the function to register - :param event: the event type - :param regex: a regex to specify for wich topics the function will be called. Some - possible Expressions can be: - 'Intel POD 2' : Intel POD 2 - 'Intel POD .*' : All Intel Pods - '.*' : All Topics - """ - - if event not in self._registry: - self._registry[event] = [(function, regex)] - else: - self._registry[event].append((function, regex)) - - def receive(self): - """ - Start receiving notifications. This is a blocking operation, if a notification is received, - the registered functions will be called. - """ - if self.verbose: - print('Start receiving Notifications. Keys: ', self._registry.keys()) - self._receive_message(self._registry.keys()) - - def send(self, event, topic, content): - """ - Send an event notification. - :param event: the event type - :param topic: the pod name - :param content: a JSON-serializable dictionary - """ - message = { - 'event': event, - 'topic': topic, - 'content': content - } - self._send_message(message) - - def _send_message(self, event): - routing_key = event['type'] - message = json.dumps(event) - self.channel.basic_publish(exchange='notifications', - routing_key=routing_key, - body=message, - properties=pika.BasicProperties( - content_type='application/json' - )) - if self.verbose: - print(" [x] Sent %r:%r" % (routing_key, message)) - - def _receive_message(self, binding_keys): - for key in binding_keys: - self.channel.queue_bind(exchange='notifications', - queue=self.queue_name, - routing_key=key) - self.channel.basic_consume(self._message_callback, - queue=self.queue_name, - no_ack=True) - self.channel.start_consuming() - - def _message_callback(self, ch, method, properties, body): - if self.verbose: - print(" [x] Got %r:%r" % (method.routing_key, body)) - if method.routing_key not in self._registry: - return - for func, regex in self._registry[method.routing_key]: - message = json.loads(body.decode()) - match = re.match(regex, message['topic']) - if match: - func(body) diff --git a/pharos-dashboard/web/Dockerfile b/pharos-dashboard/web/Dockerfile index d543235..228b0b0 100644 --- a/pharos-dashboard/web/Dockerfile +++ b/pharos-dashboard/web/Dockerfile @@ -3,5 +3,5 @@ ENV PYTHONUNBUFFERED 1 RUN mkdir /config ADD ./requirements.txt /config/ RUN pip install -r /config/requirements.txt -RUN mkdir /src; -WORKDIR /src +RUN mkdir -p /pharos_dashboard/src +WORKDIR /pharos_dashboard/src diff --git a/pharos-dashboard/web/requirements.txt b/pharos-dashboard/web/requirements.txt index edb20d0..f80f1c0 100644 --- a/pharos-dashboard/web/requirements.txt +++ b/pharos-dashboard/web/requirements.txt @@ -8,6 +8,7 @@ django-registration==2.1.2 djangorestframework==3.4.6 gunicorn==19.6.0 jira==1.0.7 +jsonpickle==0.9.3 oauth2==1.9.0.post1 oauthlib==1.1.2 pika==0.10.0 diff --git a/pharos-dashboard/worker/Dockerfile b/pharos-dashboard/worker/Dockerfile index 86395e0..c1e8aff 100644 --- a/pharos-dashboard/worker/Dockerfile +++ b/pharos-dashboard/worker/Dockerfile @@ -5,4 +5,4 @@ ADD ./requirements.txt /config/ RUN pip install -r /config/requirements.txt RUN useradd -ms /bin/bash celery USER celery -WORKDIR /src +WORKDIR /pharos_dashboard/src diff --git a/pharos-dashboard/worker/requirements.txt b/pharos-dashboard/worker/requirements.txt index edb20d0..f80f1c0 100644 --- a/pharos-dashboard/worker/requirements.txt +++ b/pharos-dashboard/worker/requirements.txt @@ -8,6 +8,7 @@ django-registration==2.1.2 djangorestframework==3.4.6 gunicorn==19.6.0 jira==1.0.7 +jsonpickle==0.9.3 oauth2==1.9.0.post1 oauthlib==1.1.2 pika==0.10.0 |