aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java')
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java424
1 files changed, 0 insertions, 424 deletions
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
deleted file mode 100644
index a19e8aac..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import org.onlab.util.Counter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.System.currentTimeMillis;
-import static java.nio.ByteBuffer.allocateDirect;
-
-/**
- * Bi-directional message stream for transferring messages to & from the
- * network via two byte buffers.
- *
- * @param <M> message type
- */
-public abstract class MessageStream<M extends Message> {
-
- protected Logger log = LoggerFactory.getLogger(getClass());
-
- private final IOLoop<M, ?> loop;
- private final ByteChannel channel;
- private final int maxIdleMillis;
-
- private final ByteBuffer inbound;
- private ByteBuffer outbound;
- private SelectionKey key;
-
- private volatile boolean closed = false;
- private volatile boolean writePending;
- private volatile boolean writeOccurred;
-
- private Exception ioError;
- private long lastActiveTime;
-
- private final Counter bytesIn = new Counter();
- private final Counter messagesIn = new Counter();
- private final Counter bytesOut = new Counter();
- private final Counter messagesOut = new Counter();
-
- /**
- * Creates a message stream associated with the specified IO loop and
- * backed by the given byte channel.
- *
- * @param loop IO loop
- * @param byteChannel backing byte channel
- * @param bufferSize size of the backing byte buffers
- * @param maxIdleMillis maximum number of millis the stream can be idle
- * before it will be closed
- */
- protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
- int bufferSize, int maxIdleMillis) {
- this.loop = checkNotNull(loop, "Loop cannot be null");
- this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
-
- checkArgument(maxIdleMillis > 0, "Idle time must be positive");
- this.maxIdleMillis = maxIdleMillis;
-
- inbound = allocateDirect(bufferSize);
- outbound = allocateDirect(bufferSize);
- }
-
- /**
- * Gets a single message from the specified byte buffer; this is
- * to be done without manipulating the buffer via flip, reset or clear.
- *
- * @param buffer byte buffer
- * @return read message or null if there are not enough bytes to read
- * a complete message
- */
- protected abstract M read(ByteBuffer buffer);
-
- /**
- * Puts the specified message into the specified byte buffer; this is
- * to be done without manipulating the buffer via flip, reset or clear.
- *
- * @param message message to be write into the buffer
- * @param buffer byte buffer
- */
- protected abstract void write(M message, ByteBuffer buffer);
-
- /**
- * Closes the message buffer.
- */
- public void close() {
- synchronized (this) {
- if (closed) {
- return;
- }
- closed = true;
- }
-
- bytesIn.freeze();
- bytesOut.freeze();
- messagesIn.freeze();
- messagesOut.freeze();
-
- loop.removeStream(this);
- if (key != null) {
- try {
- key.cancel();
- key.channel().close();
- } catch (IOException e) {
- log.warn("Unable to close stream", e);
- }
- }
- }
-
- /**
- * Indicates whether this buffer has been closed.
- *
- * @return true if this stream has been closed
- */
- public synchronized boolean isClosed() {
- return closed;
- }
-
- /**
- * Returns the stream IO selection key.
- *
- * @return socket channel registration selection key
- */
- public SelectionKey key() {
- return key;
- }
-
- /**
- * Binds the selection key to be used for driving IO operations on the stream.
- *
- * @param key IO selection key
- */
- public void setKey(SelectionKey key) {
- this.key = key;
- this.lastActiveTime = currentTimeMillis();
- }
-
- /**
- * Returns the IO loop to which this stream is bound.
- *
- * @return I/O loop used to drive this stream
- */
- public IOLoop<M, ?> loop() {
- return loop;
- }
-
- /**
- * Indicates whether the any prior IO encountered an error.
- *
- * @return true if a write failed
- */
- public boolean hadError() {
- return ioError != null;
- }
-
- /**
- * Gets the prior IO error, if one occurred.
- *
- * @return IO error; null if none occurred
- */
- public Exception getError() {
- return ioError;
- }
-
- /**
- * Reads, without blocking, a list of messages from the stream.
- * The list will be empty if there were not messages pending.
- *
- * @return list of messages or null if backing channel has been closed
- * @throws IOException if messages could not be read
- */
- public List<M> read() throws IOException {
- try {
- int read = channel.read(inbound);
- if (read != -1) {
- // Read the messages one-by-one and add them to the list.
- List<M> messages = new ArrayList<>();
- M message;
- inbound.flip();
- while ((message = read(inbound)) != null) {
- messages.add(message);
- messagesIn.add(1);
- bytesIn.add(message.length());
- }
- inbound.compact();
-
- // Mark the stream with current time to indicate liveness.
- lastActiveTime = currentTimeMillis();
- return messages;
- }
- return null;
-
- } catch (Exception e) {
- throw new IOException("Unable to read messages", e);
- }
- }
-
- /**
- * Writes the specified list of messages to the stream.
- *
- * @param messages list of messages to write
- * @throws IOException if error occurred while writing the data
- */
- public void write(List<M> messages) throws IOException {
- synchronized (this) {
- // First write all messages.
- for (M m : messages) {
- append(m);
- }
- flushUnlessAlreadyPlanningTo();
- }
- }
-
- /**
- * Writes the given message to the stream.
- *
- * @param message message to write
- * @throws IOException if error occurred while writing the data
- */
- public void write(M message) throws IOException {
- synchronized (this) {
- append(message);
- flushUnlessAlreadyPlanningTo();
- }
- }
-
- // Appends the specified message into the internal buffer, growing the
- // buffer if required.
- private void append(M message) {
- // If the buffer does not have sufficient length double it.
- while (outbound.remaining() < message.length()) {
- doubleSize();
- }
- write(message, outbound);
- messagesOut.add(1);
- bytesOut.add(message.length());
- }
-
- // Forces a flush, unless one is planned already.
- private void flushUnlessAlreadyPlanningTo() throws IOException {
- if (!writeOccurred && !writePending) {
- flush();
- }
- }
-
- /**
- * Flushes any pending writes.
- *
- * @throws IOException if flush failed
- */
- public void flush() throws IOException {
- synchronized (this) {
- if (!writeOccurred && !writePending) {
- outbound.flip();
- try {
- channel.write(outbound);
- } catch (IOException e) {
- if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
- log.warn("Unable to write data", e);
- ioError = e;
- }
- }
- lastActiveTime = currentTimeMillis();
- writeOccurred = true;
- writePending = outbound.hasRemaining();
- outbound.compact();
- }
- }
- }
-
- /**
- * Indicates whether the stream has bytes to be written to the channel.
- *
- * @return true if there are bytes to be written
- */
- boolean isWritePending() {
- synchronized (this) {
- return writePending;
- }
- }
-
-
- /**
- * Indicates whether data has been written but not flushed yet.
- *
- * @return true if flush is required
- */
- boolean isFlushRequired() {
- synchronized (this) {
- return outbound.position() > 0;
- }
- }
-
- /**
- * Attempts to flush data, internal stream state and channel availability
- * permitting. Invoked by the driver I/O loop during handling of writable
- * selection key.
- * <p>
- * Resets the internal state flags {@code writeOccurred} and
- * {@code writePending}.
- * </p>
- * @throws IOException if implicit flush failed
- */
- void flushIfPossible() throws IOException {
- synchronized (this) {
- writePending = false;
- writeOccurred = false;
- if (outbound.position() > 0) {
- flush();
- }
- }
- key.interestOps(SelectionKey.OP_READ);
- }
-
- /**
- * Attempts to flush data, internal stream state and channel availability
- * permitting and if other writes are not pending. Invoked by the driver
- * I/O loop prior to entering select wait. Resets the internal
- * {@code writeOccurred} state flag.
- *
- * @throws IOException if implicit flush failed
- */
- void flushIfWriteNotPending() throws IOException {
- synchronized (this) {
- writeOccurred = false;
- if (!writePending && outbound.position() > 0) {
- flush();
- }
- }
- if (isWritePending()) {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- }
- }
-
- /**
- * Doubles the size of the outbound buffer.
- */
- private void doubleSize() {
- ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
- outbound.flip();
- newBuffer.put(outbound);
- outbound = newBuffer;
- }
-
- /**
- * Returns the maximum number of milliseconds the stream is allowed
- * without any read/write operations.
- *
- * @return number if millis of permissible idle time
- */
- protected int maxIdleMillis() {
- return maxIdleMillis;
- }
-
-
- /**
- * Returns true if the given stream has gone stale.
- *
- * @return true if the stream is stale
- */
- boolean isStale() {
- return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
- }
-
- /**
- * Returns the inbound bytes counter.
- *
- * @return inbound bytes counter
- */
- public Counter bytesIn() {
- return bytesIn;
- }
-
- /**
- * Returns the outbound bytes counter.
- *
- * @return outbound bytes counter
- */
- public Counter bytesOut() {
- return bytesOut;
- }
-
- /**
- * Returns the inbound messages counter.
- *
- * @return inbound messages counter
- */
- public Counter messagesIn() {
- return messagesIn;
- }
-
- /**
- * Returns the outbound messages counter.
- *
- * @return outbound messages counter
- */
- public Counter messagesOut() {
- return messagesOut;
- }
-
-}