From b731e2f1dd0972409b136aebc7b463dd72c9cfad Mon Sep 17 00:00:00 2001 From: CNlucius Date: Tue, 13 Sep 2016 11:40:12 +0800 Subject: ONOSFW-171 O/S-SFC-ONOS scenario documentation Change-Id: I51ae1cf736ea24ab6680f8edca1b2bf5dd598365 Signed-off-by: CNlucius --- framework/src/onos/utils/nio/pom.xml | 58 --- .../main/java/org/onlab/nio/AbstractMessage.java | 30 -- .../src/main/java/org/onlab/nio/AcceptorLoop.java | 123 ------ .../nio/src/main/java/org/onlab/nio/IOLoop.java | 302 --------------- .../nio/src/main/java/org/onlab/nio/Message.java | 30 -- .../src/main/java/org/onlab/nio/MessageStream.java | 424 --------------------- .../src/main/java/org/onlab/nio/SelectorLoop.java | 175 --------- .../src/main/java/org/onlab/nio/package-info.java | 21 - .../java/org/onlab/nio/service/DefaultIOLoop.java | 66 ---- .../java/org/onlab/nio/service/DefaultMessage.java | 104 ----- .../onlab/nio/service/DefaultMessageStream.java | 139 ------- .../org/onlab/nio/service/IOLoopMessaging.java | 334 ---------------- .../java/org/onlab/nio/service/package-info.java | 20 - .../test/java/org/onlab/nio/AbstractLoopTest.java | 60 --- .../test/java/org/onlab/nio/AcceptorLoopTest.java | 87 ----- .../java/org/onlab/nio/IOLoopIntegrationTest.java | 82 ---- .../test/java/org/onlab/nio/IOLoopTestClient.java | 324 ---------------- .../test/java/org/onlab/nio/IOLoopTestServer.java | 256 ------------- .../test/java/org/onlab/nio/MessageStreamTest.java | 359 ----------------- .../src/test/java/org/onlab/nio/MockSelector.java | 85 ----- .../src/test/java/org/onlab/nio/TestMessage.java | 56 --- .../test/java/org/onlab/nio/TestMessageStream.java | 89 ----- 22 files changed, 3224 deletions(-) delete mode 100644 framework/src/onos/utils/nio/pom.xml delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java delete mode 100644 framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java delete mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java delete mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java delete mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java delete mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java delete mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java delete mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java delete mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java delete mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java delete mode 100644 framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java (limited to 'framework/src/onos/utils/nio') diff --git a/framework/src/onos/utils/nio/pom.xml b/framework/src/onos/utils/nio/pom.xml deleted file mode 100644 index b92c9b7b..00000000 --- a/framework/src/onos/utils/nio/pom.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - 4.0.0 - - - org.onosproject - onlab-utils - 1.4.0-rc1 - ../pom.xml - - - onlab-nio - bundle - - Fast network I/O using Java NIO - - - - com.google.guava - guava-testlib - test - - - commons-pool - commons-pool - - - org.onosproject - onos-api - - - org.onosproject - onlab-misc - - - org.onosproject - onlab-junit - test - - - diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java deleted file mode 100644 index 16bb9359..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -/** - * Base {@link Message} implementation. - */ -public abstract class AbstractMessage implements Message { - - protected int length; - - @Override - public int length() { - return length; - } - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java deleted file mode 100644 index e416f3be..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import java.io.IOException; -import java.net.SocketAddress; -import java.net.StandardSocketOptions; -import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; -import java.util.Iterator; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Selector loop derivative tailored to acceptConnection inbound connections. - */ -public abstract class AcceptorLoop extends SelectorLoop { - - private SocketAddress listenAddress; - private ServerSocketChannel socketChannel; - - /** - * Creates an acceptor loop with the specified selection timeout and - * accepting connections on the the given address. - * - * @param selectTimeout selection timeout; specified in millis - * @param listenAddress socket address where to listen for connections - * @throws IOException if the backing selector cannot be opened - */ - public AcceptorLoop(long selectTimeout, SocketAddress listenAddress) - throws IOException { - super(selectTimeout); - this.listenAddress = checkNotNull(listenAddress, "Address cannot be null"); - } - - /** - * Hook to accept an inbound connection on the specified socket channel. - * - * @param channel socketChannel where an accept operation awaits - * @throws IOException if the accept operation cannot be processed - */ - protected abstract void acceptConnection(ServerSocketChannel channel) throws IOException; - - /** - * Opens a new server socket channel configured in non-blocking mode and - * bound to the loop's listen address. - * - * @throws IOException if unable to open or configure the socket channel - */ - protected synchronized void openChannel() throws IOException { - socketChannel = ServerSocketChannel.open(); - socketChannel.configureBlocking(false); - socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - socketChannel.register(selector, SelectionKey.OP_ACCEPT); - socketChannel.bind(listenAddress); - } - - /** - * Closes the server socket channel. - * - * @throws IOException if unable to close the socketChannel - */ - protected synchronized void closechannel() throws IOException { - if (socketChannel != null) { - socketChannel.close(); - socketChannel = null; - } - } - - @Override - public void shutdown() { - try { - closechannel(); - } catch (IOException e) { - log.warn("Unable to close the socketChannel", e); - } - super.shutdown(); - } - - @Override - protected void loop() throws IOException { - openChannel(); - notifyReady(); - - // Keep looping until told otherwise. - while (isRunning()) { - // Attempt a selection; if no operations selected or if signalled - // to shutdown, spin through. - int count = selector.select(selectTimeout); - if (count == 0 || !isRunning()) { - continue; - } - - // Iterate over all keys selected for an operation and process them. - Iterator keys = selector.selectedKeys().iterator(); - while (keys.hasNext()) { - // Fetch the key and remove it from the pending list. - SelectionKey key = keys.next(); - keys.remove(); - - // If the key has a pending acceptConnection operation, process it. - if (key.isAcceptable()) { - acceptConnection((ServerSocketChannel) key.channel()); - } - } - } - } - -} - diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java deleted file mode 100644 index 106df7b2..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import java.io.IOException; -import java.nio.channels.ByteChannel; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArraySet; - -/** - * I/O loop for driving inbound & outbound {@link Message} transfer via - * {@link MessageStream}. - * - * @param message type - * @param message stream type - */ -public abstract class IOLoop> - extends SelectorLoop { - - // Queue of requests for new message streams to enter the IO loop processing. - private final Queue newStreamRequests = new ConcurrentLinkedQueue<>(); - - // Carries information required for admitting a new message stream. - private class NewStreamRequest { - private final S stream; - private final SelectableChannel channel; - private final int op; - - public NewStreamRequest(S stream, SelectableChannel channel, int op) { - this.stream = stream; - this.channel = channel; - this.op = op; - } - } - - // Set of message streams currently admitted into the IO loop. - private final Set> streams = new CopyOnWriteArraySet<>(); - - /** - * Creates an IO loop with the given selection timeout. - * - * @param timeout selection timeout in milliseconds - * @throws IOException if the backing selector cannot be opened - */ - public IOLoop(long timeout) throws IOException { - super(timeout); - } - - /** - * Returns the number of message stream in custody of the loop. - * - * @return number of message streams - */ - public int streamCount() { - return streams.size(); - } - - /** - * Creates a new message stream backed by the specified socket channel. - * - * @param byteChannel backing byte channel - * @return newly created message stream - */ - protected abstract S createStream(ByteChannel byteChannel); - - /** - * Removes the specified message stream from the IO loop. - * - * @param stream message stream to remove - */ - protected void removeStream(MessageStream stream) { - streams.remove(stream); - } - - /** - * Processes the list of messages extracted from the specified message - * stream. - * - * @param messages non-empty list of received messages - * @param stream message stream from which the messages were extracted - */ - protected abstract void processMessages(List messages, MessageStream stream); - - /** - * Completes connection request pending on the given selection key. - * - * @param key selection key holding the pending connect operation. - * @throws IOException when I/O exception of some sort has occurred - */ - protected void connect(SelectionKey key) throws IOException { - SocketChannel ch = (SocketChannel) key.channel(); - ch.finishConnect(); - if (key.isValid()) { - key.interestOps(SelectionKey.OP_READ); - } - } - - /** - * Processes an IO operation pending on the specified key. - * - * @param key selection key holding the pending I/O operation. - */ - protected void processKeyOperation(SelectionKey key) { - @SuppressWarnings("unchecked") - S stream = (S) key.attachment(); - - try { - // If the key is not valid, bail out. - if (!key.isValid()) { - stream.close(); - return; - } - - // If there is a pending connect operation, complete it. - if (key.isConnectable()) { - try { - connect(key); - } catch (IOException | IllegalStateException e) { - log.warn("Unable to complete connection", e); - } - } - - // If there is a read operation, slurp as much data as possible. - if (key.isReadable()) { - List messages = stream.read(); - - // No messages or failed flush imply disconnect; bail. - if (messages == null || stream.hadError()) { - stream.close(); - return; - } - - // If there were any messages read, process them. - if (!messages.isEmpty()) { - try { - processMessages(messages, stream); - } catch (RuntimeException e) { - onError(stream, e); - } - } - } - - // If there are pending writes, flush them - if (key.isWritable()) { - stream.flushIfPossible(); - } - - // If there were any issued flushing, close the stream. - if (stream.hadError()) { - stream.close(); - } - - } catch (CancelledKeyException e) { - // Key was cancelled, so silently close the stream - stream.close(); - } catch (IOException e) { - if (!stream.isClosed() && !isResetByPeer(e)) { - log.warn("Unable to process IO", e); - } - stream.close(); - } - } - - // Indicates whether or not this exception is caused by 'reset by peer'. - private boolean isResetByPeer(IOException e) { - Throwable cause = e.getCause(); - return cause != null && cause instanceof IOException && - cause.getMessage().contains("reset by peer"); - } - - /** - * Hook to allow intercept of any errors caused during message processing. - * Default behaviour is to rethrow the error. - * - * @param stream message stream involved in the error - * @param error the runtime exception - */ - protected void onError(S stream, RuntimeException error) { - throw error; - } - - /** - * Admits a new message stream backed by the specified socket channel - * with a pending accept operation. - * - * @param channel backing socket channel - * @return newly accepted message stream - */ - public S acceptStream(SocketChannel channel) { - return createAndAdmit(channel, SelectionKey.OP_READ); - } - - - /** - * Admits a new message stream backed by the specified socket channel - * with a pending connect operation. - * - * @param channel backing socket channel - * @return newly connected message stream - */ - public S connectStream(SocketChannel channel) { - return createAndAdmit(channel, SelectionKey.OP_CONNECT); - } - - /** - * Creates a new message stream backed by the specified socket channel - * and admits it into the IO loop. - * - * @param channel socket channel - * @param op pending operations mask to be applied to the selection - * key as a set of initial interestedOps - * @return newly created message stream - */ - private synchronized S createAndAdmit(SocketChannel channel, int op) { - S stream = createStream(channel); - streams.add(stream); - newStreamRequests.add(new NewStreamRequest(stream, channel, op)); - selector.wakeup(); - return stream; - } - - /** - * Safely admits new streams into the IO loop. - */ - private void admitNewStreams() { - Iterator it = newStreamRequests.iterator(); - while (isRunning() && it.hasNext()) { - try { - NewStreamRequest request = it.next(); - it.remove(); - SelectionKey key = request.channel.register(selector, request.op, - request.stream); - request.stream.setKey(key); - } catch (ClosedChannelException e) { - log.warn("Unable to admit new message stream", e); - } - } - } - - @Override - protected void loop() throws IOException { - notifyReady(); - - // Keep going until told otherwise. - while (isRunning()) { - admitNewStreams(); - - // Process flushes & write selects on all streams - for (MessageStream stream : streams) { - stream.flushIfWriteNotPending(); - } - - // Select keys and process them. - int count = selector.select(selectTimeout); - if (count > 0 && isRunning()) { - Iterator it = selector.selectedKeys().iterator(); - while (it.hasNext()) { - SelectionKey key = it.next(); - it.remove(); - processKeyOperation(key); - } - } - } - } - - /** - * Prunes the registered streams by discarding any stale ones. - * - * @return number of remaining streams - */ - public synchronized int pruneStaleStreams() { - for (MessageStream stream : streams) { - if (stream.isStale()) { - stream.close(); - } - } - return streams.size(); - } - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java deleted file mode 100644 index c1a339e0..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -/** - * Representation of a message transferred via {@link MessageStream}. - */ -public interface Message { - - /** - * Gets the message length in bytes. - * - * @return number of bytes - */ - int length(); - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java deleted file mode 100644 index a19e8aac..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.onlab.util.Counter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.System.currentTimeMillis; -import static java.nio.ByteBuffer.allocateDirect; - -/** - * Bi-directional message stream for transferring messages to & from the - * network via two byte buffers. - * - * @param message type - */ -public abstract class MessageStream { - - protected Logger log = LoggerFactory.getLogger(getClass()); - - private final IOLoop loop; - private final ByteChannel channel; - private final int maxIdleMillis; - - private final ByteBuffer inbound; - private ByteBuffer outbound; - private SelectionKey key; - - private volatile boolean closed = false; - private volatile boolean writePending; - private volatile boolean writeOccurred; - - private Exception ioError; - private long lastActiveTime; - - private final Counter bytesIn = new Counter(); - private final Counter messagesIn = new Counter(); - private final Counter bytesOut = new Counter(); - private final Counter messagesOut = new Counter(); - - /** - * Creates a message stream associated with the specified IO loop and - * backed by the given byte channel. - * - * @param loop IO loop - * @param byteChannel backing byte channel - * @param bufferSize size of the backing byte buffers - * @param maxIdleMillis maximum number of millis the stream can be idle - * before it will be closed - */ - protected MessageStream(IOLoop loop, ByteChannel byteChannel, - int bufferSize, int maxIdleMillis) { - this.loop = checkNotNull(loop, "Loop cannot be null"); - this.channel = checkNotNull(byteChannel, "Byte channel cannot be null"); - - checkArgument(maxIdleMillis > 0, "Idle time must be positive"); - this.maxIdleMillis = maxIdleMillis; - - inbound = allocateDirect(bufferSize); - outbound = allocateDirect(bufferSize); - } - - /** - * Gets a single message from the specified byte buffer; this is - * to be done without manipulating the buffer via flip, reset or clear. - * - * @param buffer byte buffer - * @return read message or null if there are not enough bytes to read - * a complete message - */ - protected abstract M read(ByteBuffer buffer); - - /** - * Puts the specified message into the specified byte buffer; this is - * to be done without manipulating the buffer via flip, reset or clear. - * - * @param message message to be write into the buffer - * @param buffer byte buffer - */ - protected abstract void write(M message, ByteBuffer buffer); - - /** - * Closes the message buffer. - */ - public void close() { - synchronized (this) { - if (closed) { - return; - } - closed = true; - } - - bytesIn.freeze(); - bytesOut.freeze(); - messagesIn.freeze(); - messagesOut.freeze(); - - loop.removeStream(this); - if (key != null) { - try { - key.cancel(); - key.channel().close(); - } catch (IOException e) { - log.warn("Unable to close stream", e); - } - } - } - - /** - * Indicates whether this buffer has been closed. - * - * @return true if this stream has been closed - */ - public synchronized boolean isClosed() { - return closed; - } - - /** - * Returns the stream IO selection key. - * - * @return socket channel registration selection key - */ - public SelectionKey key() { - return key; - } - - /** - * Binds the selection key to be used for driving IO operations on the stream. - * - * @param key IO selection key - */ - public void setKey(SelectionKey key) { - this.key = key; - this.lastActiveTime = currentTimeMillis(); - } - - /** - * Returns the IO loop to which this stream is bound. - * - * @return I/O loop used to drive this stream - */ - public IOLoop loop() { - return loop; - } - - /** - * Indicates whether the any prior IO encountered an error. - * - * @return true if a write failed - */ - public boolean hadError() { - return ioError != null; - } - - /** - * Gets the prior IO error, if one occurred. - * - * @return IO error; null if none occurred - */ - public Exception getError() { - return ioError; - } - - /** - * Reads, without blocking, a list of messages from the stream. - * The list will be empty if there were not messages pending. - * - * @return list of messages or null if backing channel has been closed - * @throws IOException if messages could not be read - */ - public List read() throws IOException { - try { - int read = channel.read(inbound); - if (read != -1) { - // Read the messages one-by-one and add them to the list. - List messages = new ArrayList<>(); - M message; - inbound.flip(); - while ((message = read(inbound)) != null) { - messages.add(message); - messagesIn.add(1); - bytesIn.add(message.length()); - } - inbound.compact(); - - // Mark the stream with current time to indicate liveness. - lastActiveTime = currentTimeMillis(); - return messages; - } - return null; - - } catch (Exception e) { - throw new IOException("Unable to read messages", e); - } - } - - /** - * Writes the specified list of messages to the stream. - * - * @param messages list of messages to write - * @throws IOException if error occurred while writing the data - */ - public void write(List messages) throws IOException { - synchronized (this) { - // First write all messages. - for (M m : messages) { - append(m); - } - flushUnlessAlreadyPlanningTo(); - } - } - - /** - * Writes the given message to the stream. - * - * @param message message to write - * @throws IOException if error occurred while writing the data - */ - public void write(M message) throws IOException { - synchronized (this) { - append(message); - flushUnlessAlreadyPlanningTo(); - } - } - - // Appends the specified message into the internal buffer, growing the - // buffer if required. - private void append(M message) { - // If the buffer does not have sufficient length double it. - while (outbound.remaining() < message.length()) { - doubleSize(); - } - write(message, outbound); - messagesOut.add(1); - bytesOut.add(message.length()); - } - - // Forces a flush, unless one is planned already. - private void flushUnlessAlreadyPlanningTo() throws IOException { - if (!writeOccurred && !writePending) { - flush(); - } - } - - /** - * Flushes any pending writes. - * - * @throws IOException if flush failed - */ - public void flush() throws IOException { - synchronized (this) { - if (!writeOccurred && !writePending) { - outbound.flip(); - try { - channel.write(outbound); - } catch (IOException e) { - if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) { - log.warn("Unable to write data", e); - ioError = e; - } - } - lastActiveTime = currentTimeMillis(); - writeOccurred = true; - writePending = outbound.hasRemaining(); - outbound.compact(); - } - } - } - - /** - * Indicates whether the stream has bytes to be written to the channel. - * - * @return true if there are bytes to be written - */ - boolean isWritePending() { - synchronized (this) { - return writePending; - } - } - - - /** - * Indicates whether data has been written but not flushed yet. - * - * @return true if flush is required - */ - boolean isFlushRequired() { - synchronized (this) { - return outbound.position() > 0; - } - } - - /** - * Attempts to flush data, internal stream state and channel availability - * permitting. Invoked by the driver I/O loop during handling of writable - * selection key. - *

- * Resets the internal state flags {@code writeOccurred} and - * {@code writePending}. - *

- * @throws IOException if implicit flush failed - */ - void flushIfPossible() throws IOException { - synchronized (this) { - writePending = false; - writeOccurred = false; - if (outbound.position() > 0) { - flush(); - } - } - key.interestOps(SelectionKey.OP_READ); - } - - /** - * Attempts to flush data, internal stream state and channel availability - * permitting and if other writes are not pending. Invoked by the driver - * I/O loop prior to entering select wait. Resets the internal - * {@code writeOccurred} state flag. - * - * @throws IOException if implicit flush failed - */ - void flushIfWriteNotPending() throws IOException { - synchronized (this) { - writeOccurred = false; - if (!writePending && outbound.position() > 0) { - flush(); - } - } - if (isWritePending()) { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - } - } - - /** - * Doubles the size of the outbound buffer. - */ - private void doubleSize() { - ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2); - outbound.flip(); - newBuffer.put(outbound); - outbound = newBuffer; - } - - /** - * Returns the maximum number of milliseconds the stream is allowed - * without any read/write operations. - * - * @return number if millis of permissible idle time - */ - protected int maxIdleMillis() { - return maxIdleMillis; - } - - - /** - * Returns true if the given stream has gone stale. - * - * @return true if the stream is stale - */ - boolean isStale() { - return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null; - } - - /** - * Returns the inbound bytes counter. - * - * @return inbound bytes counter - */ - public Counter bytesIn() { - return bytesIn; - } - - /** - * Returns the outbound bytes counter. - * - * @return outbound bytes counter - */ - public Counter bytesOut() { - return bytesOut; - } - - /** - * Returns the inbound messages counter. - * - * @return inbound messages counter - */ - public Counter messagesIn() { - return messagesIn; - } - - /** - * Returns the outbound messages counter. - * - * @return outbound messages counter - */ - public Counter messagesOut() { - return messagesOut; - } - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java deleted file mode 100644 index 95a9b61e..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.channels.Selector; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.lang.System.currentTimeMillis; - -/** - * Abstraction of an I/O processing loop based on an NIO selector. - */ -public abstract class SelectorLoop implements Runnable { - - protected final Logger log = LoggerFactory.getLogger(getClass()); - - /** - * Selector used by this loop to pace the I/O operations. - */ - protected final Selector selector; - - /** - * Selection operations timeout; specified in millis. - */ - protected long selectTimeout; - - /** - * Retains the error that caused the loop to exit prematurely. - */ - private Throwable error; - - // State indicator - private enum State { STARTING, STARTED, STOPPING, STOPPED }; - private volatile State state = State.STOPPED; - - /** - * Creates a new selector loop with the given selection timeout. - * - * @param selectTimeout selection timeout; specified in millis - * @throws IOException if the backing selector cannot be opened - */ - public SelectorLoop(long selectTimeout) throws IOException { - checkArgument(selectTimeout > 0, "Timeout must be positive"); - this.selectTimeout = selectTimeout; - this.selector = openSelector(); - } - - /** - * Opens a new selector for the use by the loop. - * - * @return newly open selector - * @throws IOException if the backing selector cannot be opened - */ - protected Selector openSelector() throws IOException { - return Selector.open(); - } - - /** - * Indicates that the loop is marked to run. - * @return true if the loop is marked to run - */ - protected boolean isRunning() { - return state == State.STARTED || state == State.STARTING; - } - - /** - * Returns the error, if there was one, that caused the loop to terminate - * prematurely. - * - * @return error or null if there was none - */ - public Throwable getError() { - return error; - } - - /** - * Contains the body of the I/O selector loop. - * - * @throws IOException if an error is encountered while selecting I/O - */ - protected abstract void loop() throws IOException; - - @Override - public void run() { - error = null; - state = State.STARTING; - try { - loop(); - } catch (Exception e) { - error = e; - log.error("Loop aborted", e); - } - notifyDone(); - } - - /** - * Notifies observers waiting for loop to become ready. - */ - protected synchronized void notifyReady() { - state = State.STARTED; - notifyAll(); - } - - /** - * Triggers loop shutdown. - */ - public void shutdown() { - // Mark the loop as no longer running and wake up the selector. - state = State.STOPPING; - selector.wakeup(); - } - - /** - * Notifies observers waiting for loop to fully stop. - */ - private synchronized void notifyDone() { - state = State.STOPPED; - notifyAll(); - } - - /** - * Waits for the loop execution to start. - * - * @param timeout number of milliseconds to wait - * @return true if loop started in time - */ - public final synchronized boolean awaitStart(long timeout) { - long max = currentTimeMillis() + timeout; - while (state != State.STARTED && (currentTimeMillis() < max)) { - try { - wait(timeout); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted", e); - } - } - return state == State.STARTED; - } - - /** - * Waits for the loop execution to stop. - * - * @param timeout number of milliseconds to wait - * @return true if loop finished in time - */ - public final synchronized boolean awaitStop(long timeout) { - long max = currentTimeMillis() + timeout; - while (state != State.STOPPED && (currentTimeMillis() < max)) { - try { - wait(timeout); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted", e); - } - } - return state == State.STOPPED; - } - - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java deleted file mode 100644 index 0d58b568..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Mechanism to transfer messages over network using IO loop and - * message stream, backed by NIO byte buffers. - */ -package org.onlab.nio; diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java deleted file mode 100644 index 0d977929..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import java.io.IOException; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.util.List; -import java.util.function.Consumer; - -import org.onlab.nio.IOLoop; -import org.onlab.nio.MessageStream; - -/** - * IOLoop for transporting DefaultMessages. - */ -public class DefaultIOLoop extends IOLoop { - - public static final int SELECT_TIMEOUT_MILLIS = 500; - private static final int MAX_IDLE_TIMEOUT_MILLIS = 1000; - private static final int BUFFER_SIZE = 1024 * 1024; - private final Consumer consumer; - - public DefaultIOLoop(Consumer consumer) throws IOException { - this(SELECT_TIMEOUT_MILLIS, consumer); - } - - public DefaultIOLoop(long timeout, Consumer consumer) throws IOException { - super(timeout); - this.consumer = consumer; - } - - @Override - protected DefaultMessageStream createStream(ByteChannel byteChannel) { - return new DefaultMessageStream(this, byteChannel, BUFFER_SIZE, MAX_IDLE_TIMEOUT_MILLIS); - } - - @Override - protected void processMessages(List messages, MessageStream stream) { - messages.forEach(consumer); - } - - @Override - protected void connect(SelectionKey key) throws IOException { - DefaultMessageStream stream = (DefaultMessageStream) key.attachment(); - try { - super.connect(key); - stream.connected(); - } catch (Exception e) { - stream.connectFailed(e); - } - } -} \ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java deleted file mode 100644 index 591d49b3..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.onlab.nio.AbstractMessage; -import org.onlab.packet.IpAddress; -import org.onlab.util.ByteArraySizeHashPrinter; -import org.onosproject.store.cluster.messaging.Endpoint; - -import com.google.common.base.Charsets; -import com.google.common.base.MoreObjects; - -/** - * Default message. - */ -public class DefaultMessage extends AbstractMessage { - - private long id; - private Endpoint sender; - private String type; - private byte[] payload; - - /** - * Creates a new message with the specified data. - * - * @param id message id - * @param type message type - * @param sender sender endpoint - * @param payload message payload - */ - DefaultMessage(long id, Endpoint sender, String type, byte[] payload) { - this.id = id; - this.type = checkNotNull(type, "Type cannot be null"); - this.sender = checkNotNull(sender, "Sender cannot be null"); - this.payload = checkNotNull(payload, "Payload cannot be null"); - - byte[] messageTypeBytes = type.getBytes(Charsets.UTF_8); - IpAddress senderIp = sender.host(); - byte[] ipOctets = senderIp.toOctets(); - - length = 25 + ipOctets.length + messageTypeBytes.length + payload.length; - } - - /** - * Returns message id. - * - * @return message id - */ - public long id() { - return id; - } - - /** - * Returns message sender. - * - * @return message sender - */ - public Endpoint sender() { - return sender; - } - - /** - * Returns message type. - * - * @return message type - */ - public String type() { - return type; - } - - /** - * Returns message payload. - * - * @return payload - */ - public byte[] payload() { - return payload; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("id", id) - .add("type", type) - .add("sender", sender) - .add("payload", ByteArraySizeHashPrinter.of(payload)) - .toString(); - } -} \ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java deleted file mode 100644 index a7dd3c04..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; - -import org.onlab.nio.IOLoop; -import org.onlab.nio.MessageStream; -import org.onlab.packet.IpAddress; -import org.onlab.packet.IpAddress.Version; -import org.onosproject.store.cluster.messaging.Endpoint; - -import com.google.common.base.Charsets; - -/** - * Default bi-directional message stream for transferring messages to & from the - * network via two byte buffers. - */ -public class DefaultMessageStream extends MessageStream { - - private final CompletableFuture connectFuture = new CompletableFuture<>(); - - public DefaultMessageStream( - IOLoop loop, - ByteChannel byteChannel, - int bufferSize, - int maxIdleMillis) { - super(loop, byteChannel, bufferSize, maxIdleMillis); - } - - public CompletableFuture connectedFuture() { - return connectFuture.thenApply(v -> this); - } - - private final AtomicInteger messageLength = new AtomicInteger(-1); - - @Override - protected DefaultMessage read(ByteBuffer buffer) { - if (messageLength.get() == -1) { - // check if we can read the message length. - if (buffer.remaining() < Integer.BYTES) { - return null; - } else { - messageLength.set(buffer.getInt()); - } - } - - if (buffer.remaining() < messageLength.get()) { - return null; - } - - long id = buffer.getLong(); - Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6; - byte[] octects = new byte[IpAddress.byteLength(ipVersion)]; - buffer.get(octects); - IpAddress senderIp = IpAddress.valueOf(ipVersion, octects); - int senderPort = buffer.getInt(); - int messageTypeByteLength = buffer.getInt(); - byte[] messageTypeBytes = new byte[messageTypeByteLength]; - buffer.get(messageTypeBytes); - String messageType = new String(messageTypeBytes, Charsets.UTF_8); - int payloadLength = buffer.getInt(); - byte[] payloadBytes = new byte[payloadLength]; - buffer.get(payloadBytes); - - // reset for next message - messageLength.set(-1); - - return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes); - } - - @Override - protected void write(DefaultMessage message, ByteBuffer buffer) { - Endpoint sender = message.sender(); - byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8); - IpAddress senderIp = sender.host(); - byte[] ipOctets = senderIp.toOctets(); - byte[] payload = message.payload(); - - int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length; - - buffer.putInt(messageLength); - - buffer.putLong(message.id()); - - if (senderIp.version() == Version.INET) { - buffer.put((byte) 0x0); - } else { - buffer.put((byte) 0x1); - } - buffer.put(ipOctets); - - // write sender port - buffer.putInt(sender.port()); - - // write length of message type - buffer.putInt(messageTypeBytes.length); - - // write message type bytes - buffer.put(messageTypeBytes); - - // write payload length - buffer.putInt(payload.length); - - // write payload. - buffer.put(payload); - } - - /** - * Callback invoked when the stream is successfully connected. - */ - public void connected() { - connectFuture.complete(null); - } - - /** - * Callback invoked when the stream fails to connect. - * @param cause failure cause - */ - public void connectFailed(Throwable cause) { - connectFuture.completeExceptionally(cause); - } -} \ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java deleted file mode 100644 index c195d160..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import static org.onlab.util.Tools.groupedThreads; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.apache.commons.pool.KeyedPoolableObjectFactory; -import org.apache.commons.pool.impl.GenericKeyedObjectPool; -import org.onlab.nio.AcceptorLoop; -import org.onlab.nio.SelectorLoop; -import org.onosproject.store.cluster.messaging.Endpoint; -import org.onosproject.store.cluster.messaging.MessagingService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Lists; - -/** - * MessagingService implementation based on IOLoop. - */ -public class IOLoopMessaging implements MessagingService { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY"; - - static final long TIMEOUT = 1000; - - static final boolean SO_NO_DELAY = false; - static final int SO_SEND_BUFFER_SIZE = 128 * 1024; - static final int SO_RCV_BUFFER_SIZE = 128 * 1024; - - private static final int NUM_WORKERS = 8; - - private AcceptorLoop acceptorLoop; - private final ExecutorService acceptorThreadPool = - Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor")); - private final ExecutorService ioThreadPool = - Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d")); - - private final List ioLoops = Lists.newArrayList(); - - private int lastWorker = -1; - - private final AtomicBoolean started = new AtomicBoolean(false); - private Endpoint localEp; - - private GenericKeyedObjectPool streams = - new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory()); - - private final ConcurrentMap> handlers = new ConcurrentHashMap<>(); - private final AtomicLong messageIdGenerator = new AtomicLong(0); - private final Cache> responseFutures = CacheBuilder.newBuilder() - .removalListener(new RemovalListener>() { - @Override - public void onRemoval(RemovalNotification> entry) { - if (entry.wasEvicted()) { - entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply")); - } - } - }) - .build(); - - /** - * Activates IO Loops. - * - * @param localEp local end-point - * @throws IOException is activation fails - */ - public void start(Endpoint localEp) throws IOException { - if (started.get()) { - log.warn("IOMessaging is already running at {}", localEp); - return; - } - this.localEp = localEp; - streams.setLifo(false); - this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port())); - - for (int i = 0; i < NUM_WORKERS; i++) { - ioLoops.add(new DefaultIOLoop(this::dispatchLocally)); - } - - ioLoops.forEach(ioThreadPool::execute); - acceptorThreadPool.execute(acceptorLoop); - ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT)); - acceptorLoop.awaitStart(TIMEOUT); - started.set(true); - } - - /** - * Shuts down IO loops. - */ - public void stop() { - if (started.get()) { - ioLoops.forEach(SelectorLoop::shutdown); - acceptorLoop.shutdown(); - ioThreadPool.shutdown(); - acceptorThreadPool.shutdown(); - started.set(false); - } - } - - - @Override - public CompletableFuture sendAsync(Endpoint ep, String type, byte[] payload) { - DefaultMessage message = new DefaultMessage( - messageIdGenerator.incrementAndGet(), - localEp, - type, - payload); - return sendAsync(ep, message); - } - - protected CompletableFuture sendAsync(Endpoint ep, DefaultMessage message) { - CompletableFuture future = new CompletableFuture<>(); - if (ep.equals(localEp)) { - dispatchLocally(message); - future.complete(null); - return future; - } - - DefaultMessageStream stream = null; - try { - stream = streams.borrowObject(ep); - stream.write(message); - future.complete(null); - } catch (Exception e) { - future.completeExceptionally(e); - } finally { - try { - streams.returnObject(ep, stream); - } catch (Exception e) { - log.warn("Failed to return stream to pool"); - } - } - return future; - } - - @Override - public CompletableFuture sendAndReceive( - Endpoint ep, - String type, - byte[] payload) { - CompletableFuture response = new CompletableFuture<>(); - Long messageId = messageIdGenerator.incrementAndGet(); - responseFutures.put(messageId, response); - DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload); - try { - sendAsync(ep, message); - } catch (Exception e) { - responseFutures.invalidate(messageId); - response.completeExceptionally(e); - } - return response; - } - - @Override - public void registerHandler(String type, Consumer handler, Executor executor) { - handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload()))); - } - - @Override - public void registerHandler(String type, Function handler, Executor executor) { - handlers.put(type, message -> executor.execute(() -> { - byte[] responsePayload = handler.apply(message.payload()); - if (responsePayload != null) { - DefaultMessage response = new DefaultMessage(message.id(), - localEp, - REPLY_MESSAGE_TYPE, - responsePayload); - sendAsync(message.sender(), response).whenComplete((result, error) -> { - log.debug("Failed to respond", error); - }); - } - })); - } - - @Override - public void registerHandler(String type, Function> handler) { - handlers.put(type, message -> { - handler.apply(message.payload()).whenComplete((result, error) -> { - if (error == null) { - DefaultMessage response = new DefaultMessage(message.id(), - localEp, - REPLY_MESSAGE_TYPE, - result); - sendAsync(message.sender(), response).whenComplete((r, e) -> { - if (e != null) { - log.debug("Failed to respond", e); - } - }); - } - }); - }); - } - - @Override - public void unregisterHandler(String type) { - handlers.remove(type); - } - - protected void dispatchLocally(DefaultMessage message) { - String type = message.type(); - if (REPLY_MESSAGE_TYPE.equals(type)) { - try { - CompletableFuture futureResponse = - responseFutures.getIfPresent(message.id()); - if (futureResponse != null) { - futureResponse.complete(message.payload()); - } else { - log.warn("Received a reply for message id:[{}]. " - + " from {}. But was unable to locate the" - + " request handle", message.id(), message.sender()); - } - } finally { - responseFutures.invalidate(message.id()); - } - return; - } - Consumer handler = handlers.get(type); - if (handler != null) { - handler.accept(message); - } else { - log.debug("No handler registered for {}", type); - } - } - - // Get the next worker to which a client should be assigned - private synchronized DefaultIOLoop nextWorker() { - lastWorker = (lastWorker + 1) % NUM_WORKERS; - return ioLoops.get(lastWorker); - } - - /** - * Initiates open connection request and registers the pending socket - * channel with the given IO loop. - * - * @param loop loop with which the channel should be registered - * @throws java.io.IOException if the socket could not be open or connected - */ - private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException { - SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port()); - SocketChannel ch = SocketChannel.open(); - ch.configureBlocking(false); - DefaultMessageStream stream = loop.connectStream(ch); - ch.connect(sa); - return stream; - } - - // Loop for accepting client connections - private class DefaultAcceptorLoop extends AcceptorLoop { - - public DefaultAcceptorLoop(SocketAddress address) throws IOException { - super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address); - } - - @Override - protected void acceptConnection(ServerSocketChannel channel) throws IOException { - SocketChannel sc = channel.accept(); - sc.configureBlocking(false); - - Socket so = sc.socket(); - so.setTcpNoDelay(SO_NO_DELAY); - so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); - so.setSendBufferSize(SO_SEND_BUFFER_SIZE); - - nextWorker().acceptStream(sc); - } - } - - private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory { - - @Override - public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception { - } - - @Override - public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception { - stream.close(); - } - - @Override - public DefaultMessageStream makeObject(Endpoint ep) throws Exception { - DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get(); - log.info("Established a new connection to {}", ep); - return stream; - } - - @Override - public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception { - } - - @Override - public boolean validateObject(Endpoint ep, DefaultMessageStream stream) { - return stream.isClosed(); - } - } -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java deleted file mode 100644 index 399a2b95..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Assembly for sending and receiving messages using the I/O loop mechanism. - */ -package org.onlab.nio.service; \ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java deleted file mode 100644 index 2b4a85cd..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.junit.Before; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import static java.util.concurrent.Executors.newSingleThreadExecutor; -import static org.junit.Assert.fail; -import static org.onlab.util.Tools.namedThreads; - -/** - * Base class for various NIO loop unit tests. - */ -public abstract class AbstractLoopTest { - - protected static final long MAX_MS_WAIT = 1500; - - /** Block on specified countdown latch. Return when countdown reaches - * zero, or fail the test if the {@value #MAX_MS_WAIT} ms timeout expires. - * - * @param latch the latch - * @param label an identifying label - */ - protected void waitForLatch(CountDownLatch latch, String label) { - try { - boolean ok = latch.await(MAX_MS_WAIT, TimeUnit.MILLISECONDS); - if (!ok) { - fail("Latch await timeout! [" + label + "]"); - } - } catch (InterruptedException e) { - System.out.println("Latch interrupt [" + label + "] : " + e); - fail("Unexpected interrupt"); - } - } - - protected ExecutorService exec; - - @Before - public void setUp() { - exec = newSingleThreadExecutor(namedThreads("test")); - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java deleted file mode 100644 index f04a0126..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2014-2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.junit.Test; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.ServerSocketChannel; -import java.util.concurrent.CountDownLatch; - -import static org.junit.Assert.assertEquals; -import static org.onlab.junit.TestTools.delay; - -/** - * Unit tests for AcceptLoop. - */ -public class AcceptorLoopTest extends AbstractLoopTest { - - private static final int PICK_EPHEMERAL = 0; - - private static final SocketAddress SOCK_ADDR = new InetSocketAddress("127.0.0.1", PICK_EPHEMERAL); - - private static class MyAcceptLoop extends AcceptorLoop { - private final CountDownLatch loopStarted = new CountDownLatch(1); - private final CountDownLatch loopFinished = new CountDownLatch(1); - private final CountDownLatch runDone = new CountDownLatch(1); - private final CountDownLatch ceaseLatch = new CountDownLatch(1); - - private int acceptCount = 0; - - MyAcceptLoop() throws IOException { - super(500, SOCK_ADDR); - } - - @Override - protected void acceptConnection(ServerSocketChannel ssc) throws IOException { - acceptCount++; - } - - @Override - public void loop() throws IOException { - loopStarted.countDown(); - super.loop(); - loopFinished.countDown(); - } - - @Override - public void run() { - super.run(); - runDone.countDown(); - } - - @Override - public void shutdown() { - super.shutdown(); - ceaseLatch.countDown(); - } - } - - @Test - public void basic() throws IOException { - MyAcceptLoop myAccLoop = new MyAcceptLoop(); - AcceptorLoop accLoop = myAccLoop; - exec.execute(accLoop); - waitForLatch(myAccLoop.loopStarted, "loopStarted"); - delay(200); // take a quick nap - accLoop.shutdown(); - waitForLatch(myAccLoop.loopFinished, "loopFinished"); - waitForLatch(myAccLoop.runDone, "runDone"); - assertEquals(0, myAccLoop.acceptCount); - } -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java deleted file mode 100644 index f42c8dc5..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import java.net.InetAddress; -import java.util.Random; -import java.util.logging.Level; -import java.util.logging.Logger; - -import static org.onlab.junit.TestTools.delay; - -/** - * Integration test for the select, accept and IO loops. - */ -public class IOLoopIntegrationTest { - - private static final int THREADS = 6; - private static final int TIMEOUT = 60; - private static final int MESSAGE_LENGTH = 128; - - private static final int MILLION = 1000000; - private static final int MSG_COUNT = 40 * MILLION; - - @Before - public void warmUp() throws Exception { - Logger.getLogger("").setLevel(Level.SEVERE); - try { - runTest(MILLION, MESSAGE_LENGTH, 15); - } catch (Throwable e) { - System.err.println("Failed warmup but moving on."); - e.printStackTrace(); - } - } - - // TODO: this test can not pass in some environments, need to be improved - @Ignore - @Test - public void basic() throws Exception { - runTest(MILLION, MESSAGE_LENGTH, TIMEOUT); - } - - public void longHaul() throws Exception { - runTest(MSG_COUNT, MESSAGE_LENGTH, TIMEOUT); - } - - private void runTest(int count, int size, int timeout) throws Exception { - // Use a random port to prevent conflicts. - int port = IOLoopTestServer.PORT + new Random().nextInt(100); - - InetAddress ip = InetAddress.getLoopbackAddress(); - IOLoopTestServer server = new IOLoopTestServer(ip, THREADS, size, port); - IOLoopTestClient client = new IOLoopTestClient(ip, THREADS, count, size, port); - - server.start(); - client.start(); - delay(100); // Pause to allow loops to get going - - client.await(timeout); - client.report(); - - server.stop(); - server.report(); - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java deleted file mode 100644 index acc9a08f..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import com.google.common.collect.Lists; -import org.onlab.util.Counter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.lang.String.format; -import static java.lang.System.nanoTime; -import static java.lang.System.out; -import static org.onlab.nio.IOLoopTestServer.PORT; -import static org.onlab.util.Tools.delay; -import static org.onlab.util.Tools.namedThreads; - -/** - * Auxiliary test fixture to measure speed of NIO-based channels. - */ -public class IOLoopTestClient { - - private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class); - - private final InetAddress ip; - private final int port; - private final int msgCount; - private final int msgLength; - - private final List iloops = new ArrayList<>(); - private final ExecutorService ipool; - private final ExecutorService wpool; - - Counter messages; - Counter bytes; - long latencyTotal = 0; - long latencyCount = 0; - - - /** - * Main entry point to launch the client. - * - * @param args command-line arguments - * @throws java.io.IOException if unable to connect to server - * @throws InterruptedException if latch wait gets interrupted - * @throws java.util.concurrent.ExecutionException if wait gets interrupted - * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion - */ - public static void main(String[] args) - throws IOException, InterruptedException, ExecutionException, TimeoutException { - startStandalone(args); - - System.exit(0); - } - - /** - * Starts a standalone IO loop test client. - * - * @param args command-line arguments - */ - public static void startStandalone(String[] args) - throws IOException, InterruptedException, ExecutionException, TimeoutException { - InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1"); - int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6; - int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000; - int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128; - int to = args.length > 4 ? Integer.parseInt(args[4]) : 60; - - log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ", - wc, mc, ml, ip); - IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT); - - client.start(); - delay(500); - - client.await(to); - client.report(); - } - - /** - * Creates a speed client. - * - * @param ip ip address of server - * @param wc worker count - * @param mc message count to send per client - * @param ml message length in bytes - * @param port socket port - * @throws java.io.IOException if unable to create IO loops - */ - public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException { - this.ip = ip; - this.port = port; - this.msgCount = mc; - this.msgLength = ml; - this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker")); - this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop")); - - for (int i = 0; i < wc; i++) { - iloops.add(new CustomIOLoop()); - } - } - - /** - * Starts the client workers. - * - * @throws java.io.IOException if unable to open connection - */ - public void start() throws IOException { - messages = new Counter(); - bytes = new Counter(); - - // First start up all the IO loops - for (CustomIOLoop l : iloops) { - ipool.execute(l); - } - - // Wait for all of them to get going - for (CustomIOLoop l : iloops) { - l.awaitStart(1000); - } - - // ... and Next open all connections; one-per-loop - for (CustomIOLoop l : iloops) { - openConnection(l); - } - } - - - /** - * Initiates open connection request and registers the pending socket - * channel with the given IO loop. - * - * @param loop loop with which the channel should be registered - * @throws java.io.IOException if the socket could not be open or connected - */ - private void openConnection(CustomIOLoop loop) throws IOException { - SocketAddress sa = new InetSocketAddress(ip, port); - SocketChannel ch = SocketChannel.open(); - ch.configureBlocking(false); - loop.connectStream(ch); - ch.connect(sa); - } - - - /** - * Waits for the client workers to complete. - * - * @param secs timeout in seconds - * @throws java.util.concurrent.ExecutionException if execution failed - * @throws InterruptedException if interrupt occurred while waiting - * @throws java.util.concurrent.TimeoutException if timeout occurred - */ - public void await(int secs) throws InterruptedException, - ExecutionException, TimeoutException { - for (CustomIOLoop l : iloops) { - if (l.worker.task != null) { - l.worker.task.get(secs, TimeUnit.SECONDS); - latencyTotal += l.latencyTotal; - latencyCount += l.latencyCount; - } - } - messages.freeze(); - bytes.freeze(); - } - - /** - * Reports on the accumulated throughput and latency. - */ - public void report() { - DecimalFormat f = new DecimalFormat("#,##0"); - out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency", - f.format(messages.total()), f.format(bytes.total()), - f.format(messages.throughput()), - f.format(bytes.throughput() / (1024 * msgLength)), - f.format(latencyTotal / latencyCount))); - } - - - // Loop for transfer of fixed-length messages - private class CustomIOLoop extends IOLoop { - - Worker worker = new Worker(); - long latencyTotal = 0; - long latencyCount = 0; - - - public CustomIOLoop() throws IOException { - super(500); - } - - - @Override - protected TestMessageStream createStream(ByteChannel channel) { - return new TestMessageStream(msgLength, channel, this); - } - - @Override - protected synchronized void removeStream(MessageStream stream) { - super.removeStream(stream); - messages.add(stream.messagesIn().total()); - bytes.add(stream.bytesIn().total()); - stream.messagesOut().reset(); - stream.bytesOut().reset(); - } - - @Override - protected void processMessages(List messages, - MessageStream stream) { - for (TestMessage message : messages) { - // TODO: summarize latency data better - latencyTotal += nanoTime() - message.requestorTime(); - latencyCount++; - } - worker.release(messages.size()); - } - - @Override - protected void connect(SelectionKey key) throws IOException { - super.connect(key); - TestMessageStream b = (TestMessageStream) key.attachment(); - Worker w = ((CustomIOLoop) b.loop()).worker; - w.pump(b); - } - - } - - /** - * Auxiliary worker to connect and pump batched messages using blocking I/O. - */ - private class Worker implements Runnable { - - private static final int BATCH_SIZE = 50; - private static final int PERMITS = 2 * BATCH_SIZE; - - private TestMessageStream stream; - private FutureTask task; - - // Stuff to throttle pump - private final Semaphore semaphore = new Semaphore(PERMITS); - private int msgWritten; - - void pump(TestMessageStream stream) { - this.stream = stream; - task = new FutureTask<>(this, this); - wpool.execute(task); - } - - @Override - public void run() { - try { - log.info("Worker started..."); - - while (msgWritten < msgCount) { - int size = Math.min(BATCH_SIZE, msgCount - msgWritten); - writeBatch(size); - msgWritten += size; - } - - // Now try to get all the permits back before sending poison pill - semaphore.acquireUninterruptibly(PERMITS); - stream.close(); - - log.info("Worker done..."); - - } catch (IOException e) { - log.error("Worker unable to perform I/O", e); - } - } - - - private void writeBatch(int size) throws IOException { - // Build a batch of messages - List batch = Lists.newArrayListWithCapacity(size); - for (int i = 0; i < size; i++) { - batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding())); - } - acquire(size); - stream.write(batch); - } - - - // Release permits based on the specified number of message credits - private void release(int permits) { - semaphore.release(permits); - } - - // Acquire permit for a single batch - private void acquire(int permits) { - semaphore.acquireUninterruptibly(permits); - } - - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java deleted file mode 100644 index d5ce5f39..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import com.google.common.collect.Lists; -import org.onlab.util.Counter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.channels.ByteChannel; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static java.lang.String.format; -import static java.lang.System.out; -import static org.onlab.util.Tools.delay; -import static org.onlab.util.Tools.namedThreads; - -/** - * Auxiliary test fixture to measure speed of NIO-based channels. - */ -public class IOLoopTestServer { - - private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class); - - private static final int PRUNE_FREQUENCY = 1000; - - static final int PORT = 9876; - static final long TIMEOUT = 1000; - - static final boolean SO_NO_DELAY = false; - static final int SO_SEND_BUFFER_SIZE = 128 * 1024; - static final int SO_RCV_BUFFER_SIZE = 128 * 1024; - - static final DecimalFormat FORMAT = new DecimalFormat("#,##0"); - - private final AcceptorLoop aloop; - private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept")); - - private final List iloops = new ArrayList<>(); - private final ExecutorService ipool; - - private final int workerCount; - private final int msgLength; - private int lastWorker = -1; - - Counter messages; - Counter bytes; - - /** - * Main entry point to launch the server. - * - * @param args command-line arguments - * @throws java.io.IOException if unable to crate IO loops - */ - public static void main(String[] args) throws IOException { - startStandalone(args); - System.exit(0); - } - - /** - * Starts a standalone IO loop test server. - * - * @param args command-line arguments - */ - public static void startStandalone(String[] args) throws IOException { - InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1"); - int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6; - int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128; - - log.info("Setting up the server with {} workers, {} byte messages on {}... ", - wc, ml, ip); - IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT); - server.start(); - - // Start pruning clients and keep going until their number goes to 0. - int remaining = -1; - while (remaining == -1 || remaining > 0) { - delay(PRUNE_FREQUENCY); - int r = server.prune(); - remaining = remaining == -1 && r == 0 ? remaining : r; - } - server.stop(); - } - - /** - * Creates a speed server. - * - * @param ip optional ip of the adapter where to bind - * @param wc worker count - * @param ml message length in bytes - * @param port listen port - * @throws java.io.IOException if unable to create IO loops - */ - public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException { - this.workerCount = wc; - this.msgLength = ml; - this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop")); - - this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port)); - for (int i = 0; i < workerCount; i++) { - iloops.add(new CustomIOLoop()); - } - } - - /** - * Start the server IO loops and kicks off throughput tracking. - */ - public void start() { - messages = new Counter(); - bytes = new Counter(); - - for (CustomIOLoop l : iloops) { - ipool.execute(l); - } - apool.execute(aloop); - - for (CustomIOLoop l : iloops) { - l.awaitStart(TIMEOUT); - } - aloop.awaitStart(TIMEOUT); - } - - /** - * Stop the server IO loops and freezes throughput tracking. - */ - public void stop() { - aloop.shutdown(); - for (CustomIOLoop l : iloops) { - l.shutdown(); - } - - for (CustomIOLoop l : iloops) { - l.awaitStop(TIMEOUT); - } - aloop.awaitStop(TIMEOUT); - - messages.freeze(); - bytes.freeze(); - } - - /** - * Reports on the accumulated throughput and latency. - */ - public void report() { - DecimalFormat f = new DecimalFormat("#,##0"); - out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs", - f.format(messages.total()), f.format(bytes.total()), - f.format(messages.throughput()), - f.format(bytes.throughput() / (1024 * msgLength)))); - } - - /** - * Prunes the IO loops of stale message buffers. - * - * @return number of remaining IO loops among all workers. - */ - public int prune() { - int count = 0; - for (CustomIOLoop l : iloops) { - count += l.pruneStaleStreams(); - } - return count; - } - - // Get the next worker to which a client should be assigned - private synchronized CustomIOLoop nextWorker() { - lastWorker = (lastWorker + 1) % workerCount; - return iloops.get(lastWorker); - } - - // Loop for transfer of fixed-length messages - private class CustomIOLoop extends IOLoop { - - public CustomIOLoop() throws IOException { - super(500); - } - - @Override - protected TestMessageStream createStream(ByteChannel channel) { - return new TestMessageStream(msgLength, channel, this); - } - - @Override - protected void removeStream(MessageStream stream) { - super.removeStream(stream); - messages.add(stream.messagesIn().total()); - bytes.add(stream.bytesIn().total()); - } - - @Override - protected void processMessages(List messages, - MessageStream stream) { - try { - stream.write(createResponses(messages)); - } catch (IOException e) { - log.error("Unable to echo messages", e); - } - } - - private List createResponses(List messages) { - List responses = Lists.newArrayListWithCapacity(messages.size()); - for (TestMessage message : messages) { - responses.add(new TestMessage(message.length(), message.requestorTime(), - System.nanoTime(), message.padding())); - } - return responses; - } - } - - // Loop for accepting client connections - private class CustomAcceptLoop extends AcceptorLoop { - - public CustomAcceptLoop(SocketAddress address) throws IOException { - super(500, address); - } - - @Override - protected void acceptConnection(ServerSocketChannel channel) throws IOException { - SocketChannel sc = channel.accept(); - sc.configureBlocking(false); - - Socket so = sc.socket(); - so.setTcpNoDelay(SO_NO_DELAY); - so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); - so.setSendBufferSize(SO_SEND_BUFFER_SIZE); - - nextWorker().acceptStream(sc); - log.info("Connected client"); - } - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java deleted file mode 100644 index 9965d389..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.SelectorProvider; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -/** - * Tests of the message message stream implementation. - */ -public class MessageStreamTest { - - private static final int SIZE = 64; - private static final int BIG_SIZE = 32 * 1024; - - private TestMessage message; - - private TestIOLoop loop; - private TestByteChannel channel; - private TestMessageStream stream; - private TestKey key; - - @Before - public void setUp() throws IOException { - loop = new TestIOLoop(); - channel = new TestByteChannel(); - key = new TestKey(channel); - stream = loop.createStream(channel); - stream.setKey(key); - stream.setNonStrict(); - message = new TestMessage(SIZE, 0, 0, stream.padding()); - } - - @After - public void tearDown() { - loop.shutdown(); - stream.close(); - } - - // Validates the state of the message stream - private void validate(boolean wp, boolean fr, int read, int written) { - assertEquals(wp, stream.isWritePending()); - assertEquals(fr, stream.isFlushRequired()); - assertEquals(read, channel.readBytes); - assertEquals(written, channel.writtenBytes); - } - - @Test - public void endOfStream() throws IOException { - channel.close(); - List messages = stream.read(); - assertNull(messages); - } - - @Test - public void bufferGrowth() throws IOException { - // Create a stream for big messages and test the growth. - stream = new TestMessageStream(BIG_SIZE, channel, loop); - TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding()); - - stream.write(bigMessage); - stream.write(bigMessage); - stream.write(bigMessage); - stream.write(bigMessage); - stream.write(bigMessage); - } - - @Test - public void discardBeforeKey() { - // Create a stream that does not yet have the key set and discard it. - stream = loop.createStream(channel); - assertNull(stream.key()); - stream.close(); - // There is not key, so nothing to check; we just expect no problem. - } - - @Test - public void bufferedRead() throws IOException { - channel.bytesToRead = SIZE + 4; - List messages = stream.read(); - assertEquals(1, messages.size()); - validate(false, false, SIZE + 4, 0); - - channel.bytesToRead = SIZE - 4; - messages = stream.read(); - assertEquals(1, messages.size()); - validate(false, false, SIZE * 2, 0); - } - - @Test - public void bufferedWrite() throws IOException { - validate(false, false, 0, 0); - - // First write is immediate... - stream.write(message); - validate(false, false, 0, SIZE); - - // Second and third get buffered... - stream.write(message); - validate(false, true, 0, SIZE); - stream.write(message); - validate(false, true, 0, SIZE); - - // Reset write, which will flush if needed; the next write is again buffered - stream.flushIfWriteNotPending(); - validate(false, false, 0, SIZE * 3); - stream.write(message); - validate(false, true, 0, SIZE * 3); - - // Select reset, which will flush if needed; the next write is again buffered - stream.flushIfPossible(); - validate(false, false, 0, SIZE * 4); - stream.write(message); - validate(false, true, 0, SIZE * 4); - stream.flush(); - validate(false, true, 0, SIZE * 4); - } - - @Test - public void bufferedWriteList() throws IOException { - validate(false, false, 0, 0); - - // First write is immediate... - List messages = new ArrayList<>(); - messages.add(message); - messages.add(message); - messages.add(message); - messages.add(message); - - stream.write(messages); - validate(false, false, 0, SIZE * 4); - - stream.write(messages); - validate(false, true, 0, SIZE * 4); - - stream.flushIfPossible(); - validate(false, false, 0, SIZE * 8); - } - - @Test - public void bufferedPartialWrite() throws IOException { - validate(false, false, 0, 0); - - // First write is immediate... - stream.write(message); - validate(false, false, 0, SIZE); - - // Tell test channel to accept only half. - channel.bytesToWrite = SIZE / 2; - - // Second and third get buffered... - stream.write(message); - validate(false, true, 0, SIZE); - stream.flushIfPossible(); - validate(true, true, 0, SIZE + SIZE / 2); - } - - @Test - public void bufferedPartialWrite2() throws IOException { - validate(false, false, 0, 0); - - // First write is immediate... - stream.write(message); - validate(false, false, 0, SIZE); - - // Tell test channel to accept only half. - channel.bytesToWrite = SIZE / 2; - - // Second and third get buffered... - stream.write(message); - validate(false, true, 0, SIZE); - stream.flushIfWriteNotPending(); - validate(true, true, 0, SIZE + SIZE / 2); - } - - @Test - public void bufferedReadWrite() throws IOException { - channel.bytesToRead = SIZE + 4; - List messages = stream.read(); - assertEquals(1, messages.size()); - validate(false, false, SIZE + 4, 0); - - stream.write(message); - validate(false, false, SIZE + 4, SIZE); - - channel.bytesToRead = SIZE - 4; - messages = stream.read(); - assertEquals(1, messages.size()); - validate(false, false, SIZE * 2, SIZE); - } - - // Fake IO driver loop - private static class TestIOLoop extends IOLoop { - - public TestIOLoop() throws IOException { - super(500); - } - - @Override - protected TestMessageStream createStream(ByteChannel channel) { - return new TestMessageStream(SIZE, channel, this); - } - - @Override - protected void processMessages(List messages, - MessageStream stream) { - } - - } - - // Byte channel test fixture - private static class TestByteChannel extends SelectableChannel implements ByteChannel { - - private static final int BUFFER_LENGTH = 1024; - byte[] bytes = new byte[BUFFER_LENGTH]; - int bytesToWrite = BUFFER_LENGTH; - int bytesToRead = BUFFER_LENGTH; - int writtenBytes = 0; - int readBytes = 0; - - @Override - public int read(ByteBuffer dst) throws IOException { - int l = Math.min(dst.remaining(), bytesToRead); - if (bytesToRead > 0) { - readBytes += l; - dst.put(bytes, 0, l); - } - return l; - } - - @Override - public int write(ByteBuffer src) throws IOException { - int l = Math.min(src.remaining(), bytesToWrite); - writtenBytes += l; - src.get(bytes, 0, l); - return l; - } - - @Override - public Object blockingLock() { - return null; - } - - @Override - public SelectableChannel configureBlocking(boolean arg0) throws IOException { - return null; - } - - @Override - public boolean isBlocking() { - return false; - } - - @Override - public boolean isRegistered() { - return false; - } - - @Override - public SelectionKey keyFor(Selector arg0) { - return null; - } - - @Override - public SelectorProvider provider() { - return null; - } - - @Override - public SelectionKey register(Selector arg0, int arg1, Object arg2) - throws ClosedChannelException { - return null; - } - - @Override - public int validOps() { - return 0; - } - - @Override - protected void implCloseChannel() throws IOException { - bytesToRead = -1; - } - - } - - // Selection key text fixture - private static class TestKey extends SelectionKey { - - private SelectableChannel channel; - - public TestKey(TestByteChannel channel) { - this.channel = channel; - } - - @Override - public void cancel() { - } - - @Override - public SelectableChannel channel() { - return channel; - } - - @Override - public int interestOps() { - return 0; - } - - @Override - public SelectionKey interestOps(int ops) { - return null; - } - - @Override - public boolean isValid() { - return true; - } - - @Override - public int readyOps() { - return 0; - } - - @Override - public Selector selector() { - return null; - } - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java deleted file mode 100644 index cf5b2bb9..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/MockSelector.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.AbstractSelectableChannel; -import java.nio.channels.spi.AbstractSelector; -import java.util.Set; - -/** - * A selector instrumented for unit tests. - */ -public class MockSelector extends AbstractSelector { - - int wakeUpCount = 0; - - /** - * Creates a mock selector, specifying null as the SelectorProvider. - */ - public MockSelector() { - super(null); - } - - @Override - public String toString() { - return "{MockSelector: wake=" + wakeUpCount + "}"; - } - - @Override - protected void implCloseSelector() throws IOException { - } - - @Override - protected SelectionKey register(AbstractSelectableChannel ch, int ops, - Object att) { - return null; - } - - @Override - public Set keys() { - return null; - } - - @Override - public Set selectedKeys() { - return null; - } - - @Override - public int selectNow() throws IOException { - return 0; - } - - @Override - public int select(long timeout) throws IOException { - return 0; - } - - @Override - public int select() throws IOException { - return 0; - } - - @Override - public Selector wakeup() { - wakeUpCount++; - return null; - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java deleted file mode 100644 index 825479b5..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessage.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Test message for measuring rate and round-trip latency. - */ -public class TestMessage extends AbstractMessage { - - private final byte[] padding; - - private final long requestorTime; - private final long responderTime; - - /** - * Creates a new message with the specified data. - * - * @param requestorTime requester time - * @param responderTime responder time - * @param padding message padding - */ - TestMessage(int length, long requestorTime, long responderTime, byte[] padding) { - this.length = length; - this.requestorTime = requestorTime; - this.responderTime = responderTime; - this.padding = checkNotNull(padding, "Padding cannot be null"); - } - - public long requestorTime() { - return requestorTime; - } - - public long responderTime() { - return responderTime; - } - - public byte[] padding() { - return padding; - } - -} diff --git a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java b/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java deleted file mode 100644 index fc86beb9..00000000 --- a/framework/src/onos/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -/** - * Fixed-length message transfer buffer. - */ -public class TestMessageStream extends MessageStream { - - private static final String E_WRONG_LEN = "Illegal message length: "; - private static final long START_TAG = 0xfeedcafedeaddeedL; - private static final long END_TAG = 0xbeadcafedeaddeedL; - private static final int META_LENGTH = 40; - - private final int length; - private boolean isStrict = true; - - public TestMessageStream(int length, ByteChannel ch, IOLoop loop) { - super(loop, ch, 64 * 1024, 500); - checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40"); - this.length = length; - } - - void setNonStrict() { - isStrict = false; - } - - @Override - protected TestMessage read(ByteBuffer rb) { - if (rb.remaining() < length) { - return null; - } - - long startTag = rb.getLong(); - if (isStrict) { - checkState(startTag == START_TAG, "Incorrect message start"); - } - - long size = rb.getLong(); - long requestorTime = rb.getLong(); - long responderTime = rb.getLong(); - byte[] padding = padding(); - rb.get(padding); - - long endTag = rb.getLong(); - if (isStrict) { - checkState(endTag == END_TAG, "Incorrect message end"); - } - - return new TestMessage((int) size, requestorTime, responderTime, padding); - } - - @Override - protected void write(TestMessage message, ByteBuffer wb) { - if (message.length() != length) { - throw new IllegalArgumentException(E_WRONG_LEN + message.length()); - } - - wb.putLong(START_TAG); - wb.putLong(message.length()); - wb.putLong(message.requestorTime()); - wb.putLong(message.responderTime()); - wb.put(message.padding(), 0, length - META_LENGTH); - wb.putLong(END_TAG); - } - - public byte[] padding() { - return new byte[length - META_LENGTH]; - } -} -- cgit 1.2.3-korg