summaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java')
-rw-r--r--framework/src/onos/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java175
1 files changed, 0 insertions, 175 deletions
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/framework/src/onos/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
deleted file mode 100644
index 79ce74b7..00000000
--- a/framework/src/onos/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.event.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.SharedExecutors;
-import org.onosproject.event.AbstractEvent;
-import org.onosproject.event.DefaultEventSinkRegistry;
-import org.onosproject.event.Event;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.event.EventSink;
-import org.slf4j.Logger;
-
-import java.util.TimerTask;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Simple implementation of an event dispatching service.
- */
-@Component(immediate = true)
-@Service
-public class CoreEventDispatcher extends DefaultEventSinkRegistry
- implements EventDeliveryService {
-
- private final Logger log = getLogger(getClass());
-
- // Default number of millis a sink can take to process an event.
- private static final long DEFAULT_EXECUTE_MS = 5_000; // ms
- private static final long WATCHDOG_MS = 250; // ms
-
- private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
-
- private final ExecutorService executor =
- newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d"));
-
- @SuppressWarnings("unchecked")
- private static final Event KILL_PILL = new AbstractEvent(null, 0) {
- };
-
- private DispatchLoop dispatchLoop;
- private long maxProcessMillis = DEFAULT_EXECUTE_MS;
-
- // Means to detect long-running sinks
- private TimerTask watchdog;
- private EventSink lastSink;
- private long lastStart = 0;
- private Future<?> dispatchFuture;
-
- @Override
- public void post(Event event) {
- if (!events.add(event)) {
- log.error("Unable to post event {}", event);
- }
- }
-
- @Activate
- public void activate() {
- dispatchLoop = new DispatchLoop();
- dispatchFuture = executor.submit(dispatchLoop);
- watchdog = new Watchdog();
- SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- dispatchLoop.stop();
- watchdog.cancel();
- post(KILL_PILL);
- log.info("Stopped");
- }
-
- @Override
- public void setDispatchTimeLimit(long millis) {
- checkArgument(millis >= WATCHDOG_MS,
- "Time limit must be greater than %s", WATCHDOG_MS);
- maxProcessMillis = millis;
- }
-
- @Override
- public long getDispatchTimeLimit() {
- return maxProcessMillis;
- }
-
- // Auxiliary event dispatching loop that feeds off the events queue.
- private class DispatchLoop implements Runnable {
- private volatile boolean stopped;
-
- @Override
- public void run() {
- stopped = false;
- log.info("Dispatch loop initiated");
- while (!stopped) {
- try {
- // Fetch the next event and if it is the kill-pill, bail
- Event event = events.take();
- if (event == KILL_PILL) {
- break;
- }
- process(event);
- } catch (InterruptedException e) {
- log.warn("Dispatch loop interrupted");
- } catch (Exception e) {
- log.warn("Error encountered while dispatching event:", e);
- }
- }
- log.info("Dispatch loop terminated");
- }
-
- // Locate the sink for the event class and use it to process the event
- @SuppressWarnings("unchecked")
- private void process(Event event) {
- EventSink sink = getSink(event.getClass());
- if (sink != null) {
- lastSink = sink;
- lastStart = System.currentTimeMillis();
- sink.process(event);
- lastStart = 0;
- } else {
- log.warn("No sink registered for event class {}",
- event.getClass().getName());
- }
- }
-
- void stop() {
- stopped = true;
- }
- }
-
- // Monitors event sinks to make sure none take too long to execute.
- private class Watchdog extends TimerTask {
- @Override
- public void run() {
- long delta = System.currentTimeMillis() - lastStart;
- if (lastStart > 0 && delta > maxProcessMillis) {
- lastStart = 0;
- log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop",
- lastSink.getClass().getName(), delta);
-
- // Notify the sink that it has exceeded its time limit.
- lastSink.onProcessLimit();
-
- // Cancel the old dispatch loop and submit a new one.
- dispatchLoop.stop();
- dispatchLoop = new DispatchLoop();
- dispatchFuture.cancel(true);
- dispatchFuture = executor.submit(dispatchLoop);
- }
- }
- }
-}