/* * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onlab.util; import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * Implementation of ThreadPoolExecutor that bounds the work queue. *
* When a new job would exceed the queue bound, the job is run on the caller's * thread rather than on a thread from the pool. *
*/ public final class BoundedThreadPool extends ThreadPoolExecutor { private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class); protected static int maxQueueSize = 80_000; //TODO tune this value //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy(); private static final long STATS_INTERVAL = 5_000; //ms private final BlockingBoolean underHighLoad; private BoundedThreadPool(int numberOfThreads, ThreadFactory threadFactory) { super(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxQueueSize), threadFactory, new CallerFeedbackPolicy()); underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load(); } /** * Returns a single-thread, bounded executor service. * * @param threadFactory thread factory for the worker thread. * @return the bounded thread pool */ public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) { return new BoundedThreadPool(1, threadFactory); } /** * Returns a fixed-size, bounded executor service. * * @param numberOfThreads number of threads in the pool * @param threadFactory thread factory for the worker threads. * @return the bounded thread pool */ public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) { return new BoundedThreadPool(numberOfThreads, threadFactory); } //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead. private final Counter submitted = new Counter(); private final Counter taken = new Counter(); @Override public Future> submit(Runnable task) { submitted.add(1); return super.submit(task); } @Override public