diff options
Diffstat (limited to 'framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java')
-rw-r--r-- | framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java b/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java new file mode 100644 index 00000000..9eef6609 --- /dev/null +++ b/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java @@ -0,0 +1,176 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.util; + +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of ThreadPoolExecutor that bounds the work queue. + * <p> + * When a new job would exceed the queue bound, the job is run on the caller's + * thread rather than on a thread from the pool. + * </p> + */ +public final class BoundedThreadPool extends ThreadPoolExecutor { + + private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class); + + protected static int maxQueueSize = 80_000; //TODO tune this value + //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy(); + private static final long STATS_INTERVAL = 5_000; //ms + + private final BlockingBoolean underHighLoad; + + private BoundedThreadPool(int numberOfThreads, + ThreadFactory threadFactory) { + super(numberOfThreads, numberOfThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(maxQueueSize), + threadFactory, + new CallerFeedbackPolicy()); + underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load(); + } + + /** + * Returns a single-thread, bounded executor service. + * + * @param threadFactory thread factory for the worker thread. + * @return the bounded thread pool + */ + public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) { + return new BoundedThreadPool(1, threadFactory); + } + + /** + * Returns a fixed-size, bounded executor service. + * + * @param numberOfThreads number of threads in the pool + * @param threadFactory thread factory for the worker threads. + * @return the bounded thread pool + */ + public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) { + return new BoundedThreadPool(numberOfThreads, threadFactory); + } + + //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead. + private final Counter submitted = new Counter(); + private final Counter taken = new Counter(); + + @Override + public Future<?> submit(Runnable task) { + submitted.add(1); + return super.submit(task); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + submitted.add(1); + return super.submit(task, result); + } + + @Override + public void execute(Runnable command) { + submitted.add(1); + super.execute(command); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + submitted.add(1); + return super.submit(task); + } + + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + taken.add(1); + periodicallyPrintStats(); + updateLoad(); + } + + // TODO schedule this with a fixed delay from a scheduled executor + private final AtomicLong lastPrinted = new AtomicLong(0L); + + private void periodicallyPrintStats() { + long now = System.currentTimeMillis(); + long prev = lastPrinted.get(); + if (now - prev > STATS_INTERVAL) { + if (lastPrinted.compareAndSet(prev, now)) { + log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s", + getQueue().size(), + submitted.throughput(), taken.throughput()); + submitted.reset(); + taken.reset(); + } + } + } + + // TODO consider updating load whenever queue changes + private void updateLoad() { + underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2); + } + + /** + * Feedback policy that delays the caller's thread until the executor's work + * queue falls below a threshold, then runs the job on the caller's thread. + */ + private static final class CallerFeedbackPolicy implements RejectedExecutionHandler { + + private final BlockingBoolean underLoad = new BlockingBoolean(false); + + public BlockingBoolean load() { + return underLoad; + } + + /** + * Executes task r in the caller's thread, unless the executor + * has been shut down, in which case the task is discarded. + * + * @param r the runnable task requested to be executed + * @param e the executor attempting to execute this task + */ + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + if (!e.isShutdown()) { + // Wait for up to 1 second while the queue drains... + boolean notified = false; + try { + notified = underLoad.await(false, 1, TimeUnit.SECONDS); + } catch (InterruptedException exception) { + log.debug("Got exception waiting for notification:", exception); + } finally { + if (!notified) { + log.info("Waited for 1 second on {}. Proceeding with work...", + Thread.currentThread().getName()); + } else { + log.info("FIXME we got a notice"); + } + } + // Do the work on the submitter's thread + r.run(); + } + } + } +}
\ No newline at end of file |