aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java')
-rw-r--r--framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java176
1 files changed, 0 insertions, 176 deletions
diff --git a/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java b/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java
deleted file mode 100644
index 9eef6609..00000000
--- a/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.util;
-
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Implementation of ThreadPoolExecutor that bounds the work queue.
- * <p>
- * When a new job would exceed the queue bound, the job is run on the caller's
- * thread rather than on a thread from the pool.
- * </p>
- */
-public final class BoundedThreadPool extends ThreadPoolExecutor {
-
- private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);
-
- protected static int maxQueueSize = 80_000; //TODO tune this value
- //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
- private static final long STATS_INTERVAL = 5_000; //ms
-
- private final BlockingBoolean underHighLoad;
-
- private BoundedThreadPool(int numberOfThreads,
- ThreadFactory threadFactory) {
- super(numberOfThreads, numberOfThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(maxQueueSize),
- threadFactory,
- new CallerFeedbackPolicy());
- underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
- }
-
- /**
- * Returns a single-thread, bounded executor service.
- *
- * @param threadFactory thread factory for the worker thread.
- * @return the bounded thread pool
- */
- public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
- return new BoundedThreadPool(1, threadFactory);
- }
-
- /**
- * Returns a fixed-size, bounded executor service.
- *
- * @param numberOfThreads number of threads in the pool
- * @param threadFactory thread factory for the worker threads.
- * @return the bounded thread pool
- */
- public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
- return new BoundedThreadPool(numberOfThreads, threadFactory);
- }
-
- //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
- private final Counter submitted = new Counter();
- private final Counter taken = new Counter();
-
- @Override
- public Future<?> submit(Runnable task) {
- submitted.add(1);
- return super.submit(task);
- }
-
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- submitted.add(1);
- return super.submit(task, result);
- }
-
- @Override
- public void execute(Runnable command) {
- submitted.add(1);
- super.execute(command);
- }
-
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- submitted.add(1);
- return super.submit(task);
- }
-
-
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- super.beforeExecute(t, r);
- taken.add(1);
- periodicallyPrintStats();
- updateLoad();
- }
-
- // TODO schedule this with a fixed delay from a scheduled executor
- private final AtomicLong lastPrinted = new AtomicLong(0L);
-
- private void periodicallyPrintStats() {
- long now = System.currentTimeMillis();
- long prev = lastPrinted.get();
- if (now - prev > STATS_INTERVAL) {
- if (lastPrinted.compareAndSet(prev, now)) {
- log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
- getQueue().size(),
- submitted.throughput(), taken.throughput());
- submitted.reset();
- taken.reset();
- }
- }
- }
-
- // TODO consider updating load whenever queue changes
- private void updateLoad() {
- underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
- }
-
- /**
- * Feedback policy that delays the caller's thread until the executor's work
- * queue falls below a threshold, then runs the job on the caller's thread.
- */
- private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {
-
- private final BlockingBoolean underLoad = new BlockingBoolean(false);
-
- public BlockingBoolean load() {
- return underLoad;
- }
-
- /**
- * Executes task r in the caller's thread, unless the executor
- * has been shut down, in which case the task is discarded.
- *
- * @param r the runnable task requested to be executed
- * @param e the executor attempting to execute this task
- */
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- // Wait for up to 1 second while the queue drains...
- boolean notified = false;
- try {
- notified = underLoad.await(false, 1, TimeUnit.SECONDS);
- } catch (InterruptedException exception) {
- log.debug("Got exception waiting for notification:", exception);
- } finally {
- if (!notified) {
- log.info("Waited for 1 second on {}. Proceeding with work...",
- Thread.currentThread().getName());
- } else {
- log.info("FIXME we got a notice");
- }
- }
- // Do the work on the submitter's thread
- r.run();
- }
- }
- }
-} \ No newline at end of file