aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/misc/src/main/java/org/onlab/util/BoundedThreadPool.java
blob: 9eef6609a0086e1bc00b1fed35ab0d0458b873e3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*
 * Copyright 2015 Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onlab.util;

import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Implementation of ThreadPoolExecutor that bounds the work queue.
 * <p>
 * When a new job would exceed the queue bound, the job is run on the caller's
 * thread rather than on a thread from the pool.
 * </p>
 */
public final class BoundedThreadPool extends ThreadPoolExecutor {

    private static final org.slf4j.Logger log = LoggerFactory.getLogger(BoundedThreadPool.class);

    protected static int maxQueueSize = 80_000; //TODO tune this value
    //private static final RejectedExecutionHandler DEFAULT_HANDLER = new CallerFeedbackPolicy();
    private static final long STATS_INTERVAL = 5_000; //ms

    private final BlockingBoolean underHighLoad;

    private BoundedThreadPool(int numberOfThreads,
                              ThreadFactory threadFactory) {
        super(numberOfThreads, numberOfThreads,
              0L, TimeUnit.MILLISECONDS,
              new LinkedBlockingQueue<>(maxQueueSize),
              threadFactory,
              new CallerFeedbackPolicy());
        underHighLoad = ((CallerFeedbackPolicy) getRejectedExecutionHandler()).load();
    }

    /**
     * Returns a single-thread, bounded executor service.
     *
     * @param threadFactory thread factory for the worker thread.
     * @return the bounded thread pool
     */
    public static BoundedThreadPool newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new BoundedThreadPool(1, threadFactory);
    }

    /**
     * Returns a fixed-size, bounded executor service.
     *
     * @param numberOfThreads number of threads in the pool
     * @param threadFactory   thread factory for the worker threads.
     * @return the bounded thread pool
     */
    public static BoundedThreadPool newFixedThreadPool(int numberOfThreads, ThreadFactory threadFactory) {
        return new BoundedThreadPool(numberOfThreads, threadFactory);
    }

    //TODO Might want to switch these to use Metrics class Meter and/or Gauge instead.
    private final Counter submitted = new Counter();
    private final Counter taken = new Counter();

    @Override
    public Future<?> submit(Runnable task) {
        submitted.add(1);
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        submitted.add(1);
        return super.submit(task, result);
    }

    @Override
    public void execute(Runnable command) {
        submitted.add(1);
        super.execute(command);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        submitted.add(1);
        return super.submit(task);
    }


    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taken.add(1);
        periodicallyPrintStats();
        updateLoad();
    }

    // TODO schedule this with a fixed delay from a scheduled executor
    private final AtomicLong lastPrinted = new AtomicLong(0L);

    private void periodicallyPrintStats() {
        long now = System.currentTimeMillis();
        long prev = lastPrinted.get();
        if (now - prev > STATS_INTERVAL) {
            if (lastPrinted.compareAndSet(prev, now)) {
                log.debug("queue size: {} jobs, submitted: {} jobs/s, taken: {} jobs/s",
                         getQueue().size(),
                         submitted.throughput(), taken.throughput());
                submitted.reset();
                taken.reset();
            }
        }
    }

    // TODO consider updating load whenever queue changes
    private void updateLoad() {
        underHighLoad.set(getQueue().remainingCapacity() / (double) maxQueueSize < 0.2);
    }

    /**
     * Feedback policy that delays the caller's thread until the executor's work
     * queue falls below a threshold, then runs the job on the caller's thread.
     */
    private static final class CallerFeedbackPolicy implements RejectedExecutionHandler {

        private final BlockingBoolean underLoad = new BlockingBoolean(false);

        public BlockingBoolean load() {
            return underLoad;
        }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // Wait for up to 1 second while the queue drains...
                boolean notified = false;
                try {
                    notified = underLoad.await(false, 1, TimeUnit.SECONDS);
                } catch (InterruptedException exception) {
                    log.debug("Got exception waiting for notification:", exception);
                } finally {
                    if (!notified) {
                        log.info("Waited for 1 second on {}. Proceeding with work...",
                                 Thread.currentThread().getName());
                    } else {
                        log.info("FIXME we got a notice");
                    }
                }
                // Do the work on the submitter's thread
                r.run();
            }
        }
    }
}