aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
blob: a19e8aace991515076ddb889679427c7767e25d3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
/*
 * Copyright 2014 Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onlab.nio;

import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.System.currentTimeMillis;
import static java.nio.ByteBuffer.allocateDirect;

/**
 * Bi-directional message stream for transferring messages to & from the
 * network via two byte buffers.
 *
 * @param <M> message type
 */
public abstract class MessageStream<M extends Message> {

    protected Logger log = LoggerFactory.getLogger(getClass());

    private final IOLoop<M, ?> loop;
    private final ByteChannel channel;
    private final int maxIdleMillis;

    private final ByteBuffer inbound;
    private ByteBuffer outbound;
    private SelectionKey key;

    private volatile boolean closed = false;
    private volatile boolean writePending;
    private volatile boolean writeOccurred;

    private Exception ioError;
    private long lastActiveTime;

    private final Counter bytesIn = new Counter();
    private final Counter messagesIn = new Counter();
    private final Counter bytesOut = new Counter();
    private final Counter messagesOut = new Counter();

    /**
     * Creates a message stream associated with the specified IO loop and
     * backed by the given byte channel.
     *
     * @param loop          IO loop
     * @param byteChannel   backing byte channel
     * @param bufferSize    size of the backing byte buffers
     * @param maxIdleMillis maximum number of millis the stream can be idle
     *                      before it will be closed
     */
    protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
                            int bufferSize, int maxIdleMillis) {
        this.loop = checkNotNull(loop, "Loop cannot be null");
        this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");

        checkArgument(maxIdleMillis > 0, "Idle time must be positive");
        this.maxIdleMillis = maxIdleMillis;

        inbound = allocateDirect(bufferSize);
        outbound = allocateDirect(bufferSize);
    }

    /**
     * Gets a single message from the specified byte buffer; this is
     * to be done without manipulating the buffer via flip, reset or clear.
     *
     * @param buffer byte buffer
     * @return read message or null if there are not enough bytes to read
     * a complete message
     */
    protected abstract M read(ByteBuffer buffer);

    /**
     * Puts the specified message into the specified byte buffer; this is
     * to be done without manipulating the buffer via flip, reset or clear.
     *
     * @param message message to be write into the buffer
     * @param buffer  byte buffer
     */
    protected abstract void write(M message, ByteBuffer buffer);

    /**
     * Closes the message buffer.
     */
    public void close() {
        synchronized (this) {
            if (closed) {
                return;
            }
            closed = true;
        }

        bytesIn.freeze();
        bytesOut.freeze();
        messagesIn.freeze();
        messagesOut.freeze();

        loop.removeStream(this);
        if (key != null) {
            try {
                key.cancel();
                key.channel().close();
            } catch (IOException e) {
                log.warn("Unable to close stream", e);
            }
        }
    }

    /**
     * Indicates whether this buffer has been closed.
     *
     * @return true if this stream has been closed
     */
    public synchronized boolean isClosed() {
        return closed;
    }

    /**
     * Returns the stream IO selection key.
     *
     * @return socket channel registration selection key
     */
    public SelectionKey key() {
        return key;
    }

    /**
     * Binds the selection key to be used for driving IO operations on the stream.
     *
     * @param key IO selection key
     */
    public void setKey(SelectionKey key) {
        this.key = key;
        this.lastActiveTime = currentTimeMillis();
    }

    /**
     * Returns the IO loop to which this stream is bound.
     *
     * @return I/O loop used to drive this stream
     */
    public IOLoop<M, ?> loop() {
        return loop;
    }

    /**
     * Indicates whether the any prior IO encountered an error.
     *
     * @return true if a write failed
     */
    public boolean hadError() {
        return ioError != null;
    }

    /**
     * Gets the prior IO error, if one occurred.
     *
     * @return IO error; null if none occurred
     */
    public Exception getError() {
        return ioError;
    }

    /**
     * Reads, without blocking, a list of messages from the stream.
     * The list will be empty if there were not messages pending.
     *
     * @return list of messages or null if backing channel has been closed
     * @throws IOException if messages could not be read
     */
    public List<M> read() throws IOException {
        try {
            int read = channel.read(inbound);
            if (read != -1) {
                // Read the messages one-by-one and add them to the list.
                List<M> messages = new ArrayList<>();
                M message;
                inbound.flip();
                while ((message = read(inbound)) != null) {
                    messages.add(message);
                    messagesIn.add(1);
                    bytesIn.add(message.length());
                }
                inbound.compact();

                // Mark the stream with current time to indicate liveness.
                lastActiveTime = currentTimeMillis();
                return messages;
            }
            return null;

        } catch (Exception e) {
            throw new IOException("Unable to read messages", e);
        }
    }

    /**
     * Writes the specified list of messages to the stream.
     *
     * @param messages list of messages to write
     * @throws IOException if error occurred while writing the data
     */
    public void write(List<M> messages) throws IOException {
        synchronized (this) {
            // First write all messages.
            for (M m : messages) {
                append(m);
            }
            flushUnlessAlreadyPlanningTo();
        }
    }

    /**
     * Writes the given message to the stream.
     *
     * @param message message to write
     * @throws IOException if error occurred while writing the data
     */
    public void write(M message) throws IOException {
        synchronized (this) {
            append(message);
            flushUnlessAlreadyPlanningTo();
        }
    }

    // Appends the specified message into the internal buffer, growing the
    // buffer if required.
    private void append(M message) {
        // If the buffer does not have sufficient length double it.
        while (outbound.remaining() < message.length()) {
            doubleSize();
        }
        write(message, outbound);
        messagesOut.add(1);
        bytesOut.add(message.length());
    }

    // Forces a flush, unless one is planned already.
    private void flushUnlessAlreadyPlanningTo() throws IOException {
        if (!writeOccurred && !writePending) {
            flush();
        }
    }

    /**
     * Flushes any pending writes.
     *
     * @throws IOException if flush failed
     */
    public void flush() throws IOException {
        synchronized (this) {
            if (!writeOccurred && !writePending) {
                outbound.flip();
                try {
                    channel.write(outbound);
                } catch (IOException e) {
                    if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
                        log.warn("Unable to write data", e);
                        ioError = e;
                    }
                }
                lastActiveTime = currentTimeMillis();
                writeOccurred = true;
                writePending = outbound.hasRemaining();
                outbound.compact();
            }
        }
    }

    /**
     * Indicates whether the stream has bytes to be written to the channel.
     *
     * @return true if there are bytes to be written
     */
    boolean isWritePending() {
        synchronized (this) {
            return writePending;
        }
    }


    /**
     * Indicates whether data has been written but not flushed yet.
     *
     * @return true if flush is required
     */
    boolean isFlushRequired() {
        synchronized (this) {
            return outbound.position() > 0;
        }
    }

    /**
     * Attempts to flush data, internal stream state and channel availability
     * permitting. Invoked by the driver I/O loop during handling of writable
     * selection key.
     * <p>
     * Resets the internal state flags {@code writeOccurred} and
     * {@code writePending}.
     * </p>
     * @throws IOException if implicit flush failed
     */
    void flushIfPossible() throws IOException {
        synchronized (this) {
            writePending = false;
            writeOccurred = false;
            if (outbound.position() > 0) {
                flush();
            }
        }
        key.interestOps(SelectionKey.OP_READ);
    }

    /**
     * Attempts to flush data, internal stream state and channel availability
     * permitting and if other writes are not pending. Invoked by the driver
     * I/O loop prior to entering select wait. Resets the internal
     * {@code writeOccurred} state flag.
     *
     * @throws IOException if implicit flush failed
     */
    void flushIfWriteNotPending() throws IOException {
        synchronized (this) {
            writeOccurred = false;
            if (!writePending && outbound.position() > 0) {
                flush();
            }
        }
        if (isWritePending()) {
            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
        }
    }

    /**
     * Doubles the size of the outbound buffer.
     */
    private void doubleSize() {
        ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
        outbound.flip();
        newBuffer.put(outbound);
        outbound = newBuffer;
    }

    /**
     * Returns the maximum number of milliseconds the stream is allowed
     * without any read/write operations.
     *
     * @return number if millis of permissible idle time
     */
    protected int maxIdleMillis() {
        return maxIdleMillis;
    }


    /**
     * Returns true if the given stream has gone stale.
     *
     * @return true if the stream is stale
     */
    boolean isStale() {
        return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
    }

    /**
     * Returns the inbound bytes counter.
     *
     * @return inbound bytes counter
     */
    public Counter bytesIn() {
        return bytesIn;
    }

    /**
     * Returns the outbound bytes counter.
     *
     * @return outbound bytes counter
     */
    public Counter bytesOut() {
        return bytesOut;
    }

    /**
     * Returns the inbound messages counter.
     *
     * @return inbound messages counter
     */
    public Counter messagesIn() {
        return messagesIn;
    }

    /**
     * Returns the outbound messages counter.
     *
     * @return outbound messages counter
     */
    public Counter messagesOut() {
        return messagesOut;
    }

}