diff options
Diffstat (limited to 'framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java')
-rw-r--r-- | framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java | 176 |
1 files changed, 0 insertions, 176 deletions
diff --git a/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java b/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java deleted file mode 100644 index 9eef6609..00000000 --- a/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.util; - -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Implementation of ThreadPoolExecutor that bounds the work queue. - * <p> - * When a new job would exceed the queue bound, the job is run on the caller's - * thread rather than on a thread from the pool. - * </p> - */ -public final class BoundedThreadPool extends ThreadPoolExecutor { - - private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class); - - protected static int maxQueueSize = 80_000; //TODO tune this value - //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy(); - private static final long STATS_INTERVAL = 5_000; //ms - - private final BlockingBoolean underHighLoad; - - private BoundedThreadPool(int numberOfThreads, - ThreadFactory threadFactory) { - super(numberOfThreads, numberOfThreads, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(maxQueueSize), - threadFactory, - new CallerFeedbackPolicy()); - underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load(); - } - - /** - * Returns a single-thread, bounded executor service. - * - * @param threadFactory thread factory for the worker thread. - * @return the bounded thread pool - */ - public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) { - return new BoundedThreadPool(1, threadFactory); - } - - /** - * Returns a fixed-size, bounded executor service. - * - * @param numberOfThreads number of threads in the pool - * @param threadFactory thread factory for the worker threads. - * @return the bounded thread pool - */ - public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) { - return new BoundedThreadPool(numberOfThreads, threadFactory); - } - - //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead. - private final Counter submitted = new Counter(); - private final Counter taken = new Counter(); - - @Override - public Future<?> submit(Runnable task) { - submitted.add(1); - return super.submit(task); - } - - @Override - public <T> Future<T> submit(Runnable task, T result) { - submitted.add(1); - return super.submit(task, result); - } - - @Override - public void execute(Runnable command) { - submitted.add(1); - super.execute(command); - } - - @Override - public <T> Future<T> submit(Callable<T> task) { - submitted.add(1); - return super.submit(task); - } - - - @Override - protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); - taken.add(1); - periodicallyPrintStats(); - updateLoad(); - } - - // TODO schedule this with a fixed delay from a scheduled executor - private final AtomicLong lastPrinted = new AtomicLong(0L); - - private void periodicallyPrintStats() { - long now = System.currentTimeMillis(); - long prev = lastPrinted.get(); - if (now - prev > STATS_INTERVAL) { - if (lastPrinted.compareAndSet(prev, now)) { - log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s", - getQueue().size(), - submitted.throughput(), taken.throughput()); - submitted.reset(); - taken.reset(); - } - } - } - - // TODO consider updating load whenever queue changes - private void updateLoad() { - underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2); - } - - /** - * Feedback policy that delays the caller's thread until the executor's work - * queue falls below a threshold, then runs the job on the caller's thread. - */ - private static final class CallerFeedbackPolicy implements RejectedExecutionHandler { - - private final BlockingBoolean underLoad = new BlockingBoolean(false); - - public BlockingBoolean load() { - return underLoad; - } - - /** - * Executes task r in the caller's thread, unless the executor - * has been shut down, in which case the task is discarded. - * - * @param r the runnable task requested to be executed - * @param e the executor attempting to execute this task - */ - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - if (!e.isShutdown()) { - // Wait for up to 1 second while the queue drains... - boolean notified = false; - try { - notified = underLoad.await(false, 1, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - log.debug("Got exception waiting for notification:", exception); - } finally { - if (!notified) { - log.info("Waited for 1 second on {}. Proceeding with work...", - Thread.currentThread().getName()); - } else { - log.info("FIXME we got a notice"); - } - } - // Do the work on the submitter's thread - r.run(); - } - } - } -}
\ No newline at end of file |