From 13d05bc8458758ee39cb829098241e89616717ee Mon Sep 17 00:00:00 2001 From: Ashlee Young Date: Wed, 9 Sep 2015 22:15:21 -0700 Subject: ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60 Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd --- framework/src/onos/utils/nio/pom.xml | 58 +++ .../main/java/org/onlab/nio/AbstractMessage.java | 30 ++ .../src/main/java/org/onlab/nio/AcceptorLoop.java | 123 ++++++ .../nio/src/main/java/org/onlab/nio/IOLoop.java | 302 +++++++++++++++ .../nio/src/main/java/org/onlab/nio/Message.java | 30 ++ .../src/main/java/org/onlab/nio/MessageStream.java | 424 +++++++++++++++++++++ .../src/main/java/org/onlab/nio/SelectorLoop.java | 175 +++++++++ .../src/main/java/org/onlab/nio/package-info.java | 21 + .../java/org/onlab/nio/service/DefaultIOLoop.java | 66 ++++ .../java/org/onlab/nio/service/DefaultMessage.java | 104 +++++ .../onlab/nio/service/DefaultMessageStream.java | 139 +++++++ .../org/onlab/nio/service/IOLoopMessaging.java | 334 ++++++++++++++++ .../java/org/onlab/nio/service/package-info.java | 20 + .../test/java/org/onlab/nio/AbstractLoopTest.java | 60 +++ .../test/java/org/onlab/nio/AcceptorLoopTest.java | 87 +++++ .../java/org/onlab/nio/IOLoopIntegrationTest.java | 82 ++++ .../test/java/org/onlab/nio/IOLoopTestClient.java | 324 ++++++++++++++++ .../test/java/org/onlab/nio/IOLoopTestServer.java | 256 +++++++++++++ .../test/java/org/onlab/nio/MessageStreamTest.java | 359 +++++++++++++++++ .../src/test/java/org/onlab/nio/MockSelector.java | 85 +++++ .../src/test/java/org/onlab/nio/TestMessage.java | 56 +++ .../test/java/org/onlab/nio/TestMessageStream.java | 89 +++++ 22 files changed, 3224 insertions(+) create mode 100644 framework/src/onos/utils/nio/pom.xml create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java create mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java create mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java create mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java create mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java create mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java create mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java create mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java create mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java create mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java create mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java (limited to 'framework/src/onos/utils/nio') diff --git a/framework/src/onos/utils/nio/pom.xml b/framework/src/onos/utils/nio/pom.xml new file mode 100644 index 00000000..b9e4d447 --- /dev/null +++ b/framework/src/onos/utils/nio/pom.xml @@ -0,0 +1,58 @@ + + + + 4.0.0 + + + org.onosproject + onlab-utils + 1.3.0-SNAPSHOT + ../pom.xml + + + onlab-nio + bundle + + Fast network I/O using Java NIO + + + + com.google.guava + guava-testlib + test + + + commons-pool + commons-pool + + + org.onosproject + onos-api + + + org.onosproject + onlab-misc + + + org.onosproject + onlab-junit + test + + + diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java new file mode 100644 index 00000000..16bb9359 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java @@ -0,0 +1,30 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +/** + * Base {@link Message} implementation. + */ +public abstract class AbstractMessage implements Message { + + protected int length; + + @Override + public int length() { + return length; + } + +} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java new file mode 100644 index 00000000..e416f3be --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java @@ -0,0 +1,123 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import java.io.IOException; +import java.net.SocketAddress; +import java.net.StandardSocketOptions; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.util.Iterator; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Selector loop derivative tailored to acceptConnection inbound connections. + */ +public abstract class AcceptorLoop extends SelectorLoop { + + private SocketAddress listenAddress; + private ServerSocketChannel socketChannel; + + /** + * Creates an acceptor loop with the specified selection timeout and + * accepting connections on the the given address. + * + * @param selectTimeout selection timeout; specified in millis + * @param listenAddress socket address where to listen for connections + * @throws IOException if the backing selector cannot be opened + */ + public AcceptorLoop(long selectTimeout, SocketAddress listenAddress) + throws IOException { + super(selectTimeout); + this.listenAddress = checkNotNull(listenAddress, "Address cannot be null"); + } + + /** + * Hook to accept an inbound connection on the specified socket channel. + * + * @param channel socketChannel where an accept operation awaits + * @throws IOException if the accept operation cannot be processed + */ + protected abstract void acceptConnection(ServerSocketChannel channel) throws IOException; + + /** + * Opens a new server socket channel configured in non-blocking mode and + * bound to the loop's listen address. + * + * @throws IOException if unable to open or configure the socket channel + */ + protected synchronized void openChannel() throws IOException { + socketChannel = ServerSocketChannel.open(); + socketChannel.configureBlocking(false); + socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + socketChannel.register(selector, SelectionKey.OP_ACCEPT); + socketChannel.bind(listenAddress); + } + + /** + * Closes the server socket channel. + * + * @throws IOException if unable to close the socketChannel + */ + protected synchronized void closechannel() throws IOException { + if (socketChannel != null) { + socketChannel.close(); + socketChannel = null; + } + } + + @Override + public void shutdown() { + try { + closechannel(); + } catch (IOException e) { + log.warn("Unable to close the socketChannel", e); + } + super.shutdown(); + } + + @Override + protected void loop() throws IOException { + openChannel(); + notifyReady(); + + // Keep looping until told otherwise. + while (isRunning()) { + // Attempt a selection; if no operations selected or if signalled + // to shutdown, spin through. + int count = selector.select(selectTimeout); + if (count == 0 || !isRunning()) { + continue; + } + + // Iterate over all keys selected for an operation and process them. + Iterator keys = selector.selectedKeys().iterator(); + while (keys.hasNext()) { + // Fetch the key and remove it from the pending list. + SelectionKey key = keys.next(); + keys.remove(); + + // If the key has a pending acceptConnection operation, process it. + if (key.isAcceptable()) { + acceptConnection((ServerSocketChannel) key.channel()); + } + } + } + } + +} + diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java new file mode 100644 index 00000000..106df7b2 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java @@ -0,0 +1,302 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * I/O loop for driving inbound & outbound {@link Message} transfer via + * {@link MessageStream}. + * + * @param message type + * @param message stream type + */ +public abstract class IOLoop> + extends SelectorLoop { + + // Queue of requests for new message streams to enter the IO loop processing. + private final Queue newStreamRequests = new ConcurrentLinkedQueue<>(); + + // Carries information required for admitting a new message stream. + private class NewStreamRequest { + private final S stream; + private final SelectableChannel channel; + private final int op; + + public NewStreamRequest(S stream, SelectableChannel channel, int op) { + this.stream = stream; + this.channel = channel; + this.op = op; + } + } + + // Set of message streams currently admitted into the IO loop. + private final Set> streams = new CopyOnWriteArraySet<>(); + + /** + * Creates an IO loop with the given selection timeout. + * + * @param timeout selection timeout in milliseconds + * @throws IOException if the backing selector cannot be opened + */ + public IOLoop(long timeout) throws IOException { + super(timeout); + } + + /** + * Returns the number of message stream in custody of the loop. + * + * @return number of message streams + */ + public int streamCount() { + return streams.size(); + } + + /** + * Creates a new message stream backed by the specified socket channel. + * + * @param byteChannel backing byte channel + * @return newly created message stream + */ + protected abstract S createStream(ByteChannel byteChannel); + + /** + * Removes the specified message stream from the IO loop. + * + * @param stream message stream to remove + */ + protected void removeStream(MessageStream stream) { + streams.remove(stream); + } + + /** + * Processes the list of messages extracted from the specified message + * stream. + * + * @param messages non-empty list of received messages + * @param stream message stream from which the messages were extracted + */ + protected abstract void processMessages(List messages, MessageStream stream); + + /** + * Completes connection request pending on the given selection key. + * + * @param key selection key holding the pending connect operation. + * @throws IOException when I/O exception of some sort has occurred + */ + protected void connect(SelectionKey key) throws IOException { + SocketChannel ch = (SocketChannel) key.channel(); + ch.finishConnect(); + if (key.isValid()) { + key.interestOps(SelectionKey.OP_READ); + } + } + + /** + * Processes an IO operation pending on the specified key. + * + * @param key selection key holding the pending I/O operation. + */ + protected void processKeyOperation(SelectionKey key) { + @SuppressWarnings("unchecked") + S stream = (S) key.attachment(); + + try { + // If the key is not valid, bail out. + if (!key.isValid()) { + stream.close(); + return; + } + + // If there is a pending connect operation, complete it. + if (key.isConnectable()) { + try { + connect(key); + } catch (IOException | IllegalStateException e) { + log.warn("Unable to complete connection", e); + } + } + + // If there is a read operation, slurp as much data as possible. + if (key.isReadable()) { + List messages = stream.read(); + + // No messages or failed flush imply disconnect; bail. + if (messages == null || stream.hadError()) { + stream.close(); + return; + } + + // If there were any messages read, process them. + if (!messages.isEmpty()) { + try { + processMessages(messages, stream); + } catch (RuntimeException e) { + onError(stream, e); + } + } + } + + // If there are pending writes, flush them + if (key.isWritable()) { + stream.flushIfPossible(); + } + + // If there were any issued flushing, close the stream. + if (stream.hadError()) { + stream.close(); + } + + } catch (CancelledKeyException e) { + // Key was cancelled, so silently close the stream + stream.close(); + } catch (IOException e) { + if (!stream.isClosed() && !isResetByPeer(e)) { + log.warn("Unable to process IO", e); + } + stream.close(); + } + } + + // Indicates whether or not this exception is caused by 'reset by peer'. + private boolean isResetByPeer(IOException e) { + Throwable cause = e.getCause(); + return cause != null && cause instanceof IOException && + cause.getMessage().contains("reset by peer"); + } + + /** + * Hook to allow intercept of any errors caused during message processing. + * Default behaviour is to rethrow the error. + * + * @param stream message stream involved in the error + * @param error the runtime exception + */ + protected void onError(S stream, RuntimeException error) { + throw error; + } + + /** + * Admits a new message stream backed by the specified socket channel + * with a pending accept operation. + * + * @param channel backing socket channel + * @return newly accepted message stream + */ + public S acceptStream(SocketChannel channel) { + return createAndAdmit(channel, SelectionKey.OP_READ); + } + + + /** + * Admits a new message stream backed by the specified socket channel + * with a pending connect operation. + * + * @param channel backing socket channel + * @return newly connected message stream + */ + public S connectStream(SocketChannel channel) { + return createAndAdmit(channel, SelectionKey.OP_CONNECT); + } + + /** + * Creates a new message stream backed by the specified socket channel + * and admits it into the IO loop. + * + * @param channel socket channel + * @param op pending operations mask to be applied to the selection + * key as a set of initial interestedOps + * @return newly created message stream + */ + private synchronized S createAndAdmit(SocketChannel channel, int op) { + S stream = createStream(channel); + streams.add(stream); + newStreamRequests.add(new NewStreamRequest(stream, channel, op)); + selector.wakeup(); + return stream; + } + + /** + * Safely admits new streams into the IO loop. + */ + private void admitNewStreams() { + Iterator it = newStreamRequests.iterator(); + while (isRunning() && it.hasNext()) { + try { + NewStreamRequest request = it.next(); + it.remove(); + SelectionKey key = request.channel.register(selector, request.op, + request.stream); + request.stream.setKey(key); + } catch (ClosedChannelException e) { + log.warn("Unable to admit new message stream", e); + } + } + } + + @Override + protected void loop() throws IOException { + notifyReady(); + + // Keep going until told otherwise. + while (isRunning()) { + admitNewStreams(); + + // Process flushes & write selects on all streams + for (MessageStream stream : streams) { + stream.flushIfWriteNotPending(); + } + + // Select keys and process them. + int count = selector.select(selectTimeout); + if (count > 0 && isRunning()) { + Iterator it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + it.remove(); + processKeyOperation(key); + } + } + } + } + + /** + * Prunes the registered streams by discarding any stale ones. + * + * @return number of remaining streams + */ + public synchronized int pruneStaleStreams() { + for (MessageStream stream : streams) { + if (stream.isStale()) { + stream.close(); + } + } + return streams.size(); + } + +} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java new file mode 100644 index 00000000..c1a339e0 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java @@ -0,0 +1,30 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +/** + * Representation of a message transferred via {@link MessageStream}. + */ +public interface Message { + + /** + * Gets the message length in bytes. + * + * @return number of bytes + */ + int length(); + +} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java new file mode 100644 index 00000000..a19e8aac --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java @@ -0,0 +1,424 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import org.onlab.util.Counter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.System.currentTimeMillis; +import static java.nio.ByteBuffer.allocateDirect; + +/** + * Bi-directional message stream for transferring messages to & from the + * network via two byte buffers. + * + * @param message type + */ +public abstract class MessageStream { + + protected Logger log = LoggerFactory.getLogger(getClass()); + + private final IOLoop loop; + private final ByteChannel channel; + private final int maxIdleMillis; + + private final ByteBuffer inbound; + private ByteBuffer outbound; + private SelectionKey key; + + private volatile boolean closed = false; + private volatile boolean writePending; + private volatile boolean writeOccurred; + + private Exception ioError; + private long lastActiveTime; + + private final Counter bytesIn = new Counter(); + private final Counter messagesIn = new Counter(); + private final Counter bytesOut = new Counter(); + private final Counter messagesOut = new Counter(); + + /** + * Creates a message stream associated with the specified IO loop and + * backed by the given byte channel. + * + * @param loop IO loop + * @param byteChannel backing byte channel + * @param bufferSize size of the backing byte buffers + * @param maxIdleMillis maximum number of millis the stream can be idle + * before it will be closed + */ + protected MessageStream(IOLoop loop, ByteChannel byteChannel, + int bufferSize, int maxIdleMillis) { + this.loop = checkNotNull(loop, "Loop cannot be null"); + this.channel = checkNotNull(byteChannel, "Byte channel cannot be null"); + + checkArgument(maxIdleMillis > 0, "Idle time must be positive"); + this.maxIdleMillis = maxIdleMillis; + + inbound = allocateDirect(bufferSize); + outbound = allocateDirect(bufferSize); + } + + /** + * Gets a single message from the specified byte buffer; this is + * to be done without manipulating the buffer via flip, reset or clear. + * + * @param buffer byte buffer + * @return read message or null if there are not enough bytes to read + * a complete message + */ + protected abstract M read(ByteBuffer buffer); + + /** + * Puts the specified message into the specified byte buffer; this is + * to be done without manipulating the buffer via flip, reset or clear. + * + * @param message message to be write into the buffer + * @param buffer byte buffer + */ + protected abstract void write(M message, ByteBuffer buffer); + + /** + * Closes the message buffer. + */ + public void close() { + synchronized (this) { + if (closed) { + return; + } + closed = true; + } + + bytesIn.freeze(); + bytesOut.freeze(); + messagesIn.freeze(); + messagesOut.freeze(); + + loop.removeStream(this); + if (key != null) { + try { + key.cancel(); + key.channel().close(); + } catch (IOException e) { + log.warn("Unable to close stream", e); + } + } + } + + /** + * Indicates whether this buffer has been closed. + * + * @return true if this stream has been closed + */ + public synchronized boolean isClosed() { + return closed; + } + + /** + * Returns the stream IO selection key. + * + * @return socket channel registration selection key + */ + public SelectionKey key() { + return key; + } + + /** + * Binds the selection key to be used for driving IO operations on the stream. + * + * @param key IO selection key + */ + public void setKey(SelectionKey key) { + this.key = key; + this.lastActiveTime = currentTimeMillis(); + } + + /** + * Returns the IO loop to which this stream is bound. + * + * @return I/O loop used to drive this stream + */ + public IOLoop loop() { + return loop; + } + + /** + * Indicates whether the any prior IO encountered an error. + * + * @return true if a write failed + */ + public boolean hadError() { + return ioError != null; + } + + /** + * Gets the prior IO error, if one occurred. + * + * @return IO error; null if none occurred + */ + public Exception getError() { + return ioError; + } + + /** + * Reads, without blocking, a list of messages from the stream. + * The list will be empty if there were not messages pending. + * + * @return list of messages or null if backing channel has been closed + * @throws IOException if messages could not be read + */ + public List read() throws IOException { + try { + int read = channel.read(inbound); + if (read != -1) { + // Read the messages one-by-one and add them to the list. + List messages = new ArrayList<>(); + M message; + inbound.flip(); + while ((message = read(inbound)) != null) { + messages.add(message); + messagesIn.add(1); + bytesIn.add(message.length()); + } + inbound.compact(); + + // Mark the stream with current time to indicate liveness. + lastActiveTime = currentTimeMillis(); + return messages; + } + return null; + + } catch (Exception e) { + throw new IOException("Unable to read messages", e); + } + } + + /** + * Writes the specified list of messages to the stream. + * + * @param messages list of messages to write + * @throws IOException if error occurred while writing the data + */ + public void write(List messages) throws IOException { + synchronized (this) { + // First write all messages. + for (M m : messages) { + append(m); + } + flushUnlessAlreadyPlanningTo(); + } + } + + /** + * Writes the given message to the stream. + * + * @param message message to write + * @throws IOException if error occurred while writing the data + */ + public void write(M message) throws IOException { + synchronized (this) { + append(message); + flushUnlessAlreadyPlanningTo(); + } + } + + // Appends the specified message into the internal buffer, growing the + // buffer if required. + private void append(M message) { + // If the buffer does not have sufficient length double it. + while (outbound.remaining() < message.length()) { + doubleSize(); + } + write(message, outbound); + messagesOut.add(1); + bytesOut.add(message.length()); + } + + // Forces a flush, unless one is planned already. + private void flushUnlessAlreadyPlanningTo() throws IOException { + if (!writeOccurred && !writePending) { + flush(); + } + } + + /** + * Flushes any pending writes. + * + * @throws IOException if flush failed + */ + public void flush() throws IOException { + synchronized (this) { + if (!writeOccurred && !writePending) { + outbound.flip(); + try { + channel.write(outbound); + } catch (IOException e) { + if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) { + log.warn("Unable to write data", e); + ioError = e; + } + } + lastActiveTime = currentTimeMillis(); + writeOccurred = true; + writePending = outbound.hasRemaining(); + outbound.compact(); + } + } + } + + /** + * Indicates whether the stream has bytes to be written to the channel. + * + * @return true if there are bytes to be written + */ + boolean isWritePending() { + synchronized (this) { + return writePending; + } + } + + + /** + * Indicates whether data has been written but not flushed yet. + * + * @return true if flush is required + */ + boolean isFlushRequired() { + synchronized (this) { + return outbound.position() > 0; + } + } + + /** + * Attempts to flush data, internal stream state and channel availability + * permitting. Invoked by the driver I/O loop during handling of writable + * selection key. + *

+ * Resets the internal state flags {@code writeOccurred} and + * {@code writePending}. + *

+ * @throws IOException if implicit flush failed + */ + void flushIfPossible() throws IOException { + synchronized (this) { + writePending = false; + writeOccurred = false; + if (outbound.position() > 0) { + flush(); + } + } + key.interestOps(SelectionKey.OP_READ); + } + + /** + * Attempts to flush data, internal stream state and channel availability + * permitting and if other writes are not pending. Invoked by the driver + * I/O loop prior to entering select wait. Resets the internal + * {@code writeOccurred} state flag. + * + * @throws IOException if implicit flush failed + */ + void flushIfWriteNotPending() throws IOException { + synchronized (this) { + writeOccurred = false; + if (!writePending && outbound.position() > 0) { + flush(); + } + } + if (isWritePending()) { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } + } + + /** + * Doubles the size of the outbound buffer. + */ + private void doubleSize() { + ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2); + outbound.flip(); + newBuffer.put(outbound); + outbound = newBuffer; + } + + /** + * Returns the maximum number of milliseconds the stream is allowed + * without any read/write operations. + * + * @return number if millis of permissible idle time + */ + protected int maxIdleMillis() { + return maxIdleMillis; + } + + + /** + * Returns true if the given stream has gone stale. + * + * @return true if the stream is stale + */ + boolean isStale() { + return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null; + } + + /** + * Returns the inbound bytes counter. + * + * @return inbound bytes counter + */ + public Counter bytesIn() { + return bytesIn; + } + + /** + * Returns the outbound bytes counter. + * + * @return outbound bytes counter + */ + public Counter bytesOut() { + return bytesOut; + } + + /** + * Returns the inbound messages counter. + * + * @return inbound messages counter + */ + public Counter messagesIn() { + return messagesIn; + } + + /** + * Returns the outbound messages counter. + * + * @return outbound messages counter + */ + public Counter messagesOut() { + return messagesOut; + } + +} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java new file mode 100644 index 00000000..95a9b61e --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java @@ -0,0 +1,175 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.channels.Selector; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.System.currentTimeMillis; + +/** + * Abstraction of an I/O processing loop based on an NIO selector. + */ +public abstract class SelectorLoop implements Runnable { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + /** + * Selector used by this loop to pace the I/O operations. + */ + protected final Selector selector; + + /** + * Selection operations timeout; specified in millis. + */ + protected long selectTimeout; + + /** + * Retains the error that caused the loop to exit prematurely. + */ + private Throwable error; + + // State indicator + private enum State { STARTING, STARTED, STOPPING, STOPPED }; + private volatile State state = State.STOPPED; + + /** + * Creates a new selector loop with the given selection timeout. + * + * @param selectTimeout selection timeout; specified in millis + * @throws IOException if the backing selector cannot be opened + */ + public SelectorLoop(long selectTimeout) throws IOException { + checkArgument(selectTimeout > 0, "Timeout must be positive"); + this.selectTimeout = selectTimeout; + this.selector = openSelector(); + } + + /** + * Opens a new selector for the use by the loop. + * + * @return newly open selector + * @throws IOException if the backing selector cannot be opened + */ + protected Selector openSelector() throws IOException { + return Selector.open(); + } + + /** + * Indicates that the loop is marked to run. + * @return true if the loop is marked to run + */ + protected boolean isRunning() { + return state == State.STARTED || state == State.STARTING; + } + + /** + * Returns the error, if there was one, that caused the loop to terminate + * prematurely. + * + * @return error or null if there was none + */ + public Throwable getError() { + return error; + } + + /** + * Contains the body of the I/O selector loop. + * + * @throws IOException if an error is encountered while selecting I/O + */ + protected abstract void loop() throws IOException; + + @Override + public void run() { + error = null; + state = State.STARTING; + try { + loop(); + } catch (Exception e) { + error = e; + log.error("Loop aborted", e); + } + notifyDone(); + } + + /** + * Notifies observers waiting for loop to become ready. + */ + protected synchronized void notifyReady() { + state = State.STARTED; + notifyAll(); + } + + /** + * Triggers loop shutdown. + */ + public void shutdown() { + // Mark the loop as no longer running and wake up the selector. + state = State.STOPPING; + selector.wakeup(); + } + + /** + * Notifies observers waiting for loop to fully stop. + */ + private synchronized void notifyDone() { + state = State.STOPPED; + notifyAll(); + } + + /** + * Waits for the loop execution to start. + * + * @param timeout number of milliseconds to wait + * @return true if loop started in time + */ + public final synchronized boolean awaitStart(long timeout) { + long max = currentTimeMillis() + timeout; + while (state != State.STARTED && (currentTimeMillis() < max)) { + try { + wait(timeout); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted", e); + } + } + return state == State.STARTED; + } + + /** + * Waits for the loop execution to stop. + * + * @param timeout number of milliseconds to wait + * @return true if loop finished in time + */ + public final synchronized boolean awaitStop(long timeout) { + long max = currentTimeMillis() + timeout; + while (state != State.STOPPED && (currentTimeMillis() < max)) { + try { + wait(timeout); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted", e); + } + } + return state == State.STOPPED; + } + + +} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java new file mode 100644 index 00000000..0d58b568 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Mechanism to transfer messages over network using IO loop and + * message stream, backed by NIO byte buffers. + */ +package org.onlab.nio; diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java new file mode 100644 index 00000000..0d977929 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java @@ -0,0 +1,66 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio.service; + +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.util.List; +import java.util.function.Consumer; + +import org.onlab.nio.IOLoop; +import org.onlab.nio.MessageStream; + +/** + * IOLoop for transporting DefaultMessages. + */ +public class DefaultIOLoop extends IOLoop { + + public static final int SELECT_TIMEOUT_MILLIS = 500; + private static final int MAX_IDLE_TIMEOUT_MILLIS = 1000; + private static final int BUFFER_SIZE = 1024 * 1024; + private final Consumer consumer; + + public DefaultIOLoop(Consumer consumer) throws IOException { + this(SELECT_TIMEOUT_MILLIS, consumer); + } + + public DefaultIOLoop(long timeout, Consumer consumer) throws IOException { + super(timeout); + this.consumer = consumer; + } + + @Override + protected DefaultMessageStream createStream(ByteChannel byteChannel) { + return new DefaultMessageStream(this, byteChannel, BUFFER_SIZE, MAX_IDLE_TIMEOUT_MILLIS); + } + + @Override + protected void processMessages(List messages, MessageStream stream) { + messages.forEach(consumer); + } + + @Override + protected void connect(SelectionKey key) throws IOException { + DefaultMessageStream stream = (DefaultMessageStream) key.attachment(); + try { + super.connect(key); + stream.connected(); + } catch (Exception e) { + stream.connectFailed(e); + } + } +} \ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java new file mode 100644 index 00000000..591d49b3 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java @@ -0,0 +1,104 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio.service; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.onlab.nio.AbstractMessage; +import org.onlab.packet.IpAddress; +import org.onlab.util.ByteArraySizeHashPrinter; +import org.onosproject.store.cluster.messaging.Endpoint; + +import com.google.common.base.Charsets; +import com.google.common.base.MoreObjects; + +/** + * Default message. + */ +public class DefaultMessage extends AbstractMessage { + + private long id; + private Endpoint sender; + private String type; + private byte[] payload; + + /** + * Creates a new message with the specified data. + * + * @param id message id + * @param type message type + * @param sender sender endpoint + * @param payload message payload + */ + DefaultMessage(long id, Endpoint sender, String type, byte[] payload) { + this.id = id; + this.type = checkNotNull(type, "Type cannot be null"); + this.sender = checkNotNull(sender, "Sender cannot be null"); + this.payload = checkNotNull(payload, "Payload cannot be null"); + + byte[] messageTypeBytes = type.getBytes(Charsets.UTF_8); + IpAddress senderIp = sender.host(); + byte[] ipOctets = senderIp.toOctets(); + + length = 25 + ipOctets.length + messageTypeBytes.length + payload.length; + } + + /** + * Returns message id. + * + * @return message id + */ + public long id() { + return id; + } + + /** + * Returns message sender. + * + * @return message sender + */ + public Endpoint sender() { + return sender; + } + + /** + * Returns message type. + * + * @return message type + */ + public String type() { + return type; + } + + /** + * Returns message payload. + * + * @return payload + */ + public byte[] payload() { + return payload; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", id) + .add("type", type) + .add("sender", sender) + .add("payload", ByteArraySizeHashPrinter.of(payload)) + .toString(); + } +} \ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java new file mode 100644 index 00000000..a7dd3c04 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java @@ -0,0 +1,139 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio.service; + +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import org.onlab.nio.IOLoop; +import org.onlab.nio.MessageStream; +import org.onlab.packet.IpAddress; +import org.onlab.packet.IpAddress.Version; +import org.onosproject.store.cluster.messaging.Endpoint; + +import com.google.common.base.Charsets; + +/** + * Default bi-directional message stream for transferring messages to & from the + * network via two byte buffers. + */ +public class DefaultMessageStream extends MessageStream { + + private final CompletableFuture connectFuture = new CompletableFuture<>(); + + public DefaultMessageStream( + IOLoop loop, + ByteChannel byteChannel, + int bufferSize, + int maxIdleMillis) { + super(loop, byteChannel, bufferSize, maxIdleMillis); + } + + public CompletableFuture connectedFuture() { + return connectFuture.thenApply(v -> this); + } + + private final AtomicInteger messageLength = new AtomicInteger(-1); + + @Override + protected DefaultMessage read(ByteBuffer buffer) { + if (messageLength.get() == -1) { + // check if we can read the message length. + if (buffer.remaining() < Integer.BYTES) { + return null; + } else { + messageLength.set(buffer.getInt()); + } + } + + if (buffer.remaining() < messageLength.get()) { + return null; + } + + long id = buffer.getLong(); + Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6; + byte[] octects = new byte[IpAddress.byteLength(ipVersion)]; + buffer.get(octects); + IpAddress senderIp = IpAddress.valueOf(ipVersion, octects); + int senderPort = buffer.getInt(); + int messageTypeByteLength = buffer.getInt(); + byte[] messageTypeBytes = new byte[messageTypeByteLength]; + buffer.get(messageTypeBytes); + String messageType = new String(messageTypeBytes, Charsets.UTF_8); + int payloadLength = buffer.getInt(); + byte[] payloadBytes = new byte[payloadLength]; + buffer.get(payloadBytes); + + // reset for next message + messageLength.set(-1); + + return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes); + } + + @Override + protected void write(DefaultMessage message, ByteBuffer buffer) { + Endpoint sender = message.sender(); + byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8); + IpAddress senderIp = sender.host(); + byte[] ipOctets = senderIp.toOctets(); + byte[] payload = message.payload(); + + int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length; + + buffer.putInt(messageLength); + + buffer.putLong(message.id()); + + if (senderIp.version() == Version.INET) { + buffer.put((byte) 0x0); + } else { + buffer.put((byte) 0x1); + } + buffer.put(ipOctets); + + // write sender port + buffer.putInt(sender.port()); + + // write length of message type + buffer.putInt(messageTypeBytes.length); + + // write message type bytes + buffer.put(messageTypeBytes); + + // write payload length + buffer.putInt(payload.length); + + // write payload. + buffer.put(payload); + } + + /** + * Callback invoked when the stream is successfully connected. + */ + public void connected() { + connectFuture.complete(null); + } + + /** + * Callback invoked when the stream fails to connect. + * @param cause failure cause + */ + public void connectFailed(Throwable cause) { + connectFuture.completeExceptionally(cause); + } +} \ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java new file mode 100644 index 00000000..c195d160 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java @@ -0,0 +1,334 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio.service; + +import static org.onlab.util.Tools.groupedThreads; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.apache.commons.pool.KeyedPoolableObjectFactory; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import org.onlab.nio.AcceptorLoop; +import org.onlab.nio.SelectorLoop; +import org.onosproject.store.cluster.messaging.Endpoint; +import org.onosproject.store.cluster.messaging.MessagingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Lists; + +/** + * MessagingService implementation based on IOLoop. + */ +public class IOLoopMessaging implements MessagingService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY"; + + static final long TIMEOUT = 1000; + + static final boolean SO_NO_DELAY = false; + static final int SO_SEND_BUFFER_SIZE = 128 * 1024; + static final int SO_RCV_BUFFER_SIZE = 128 * 1024; + + private static final int NUM_WORKERS = 8; + + private AcceptorLoop acceptorLoop; + private final ExecutorService acceptorThreadPool = + Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor")); + private final ExecutorService ioThreadPool = + Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d")); + + private final List ioLoops = Lists.newArrayList(); + + private int lastWorker = -1; + + private final AtomicBoolean started = new AtomicBoolean(false); + private Endpoint localEp; + + private GenericKeyedObjectPool streams = + new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory()); + + private final ConcurrentMap> handlers = new ConcurrentHashMap<>(); + private final AtomicLong messageIdGenerator = new AtomicLong(0); + private final Cache> responseFutures = CacheBuilder.newBuilder() + .removalListener(new RemovalListener>() { + @Override + public void onRemoval(RemovalNotification> entry) { + if (entry.wasEvicted()) { + entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply")); + } + } + }) + .build(); + + /** + * Activates IO Loops. + * + * @param localEp local end-point + * @throws IOException is activation fails + */ + public void start(Endpoint localEp) throws IOException { + if (started.get()) { + log.warn("IOMessaging is already running at {}", localEp); + return; + } + this.localEp = localEp; + streams.setLifo(false); + this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port())); + + for (int i = 0; i < NUM_WORKERS; i++) { + ioLoops.add(new DefaultIOLoop(this::dispatchLocally)); + } + + ioLoops.forEach(ioThreadPool::execute); + acceptorThreadPool.execute(acceptorLoop); + ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT)); + acceptorLoop.awaitStart(TIMEOUT); + started.set(true); + } + + /** + * Shuts down IO loops. + */ + public void stop() { + if (started.get()) { + ioLoops.forEach(SelectorLoop::shutdown); + acceptorLoop.shutdown(); + ioThreadPool.shutdown(); + acceptorThreadPool.shutdown(); + started.set(false); + } + } + + + @Override + public CompletableFuture sendAsync(Endpoint ep, String type, byte[] payload) { + DefaultMessage message = new DefaultMessage( + messageIdGenerator.incrementAndGet(), + localEp, + type, + payload); + return sendAsync(ep, message); + } + + protected CompletableFuture sendAsync(Endpoint ep, DefaultMessage message) { + CompletableFuture future = new CompletableFuture<>(); + if (ep.equals(localEp)) { + dispatchLocally(message); + future.complete(null); + return future; + } + + DefaultMessageStream stream = null; + try { + stream = streams.borrowObject(ep); + stream.write(message); + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } finally { + try { + streams.returnObject(ep, stream); + } catch (Exception e) { + log.warn("Failed to return stream to pool"); + } + } + return future; + } + + @Override + public CompletableFuture sendAndReceive( + Endpoint ep, + String type, + byte[] payload) { + CompletableFuture response = new CompletableFuture<>(); + Long messageId = messageIdGenerator.incrementAndGet(); + responseFutures.put(messageId, response); + DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload); + try { + sendAsync(ep, message); + } catch (Exception e) { + responseFutures.invalidate(messageId); + response.completeExceptionally(e); + } + return response; + } + + @Override + public void registerHandler(String type, Consumer handler, Executor executor) { + handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload()))); + } + + @Override + public void registerHandler(String type, Function handler, Executor executor) { + handlers.put(type, message -> executor.execute(() -> { + byte[] responsePayload = handler.apply(message.payload()); + if (responsePayload != null) { + DefaultMessage response = new DefaultMessage(message.id(), + localEp, + REPLY_MESSAGE_TYPE, + responsePayload); + sendAsync(message.sender(), response).whenComplete((result, error) -> { + log.debug("Failed to respond", error); + }); + } + })); + } + + @Override + public void registerHandler(String type, Function> handler) { + handlers.put(type, message -> { + handler.apply(message.payload()).whenComplete((result, error) -> { + if (error == null) { + DefaultMessage response = new DefaultMessage(message.id(), + localEp, + REPLY_MESSAGE_TYPE, + result); + sendAsync(message.sender(), response).whenComplete((r, e) -> { + if (e != null) { + log.debug("Failed to respond", e); + } + }); + } + }); + }); + } + + @Override + public void unregisterHandler(String type) { + handlers.remove(type); + } + + protected void dispatchLocally(DefaultMessage message) { + String type = message.type(); + if (REPLY_MESSAGE_TYPE.equals(type)) { + try { + CompletableFuture futureResponse = + responseFutures.getIfPresent(message.id()); + if (futureResponse != null) { + futureResponse.complete(message.payload()); + } else { + log.warn("Received a reply for message id:[{}]. " + + " from {}. But was unable to locate the" + + " request handle", message.id(), message.sender()); + } + } finally { + responseFutures.invalidate(message.id()); + } + return; + } + Consumer handler = handlers.get(type); + if (handler != null) { + handler.accept(message); + } else { + log.debug("No handler registered for {}", type); + } + } + + // Get the next worker to which a client should be assigned + private synchronized DefaultIOLoop nextWorker() { + lastWorker = (lastWorker + 1) % NUM_WORKERS; + return ioLoops.get(lastWorker); + } + + /** + * Initiates open connection request and registers the pending socket + * channel with the given IO loop. + * + * @param loop loop with which the channel should be registered + * @throws java.io.IOException if the socket could not be open or connected + */ + private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException { + SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port()); + SocketChannel ch = SocketChannel.open(); + ch.configureBlocking(false); + DefaultMessageStream stream = loop.connectStream(ch); + ch.connect(sa); + return stream; + } + + // Loop for accepting client connections + private class DefaultAcceptorLoop extends AcceptorLoop { + + public DefaultAcceptorLoop(SocketAddress address) throws IOException { + super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address); + } + + @Override + protected void acceptConnection(ServerSocketChannel channel) throws IOException { + SocketChannel sc = channel.accept(); + sc.configureBlocking(false); + + Socket so = sc.socket(); + so.setTcpNoDelay(SO_NO_DELAY); + so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); + so.setSendBufferSize(SO_SEND_BUFFER_SIZE); + + nextWorker().acceptStream(sc); + } + } + + private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory { + + @Override + public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception { + } + + @Override + public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception { + stream.close(); + } + + @Override + public DefaultMessageStream makeObject(Endpoint ep) throws Exception { + DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get(); + log.info("Established a new connection to {}", ep); + return stream; + } + + @Override + public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception { + } + + @Override + public boolean validateObject(Endpoint ep, DefaultMessageStream stream) { + return stream.isClosed(); + } + } +} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java new file mode 100644 index 00000000..399a2b95 --- /dev/null +++ b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Assembly for sending and receiving messages using the I/O loop mechanism. + */ +package org.onlab.nio.service; \ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java new file mode 100644 index 00000000..2b4a85cd --- /dev/null +++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import org.junit.Before; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.junit.Assert.fail; +import static org.onlab.util.Tools.namedThreads; + +/** + * Base class for various NIO loop unit tests. + */ +public abstract class AbstractLoopTest { + + protected static final long MAX_MS_WAIT = 1500; + + /** Block on specified countdown latch. Return when countdown reaches + * zero, or fail the test if the {@value #MAX_MS_WAIT} ms timeout expires. + * + * @param latch the latch + * @param label an identifying label + */ + protected void waitForLatch(CountDownLatch latch, String label) { + try { + boolean ok = latch.await(MAX_MS_WAIT, TimeUnit.MILLISECONDS); + if (!ok) { + fail("Latch await timeout! [" + label + "]"); + } + } catch (InterruptedException e) { + System.out.println("Latch interrupt [" + label + "] : " + e); + fail("Unexpected interrupt"); + } + } + + protected ExecutorService exec; + + @Before + public void setUp() { + exec = newSingleThreadExecutor(namedThreads("test")); + } + +} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java new file mode 100644 index 00000000..f04a0126 --- /dev/null +++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.onlab.junit.TestTools.delay; + +/** + * Unit tests for AcceptLoop. + */ +public class AcceptorLoopTest extends AbstractLoopTest { + + private static final int PICK_EPHEMERAL = 0; + + private static final SocketAddress SOCK_ADDR = new InetSocketAddress("127.0.0.1", PICK_EPHEMERAL); + + private static class MyAcceptLoop extends AcceptorLoop { + private final CountDownLatch loopStarted = new CountDownLatch(1); + private final CountDownLatch loopFinished = new CountDownLatch(1); + private final CountDownLatch runDone = new CountDownLatch(1); + private final CountDownLatch ceaseLatch = new CountDownLatch(1); + + private int acceptCount = 0; + + MyAcceptLoop() throws IOException { + super(500, SOCK_ADDR); + } + + @Override + protected void acceptConnection(ServerSocketChannel ssc) throws IOException { + acceptCount++; + } + + @Override + public void loop() throws IOException { + loopStarted.countDown(); + super.loop(); + loopFinished.countDown(); + } + + @Override + public void run() { + super.run(); + runDone.countDown(); + } + + @Override + public void shutdown() { + super.shutdown(); + ceaseLatch.countDown(); + } + } + + @Test + public void basic() throws IOException { + MyAcceptLoop myAccLoop = new MyAcceptLoop(); + AcceptorLoop accLoop = myAccLoop; + exec.execute(accLoop); + waitForLatch(myAccLoop.loopStarted, "loopStarted"); + delay(200); // take a quick nap + accLoop.shutdown(); + waitForLatch(myAccLoop.loopFinished, "loopFinished"); + waitForLatch(myAccLoop.runDone, "runDone"); + assertEquals(0, myAccLoop.acceptCount); + } +} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java new file mode 100644 index 00000000..f42c8dc5 --- /dev/null +++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java @@ -0,0 +1,82 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.net.InetAddress; +import java.util.Random; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.onlab.junit.TestTools.delay; + +/** + * Integration test for the select, accept and IO loops. + */ +public class IOLoopIntegrationTest { + + private static final int THREADS = 6; + private static final int TIMEOUT = 60; + private static final int MESSAGE_LENGTH = 128; + + private static final int MILLION = 1000000; + private static final int MSG_COUNT = 40 * MILLION; + + @Before + public void warmUp() throws Exception { + Logger.getLogger("").setLevel(Level.SEVERE); + try { + runTest(MILLION, MESSAGE_LENGTH, 15); + } catch (Throwable e) { + System.err.println("Failed warmup but moving on."); + e.printStackTrace(); + } + } + + // TODO: this test can not pass in some environments, need to be improved + @Ignore + @Test + public void basic() throws Exception { + runTest(MILLION, MESSAGE_LENGTH, TIMEOUT); + } + + public void longHaul() throws Exception { + runTest(MSG_COUNT, MESSAGE_LENGTH, TIMEOUT); + } + + private void runTest(int count, int size, int timeout) throws Exception { + // Use a random port to prevent conflicts. + int port = IOLoopTestServer.PORT + new Random().nextInt(100); + + InetAddress ip = InetAddress.getLoopbackAddress(); + IOLoopTestServer server = new IOLoopTestServer(ip, THREADS, size, port); + IOLoopTestClient client = new IOLoopTestClient(ip, THREADS, count, size, port); + + server.start(); + client.start(); + delay(100); // Pause to allow loops to get going + + client.await(timeout); + client.report(); + + server.stop(); + server.report(); + } + +} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java new file mode 100644 index 00000000..acc9a08f --- /dev/null +++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java @@ -0,0 +1,324 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import com.google.common.collect.Lists; +import org.onlab.util.Counter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.lang.String.format; +import static java.lang.System.nanoTime; +import static java.lang.System.out; +import static org.onlab.nio.IOLoopTestServer.PORT; +import static org.onlab.util.Tools.delay; +import static org.onlab.util.Tools.namedThreads; + +/** + * Auxiliary test fixture to measure speed of NIO-based channels. + */ +public class IOLoopTestClient { + + private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class); + + private final InetAddress ip; + private final int port; + private final int msgCount; + private final int msgLength; + + private final List iloops = new ArrayList<>(); + private final ExecutorService ipool; + private final ExecutorService wpool; + + Counter messages; + Counter bytes; + long latencyTotal = 0; + long latencyCount = 0; + + + /** + * Main entry point to launch the client. + * + * @param args command-line arguments + * @throws java.io.IOException if unable to connect to server + * @throws InterruptedException if latch wait gets interrupted + * @throws java.util.concurrent.ExecutionException if wait gets interrupted + * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion + */ + public static void main(String[] args) + throws IOException, InterruptedException, ExecutionException, TimeoutException { + startStandalone(args); + + System.exit(0); + } + + /** + * Starts a standalone IO loop test client. + * + * @param args command-line arguments + */ + public static void startStandalone(String[] args) + throws IOException, InterruptedException, ExecutionException, TimeoutException { + InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1"); + int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6; + int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000; + int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128; + int to = args.length > 4 ? Integer.parseInt(args[4]) : 60; + + log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ", + wc, mc, ml, ip); + IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT); + + client.start(); + delay(500); + + client.await(to); + client.report(); + } + + /** + * Creates a speed client. + * + * @param ip ip address of server + * @param wc worker count + * @param mc message count to send per client + * @param ml message length in bytes + * @param port socket port + * @throws java.io.IOException if unable to create IO loops + */ + public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException { + this.ip = ip; + this.port = port; + this.msgCount = mc; + this.msgLength = ml; + this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker")); + this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop")); + + for (int i = 0; i < wc; i++) { + iloops.add(new CustomIOLoop()); + } + } + + /** + * Starts the client workers. + * + * @throws java.io.IOException if unable to open connection + */ + public void start() throws IOException { + messages = new Counter(); + bytes = new Counter(); + + // First start up all the IO loops + for (CustomIOLoop l : iloops) { + ipool.execute(l); + } + + // Wait for all of them to get going + for (CustomIOLoop l : iloops) { + l.awaitStart(1000); + } + + // ... and Next open all connections; one-per-loop + for (CustomIOLoop l : iloops) { + openConnection(l); + } + } + + + /** + * Initiates open connection request and registers the pending socket + * channel with the given IO loop. + * + * @param loop loop with which the channel should be registered + * @throws java.io.IOException if the socket could not be open or connected + */ + private void openConnection(CustomIOLoop loop) throws IOException { + SocketAddress sa = new InetSocketAddress(ip, port); + SocketChannel ch = SocketChannel.open(); + ch.configureBlocking(false); + loop.connectStream(ch); + ch.connect(sa); + } + + + /** + * Waits for the client workers to complete. + * + * @param secs timeout in seconds + * @throws java.util.concurrent.ExecutionException if execution failed + * @throws InterruptedException if interrupt occurred while waiting + * @throws java.util.concurrent.TimeoutException if timeout occurred + */ + public void await(int secs) throws InterruptedException, + ExecutionException, TimeoutException { + for (CustomIOLoop l : iloops) { + if (l.worker.task != null) { + l.worker.task.get(secs, TimeUnit.SECONDS); + latencyTotal += l.latencyTotal; + latencyCount += l.latencyCount; + } + } + messages.freeze(); + bytes.freeze(); + } + + /** + * Reports on the accumulated throughput and latency. + */ + public void report() { + DecimalFormat f = new DecimalFormat("#,##0"); + out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency", + f.format(messages.total()), f.format(bytes.total()), + f.format(messages.throughput()), + f.format(bytes.throughput() / (1024 * msgLength)), + f.format(latencyTotal / latencyCount))); + } + + + // Loop for transfer of fixed-length messages + private class CustomIOLoop extends IOLoop { + + Worker worker = new Worker(); + long latencyTotal = 0; + long latencyCount = 0; + + + public CustomIOLoop() throws IOException { + super(500); + } + + + @Override + protected TestMessageStream createStream(ByteChannel channel) { + return new TestMessageStream(msgLength, channel, this); + } + + @Override + protected synchronized void removeStream(MessageStream stream) { + super.removeStream(stream); + messages.add(stream.messagesIn().total()); + bytes.add(stream.bytesIn().total()); + stream.messagesOut().reset(); + stream.bytesOut().reset(); + } + + @Override + protected void processMessages(List messages, + MessageStream stream) { + for (TestMessage message : messages) { + // TODO: summarize latency data better + latencyTotal += nanoTime() - message.requestorTime(); + latencyCount++; + } + worker.release(messages.size()); + } + + @Override + protected void connect(SelectionKey key) throws IOException { + super.connect(key); + TestMessageStream b = (TestMessageStream) key.attachment(); + Worker w = ((CustomIOLoop) b.loop()).worker; + w.pump(b); + } + + } + + /** + * Auxiliary worker to connect and pump batched messages using blocking I/O. + */ + private class Worker implements Runnable { + + private static final int BATCH_SIZE = 50; + private static final int PERMITS = 2 * BATCH_SIZE; + + private TestMessageStream stream; + private FutureTask task; + + // Stuff to throttle pump + private final Semaphore semaphore = new Semaphore(PERMITS); + private int msgWritten; + + void pump(TestMessageStream stream) { + this.stream = stream; + task = new FutureTask<>(this, this); + wpool.execute(task); + } + + @Override + public void run() { + try { + log.info("Worker started..."); + + while (msgWritten < msgCount) { + int size = Math.min(BATCH_SIZE, msgCount - msgWritten); + writeBatch(size); + msgWritten += size; + } + + // Now try to get all the permits back before sending poison pill + semaphore.acquireUninterruptibly(PERMITS); + stream.close(); + + log.info("Worker done..."); + + } catch (IOException e) { + log.error("Worker unable to perform I/O", e); + } + } + + + private void writeBatch(int size) throws IOException { + // Build a batch of messages + List batch = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding())); + } + acquire(size); + stream.write(batch); + } + + + // Release permits based on the specified number of message credits + private void release(int permits) { + semaphore.release(permits); + } + + // Acquire permit for a single batch + private void acquire(int permits) { + semaphore.acquireUninterruptibly(permits); + } + + } + +} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java new file mode 100644 index 00000000..d5ce5f39 --- /dev/null +++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java @@ -0,0 +1,256 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import com.google.common.collect.Lists; +import org.onlab.util.Counter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.ByteChannel; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static java.lang.String.format; +import static java.lang.System.out; +import static org.onlab.util.Tools.delay; +import static org.onlab.util.Tools.namedThreads; + +/** + * Auxiliary test fixture to measure speed of NIO-based channels. + */ +public class IOLoopTestServer { + + private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class); + + private static final int PRUNE_FREQUENCY = 1000; + + static final int PORT = 9876; + static final long TIMEOUT = 1000; + + static final boolean SO_NO_DELAY = false; + static final int SO_SEND_BUFFER_SIZE = 128 * 1024; + static final int SO_RCV_BUFFER_SIZE = 128 * 1024; + + static final DecimalFormat FORMAT = new DecimalFormat("#,##0"); + + private final AcceptorLoop aloop; + private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept")); + + private final List iloops = new ArrayList<>(); + private final ExecutorService ipool; + + private final int workerCount; + private final int msgLength; + private int lastWorker = -1; + + Counter messages; + Counter bytes; + + /** + * Main entry point to launch the server. + * + * @param args command-line arguments + * @throws java.io.IOException if unable to crate IO loops + */ + public static void main(String[] args) throws IOException { + startStandalone(args); + System.exit(0); + } + + /** + * Starts a standalone IO loop test server. + * + * @param args command-line arguments + */ + public static void startStandalone(String[] args) throws IOException { + InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1"); + int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6; + int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128; + + log.info("Setting up the server with {} workers, {} byte messages on {}... ", + wc, ml, ip); + IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT); + server.start(); + + // Start pruning clients and keep going until their number goes to 0. + int remaining = -1; + while (remaining == -1 || remaining > 0) { + delay(PRUNE_FREQUENCY); + int r = server.prune(); + remaining = remaining == -1 && r == 0 ? remaining : r; + } + server.stop(); + } + + /** + * Creates a speed server. + * + * @param ip optional ip of the adapter where to bind + * @param wc worker count + * @param ml message length in bytes + * @param port listen port + * @throws java.io.IOException if unable to create IO loops + */ + public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException { + this.workerCount = wc; + this.msgLength = ml; + this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop")); + + this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port)); + for (int i = 0; i < workerCount; i++) { + iloops.add(new CustomIOLoop()); + } + } + + /** + * Start the server IO loops and kicks off throughput tracking. + */ + public void start() { + messages = new Counter(); + bytes = new Counter(); + + for (CustomIOLoop l : iloops) { + ipool.execute(l); + } + apool.execute(aloop); + + for (CustomIOLoop l : iloops) { + l.awaitStart(TIMEOUT); + } + aloop.awaitStart(TIMEOUT); + } + + /** + * Stop the server IO loops and freezes throughput tracking. + */ + public void stop() { + aloop.shutdown(); + for (CustomIOLoop l : iloops) { + l.shutdown(); + } + + for (CustomIOLoop l : iloops) { + l.awaitStop(TIMEOUT); + } + aloop.awaitStop(TIMEOUT); + + messages.freeze(); + bytes.freeze(); + } + + /** + * Reports on the accumulated throughput and latency. + */ + public void report() { + DecimalFormat f = new DecimalFormat("#,##0"); + out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs", + f.format(messages.total()), f.format(bytes.total()), + f.format(messages.throughput()), + f.format(bytes.throughput() / (1024 * msgLength)))); + } + + /** + * Prunes the IO loops of stale message buffers. + * + * @return number of remaining IO loops among all workers. + */ + public int prune() { + int count = 0; + for (CustomIOLoop l : iloops) { + count += l.pruneStaleStreams(); + } + return count; + } + + // Get the next worker to which a client should be assigned + private synchronized CustomIOLoop nextWorker() { + lastWorker = (lastWorker + 1) % workerCount; + return iloops.get(lastWorker); + } + + // Loop for transfer of fixed-length messages + private class CustomIOLoop extends IOLoop { + + public CustomIOLoop() throws IOException { + super(500); + } + + @Override + protected TestMessageStream createStream(ByteChannel channel) { + return new TestMessageStream(msgLength, channel, this); + } + + @Override + protected void removeStream(MessageStream stream) { + super.removeStream(stream); + messages.add(stream.messagesIn().total()); + bytes.add(stream.bytesIn().total()); + } + + @Override + protected void processMessages(List messages, + MessageStream stream) { + try { + stream.write(createResponses(messages)); + } catch (IOException e) { + log.error("Unable to echo messages", e); + } + } + + private List createResponses(List messages) { + List responses = Lists.newArrayListWithCapacity(messages.size()); + for (TestMessage message : messages) { + responses.add(new TestMessage(message.length(), message.requestorTime(), + System.nanoTime(), message.padding())); + } + return responses; + } + } + + // Loop for accepting client connections + private class CustomAcceptLoop extends AcceptorLoop { + + public CustomAcceptLoop(SocketAddress address) throws IOException { + super(500, address); + } + + @Override + protected void acceptConnection(ServerSocketChannel channel) throws IOException { + SocketChannel sc = channel.accept(); + sc.configureBlocking(false); + + Socket so = sc.socket(); + so.setTcpNoDelay(SO_NO_DELAY); + so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); + so.setSendBufferSize(SO_SEND_BUFFER_SIZE); + + nextWorker().acceptStream(sc); + log.info("Connected client"); + } + } + +} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java new file mode 100644 index 00000000..9965d389 --- /dev/null +++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java @@ -0,0 +1,359 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Tests of the message message stream implementation. + */ +public class MessageStreamTest { + + private static final int SIZE = 64; + private static final int BIG_SIZE = 32 * 1024; + + private TestMessage message; + + private TestIOLoop loop; + private TestByteChannel channel; + private TestMessageStream stream; + private TestKey key; + + @Before + public void setUp() throws IOException { + loop = new TestIOLoop(); + channel = new TestByteChannel(); + key = new TestKey(channel); + stream = loop.createStream(channel); + stream.setKey(key); + stream.setNonStrict(); + message = new TestMessage(SIZE, 0, 0, stream.padding()); + } + + @After + public void tearDown() { + loop.shutdown(); + stream.close(); + } + + // Validates the state of the message stream + private void validate(boolean wp, boolean fr, int read, int written) { + assertEquals(wp, stream.isWritePending()); + assertEquals(fr, stream.isFlushRequired()); + assertEquals(read, channel.readBytes); + assertEquals(written, channel.writtenBytes); + } + + @Test + public void endOfStream() throws IOException { + channel.close(); + List messages = stream.read(); + assertNull(messages); + } + + @Test + public void bufferGrowth() throws IOException { + // Create a stream for big messages and test the growth. + stream = new TestMessageStream(BIG_SIZE, channel, loop); + TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding()); + + stream.write(bigMessage); + stream.write(bigMessage); + stream.write(bigMessage); + stream.write(bigMessage); + stream.write(bigMessage); + } + + @Test + public void discardBeforeKey() { + // Create a stream that does not yet have the key set and discard it. + stream = loop.createStream(channel); + assertNull(stream.key()); + stream.close(); + // There is not key, so nothing to check; we just expect no problem. + } + + @Test + public void bufferedRead() throws IOException { + channel.bytesToRead = SIZE + 4; + List messages = stream.read(); + assertEquals(1, messages.size()); + validate(false, false, SIZE + 4, 0); + + channel.bytesToRead = SIZE - 4; + messages = stream.read(); + assertEquals(1, messages.size()); + validate(false, false, SIZE * 2, 0); + } + + @Test + public void bufferedWrite() throws IOException { + validate(false, false, 0, 0); + + // First write is immediate... + stream.write(message); + validate(false, false, 0, SIZE); + + // Second and third get buffered... + stream.write(message); + validate(false, true, 0, SIZE); + stream.write(message); + validate(false, true, 0, SIZE); + + // Reset write, which will flush if needed; the next write is again buffered + stream.flushIfWriteNotPending(); + validate(false, false, 0, SIZE * 3); + stream.write(message); + validate(false, true, 0, SIZE * 3); + + // Select reset, which will flush if needed; the next write is again buffered + stream.flushIfPossible(); + validate(false, false, 0, SIZE * 4); + stream.write(message); + validate(false, true, 0, SIZE * 4); + stream.flush(); + validate(false, true, 0, SIZE * 4); + } + + @Test + public void bufferedWriteList() throws IOException { + validate(false, false, 0, 0); + + // First write is immediate... + List messages = new ArrayList<>(); + messages.add(message); + messages.add(message); + messages.add(message); + messages.add(message); + + stream.write(messages); + validate(false, false, 0, SIZE * 4); + + stream.write(messages); + validate(false, true, 0, SIZE * 4); + + stream.flushIfPossible(); + validate(false, false, 0, SIZE * 8); + } + + @Test + public void bufferedPartialWrite() throws IOException { + validate(false, false, 0, 0); + + // First write is immediate... + stream.write(message); + validate(false, false, 0, SIZE); + + // Tell test channel to accept only half. + channel.bytesToWrite = SIZE / 2; + + // Second and third get buffered... + stream.write(message); + validate(false, true, 0, SIZE); + stream.flushIfPossible(); + validate(true, true, 0, SIZE + SIZE / 2); + } + + @Test + public void bufferedPartialWrite2() throws IOException { + validate(false, false, 0, 0); + + // First write is immediate... + stream.write(message); + validate(false, false, 0, SIZE); + + // Tell test channel to accept only half. + channel.bytesToWrite = SIZE / 2; + + // Second and third get buffered... + stream.write(message); + validate(false, true, 0, SIZE); + stream.flushIfWriteNotPending(); + validate(true, true, 0, SIZE + SIZE / 2); + } + + @Test + public void bufferedReadWrite() throws IOException { + channel.bytesToRead = SIZE + 4; + List messages = stream.read(); + assertEquals(1, messages.size()); + validate(false, false, SIZE + 4, 0); + + stream.write(message); + validate(false, false, SIZE + 4, SIZE); + + channel.bytesToRead = SIZE - 4; + messages = stream.read(); + assertEquals(1, messages.size()); + validate(false, false, SIZE * 2, SIZE); + } + + // Fake IO driver loop + private static class TestIOLoop extends IOLoop { + + public TestIOLoop() throws IOException { + super(500); + } + + @Override + protected TestMessageStream createStream(ByteChannel channel) { + return new TestMessageStream(SIZE, channel, this); + } + + @Override + protected void processMessages(List messages, + MessageStream stream) { + } + + } + + // Byte channel test fixture + private static class TestByteChannel extends SelectableChannel implements ByteChannel { + + private static final int BUFFER_LENGTH = 1024; + byte[] bytes = new byte[BUFFER_LENGTH]; + int bytesToWrite = BUFFER_LENGTH; + int bytesToRead = BUFFER_LENGTH; + int writtenBytes = 0; + int readBytes = 0; + + @Override + public int read(ByteBuffer dst) throws IOException { + int l = Math.min(dst.remaining(), bytesToRead); + if (bytesToRead > 0) { + readBytes += l; + dst.put(bytes, 0, l); + } + return l; + } + + @Override + public int write(ByteBuffer src) throws IOException { + int l = Math.min(src.remaining(), bytesToWrite); + writtenBytes += l; + src.get(bytes, 0, l); + return l; + } + + @Override + public Object blockingLock() { + return null; + } + + @Override + public SelectableChannel configureBlocking(boolean arg0) throws IOException { + return null; + } + + @Override + public boolean isBlocking() { + return false; + } + + @Override + public boolean isRegistered() { + return false; + } + + @Override + public SelectionKey keyFor(Selector arg0) { + return null; + } + + @Override + public SelectorProvider provider() { + return null; + } + + @Override + public SelectionKey register(Selector arg0, int arg1, Object arg2) + throws ClosedChannelException { + return null; + } + + @Override + public int validOps() { + return 0; + } + + @Override + protected void implCloseChannel() throws IOException { + bytesToRead = -1; + } + + } + + // Selection key text fixture + private static class TestKey extends SelectionKey { + + private SelectableChannel channel; + + public TestKey(TestByteChannel channel) { + this.channel = channel; + } + + @Override + public void cancel() { + } + + @Override + public SelectableChannel channel() { + return channel; + } + + @Override + public int interestOps() { + return 0; + } + + @Override + public SelectionKey interestOps(int ops) { + return null; + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public int readyOps() { + return 0; + } + + @Override + public Selector selector() { + return null; + } + } + +} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java new file mode 100644 index 00000000..cf5b2bb9 --- /dev/null +++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java @@ -0,0 +1,85 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.AbstractSelectableChannel; +import java.nio.channels.spi.AbstractSelector; +import java.util.Set; + +/** + * A selector instrumented for unit tests. + */ +public class MockSelector extends AbstractSelector { + + int wakeUpCount = 0; + + /** + * Creates a mock selector, specifying null as the SelectorProvider. + */ + public MockSelector() { + super(null); + } + + @Override + public String toString() { + return "{MockSelector: wake=" + wakeUpCount + "}"; + } + + @Override + protected void implCloseSelector() throws IOException { + } + + @Override + protected SelectionKey register(AbstractSelectableChannel ch, int ops, + Object att) { + return null; + } + + @Override + public Set keys() { + return null; + } + + @Override + public Set selectedKeys() { + return null; + } + + @Override + public int selectNow() throws IOException { + return 0; + } + + @Override + public int select(long timeout) throws IOException { + return 0; + } + + @Override + public int select() throws IOException { + return 0; + } + + @Override + public Selector wakeup() { + wakeUpCount++; + return null; + } + +} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java new file mode 100644 index 00000000..825479b5 --- /dev/null +++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java @@ -0,0 +1,56 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Test message for measuring rate and round-trip latency. + */ +public class TestMessage extends AbstractMessage { + + private final byte[] padding; + + private final long requestorTime; + private final long responderTime; + + /** + * Creates a new message with the specified data. + * + * @param requestorTime requester time + * @param responderTime responder time + * @param padding message padding + */ + TestMessage(int length, long requestorTime, long responderTime, byte[] padding) { + this.length = length; + this.requestorTime = requestorTime; + this.responderTime = responderTime; + this.padding = checkNotNull(padding, "Padding cannot be null"); + } + + public long requestorTime() { + return requestorTime; + } + + public long responderTime() { + return responderTime; + } + + public byte[] padding() { + return padding; + } + +} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java new file mode 100644 index 00000000..fc86beb9 --- /dev/null +++ b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java @@ -0,0 +1,89 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.nio; + +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +/** + * Fixed-length message transfer buffer. + */ +public class TestMessageStream extends MessageStream { + + private static final String E_WRONG_LEN = "Illegal message length: "; + private static final long START_TAG = 0xfeedcafedeaddeedL; + private static final long END_TAG = 0xbeadcafedeaddeedL; + private static final int META_LENGTH = 40; + + private final int length; + private boolean isStrict = true; + + public TestMessageStream(int length, ByteChannel ch, IOLoop loop) { + super(loop, ch, 64 * 1024, 500); + checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40"); + this.length = length; + } + + void setNonStrict() { + isStrict = false; + } + + @Override + protected TestMessage read(ByteBuffer rb) { + if (rb.remaining() < length) { + return null; + } + + long startTag = rb.getLong(); + if (isStrict) { + checkState(startTag == START_TAG, "Incorrect message start"); + } + + long size = rb.getLong(); + long requestorTime = rb.getLong(); + long responderTime = rb.getLong(); + byte[] padding = padding(); + rb.get(padding); + + long endTag = rb.getLong(); + if (isStrict) { + checkState(endTag == END_TAG, "Incorrect message end"); + } + + return new TestMessage((int) size, requestorTime, responderTime, padding); + } + + @Override + protected void write(TestMessage message, ByteBuffer wb) { + if (message.length() != length) { + throw new IllegalArgumentException(E_WRONG_LEN + message.length()); + } + + wb.putLong(START_TAG); + wb.putLong(message.length()); + wb.putLong(message.requestorTime()); + wb.putLong(message.responderTime()); + wb.put(message.padding(), 0, length - META_LENGTH); + wb.putLong(END_TAG); + } + + public byte[] padding() { + return new byte[length - META_LENGTH]; + } +} -- cgit 1.2.3-korg