aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/nio
diff options
context:
space:
mode:
authorAshlee Young <ashlee@onosfw.com>2015-09-09 22:15:21 -0700
committerAshlee Young <ashlee@onosfw.com>2015-09-09 22:15:21 -0700
commit13d05bc8458758ee39cb829098241e89616717ee (patch)
tree22a4d1ce65f15952f07a3df5af4b462b4697cb3a /framework/src/onos/utils/nio
parent6139282e1e93c2322076de4b91b1c85d0bc4a8b3 (diff)
ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60
Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd
Diffstat (limited to 'framework/src/onos/utils/nio')
-rw-r--r--framework/src/onos/utils/nio/pom.xml58
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java30
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java123
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java302
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java30
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java424
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java175
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java21
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java66
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java104
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java139
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java334
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java20
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java60
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java87
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java82
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java324
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java256
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java359
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java85
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java56
-rw-r--r--framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java89
22 files changed, 3224 insertions, 0 deletions
diff --git a/framework/src/onos/utils/nio/pom.xml b/framework/src/onos/utils/nio/pom.xml
new file mode 100644
index 00000000..b9e4d447
--- /dev/null
+++ b/framework/src/onos/utils/nio/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2014 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-utils</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>onlab-nio</artifactId>
+ <packaging>bundle</packaging>
+
+ <description>Fast network I/O using Java NIO</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-testlib</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-misc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java
new file mode 100644
index 00000000..16bb9359
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+/**
+ * Base {@link Message} implementation.
+ */
+public abstract class AbstractMessage implements Message {
+
+ protected int length;
+
+ @Override
+ public int length() {
+ return length;
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java
new file mode 100644
index 00000000..e416f3be
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Selector loop derivative tailored to acceptConnection inbound connections.
+ */
+public abstract class AcceptorLoop extends SelectorLoop {
+
+ private SocketAddress listenAddress;
+ private ServerSocketChannel socketChannel;
+
+ /**
+ * Creates an acceptor loop with the specified selection timeout and
+ * accepting connections on the the given address.
+ *
+ * @param selectTimeout selection timeout; specified in millis
+ * @param listenAddress socket address where to listen for connections
+ * @throws IOException if the backing selector cannot be opened
+ */
+ public AcceptorLoop(long selectTimeout, SocketAddress listenAddress)
+ throws IOException {
+ super(selectTimeout);
+ this.listenAddress = checkNotNull(listenAddress, "Address cannot be null");
+ }
+
+ /**
+ * Hook to accept an inbound connection on the specified socket channel.
+ *
+ * @param channel socketChannel where an accept operation awaits
+ * @throws IOException if the accept operation cannot be processed
+ */
+ protected abstract void acceptConnection(ServerSocketChannel channel) throws IOException;
+
+ /**
+ * Opens a new server socket channel configured in non-blocking mode and
+ * bound to the loop's listen address.
+ *
+ * @throws IOException if unable to open or configure the socket channel
+ */
+ protected synchronized void openChannel() throws IOException {
+ socketChannel = ServerSocketChannel.open();
+ socketChannel.configureBlocking(false);
+ socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ socketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ socketChannel.bind(listenAddress);
+ }
+
+ /**
+ * Closes the server socket channel.
+ *
+ * @throws IOException if unable to close the socketChannel
+ */
+ protected synchronized void closechannel() throws IOException {
+ if (socketChannel != null) {
+ socketChannel.close();
+ socketChannel = null;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ closechannel();
+ } catch (IOException e) {
+ log.warn("Unable to close the socketChannel", e);
+ }
+ super.shutdown();
+ }
+
+ @Override
+ protected void loop() throws IOException {
+ openChannel();
+ notifyReady();
+
+ // Keep looping until told otherwise.
+ while (isRunning()) {
+ // Attempt a selection; if no operations selected or if signalled
+ // to shutdown, spin through.
+ int count = selector.select(selectTimeout);
+ if (count == 0 || !isRunning()) {
+ continue;
+ }
+
+ // Iterate over all keys selected for an operation and process them.
+ Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
+ while (keys.hasNext()) {
+ // Fetch the key and remove it from the pending list.
+ SelectionKey key = keys.next();
+ keys.remove();
+
+ // If the key has a pending acceptConnection operation, process it.
+ if (key.isAcceptable()) {
+ acceptConnection((ServerSocketChannel) key.channel());
+ }
+ }
+ }
+ }
+
+}
+
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
new file mode 100644
index 00000000..106df7b2
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * I/O loop for driving inbound &amp; outbound {@link Message} transfer via
+ * {@link MessageStream}.
+ *
+ * @param <M> message type
+ * @param <S> message stream type
+ */
+public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
+ extends SelectorLoop {
+
+ // Queue of requests for new message streams to enter the IO loop processing.
+ private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
+
+ // Carries information required for admitting a new message stream.
+ private class NewStreamRequest {
+ private final S stream;
+ private final SelectableChannel channel;
+ private final int op;
+
+ public NewStreamRequest(S stream, SelectableChannel channel, int op) {
+ this.stream = stream;
+ this.channel = channel;
+ this.op = op;
+ }
+ }
+
+ // Set of message streams currently admitted into the IO loop.
+ private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
+
+ /**
+ * Creates an IO loop with the given selection timeout.
+ *
+ * @param timeout selection timeout in milliseconds
+ * @throws IOException if the backing selector cannot be opened
+ */
+ public IOLoop(long timeout) throws IOException {
+ super(timeout);
+ }
+
+ /**
+ * Returns the number of message stream in custody of the loop.
+ *
+ * @return number of message streams
+ */
+ public int streamCount() {
+ return streams.size();
+ }
+
+ /**
+ * Creates a new message stream backed by the specified socket channel.
+ *
+ * @param byteChannel backing byte channel
+ * @return newly created message stream
+ */
+ protected abstract S createStream(ByteChannel byteChannel);
+
+ /**
+ * Removes the specified message stream from the IO loop.
+ *
+ * @param stream message stream to remove
+ */
+ protected void removeStream(MessageStream<M> stream) {
+ streams.remove(stream);
+ }
+
+ /**
+ * Processes the list of messages extracted from the specified message
+ * stream.
+ *
+ * @param messages non-empty list of received messages
+ * @param stream message stream from which the messages were extracted
+ */
+ protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
+
+ /**
+ * Completes connection request pending on the given selection key.
+ *
+ * @param key selection key holding the pending connect operation.
+ * @throws IOException when I/O exception of some sort has occurred
+ */
+ protected void connect(SelectionKey key) throws IOException {
+ SocketChannel ch = (SocketChannel) key.channel();
+ ch.finishConnect();
+ if (key.isValid()) {
+ key.interestOps(SelectionKey.OP_READ);
+ }
+ }
+
+ /**
+ * Processes an IO operation pending on the specified key.
+ *
+ * @param key selection key holding the pending I/O operation.
+ */
+ protected void processKeyOperation(SelectionKey key) {
+ @SuppressWarnings("unchecked")
+ S stream = (S) key.attachment();
+
+ try {
+ // If the key is not valid, bail out.
+ if (!key.isValid()) {
+ stream.close();
+ return;
+ }
+
+ // If there is a pending connect operation, complete it.
+ if (key.isConnectable()) {
+ try {
+ connect(key);
+ } catch (IOException | IllegalStateException e) {
+ log.warn("Unable to complete connection", e);
+ }
+ }
+
+ // If there is a read operation, slurp as much data as possible.
+ if (key.isReadable()) {
+ List<M> messages = stream.read();
+
+ // No messages or failed flush imply disconnect; bail.
+ if (messages == null || stream.hadError()) {
+ stream.close();
+ return;
+ }
+
+ // If there were any messages read, process them.
+ if (!messages.isEmpty()) {
+ try {
+ processMessages(messages, stream);
+ } catch (RuntimeException e) {
+ onError(stream, e);
+ }
+ }
+ }
+
+ // If there are pending writes, flush them
+ if (key.isWritable()) {
+ stream.flushIfPossible();
+ }
+
+ // If there were any issued flushing, close the stream.
+ if (stream.hadError()) {
+ stream.close();
+ }
+
+ } catch (CancelledKeyException e) {
+ // Key was cancelled, so silently close the stream
+ stream.close();
+ } catch (IOException e) {
+ if (!stream.isClosed() && !isResetByPeer(e)) {
+ log.warn("Unable to process IO", e);
+ }
+ stream.close();
+ }
+ }
+
+ // Indicates whether or not this exception is caused by 'reset by peer'.
+ private boolean isResetByPeer(IOException e) {
+ Throwable cause = e.getCause();
+ return cause != null && cause instanceof IOException &&
+ cause.getMessage().contains("reset by peer");
+ }
+
+ /**
+ * Hook to allow intercept of any errors caused during message processing.
+ * Default behaviour is to rethrow the error.
+ *
+ * @param stream message stream involved in the error
+ * @param error the runtime exception
+ */
+ protected void onError(S stream, RuntimeException error) {
+ throw error;
+ }
+
+ /**
+ * Admits a new message stream backed by the specified socket channel
+ * with a pending accept operation.
+ *
+ * @param channel backing socket channel
+ * @return newly accepted message stream
+ */
+ public S acceptStream(SocketChannel channel) {
+ return createAndAdmit(channel, SelectionKey.OP_READ);
+ }
+
+
+ /**
+ * Admits a new message stream backed by the specified socket channel
+ * with a pending connect operation.
+ *
+ * @param channel backing socket channel
+ * @return newly connected message stream
+ */
+ public S connectStream(SocketChannel channel) {
+ return createAndAdmit(channel, SelectionKey.OP_CONNECT);
+ }
+
+ /**
+ * Creates a new message stream backed by the specified socket channel
+ * and admits it into the IO loop.
+ *
+ * @param channel socket channel
+ * @param op pending operations mask to be applied to the selection
+ * key as a set of initial interestedOps
+ * @return newly created message stream
+ */
+ private synchronized S createAndAdmit(SocketChannel channel, int op) {
+ S stream = createStream(channel);
+ streams.add(stream);
+ newStreamRequests.add(new NewStreamRequest(stream, channel, op));
+ selector.wakeup();
+ return stream;
+ }
+
+ /**
+ * Safely admits new streams into the IO loop.
+ */
+ private void admitNewStreams() {
+ Iterator<NewStreamRequest> it = newStreamRequests.iterator();
+ while (isRunning() && it.hasNext()) {
+ try {
+ NewStreamRequest request = it.next();
+ it.remove();
+ SelectionKey key = request.channel.register(selector, request.op,
+ request.stream);
+ request.stream.setKey(key);
+ } catch (ClosedChannelException e) {
+ log.warn("Unable to admit new message stream", e);
+ }
+ }
+ }
+
+ @Override
+ protected void loop() throws IOException {
+ notifyReady();
+
+ // Keep going until told otherwise.
+ while (isRunning()) {
+ admitNewStreams();
+
+ // Process flushes & write selects on all streams
+ for (MessageStream<M> stream : streams) {
+ stream.flushIfWriteNotPending();
+ }
+
+ // Select keys and process them.
+ int count = selector.select(selectTimeout);
+ if (count > 0 && isRunning()) {
+ Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+ while (it.hasNext()) {
+ SelectionKey key = it.next();
+ it.remove();
+ processKeyOperation(key);
+ }
+ }
+ }
+ }
+
+ /**
+ * Prunes the registered streams by discarding any stale ones.
+ *
+ * @return number of remaining streams
+ */
+ public synchronized int pruneStaleStreams() {
+ for (MessageStream<M> stream : streams) {
+ if (stream.isStale()) {
+ stream.close();
+ }
+ }
+ return streams.size();
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java
new file mode 100644
index 00000000..c1a339e0
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+/**
+ * Representation of a message transferred via {@link MessageStream}.
+ */
+public interface Message {
+
+ /**
+ * Gets the message length in bytes.
+ *
+ * @return number of bytes
+ */
+ int length();
+
+}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
new file mode 100644
index 00000000..a19e8aac
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
@@ -0,0 +1,424 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import org.onlab.util.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.System.currentTimeMillis;
+import static java.nio.ByteBuffer.allocateDirect;
+
+/**
+ * Bi-directional message stream for transferring messages to &amp; from the
+ * network via two byte buffers.
+ *
+ * @param <M> message type
+ */
+public abstract class MessageStream<M extends Message> {
+
+ protected Logger log = LoggerFactory.getLogger(getClass());
+
+ private final IOLoop<M, ?> loop;
+ private final ByteChannel channel;
+ private final int maxIdleMillis;
+
+ private final ByteBuffer inbound;
+ private ByteBuffer outbound;
+ private SelectionKey key;
+
+ private volatile boolean closed = false;
+ private volatile boolean writePending;
+ private volatile boolean writeOccurred;
+
+ private Exception ioError;
+ private long lastActiveTime;
+
+ private final Counter bytesIn = new Counter();
+ private final Counter messagesIn = new Counter();
+ private final Counter bytesOut = new Counter();
+ private final Counter messagesOut = new Counter();
+
+ /**
+ * Creates a message stream associated with the specified IO loop and
+ * backed by the given byte channel.
+ *
+ * @param loop IO loop
+ * @param byteChannel backing byte channel
+ * @param bufferSize size of the backing byte buffers
+ * @param maxIdleMillis maximum number of millis the stream can be idle
+ * before it will be closed
+ */
+ protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
+ int bufferSize, int maxIdleMillis) {
+ this.loop = checkNotNull(loop, "Loop cannot be null");
+ this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
+
+ checkArgument(maxIdleMillis > 0, "Idle time must be positive");
+ this.maxIdleMillis = maxIdleMillis;
+
+ inbound = allocateDirect(bufferSize);
+ outbound = allocateDirect(bufferSize);
+ }
+
+ /**
+ * Gets a single message from the specified byte buffer; this is
+ * to be done without manipulating the buffer via flip, reset or clear.
+ *
+ * @param buffer byte buffer
+ * @return read message or null if there are not enough bytes to read
+ * a complete message
+ */
+ protected abstract M read(ByteBuffer buffer);
+
+ /**
+ * Puts the specified message into the specified byte buffer; this is
+ * to be done without manipulating the buffer via flip, reset or clear.
+ *
+ * @param message message to be write into the buffer
+ * @param buffer byte buffer
+ */
+ protected abstract void write(M message, ByteBuffer buffer);
+
+ /**
+ * Closes the message buffer.
+ */
+ public void close() {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
+
+ bytesIn.freeze();
+ bytesOut.freeze();
+ messagesIn.freeze();
+ messagesOut.freeze();
+
+ loop.removeStream(this);
+ if (key != null) {
+ try {
+ key.cancel();
+ key.channel().close();
+ } catch (IOException e) {
+ log.warn("Unable to close stream", e);
+ }
+ }
+ }
+
+ /**
+ * Indicates whether this buffer has been closed.
+ *
+ * @return true if this stream has been closed
+ */
+ public synchronized boolean isClosed() {
+ return closed;
+ }
+
+ /**
+ * Returns the stream IO selection key.
+ *
+ * @return socket channel registration selection key
+ */
+ public SelectionKey key() {
+ return key;
+ }
+
+ /**
+ * Binds the selection key to be used for driving IO operations on the stream.
+ *
+ * @param key IO selection key
+ */
+ public void setKey(SelectionKey key) {
+ this.key = key;
+ this.lastActiveTime = currentTimeMillis();
+ }
+
+ /**
+ * Returns the IO loop to which this stream is bound.
+ *
+ * @return I/O loop used to drive this stream
+ */
+ public IOLoop<M, ?> loop() {
+ return loop;
+ }
+
+ /**
+ * Indicates whether the any prior IO encountered an error.
+ *
+ * @return true if a write failed
+ */
+ public boolean hadError() {
+ return ioError != null;
+ }
+
+ /**
+ * Gets the prior IO error, if one occurred.
+ *
+ * @return IO error; null if none occurred
+ */
+ public Exception getError() {
+ return ioError;
+ }
+
+ /**
+ * Reads, without blocking, a list of messages from the stream.
+ * The list will be empty if there were not messages pending.
+ *
+ * @return list of messages or null if backing channel has been closed
+ * @throws IOException if messages could not be read
+ */
+ public List<M> read() throws IOException {
+ try {
+ int read = channel.read(inbound);
+ if (read != -1) {
+ // Read the messages one-by-one and add them to the list.
+ List<M> messages = new ArrayList<>();
+ M message;
+ inbound.flip();
+ while ((message = read(inbound)) != null) {
+ messages.add(message);
+ messagesIn.add(1);
+ bytesIn.add(message.length());
+ }
+ inbound.compact();
+
+ // Mark the stream with current time to indicate liveness.
+ lastActiveTime = currentTimeMillis();
+ return messages;
+ }
+ return null;
+
+ } catch (Exception e) {
+ throw new IOException("Unable to read messages", e);
+ }
+ }
+
+ /**
+ * Writes the specified list of messages to the stream.
+ *
+ * @param messages list of messages to write
+ * @throws IOException if error occurred while writing the data
+ */
+ public void write(List<M> messages) throws IOException {
+ synchronized (this) {
+ // First write all messages.
+ for (M m : messages) {
+ append(m);
+ }
+ flushUnlessAlreadyPlanningTo();
+ }
+ }
+
+ /**
+ * Writes the given message to the stream.
+ *
+ * @param message message to write
+ * @throws IOException if error occurred while writing the data
+ */
+ public void write(M message) throws IOException {
+ synchronized (this) {
+ append(message);
+ flushUnlessAlreadyPlanningTo();
+ }
+ }
+
+ // Appends the specified message into the internal buffer, growing the
+ // buffer if required.
+ private void append(M message) {
+ // If the buffer does not have sufficient length double it.
+ while (outbound.remaining() < message.length()) {
+ doubleSize();
+ }
+ write(message, outbound);
+ messagesOut.add(1);
+ bytesOut.add(message.length());
+ }
+
+ // Forces a flush, unless one is planned already.
+ private void flushUnlessAlreadyPlanningTo() throws IOException {
+ if (!writeOccurred && !writePending) {
+ flush();
+ }
+ }
+
+ /**
+ * Flushes any pending writes.
+ *
+ * @throws IOException if flush failed
+ */
+ public void flush() throws IOException {
+ synchronized (this) {
+ if (!writeOccurred && !writePending) {
+ outbound.flip();
+ try {
+ channel.write(outbound);
+ } catch (IOException e) {
+ if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
+ log.warn("Unable to write data", e);
+ ioError = e;
+ }
+ }
+ lastActiveTime = currentTimeMillis();
+ writeOccurred = true;
+ writePending = outbound.hasRemaining();
+ outbound.compact();
+ }
+ }
+ }
+
+ /**
+ * Indicates whether the stream has bytes to be written to the channel.
+ *
+ * @return true if there are bytes to be written
+ */
+ boolean isWritePending() {
+ synchronized (this) {
+ return writePending;
+ }
+ }
+
+
+ /**
+ * Indicates whether data has been written but not flushed yet.
+ *
+ * @return true if flush is required
+ */
+ boolean isFlushRequired() {
+ synchronized (this) {
+ return outbound.position() > 0;
+ }
+ }
+
+ /**
+ * Attempts to flush data, internal stream state and channel availability
+ * permitting. Invoked by the driver I/O loop during handling of writable
+ * selection key.
+ * <p>
+ * Resets the internal state flags {@code writeOccurred} and
+ * {@code writePending}.
+ * </p>
+ * @throws IOException if implicit flush failed
+ */
+ void flushIfPossible() throws IOException {
+ synchronized (this) {
+ writePending = false;
+ writeOccurred = false;
+ if (outbound.position() > 0) {
+ flush();
+ }
+ }
+ key.interestOps(SelectionKey.OP_READ);
+ }
+
+ /**
+ * Attempts to flush data, internal stream state and channel availability
+ * permitting and if other writes are not pending. Invoked by the driver
+ * I/O loop prior to entering select wait. Resets the internal
+ * {@code writeOccurred} state flag.
+ *
+ * @throws IOException if implicit flush failed
+ */
+ void flushIfWriteNotPending() throws IOException {
+ synchronized (this) {
+ writeOccurred = false;
+ if (!writePending && outbound.position() > 0) {
+ flush();
+ }
+ }
+ if (isWritePending()) {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ }
+ }
+
+ /**
+ * Doubles the size of the outbound buffer.
+ */
+ private void doubleSize() {
+ ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
+ outbound.flip();
+ newBuffer.put(outbound);
+ outbound = newBuffer;
+ }
+
+ /**
+ * Returns the maximum number of milliseconds the stream is allowed
+ * without any read/write operations.
+ *
+ * @return number if millis of permissible idle time
+ */
+ protected int maxIdleMillis() {
+ return maxIdleMillis;
+ }
+
+
+ /**
+ * Returns true if the given stream has gone stale.
+ *
+ * @return true if the stream is stale
+ */
+ boolean isStale() {
+ return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
+ }
+
+ /**
+ * Returns the inbound bytes counter.
+ *
+ * @return inbound bytes counter
+ */
+ public Counter bytesIn() {
+ return bytesIn;
+ }
+
+ /**
+ * Returns the outbound bytes counter.
+ *
+ * @return outbound bytes counter
+ */
+ public Counter bytesOut() {
+ return bytesOut;
+ }
+
+ /**
+ * Returns the inbound messages counter.
+ *
+ * @return inbound messages counter
+ */
+ public Counter messagesIn() {
+ return messagesIn;
+ }
+
+ /**
+ * Returns the outbound messages counter.
+ *
+ * @return outbound messages counter
+ */
+ public Counter messagesOut() {
+ return messagesOut;
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java
new file mode 100644
index 00000000..95a9b61e
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.Selector;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.System.currentTimeMillis;
+
+/**
+ * Abstraction of an I/O processing loop based on an NIO selector.
+ */
+public abstract class SelectorLoop implements Runnable {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ /**
+ * Selector used by this loop to pace the I/O operations.
+ */
+ protected final Selector selector;
+
+ /**
+ * Selection operations timeout; specified in millis.
+ */
+ protected long selectTimeout;
+
+ /**
+ * Retains the error that caused the loop to exit prematurely.
+ */
+ private Throwable error;
+
+ // State indicator
+ private enum State { STARTING, STARTED, STOPPING, STOPPED };
+ private volatile State state = State.STOPPED;
+
+ /**
+ * Creates a new selector loop with the given selection timeout.
+ *
+ * @param selectTimeout selection timeout; specified in millis
+ * @throws IOException if the backing selector cannot be opened
+ */
+ public SelectorLoop(long selectTimeout) throws IOException {
+ checkArgument(selectTimeout > 0, "Timeout must be positive");
+ this.selectTimeout = selectTimeout;
+ this.selector = openSelector();
+ }
+
+ /**
+ * Opens a new selector for the use by the loop.
+ *
+ * @return newly open selector
+ * @throws IOException if the backing selector cannot be opened
+ */
+ protected Selector openSelector() throws IOException {
+ return Selector.open();
+ }
+
+ /**
+ * Indicates that the loop is marked to run.
+ * @return true if the loop is marked to run
+ */
+ protected boolean isRunning() {
+ return state == State.STARTED || state == State.STARTING;
+ }
+
+ /**
+ * Returns the error, if there was one, that caused the loop to terminate
+ * prematurely.
+ *
+ * @return error or null if there was none
+ */
+ public Throwable getError() {
+ return error;
+ }
+
+ /**
+ * Contains the body of the I/O selector loop.
+ *
+ * @throws IOException if an error is encountered while selecting I/O
+ */
+ protected abstract void loop() throws IOException;
+
+ @Override
+ public void run() {
+ error = null;
+ state = State.STARTING;
+ try {
+ loop();
+ } catch (Exception e) {
+ error = e;
+ log.error("Loop aborted", e);
+ }
+ notifyDone();
+ }
+
+ /**
+ * Notifies observers waiting for loop to become ready.
+ */
+ protected synchronized void notifyReady() {
+ state = State.STARTED;
+ notifyAll();
+ }
+
+ /**
+ * Triggers loop shutdown.
+ */
+ public void shutdown() {
+ // Mark the loop as no longer running and wake up the selector.
+ state = State.STOPPING;
+ selector.wakeup();
+ }
+
+ /**
+ * Notifies observers waiting for loop to fully stop.
+ */
+ private synchronized void notifyDone() {
+ state = State.STOPPED;
+ notifyAll();
+ }
+
+ /**
+ * Waits for the loop execution to start.
+ *
+ * @param timeout number of milliseconds to wait
+ * @return true if loop started in time
+ */
+ public final synchronized boolean awaitStart(long timeout) {
+ long max = currentTimeMillis() + timeout;
+ while (state != State.STARTED && (currentTimeMillis() < max)) {
+ try {
+ wait(timeout);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
+ return state == State.STARTED;
+ }
+
+ /**
+ * Waits for the loop execution to stop.
+ *
+ * @param timeout number of milliseconds to wait
+ * @return true if loop finished in time
+ */
+ public final synchronized boolean awaitStop(long timeout) {
+ long max = currentTimeMillis() + timeout;
+ while (state != State.STOPPED && (currentTimeMillis() < max)) {
+ try {
+ wait(timeout);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
+ return state == State.STOPPED;
+ }
+
+
+}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java
new file mode 100644
index 00000000..0d58b568
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Mechanism to transfer messages over network using IO loop and
+ * message stream, backed by NIO byte buffers.
+ */
+package org.onlab.nio;
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java
new file mode 100644
index 00000000..0d977929
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio.service;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+
+/**
+ * IOLoop for transporting DefaultMessages.
+ */
+public class DefaultIOLoop extends IOLoop<DefaultMessage, DefaultMessageStream> {
+
+ public static final int SELECT_TIMEOUT_MILLIS = 500;
+ private static final int MAX_IDLE_TIMEOUT_MILLIS = 1000;
+ private static final int BUFFER_SIZE = 1024 * 1024;
+ private final Consumer<DefaultMessage> consumer;
+
+ public DefaultIOLoop(Consumer<DefaultMessage> consumer) throws IOException {
+ this(SELECT_TIMEOUT_MILLIS, consumer);
+ }
+
+ public DefaultIOLoop(long timeout, Consumer<DefaultMessage> consumer) throws IOException {
+ super(timeout);
+ this.consumer = consumer;
+ }
+
+ @Override
+ protected DefaultMessageStream createStream(ByteChannel byteChannel) {
+ return new DefaultMessageStream(this, byteChannel, BUFFER_SIZE, MAX_IDLE_TIMEOUT_MILLIS);
+ }
+
+ @Override
+ protected void processMessages(List<DefaultMessage> messages, MessageStream<DefaultMessage> stream) {
+ messages.forEach(consumer);
+ }
+
+ @Override
+ protected void connect(SelectionKey key) throws IOException {
+ DefaultMessageStream stream = (DefaultMessageStream) key.attachment();
+ try {
+ super.connect(key);
+ stream.connected();
+ } catch (Exception e) {
+ stream.connectFailed(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java
new file mode 100644
index 00000000..591d49b3
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio.service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.nio.AbstractMessage;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.MoreObjects;
+
+/**
+ * Default message.
+ */
+public class DefaultMessage extends AbstractMessage {
+
+ private long id;
+ private Endpoint sender;
+ private String type;
+ private byte[] payload;
+
+ /**
+ * Creates a new message with the specified data.
+ *
+ * @param id message id
+ * @param type message type
+ * @param sender sender endpoint
+ * @param payload message payload
+ */
+ DefaultMessage(long id, Endpoint sender, String type, byte[] payload) {
+ this.id = id;
+ this.type = checkNotNull(type, "Type cannot be null");
+ this.sender = checkNotNull(sender, "Sender cannot be null");
+ this.payload = checkNotNull(payload, "Payload cannot be null");
+
+ byte[] messageTypeBytes = type.getBytes(Charsets.UTF_8);
+ IpAddress senderIp = sender.host();
+ byte[] ipOctets = senderIp.toOctets();
+
+ length = 25 + ipOctets.length + messageTypeBytes.length + payload.length;
+ }
+
+ /**
+ * Returns message id.
+ *
+ * @return message id
+ */
+ public long id() {
+ return id;
+ }
+
+ /**
+ * Returns message sender.
+ *
+ * @return message sender
+ */
+ public Endpoint sender() {
+ return sender;
+ }
+
+ /**
+ * Returns message type.
+ *
+ * @return message type
+ */
+ public String type() {
+ return type;
+ }
+
+ /**
+ * Returns message payload.
+ *
+ * @return payload
+ */
+ public byte[] payload() {
+ return payload;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("id", id)
+ .add("type", type)
+ .add("sender", sender)
+ .add("payload", ByteArraySizeHashPrinter.of(payload))
+ .toString();
+ }
+} \ No newline at end of file
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java
new file mode 100644
index 00000000..a7dd3c04
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio.service;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Default bi-directional message stream for transferring messages to &amp; from the
+ * network via two byte buffers.
+ */
+public class DefaultMessageStream extends MessageStream<DefaultMessage> {
+
+ private final CompletableFuture<Void> connectFuture = new CompletableFuture<>();
+
+ public DefaultMessageStream(
+ IOLoop<DefaultMessage, ?> loop,
+ ByteChannel byteChannel,
+ int bufferSize,
+ int maxIdleMillis) {
+ super(loop, byteChannel, bufferSize, maxIdleMillis);
+ }
+
+ public CompletableFuture<DefaultMessageStream> connectedFuture() {
+ return connectFuture.thenApply(v -> this);
+ }
+
+ private final AtomicInteger messageLength = new AtomicInteger(-1);
+
+ @Override
+ protected DefaultMessage read(ByteBuffer buffer) {
+ if (messageLength.get() == -1) {
+ // check if we can read the message length.
+ if (buffer.remaining() < Integer.BYTES) {
+ return null;
+ } else {
+ messageLength.set(buffer.getInt());
+ }
+ }
+
+ if (buffer.remaining() < messageLength.get()) {
+ return null;
+ }
+
+ long id = buffer.getLong();
+ Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6;
+ byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
+ buffer.get(octects);
+ IpAddress senderIp = IpAddress.valueOf(ipVersion, octects);
+ int senderPort = buffer.getInt();
+ int messageTypeByteLength = buffer.getInt();
+ byte[] messageTypeBytes = new byte[messageTypeByteLength];
+ buffer.get(messageTypeBytes);
+ String messageType = new String(messageTypeBytes, Charsets.UTF_8);
+ int payloadLength = buffer.getInt();
+ byte[] payloadBytes = new byte[payloadLength];
+ buffer.get(payloadBytes);
+
+ // reset for next message
+ messageLength.set(-1);
+
+ return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes);
+ }
+
+ @Override
+ protected void write(DefaultMessage message, ByteBuffer buffer) {
+ Endpoint sender = message.sender();
+ byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
+ IpAddress senderIp = sender.host();
+ byte[] ipOctets = senderIp.toOctets();
+ byte[] payload = message.payload();
+
+ int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length;
+
+ buffer.putInt(messageLength);
+
+ buffer.putLong(message.id());
+
+ if (senderIp.version() == Version.INET) {
+ buffer.put((byte) 0x0);
+ } else {
+ buffer.put((byte) 0x1);
+ }
+ buffer.put(ipOctets);
+
+ // write sender port
+ buffer.putInt(sender.port());
+
+ // write length of message type
+ buffer.putInt(messageTypeBytes.length);
+
+ // write message type bytes
+ buffer.put(messageTypeBytes);
+
+ // write payload length
+ buffer.putInt(payload.length);
+
+ // write payload.
+ buffer.put(payload);
+ }
+
+ /**
+ * Callback invoked when the stream is successfully connected.
+ */
+ public void connected() {
+ connectFuture.complete(null);
+ }
+
+ /**
+ * Callback invoked when the stream fails to connect.
+ * @param cause failure cause
+ */
+ public void connectFailed(Throwable cause) {
+ connectFuture.completeExceptionally(cause);
+ }
+} \ No newline at end of file
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
new file mode 100644
index 00000000..c195d160
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio.service;
+
+import static org.onlab.util.Tools.groupedThreads;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.onlab.nio.AcceptorLoop;
+import org.onlab.nio.SelectorLoop;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+
+/**
+ * MessagingService implementation based on IOLoop.
+ */
+public class IOLoopMessaging implements MessagingService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
+
+ static final long TIMEOUT = 1000;
+
+ static final boolean SO_NO_DELAY = false;
+ static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
+ static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
+
+ private static final int NUM_WORKERS = 8;
+
+ private AcceptorLoop acceptorLoop;
+ private final ExecutorService acceptorThreadPool =
+ Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
+ private final ExecutorService ioThreadPool =
+ Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
+
+ private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
+
+ private int lastWorker = -1;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private Endpoint localEp;
+
+ private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
+ new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
+
+ private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
+ private final AtomicLong messageIdGenerator = new AtomicLong(0);
+ private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+ .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
+ @Override
+ public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
+ if (entry.wasEvicted()) {
+ entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
+ }
+ }
+ })
+ .build();
+
+ /**
+ * Activates IO Loops.
+ *
+ * @param localEp local end-point
+ * @throws IOException is activation fails
+ */
+ public void start(Endpoint localEp) throws IOException {
+ if (started.get()) {
+ log.warn("IOMessaging is already running at {}", localEp);
+ return;
+ }
+ this.localEp = localEp;
+ streams.setLifo(false);
+ this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
+
+ for (int i = 0; i < NUM_WORKERS; i++) {
+ ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
+ }
+
+ ioLoops.forEach(ioThreadPool::execute);
+ acceptorThreadPool.execute(acceptorLoop);
+ ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
+ acceptorLoop.awaitStart(TIMEOUT);
+ started.set(true);
+ }
+
+ /**
+ * Shuts down IO loops.
+ */
+ public void stop() {
+ if (started.get()) {
+ ioLoops.forEach(SelectorLoop::shutdown);
+ acceptorLoop.shutdown();
+ ioThreadPool.shutdown();
+ acceptorThreadPool.shutdown();
+ started.set(false);
+ }
+ }
+
+
+ @Override
+ public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
+ DefaultMessage message = new DefaultMessage(
+ messageIdGenerator.incrementAndGet(),
+ localEp,
+ type,
+ payload);
+ return sendAsync(ep, message);
+ }
+
+ protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ if (ep.equals(localEp)) {
+ dispatchLocally(message);
+ future.complete(null);
+ return future;
+ }
+
+ DefaultMessageStream stream = null;
+ try {
+ stream = streams.borrowObject(ep);
+ stream.write(message);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ } finally {
+ try {
+ streams.returnObject(ep, stream);
+ } catch (Exception e) {
+ log.warn("Failed to return stream to pool");
+ }
+ }
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> sendAndReceive(
+ Endpoint ep,
+ String type,
+ byte[] payload) {
+ CompletableFuture<byte[]> response = new CompletableFuture<>();
+ Long messageId = messageIdGenerator.incrementAndGet();
+ responseFutures.put(messageId, response);
+ DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
+ try {
+ sendAsync(ep, message);
+ } catch (Exception e) {
+ responseFutures.invalidate(messageId);
+ response.completeExceptionally(e);
+ }
+ return response;
+ }
+
+ @Override
+ public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
+ }
+
+ @Override
+ public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> {
+ byte[] responsePayload = handler.apply(message.payload());
+ if (responsePayload != null) {
+ DefaultMessage response = new DefaultMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ responsePayload);
+ sendAsync(message.sender(), response).whenComplete((result, error) -> {
+ log.debug("Failed to respond", error);
+ });
+ }
+ }));
+ }
+
+ @Override
+ public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
+ handlers.put(type, message -> {
+ handler.apply(message.payload()).whenComplete((result, error) -> {
+ if (error == null) {
+ DefaultMessage response = new DefaultMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ result);
+ sendAsync(message.sender(), response).whenComplete((r, e) -> {
+ if (e != null) {
+ log.debug("Failed to respond", e);
+ }
+ });
+ }
+ });
+ });
+ }
+
+ @Override
+ public void unregisterHandler(String type) {
+ handlers.remove(type);
+ }
+
+ protected void dispatchLocally(DefaultMessage message) {
+ String type = message.type();
+ if (REPLY_MESSAGE_TYPE.equals(type)) {
+ try {
+ CompletableFuture<byte[]> futureResponse =
+ responseFutures.getIfPresent(message.id());
+ if (futureResponse != null) {
+ futureResponse.complete(message.payload());
+ } else {
+ log.warn("Received a reply for message id:[{}]. "
+ + " from {}. But was unable to locate the"
+ + " request handle", message.id(), message.sender());
+ }
+ } finally {
+ responseFutures.invalidate(message.id());
+ }
+ return;
+ }
+ Consumer<DefaultMessage> handler = handlers.get(type);
+ if (handler != null) {
+ handler.accept(message);
+ } else {
+ log.debug("No handler registered for {}", type);
+ }
+ }
+
+ // Get the next worker to which a client should be assigned
+ private synchronized DefaultIOLoop nextWorker() {
+ lastWorker = (lastWorker + 1) % NUM_WORKERS;
+ return ioLoops.get(lastWorker);
+ }
+
+ /**
+ * Initiates open connection request and registers the pending socket
+ * channel with the given IO loop.
+ *
+ * @param loop loop with which the channel should be registered
+ * @throws java.io.IOException if the socket could not be open or connected
+ */
+ private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
+ SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
+ SocketChannel ch = SocketChannel.open();
+ ch.configureBlocking(false);
+ DefaultMessageStream stream = loop.connectStream(ch);
+ ch.connect(sa);
+ return stream;
+ }
+
+ // Loop for accepting client connections
+ private class DefaultAcceptorLoop extends AcceptorLoop {
+
+ public DefaultAcceptorLoop(SocketAddress address) throws IOException {
+ super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
+ }
+
+ @Override
+ protected void acceptConnection(ServerSocketChannel channel) throws IOException {
+ SocketChannel sc = channel.accept();
+ sc.configureBlocking(false);
+
+ Socket so = sc.socket();
+ so.setTcpNoDelay(SO_NO_DELAY);
+ so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
+ so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
+
+ nextWorker().acceptStream(sc);
+ }
+ }
+
+ private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
+
+ @Override
+ public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
+ }
+
+ @Override
+ public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
+ stream.close();
+ }
+
+ @Override
+ public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
+ DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
+ log.info("Established a new connection to {}", ep);
+ return stream;
+ }
+
+ @Override
+ public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
+ }
+
+ @Override
+ public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
+ return stream.isClosed();
+ }
+ }
+}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java
new file mode 100644
index 00000000..399a2b95
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Assembly for sending and receiving messages using the I/O loop mechanism.
+ */
+package org.onlab.nio.service; \ No newline at end of file
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java
new file mode 100644
index 00000000..2b4a85cd
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import org.junit.Before;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.junit.Assert.fail;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Base class for various NIO loop unit tests.
+ */
+public abstract class AbstractLoopTest {
+
+ protected static final long MAX_MS_WAIT = 1500;
+
+ /** Block on specified countdown latch. Return when countdown reaches
+ * zero, or fail the test if the {@value #MAX_MS_WAIT} ms timeout expires.
+ *
+ * @param latch the latch
+ * @param label an identifying label
+ */
+ protected void waitForLatch(CountDownLatch latch, String label) {
+ try {
+ boolean ok = latch.await(MAX_MS_WAIT, TimeUnit.MILLISECONDS);
+ if (!ok) {
+ fail("Latch await timeout! [" + label + "]");
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Latch interrupt [" + label + "] : " + e);
+ fail("Unexpected interrupt");
+ }
+ }
+
+ protected ExecutorService exec;
+
+ @Before
+ public void setUp() {
+ exec = newSingleThreadExecutor(namedThreads("test"));
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java
new file mode 100644
index 00000000..f04a0126
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.onlab.junit.TestTools.delay;
+
+/**
+ * Unit tests for AcceptLoop.
+ */
+public class AcceptorLoopTest extends AbstractLoopTest {
+
+ private static final int PICK_EPHEMERAL = 0;
+
+ private static final SocketAddress SOCK_ADDR = new InetSocketAddress("127.0.0.1", PICK_EPHEMERAL);
+
+ private static class MyAcceptLoop extends AcceptorLoop {
+ private final CountDownLatch loopStarted = new CountDownLatch(1);
+ private final CountDownLatch loopFinished = new CountDownLatch(1);
+ private final CountDownLatch runDone = new CountDownLatch(1);
+ private final CountDownLatch ceaseLatch = new CountDownLatch(1);
+
+ private int acceptCount = 0;
+
+ MyAcceptLoop() throws IOException {
+ super(500, SOCK_ADDR);
+ }
+
+ @Override
+ protected void acceptConnection(ServerSocketChannel ssc) throws IOException {
+ acceptCount++;
+ }
+
+ @Override
+ public void loop() throws IOException {
+ loopStarted.countDown();
+ super.loop();
+ loopFinished.countDown();
+ }
+
+ @Override
+ public void run() {
+ super.run();
+ runDone.countDown();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ ceaseLatch.countDown();
+ }
+ }
+
+ @Test
+ public void basic() throws IOException {
+ MyAcceptLoop myAccLoop = new MyAcceptLoop();
+ AcceptorLoop accLoop = myAccLoop;
+ exec.execute(accLoop);
+ waitForLatch(myAccLoop.loopStarted, "loopStarted");
+ delay(200); // take a quick nap
+ accLoop.shutdown();
+ waitForLatch(myAccLoop.loopFinished, "loopFinished");
+ waitForLatch(myAccLoop.runDone, "runDone");
+ assertEquals(0, myAccLoop.acceptCount);
+ }
+}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
new file mode 100644
index 00000000..f42c8dc5
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.Random;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.onlab.junit.TestTools.delay;
+
+/**
+ * Integration test for the select, accept and IO loops.
+ */
+public class IOLoopIntegrationTest {
+
+ private static final int THREADS = 6;
+ private static final int TIMEOUT = 60;
+ private static final int MESSAGE_LENGTH = 128;
+
+ private static final int MILLION = 1000000;
+ private static final int MSG_COUNT = 40 * MILLION;
+
+ @Before
+ public void warmUp() throws Exception {
+ Logger.getLogger("").setLevel(Level.SEVERE);
+ try {
+ runTest(MILLION, MESSAGE_LENGTH, 15);
+ } catch (Throwable e) {
+ System.err.println("Failed warmup but moving on.");
+ e.printStackTrace();
+ }
+ }
+
+ // TODO: this test can not pass in some environments, need to be improved
+ @Ignore
+ @Test
+ public void basic() throws Exception {
+ runTest(MILLION, MESSAGE_LENGTH, TIMEOUT);
+ }
+
+ public void longHaul() throws Exception {
+ runTest(MSG_COUNT, MESSAGE_LENGTH, TIMEOUT);
+ }
+
+ private void runTest(int count, int size, int timeout) throws Exception {
+ // Use a random port to prevent conflicts.
+ int port = IOLoopTestServer.PORT + new Random().nextInt(100);
+
+ InetAddress ip = InetAddress.getLoopbackAddress();
+ IOLoopTestServer server = new IOLoopTestServer(ip, THREADS, size, port);
+ IOLoopTestClient client = new IOLoopTestClient(ip, THREADS, count, size, port);
+
+ server.start();
+ client.start();
+ delay(100); // Pause to allow loops to get going
+
+ client.await(timeout);
+ client.report();
+
+ server.stop();
+ server.report();
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
new file mode 100644
index 00000000..acc9a08f
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
@@ -0,0 +1,324 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import com.google.common.collect.Lists;
+import org.onlab.util.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.lang.String.format;
+import static java.lang.System.nanoTime;
+import static java.lang.System.out;
+import static org.onlab.nio.IOLoopTestServer.PORT;
+import static org.onlab.util.Tools.delay;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Auxiliary test fixture to measure speed of NIO-based channels.
+ */
+public class IOLoopTestClient {
+
+ private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
+
+ private final InetAddress ip;
+ private final int port;
+ private final int msgCount;
+ private final int msgLength;
+
+ private final List<CustomIOLoop> iloops = new ArrayList<>();
+ private final ExecutorService ipool;
+ private final ExecutorService wpool;
+
+ Counter messages;
+ Counter bytes;
+ long latencyTotal = 0;
+ long latencyCount = 0;
+
+
+ /**
+ * Main entry point to launch the client.
+ *
+ * @param args command-line arguments
+ * @throws java.io.IOException if unable to connect to server
+ * @throws InterruptedException if latch wait gets interrupted
+ * @throws java.util.concurrent.ExecutionException if wait gets interrupted
+ * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
+ */
+ public static void main(String[] args)
+ throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ startStandalone(args);
+
+ System.exit(0);
+ }
+
+ /**
+ * Starts a standalone IO loop test client.
+ *
+ * @param args command-line arguments
+ */
+ public static void startStandalone(String[] args)
+ throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
+ int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
+ int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
+ int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
+ int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
+
+ log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
+ wc, mc, ml, ip);
+ IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
+
+ client.start();
+ delay(500);
+
+ client.await(to);
+ client.report();
+ }
+
+ /**
+ * Creates a speed client.
+ *
+ * @param ip ip address of server
+ * @param wc worker count
+ * @param mc message count to send per client
+ * @param ml message length in bytes
+ * @param port socket port
+ * @throws java.io.IOException if unable to create IO loops
+ */
+ public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
+ this.ip = ip;
+ this.port = port;
+ this.msgCount = mc;
+ this.msgLength = ml;
+ this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
+ this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
+
+ for (int i = 0; i < wc; i++) {
+ iloops.add(new CustomIOLoop());
+ }
+ }
+
+ /**
+ * Starts the client workers.
+ *
+ * @throws java.io.IOException if unable to open connection
+ */
+ public void start() throws IOException {
+ messages = new Counter();
+ bytes = new Counter();
+
+ // First start up all the IO loops
+ for (CustomIOLoop l : iloops) {
+ ipool.execute(l);
+ }
+
+ // Wait for all of them to get going
+ for (CustomIOLoop l : iloops) {
+ l.awaitStart(1000);
+ }
+
+ // ... and Next open all connections; one-per-loop
+ for (CustomIOLoop l : iloops) {
+ openConnection(l);
+ }
+ }
+
+
+ /**
+ * Initiates open connection request and registers the pending socket
+ * channel with the given IO loop.
+ *
+ * @param loop loop with which the channel should be registered
+ * @throws java.io.IOException if the socket could not be open or connected
+ */
+ private void openConnection(CustomIOLoop loop) throws IOException {
+ SocketAddress sa = new InetSocketAddress(ip, port);
+ SocketChannel ch = SocketChannel.open();
+ ch.configureBlocking(false);
+ loop.connectStream(ch);
+ ch.connect(sa);
+ }
+
+
+ /**
+ * Waits for the client workers to complete.
+ *
+ * @param secs timeout in seconds
+ * @throws java.util.concurrent.ExecutionException if execution failed
+ * @throws InterruptedException if interrupt occurred while waiting
+ * @throws java.util.concurrent.TimeoutException if timeout occurred
+ */
+ public void await(int secs) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ for (CustomIOLoop l : iloops) {
+ if (l.worker.task != null) {
+ l.worker.task.get(secs, TimeUnit.SECONDS);
+ latencyTotal += l.latencyTotal;
+ latencyCount += l.latencyCount;
+ }
+ }
+ messages.freeze();
+ bytes.freeze();
+ }
+
+ /**
+ * Reports on the accumulated throughput and latency.
+ */
+ public void report() {
+ DecimalFormat f = new DecimalFormat("#,##0");
+ out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
+ f.format(messages.total()), f.format(bytes.total()),
+ f.format(messages.throughput()),
+ f.format(bytes.throughput() / (1024 * msgLength)),
+ f.format(latencyTotal / latencyCount)));
+ }
+
+
+ // Loop for transfer of fixed-length messages
+ private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+
+ Worker worker = new Worker();
+ long latencyTotal = 0;
+ long latencyCount = 0;
+
+
+ public CustomIOLoop() throws IOException {
+ super(500);
+ }
+
+
+ @Override
+ protected TestMessageStream createStream(ByteChannel channel) {
+ return new TestMessageStream(msgLength, channel, this);
+ }
+
+ @Override
+ protected synchronized void removeStream(MessageStream<TestMessage> stream) {
+ super.removeStream(stream);
+ messages.add(stream.messagesIn().total());
+ bytes.add(stream.bytesIn().total());
+ stream.messagesOut().reset();
+ stream.bytesOut().reset();
+ }
+
+ @Override
+ protected void processMessages(List<TestMessage> messages,
+ MessageStream<TestMessage> stream) {
+ for (TestMessage message : messages) {
+ // TODO: summarize latency data better
+ latencyTotal += nanoTime() - message.requestorTime();
+ latencyCount++;
+ }
+ worker.release(messages.size());
+ }
+
+ @Override
+ protected void connect(SelectionKey key) throws IOException {
+ super.connect(key);
+ TestMessageStream b = (TestMessageStream) key.attachment();
+ Worker w = ((CustomIOLoop) b.loop()).worker;
+ w.pump(b);
+ }
+
+ }
+
+ /**
+ * Auxiliary worker to connect and pump batched messages using blocking I/O.
+ */
+ private class Worker implements Runnable {
+
+ private static final int BATCH_SIZE = 50;
+ private static final int PERMITS = 2 * BATCH_SIZE;
+
+ private TestMessageStream stream;
+ private FutureTask<Worker> task;
+
+ // Stuff to throttle pump
+ private final Semaphore semaphore = new Semaphore(PERMITS);
+ private int msgWritten;
+
+ void pump(TestMessageStream stream) {
+ this.stream = stream;
+ task = new FutureTask<>(this, this);
+ wpool.execute(task);
+ }
+
+ @Override
+ public void run() {
+ try {
+ log.info("Worker started...");
+
+ while (msgWritten < msgCount) {
+ int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
+ writeBatch(size);
+ msgWritten += size;
+ }
+
+ // Now try to get all the permits back before sending poison pill
+ semaphore.acquireUninterruptibly(PERMITS);
+ stream.close();
+
+ log.info("Worker done...");
+
+ } catch (IOException e) {
+ log.error("Worker unable to perform I/O", e);
+ }
+ }
+
+
+ private void writeBatch(int size) throws IOException {
+ // Build a batch of messages
+ List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
+ for (int i = 0; i < size; i++) {
+ batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
+ }
+ acquire(size);
+ stream.write(batch);
+ }
+
+
+ // Release permits based on the specified number of message credits
+ private void release(int permits) {
+ semaphore.release(permits);
+ }
+
+ // Acquire permit for a single batch
+ private void acquire(int permits) {
+ semaphore.acquireUninterruptibly(permits);
+ }
+
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
new file mode 100644
index 00000000..d5ce5f39
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import com.google.common.collect.Lists;
+import org.onlab.util.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static java.lang.String.format;
+import static java.lang.System.out;
+import static org.onlab.util.Tools.delay;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Auxiliary test fixture to measure speed of NIO-based channels.
+ */
+public class IOLoopTestServer {
+
+ private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
+
+ private static final int PRUNE_FREQUENCY = 1000;
+
+ static final int PORT = 9876;
+ static final long TIMEOUT = 1000;
+
+ static final boolean SO_NO_DELAY = false;
+ static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
+ static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
+
+ static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
+
+ private final AcceptorLoop aloop;
+ private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
+
+ private final List<CustomIOLoop> iloops = new ArrayList<>();
+ private final ExecutorService ipool;
+
+ private final int workerCount;
+ private final int msgLength;
+ private int lastWorker = -1;
+
+ Counter messages;
+ Counter bytes;
+
+ /**
+ * Main entry point to launch the server.
+ *
+ * @param args command-line arguments
+ * @throws java.io.IOException if unable to crate IO loops
+ */
+ public static void main(String[] args) throws IOException {
+ startStandalone(args);
+ System.exit(0);
+ }
+
+ /**
+ * Starts a standalone IO loop test server.
+ *
+ * @param args command-line arguments
+ */
+ public static void startStandalone(String[] args) throws IOException {
+ InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
+ int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
+ int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
+
+ log.info("Setting up the server with {} workers, {} byte messages on {}... ",
+ wc, ml, ip);
+ IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
+ server.start();
+
+ // Start pruning clients and keep going until their number goes to 0.
+ int remaining = -1;
+ while (remaining == -1 || remaining > 0) {
+ delay(PRUNE_FREQUENCY);
+ int r = server.prune();
+ remaining = remaining == -1 && r == 0 ? remaining : r;
+ }
+ server.stop();
+ }
+
+ /**
+ * Creates a speed server.
+ *
+ * @param ip optional ip of the adapter where to bind
+ * @param wc worker count
+ * @param ml message length in bytes
+ * @param port listen port
+ * @throws java.io.IOException if unable to create IO loops
+ */
+ public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
+ this.workerCount = wc;
+ this.msgLength = ml;
+ this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
+
+ this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
+ for (int i = 0; i < workerCount; i++) {
+ iloops.add(new CustomIOLoop());
+ }
+ }
+
+ /**
+ * Start the server IO loops and kicks off throughput tracking.
+ */
+ public void start() {
+ messages = new Counter();
+ bytes = new Counter();
+
+ for (CustomIOLoop l : iloops) {
+ ipool.execute(l);
+ }
+ apool.execute(aloop);
+
+ for (CustomIOLoop l : iloops) {
+ l.awaitStart(TIMEOUT);
+ }
+ aloop.awaitStart(TIMEOUT);
+ }
+
+ /**
+ * Stop the server IO loops and freezes throughput tracking.
+ */
+ public void stop() {
+ aloop.shutdown();
+ for (CustomIOLoop l : iloops) {
+ l.shutdown();
+ }
+
+ for (CustomIOLoop l : iloops) {
+ l.awaitStop(TIMEOUT);
+ }
+ aloop.awaitStop(TIMEOUT);
+
+ messages.freeze();
+ bytes.freeze();
+ }
+
+ /**
+ * Reports on the accumulated throughput and latency.
+ */
+ public void report() {
+ DecimalFormat f = new DecimalFormat("#,##0");
+ out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
+ f.format(messages.total()), f.format(bytes.total()),
+ f.format(messages.throughput()),
+ f.format(bytes.throughput() / (1024 * msgLength))));
+ }
+
+ /**
+ * Prunes the IO loops of stale message buffers.
+ *
+ * @return number of remaining IO loops among all workers.
+ */
+ public int prune() {
+ int count = 0;
+ for (CustomIOLoop l : iloops) {
+ count += l.pruneStaleStreams();
+ }
+ return count;
+ }
+
+ // Get the next worker to which a client should be assigned
+ private synchronized CustomIOLoop nextWorker() {
+ lastWorker = (lastWorker + 1) % workerCount;
+ return iloops.get(lastWorker);
+ }
+
+ // Loop for transfer of fixed-length messages
+ private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+
+ public CustomIOLoop() throws IOException {
+ super(500);
+ }
+
+ @Override
+ protected TestMessageStream createStream(ByteChannel channel) {
+ return new TestMessageStream(msgLength, channel, this);
+ }
+
+ @Override
+ protected void removeStream(MessageStream<TestMessage> stream) {
+ super.removeStream(stream);
+ messages.add(stream.messagesIn().total());
+ bytes.add(stream.bytesIn().total());
+ }
+
+ @Override
+ protected void processMessages(List<TestMessage> messages,
+ MessageStream<TestMessage> stream) {
+ try {
+ stream.write(createResponses(messages));
+ } catch (IOException e) {
+ log.error("Unable to echo messages", e);
+ }
+ }
+
+ private List<TestMessage> createResponses(List<TestMessage> messages) {
+ List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
+ for (TestMessage message : messages) {
+ responses.add(new TestMessage(message.length(), message.requestorTime(),
+ System.nanoTime(), message.padding()));
+ }
+ return responses;
+ }
+ }
+
+ // Loop for accepting client connections
+ private class CustomAcceptLoop extends AcceptorLoop {
+
+ public CustomAcceptLoop(SocketAddress address) throws IOException {
+ super(500, address);
+ }
+
+ @Override
+ protected void acceptConnection(ServerSocketChannel channel) throws IOException {
+ SocketChannel sc = channel.accept();
+ sc.configureBlocking(false);
+
+ Socket so = sc.socket();
+ so.setTcpNoDelay(SO_NO_DELAY);
+ so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
+ so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
+
+ nextWorker().acceptStream(sc);
+ log.info("Connected client");
+ }
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
new file mode 100644
index 00000000..9965d389
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests of the message message stream implementation.
+ */
+public class MessageStreamTest {
+
+ private static final int SIZE = 64;
+ private static final int BIG_SIZE = 32 * 1024;
+
+ private TestMessage message;
+
+ private TestIOLoop loop;
+ private TestByteChannel channel;
+ private TestMessageStream stream;
+ private TestKey key;
+
+ @Before
+ public void setUp() throws IOException {
+ loop = new TestIOLoop();
+ channel = new TestByteChannel();
+ key = new TestKey(channel);
+ stream = loop.createStream(channel);
+ stream.setKey(key);
+ stream.setNonStrict();
+ message = new TestMessage(SIZE, 0, 0, stream.padding());
+ }
+
+ @After
+ public void tearDown() {
+ loop.shutdown();
+ stream.close();
+ }
+
+ // Validates the state of the message stream
+ private void validate(boolean wp, boolean fr, int read, int written) {
+ assertEquals(wp, stream.isWritePending());
+ assertEquals(fr, stream.isFlushRequired());
+ assertEquals(read, channel.readBytes);
+ assertEquals(written, channel.writtenBytes);
+ }
+
+ @Test
+ public void endOfStream() throws IOException {
+ channel.close();
+ List<TestMessage> messages = stream.read();
+ assertNull(messages);
+ }
+
+ @Test
+ public void bufferGrowth() throws IOException {
+ // Create a stream for big messages and test the growth.
+ stream = new TestMessageStream(BIG_SIZE, channel, loop);
+ TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding());
+
+ stream.write(bigMessage);
+ stream.write(bigMessage);
+ stream.write(bigMessage);
+ stream.write(bigMessage);
+ stream.write(bigMessage);
+ }
+
+ @Test
+ public void discardBeforeKey() {
+ // Create a stream that does not yet have the key set and discard it.
+ stream = loop.createStream(channel);
+ assertNull(stream.key());
+ stream.close();
+ // There is not key, so nothing to check; we just expect no problem.
+ }
+
+ @Test
+ public void bufferedRead() throws IOException {
+ channel.bytesToRead = SIZE + 4;
+ List<TestMessage> messages = stream.read();
+ assertEquals(1, messages.size());
+ validate(false, false, SIZE + 4, 0);
+
+ channel.bytesToRead = SIZE - 4;
+ messages = stream.read();
+ assertEquals(1, messages.size());
+ validate(false, false, SIZE * 2, 0);
+ }
+
+ @Test
+ public void bufferedWrite() throws IOException {
+ validate(false, false, 0, 0);
+
+ // First write is immediate...
+ stream.write(message);
+ validate(false, false, 0, SIZE);
+
+ // Second and third get buffered...
+ stream.write(message);
+ validate(false, true, 0, SIZE);
+ stream.write(message);
+ validate(false, true, 0, SIZE);
+
+ // Reset write, which will flush if needed; the next write is again buffered
+ stream.flushIfWriteNotPending();
+ validate(false, false, 0, SIZE * 3);
+ stream.write(message);
+ validate(false, true, 0, SIZE * 3);
+
+ // Select reset, which will flush if needed; the next write is again buffered
+ stream.flushIfPossible();
+ validate(false, false, 0, SIZE * 4);
+ stream.write(message);
+ validate(false, true, 0, SIZE * 4);
+ stream.flush();
+ validate(false, true, 0, SIZE * 4);
+ }
+
+ @Test
+ public void bufferedWriteList() throws IOException {
+ validate(false, false, 0, 0);
+
+ // First write is immediate...
+ List<TestMessage> messages = new ArrayList<>();
+ messages.add(message);
+ messages.add(message);
+ messages.add(message);
+ messages.add(message);
+
+ stream.write(messages);
+ validate(false, false, 0, SIZE * 4);
+
+ stream.write(messages);
+ validate(false, true, 0, SIZE * 4);
+
+ stream.flushIfPossible();
+ validate(false, false, 0, SIZE * 8);
+ }
+
+ @Test
+ public void bufferedPartialWrite() throws IOException {
+ validate(false, false, 0, 0);
+
+ // First write is immediate...
+ stream.write(message);
+ validate(false, false, 0, SIZE);
+
+ // Tell test channel to accept only half.
+ channel.bytesToWrite = SIZE / 2;
+
+ // Second and third get buffered...
+ stream.write(message);
+ validate(false, true, 0, SIZE);
+ stream.flushIfPossible();
+ validate(true, true, 0, SIZE + SIZE / 2);
+ }
+
+ @Test
+ public void bufferedPartialWrite2() throws IOException {
+ validate(false, false, 0, 0);
+
+ // First write is immediate...
+ stream.write(message);
+ validate(false, false, 0, SIZE);
+
+ // Tell test channel to accept only half.
+ channel.bytesToWrite = SIZE / 2;
+
+ // Second and third get buffered...
+ stream.write(message);
+ validate(false, true, 0, SIZE);
+ stream.flushIfWriteNotPending();
+ validate(true, true, 0, SIZE + SIZE / 2);
+ }
+
+ @Test
+ public void bufferedReadWrite() throws IOException {
+ channel.bytesToRead = SIZE + 4;
+ List<TestMessage> messages = stream.read();
+ assertEquals(1, messages.size());
+ validate(false, false, SIZE + 4, 0);
+
+ stream.write(message);
+ validate(false, false, SIZE + 4, SIZE);
+
+ channel.bytesToRead = SIZE - 4;
+ messages = stream.read();
+ assertEquals(1, messages.size());
+ validate(false, false, SIZE * 2, SIZE);
+ }
+
+ // Fake IO driver loop
+ private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+
+ public TestIOLoop() throws IOException {
+ super(500);
+ }
+
+ @Override
+ protected TestMessageStream createStream(ByteChannel channel) {
+ return new TestMessageStream(SIZE, channel, this);
+ }
+
+ @Override
+ protected void processMessages(List<TestMessage> messages,
+ MessageStream<TestMessage> stream) {
+ }
+
+ }
+
+ // Byte channel test fixture
+ private static class TestByteChannel extends SelectableChannel implements ByteChannel {
+
+ private static final int BUFFER_LENGTH = 1024;
+ byte[] bytes = new byte[BUFFER_LENGTH];
+ int bytesToWrite = BUFFER_LENGTH;
+ int bytesToRead = BUFFER_LENGTH;
+ int writtenBytes = 0;
+ int readBytes = 0;
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int l = Math.min(dst.remaining(), bytesToRead);
+ if (bytesToRead > 0) {
+ readBytes += l;
+ dst.put(bytes, 0, l);
+ }
+ return l;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ int l = Math.min(src.remaining(), bytesToWrite);
+ writtenBytes += l;
+ src.get(bytes, 0, l);
+ return l;
+ }
+
+ @Override
+ public Object blockingLock() {
+ return null;
+ }
+
+ @Override
+ public SelectableChannel configureBlocking(boolean arg0) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean isBlocking() {
+ return false;
+ }
+
+ @Override
+ public boolean isRegistered() {
+ return false;
+ }
+
+ @Override
+ public SelectionKey keyFor(Selector arg0) {
+ return null;
+ }
+
+ @Override
+ public SelectorProvider provider() {
+ return null;
+ }
+
+ @Override
+ public SelectionKey register(Selector arg0, int arg1, Object arg2)
+ throws ClosedChannelException {
+ return null;
+ }
+
+ @Override
+ public int validOps() {
+ return 0;
+ }
+
+ @Override
+ protected void implCloseChannel() throws IOException {
+ bytesToRead = -1;
+ }
+
+ }
+
+ // Selection key text fixture
+ private static class TestKey extends SelectionKey {
+
+ private SelectableChannel channel;
+
+ public TestKey(TestByteChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public SelectableChannel channel() {
+ return channel;
+ }
+
+ @Override
+ public int interestOps() {
+ return 0;
+ }
+
+ @Override
+ public SelectionKey interestOps(int ops) {
+ return null;
+ }
+
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+
+ @Override
+ public int readyOps() {
+ return 0;
+ }
+
+ @Override
+ public Selector selector() {
+ return null;
+ }
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java
new file mode 100644
index 00000000..cf5b2bb9
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.AbstractSelectableChannel;
+import java.nio.channels.spi.AbstractSelector;
+import java.util.Set;
+
+/**
+ * A selector instrumented for unit tests.
+ */
+public class MockSelector extends AbstractSelector {
+
+ int wakeUpCount = 0;
+
+ /**
+ * Creates a mock selector, specifying null as the SelectorProvider.
+ */
+ public MockSelector() {
+ super(null);
+ }
+
+ @Override
+ public String toString() {
+ return "{MockSelector: wake=" + wakeUpCount + "}";
+ }
+
+ @Override
+ protected void implCloseSelector() throws IOException {
+ }
+
+ @Override
+ protected SelectionKey register(AbstractSelectableChannel ch, int ops,
+ Object att) {
+ return null;
+ }
+
+ @Override
+ public Set<SelectionKey> keys() {
+ return null;
+ }
+
+ @Override
+ public Set<SelectionKey> selectedKeys() {
+ return null;
+ }
+
+ @Override
+ public int selectNow() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int select(long timeout) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int select() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public Selector wakeup() {
+ wakeUpCount++;
+ return null;
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
new file mode 100644
index 00000000..825479b5
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Test message for measuring rate and round-trip latency.
+ */
+public class TestMessage extends AbstractMessage {
+
+ private final byte[] padding;
+
+ private final long requestorTime;
+ private final long responderTime;
+
+ /**
+ * Creates a new message with the specified data.
+ *
+ * @param requestorTime requester time
+ * @param responderTime responder time
+ * @param padding message padding
+ */
+ TestMessage(int length, long requestorTime, long responderTime, byte[] padding) {
+ this.length = length;
+ this.requestorTime = requestorTime;
+ this.responderTime = responderTime;
+ this.padding = checkNotNull(padding, "Padding cannot be null");
+ }
+
+ public long requestorTime() {
+ return requestorTime;
+ }
+
+ public long responderTime() {
+ return responderTime;
+ }
+
+ public byte[] padding() {
+ return padding;
+ }
+
+}
diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
new file mode 100644
index 00000000..fc86beb9
--- /dev/null
+++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.nio;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Fixed-length message transfer buffer.
+ */
+public class TestMessageStream extends MessageStream<TestMessage> {
+
+ private static final String E_WRONG_LEN = "Illegal message length: ";
+ private static final long START_TAG = 0xfeedcafedeaddeedL;
+ private static final long END_TAG = 0xbeadcafedeaddeedL;
+ private static final int META_LENGTH = 40;
+
+ private final int length;
+ private boolean isStrict = true;
+
+ public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
+ super(loop, ch, 64 * 1024, 500);
+ checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
+ this.length = length;
+ }
+
+ void setNonStrict() {
+ isStrict = false;
+ }
+
+ @Override
+ protected TestMessage read(ByteBuffer rb) {
+ if (rb.remaining() < length) {
+ return null;
+ }
+
+ long startTag = rb.getLong();
+ if (isStrict) {
+ checkState(startTag == START_TAG, "Incorrect message start");
+ }
+
+ long size = rb.getLong();
+ long requestorTime = rb.getLong();
+ long responderTime = rb.getLong();
+ byte[] padding = padding();
+ rb.get(padding);
+
+ long endTag = rb.getLong();
+ if (isStrict) {
+ checkState(endTag == END_TAG, "Incorrect message end");
+ }
+
+ return new TestMessage((int) size, requestorTime, responderTime, padding);
+ }
+
+ @Override
+ protected void write(TestMessage message, ByteBuffer wb) {
+ if (message.length() != length) {
+ throw new IllegalArgumentException(E_WRONG_LEN + message.length());
+ }
+
+ wb.putLong(START_TAG);
+ wb.putLong(message.length());
+ wb.putLong(message.requestorTime());
+ wb.putLong(message.responderTime());
+ wb.put(message.padding(), 0, length - META_LENGTH);
+ wb.putLong(END_TAG);
+ }
+
+ public byte[] padding() {
+ return new byte[length - META_LENGTH];
+ }
+}