summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormaxbr <maxbr@mi.fu-berlin.de>2017-01-05 12:36:54 +0100
committermaxbr <maxbr@mi.fu-berlin.de>2017-01-05 12:36:54 +0100
commit29a4b8697e4a7960931528142d7778383810b91e (patch)
treee331d02d7dee9e5bdef1fecdf994bd85e07e8bef
parent0f779323898999cc6269ee2f9183de75e1eedab0 (diff)
Add library for dashboard notification System
JIRA: PHAROS-265 Change-Id: Ia33235c5160ef6b36b27a6fe1a2eb97a45e72367 Signed-off-by: maxbr <maxbr@mi.fu-berlin.de>
-rw-r--r--pharos-dashboard/config.env.sample6
-rw-r--r--pharos-dashboard/dashboard_notification/__init__.py (renamed from pharos-dashboard/src/notification_framework/__init__.py)0
-rw-r--r--pharos-dashboard/dashboard_notification/notification.py120
-rw-r--r--pharos-dashboard/docker-compose.yml17
-rw-r--r--pharos-dashboard/rabbitmq/Dockerfile4
-rwxr-xr-xpharos-dashboard/rabbitmq/init.sh10
-rw-r--r--pharos-dashboard/src/__init__.py8
-rw-r--r--pharos-dashboard/src/notification/migrations/0001_initial.py2
-rw-r--r--pharos-dashboard/src/notification/models.py12
-rw-r--r--pharos-dashboard/src/notification/tasks.py40
-rw-r--r--pharos-dashboard/src/notification_framework/notification.py114
-rw-r--r--pharos-dashboard/web/Dockerfile4
-rw-r--r--pharos-dashboard/web/requirements.txt1
-rw-r--r--pharos-dashboard/worker/Dockerfile2
-rw-r--r--pharos-dashboard/worker/requirements.txt1
15 files changed, 196 insertions, 145 deletions
diff --git a/pharos-dashboard/config.env.sample b/pharos-dashboard/config.env.sample
index bd93616..892faac 100644
--- a/pharos-dashboard/config.env.sample
+++ b/pharos-dashboard/config.env.sample
@@ -15,4 +15,8 @@ OAUTH_CONSUMER_SECRET=sample_secret
JIRA_URL=sample_url
JIRA_USER_NAME=sample_jira_user
-JIRA_USER_PASSWORD=sample_jira_pass \ No newline at end of file
+JIRA_USER_PASSWORD=sample_jira_pass
+
+# Rabbitmq
+RABBITMQ_USER=opnfv
+RABBITMQ_PASSWORD=opnfvopnfv
diff --git a/pharos-dashboard/src/notification_framework/__init__.py b/pharos-dashboard/dashboard_notification/__init__.py
index b5914ce..b5914ce 100644
--- a/pharos-dashboard/src/notification_framework/__init__.py
+++ b/pharos-dashboard/dashboard_notification/__init__.py
diff --git a/pharos-dashboard/dashboard_notification/notification.py b/pharos-dashboard/dashboard_notification/notification.py
new file mode 100644
index 0000000..6843c76
--- /dev/null
+++ b/pharos-dashboard/dashboard_notification/notification.py
@@ -0,0 +1,120 @@
+##############################################################################
+# Copyright (c) 2016 Max Breitenfeldt and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import jsonpickle
+import pika
+
+
+class Message(object):
+ def __init__(self, type, topic, content):
+ self.type = type
+ self.topic = topic
+ self.content = content
+
+
+class Notification(object):
+ """
+ This class can be used by the dashboard and the labs to exchange notifications about booking
+ events and pod status. It utilizes rabbitmq to communicate.
+
+ Notifications are associated to an event and to a topic.
+ Events are:
+ [ 'booking_start', 'booking_end']
+ The topic is usually a POD name, ie:
+ 'Intel POD 2'
+ """
+
+ def __init__(self, dashboard_url, user=None, password=None, verbose=False):
+ self.rabbitmq_broker = dashboard_url
+ self.verbose = verbose
+ if user is None and password is None:
+ self._connection = pika.BlockingConnection(pika.ConnectionParameters(
+ host=self.rabbitmq_broker))
+ else:
+ self.credentials = pika.PlainCredentials(user, password)
+ self._connection = pika.BlockingConnection(pika.ConnectionParameters(
+ credentials=self.credentials,
+ host=self.rabbitmq_broker))
+ self._registry = {}
+ self._channel = self._connection.channel()
+ self._channel.exchange_declare(exchange='notifications', type='topic')
+ self._result = self._channel.queue_declare(exclusive=True, durable=True)
+ self._queue_name = self._result.method.queue
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._connection.close()
+
+ def register(self, function, topic, type='all'):
+ """
+ Registers a function to be called for the specified event.
+ :param function: the function to register
+ :param event: the event type
+ :param regex: a regex to specify for wich topics the function will be called. Some
+ possible Expressions can be:
+ 'Intel POD 2' : Intel POD 2
+ """
+
+ if topic not in self._registry:
+ self._registry[topic] = [(function, type)]
+ else:
+ self._registry[topic].append((function, type))
+
+ def receive(self):
+ """
+ Start receiving notifications. This is a blocking operation, if a notification is received,
+ the registered functions will be called.
+ """
+ if self.verbose:
+ print('Start receiving Notifications. Keys: ', self._registry.keys())
+ self._receive_message(self._registry.keys())
+
+ def send(self, message):
+ """
+ Send an event notification.
+ :param event: the event type
+ :param topic: the pod name
+ :param content: a JSON-serializable dictionary
+ """
+ self._send_message(message)
+
+ def _send_message(self, message):
+ routing_key = message.topic
+ message_json = jsonpickle.encode(message)
+ self._channel.basic_publish(exchange='notifications',
+ routing_key=routing_key,
+ body=message_json,
+ properties=pika.BasicProperties(
+ content_type='application/json',
+ delivery_mode=2, # make message persistent
+ ))
+ if self.verbose:
+ print(" [x] Sent %r:%r" % (routing_key, message_json))
+
+ def _receive_message(self, binding_keys):
+ for key in binding_keys:
+ self._channel.queue_bind(exchange='notifications',
+ queue=self._queue_name,
+ routing_key=key)
+ self._channel.basic_consume(self._message_callback,
+ queue=self._queue_name)
+ self._channel.start_consuming()
+
+ def _message_callback(self, ch, method, properties, body):
+ if self.verbose:
+ print(" [x] Got %r:%r" % (method.routing_key, body))
+ if method.routing_key not in self._registry:
+ return
+ for func, type in self._registry[method.routing_key]:
+ message = jsonpickle.decode(body.decode())
+ if message.type == type:
+ func(message)
+ ch.basic_ack(delivery_tag=method.delivery_tag)
diff --git a/pharos-dashboard/docker-compose.yml b/pharos-dashboard/docker-compose.yml
index b487620..d2d672a 100644
--- a/pharos-dashboard/docker-compose.yml
+++ b/pharos-dashboard/docker-compose.yml
@@ -34,7 +34,7 @@ services:
- postgres
env_file: config.env
volumes:
- - ./src:/src
+ - ./:/pharos_dashboard
- /var/lib/pharos_dashboard/static:/static
- /var/lib/pharos_dashboard/media:/media
expose:
@@ -51,19 +51,20 @@ services:
rabbitmq:
restart: always
- image: rabbitmq:latest
+ build: ./rabbitmq/
container_name: rm01
- expose:
- - "5672"
-
+ env_file: config.env
+ ports:
+ - "5672:5672"
+
worker:
restart: always
build: ./worker/
- command: bash -c "celery -A pharos_dashboard worker -l info -B --schedule=~/celerybeat-schedule""
+ command: bash -c "celery -A pharos_dashboard worker -l info -B --schedule=~/celerybeat-schedule"
env_file: config.env
links:
- postgres
- rabbitmq
volumes:
- - ./src:/src
- \ No newline at end of file
+ - ./:/pharos_dashboard
+
diff --git a/pharos-dashboard/rabbitmq/Dockerfile b/pharos-dashboard/rabbitmq/Dockerfile
new file mode 100644
index 0000000..71162a4
--- /dev/null
+++ b/pharos-dashboard/rabbitmq/Dockerfile
@@ -0,0 +1,4 @@
+FROM rabbitmq
+
+ADD init.sh /init.sh
+CMD ["/init.sh"] \ No newline at end of file
diff --git a/pharos-dashboard/rabbitmq/init.sh b/pharos-dashboard/rabbitmq/init.sh
new file mode 100755
index 0000000..f8ac089
--- /dev/null
+++ b/pharos-dashboard/rabbitmq/init.sh
@@ -0,0 +1,10 @@
+#!/bin/sh
+
+# Create Rabbitmq user
+( sleep 20 ; \
+rabbitmqctl add_user $RABBITMQ_USER $RABBITMQ_PASSWORD 2>/dev/null ; \
+rabbitmqctl set_user_tags $RABBITMQ_USER administrator ; \
+rabbitmqctl set_permissions -p / $RABBITMQ_USER ".*" ".*" ".*" ; \
+echo "*** User '$RABBITMQ_USER' with password '$RABBITMQ_PASSWORD' completed. ***") &
+
+rabbitmq-server $@
diff --git a/pharos-dashboard/src/__init__.py b/pharos-dashboard/src/__init__.py
new file mode 100644
index 0000000..ce1acf3
--- /dev/null
+++ b/pharos-dashboard/src/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2016 Max Breitenfeldt and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+############################################################################## \ No newline at end of file
diff --git a/pharos-dashboard/src/notification/migrations/0001_initial.py b/pharos-dashboard/src/notification/migrations/0001_initial.py
index d4af751..8b8414e 100644
--- a/pharos-dashboard/src/notification/migrations/0001_initial.py
+++ b/pharos-dashboard/src/notification/migrations/0001_initial.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Generated by Django 1.10 on 2016-09-23 11:36
+# Generated by Django 1.10 on 2016-11-03 13:33
from __future__ import unicode_literals
from django.db import migrations, models
diff --git a/pharos-dashboard/src/notification/models.py b/pharos-dashboard/src/notification/models.py
index 0ee275d..89b3023 100644
--- a/pharos-dashboard/src/notification/models.py
+++ b/pharos-dashboard/src/notification/models.py
@@ -19,15 +19,15 @@ class BookingNotification(models.Model):
def get_content(self):
return {
- 'start': self.booking.start.isoformat(),
- 'end': self.booking.end.isoformat(),
+ 'resource_id': self.booking.resource.id,
+ 'booking_id': self.booking.id,
'user': self.booking.user.username,
- 'purpose': self.booking.purpose
+ 'user_id': self.booking.user.id,
}
def save(self, *args, **kwargs):
notifications = self.booking.bookingnotification_set.filter(type=self.type).exclude(
id=self.id)
- if notifications.count() > 0:
- raise ValueError('Doubled Notification')
- return super(BookingNotification, self).save(*args, **kwargs) \ No newline at end of file
+ #if notifications.count() > 0:
+ # raise ValueError('Doubled Notification')
+ return super(BookingNotification, self).save(*args, **kwargs)
diff --git a/pharos-dashboard/src/notification/tasks.py b/pharos-dashboard/src/notification/tasks.py
index 4173433..e2b34ca 100644
--- a/pharos-dashboard/src/notification/tasks.py
+++ b/pharos-dashboard/src/notification/tasks.py
@@ -8,6 +8,8 @@
##############################################################################
+import os
+import sys
from datetime import timedelta
from celery import shared_task
@@ -15,19 +17,33 @@ from django.conf import settings
from django.utils import timezone
from notification.models import BookingNotification
-from notification_framework.notification import Notification
+
+# this adds the top level directory to the python path, this is needed so that we can access the
+# notification library
+sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
+
+from dashboard_notification.notification import Notification, Message
@shared_task
def send_booking_notifications():
- messaging = Notification(dashboard_url=settings.RABBITMQ_URL)
-
- now = timezone.now()
- notifications = BookingNotification.objects.filter(submitted=False,
- submit_time__gt=now,
- submit_time__lt=now + timedelta(minutes=5))
- for notification in notifications:
- messaging.send(notification.type, notification.booking.resource.name,
- notification.get_content())
- notification.submitted = True
- notification.save()
+ with Notification(dashboard_url=settings.RABBITMQ_URL) as messaging:
+ now = timezone.now()
+ notifications = BookingNotification.objects.filter(submitted=False,
+ submit_time__gt=now - timedelta(minutes=1),
+ submit_time__lt=now + timedelta(minutes=5))
+ for notification in notifications:
+ message = Message(type=notification.type, topic=notification.booking.resource.name,
+ content=notification.get_content())
+ messaging.send(message)
+ notification.submitted = True
+ notification.save()
+
+@shared_task
+def notification_debug():
+ with Notification(dashboard_url=settings.RABBITMQ_URL) as messaging:
+ notifications = BookingNotification.objects.all()
+ for notification in notifications:
+ message = Message(type=notification.type, topic=notification.booking.resource.name,
+ content=notification.get_content())
+ messaging.send(message)
diff --git a/pharos-dashboard/src/notification_framework/notification.py b/pharos-dashboard/src/notification_framework/notification.py
deleted file mode 100644
index 84fbcff..0000000
--- a/pharos-dashboard/src/notification_framework/notification.py
+++ /dev/null
@@ -1,114 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 Max Breitenfeldt and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-
-import json
-import re
-
-import pika
-
-
-class Notification(object):
- """
- This class can be used by the dashboard and the labs to exchange notifications about booking
- events and pod status. It utilizes rabbitmq to communicate.
-
- Notifications are associated to an event and to a topic.
- Events are:
- [ 'booking_start', 'booking_stop', 'pod_status' ]
- The topic is usually a POD name, ie:
- 'Intel POD 2'
- """
-
- def __init__(self, dashboard_url, verbose=False):
- self.rabbitmq_broker = dashboard_url
- self.verbose = verbose
- self._registry = {}
-
- self.connection = pika.BlockingConnection(pika.ConnectionParameters(
- host=self.rabbitmq_broker))
- self.channel = self.connection.channel()
-
- self.channel.exchange_declare(exchange='notifications', type='topic')
-
- self.result = self.channel.queue_declare(exclusive=True)
- self.queue_name = self.result.method.queue
-
- def register(self, function, event, regex):
- """
- Registers a function to be called for the specified event.
- :param function: the function to register
- :param event: the event type
- :param regex: a regex to specify for wich topics the function will be called. Some
- possible Expressions can be:
- 'Intel POD 2' : Intel POD 2
- 'Intel POD .*' : All Intel Pods
- '.*' : All Topics
- """
-
- if event not in self._registry:
- self._registry[event] = [(function, regex)]
- else:
- self._registry[event].append((function, regex))
-
- def receive(self):
- """
- Start receiving notifications. This is a blocking operation, if a notification is received,
- the registered functions will be called.
- """
- if self.verbose:
- print('Start receiving Notifications. Keys: ', self._registry.keys())
- self._receive_message(self._registry.keys())
-
- def send(self, event, topic, content):
- """
- Send an event notification.
- :param event: the event type
- :param topic: the pod name
- :param content: a JSON-serializable dictionary
- """
- message = {
- 'event': event,
- 'topic': topic,
- 'content': content
- }
- self._send_message(message)
-
- def _send_message(self, event):
- routing_key = event['type']
- message = json.dumps(event)
- self.channel.basic_publish(exchange='notifications',
- routing_key=routing_key,
- body=message,
- properties=pika.BasicProperties(
- content_type='application/json'
- ))
- if self.verbose:
- print(" [x] Sent %r:%r" % (routing_key, message))
-
- def _receive_message(self, binding_keys):
- for key in binding_keys:
- self.channel.queue_bind(exchange='notifications',
- queue=self.queue_name,
- routing_key=key)
- self.channel.basic_consume(self._message_callback,
- queue=self.queue_name,
- no_ack=True)
- self.channel.start_consuming()
-
- def _message_callback(self, ch, method, properties, body):
- if self.verbose:
- print(" [x] Got %r:%r" % (method.routing_key, body))
- if method.routing_key not in self._registry:
- return
- for func, regex in self._registry[method.routing_key]:
- message = json.loads(body.decode())
- match = re.match(regex, message['topic'])
- if match:
- func(body)
diff --git a/pharos-dashboard/web/Dockerfile b/pharos-dashboard/web/Dockerfile
index d543235..228b0b0 100644
--- a/pharos-dashboard/web/Dockerfile
+++ b/pharos-dashboard/web/Dockerfile
@@ -3,5 +3,5 @@ ENV PYTHONUNBUFFERED 1
RUN mkdir /config
ADD ./requirements.txt /config/
RUN pip install -r /config/requirements.txt
-RUN mkdir /src;
-WORKDIR /src
+RUN mkdir -p /pharos_dashboard/src
+WORKDIR /pharos_dashboard/src
diff --git a/pharos-dashboard/web/requirements.txt b/pharos-dashboard/web/requirements.txt
index edb20d0..f80f1c0 100644
--- a/pharos-dashboard/web/requirements.txt
+++ b/pharos-dashboard/web/requirements.txt
@@ -8,6 +8,7 @@ django-registration==2.1.2
djangorestframework==3.4.6
gunicorn==19.6.0
jira==1.0.7
+jsonpickle==0.9.3
oauth2==1.9.0.post1
oauthlib==1.1.2
pika==0.10.0
diff --git a/pharos-dashboard/worker/Dockerfile b/pharos-dashboard/worker/Dockerfile
index 86395e0..c1e8aff 100644
--- a/pharos-dashboard/worker/Dockerfile
+++ b/pharos-dashboard/worker/Dockerfile
@@ -5,4 +5,4 @@ ADD ./requirements.txt /config/
RUN pip install -r /config/requirements.txt
RUN useradd -ms /bin/bash celery
USER celery
-WORKDIR /src
+WORKDIR /pharos_dashboard/src
diff --git a/pharos-dashboard/worker/requirements.txt b/pharos-dashboard/worker/requirements.txt
index edb20d0..f80f1c0 100644
--- a/pharos-dashboard/worker/requirements.txt
+++ b/pharos-dashboard/worker/requirements.txt
@@ -8,6 +8,7 @@ django-registration==2.1.2
djangorestframework==3.4.6
gunicorn==19.6.0
jira==1.0.7
+jsonpickle==0.9.3
oauth2==1.9.0.post1
oauthlib==1.1.2
pika==0.10.0