diff options
Diffstat (limited to 'framework/src/onos/utils/nio/src/main/java/org')
12 files changed, 0 insertions, 1768 deletions
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java deleted file mode 100644 index 16bb9359..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -/** - * Base {@link Message} implementation. - */ -public abstract class AbstractMessage implements Message { - - protected int length; - - @Override - public int length() { - return length; - } - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java deleted file mode 100644 index e416f3be..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import java.io.IOException; -import java.net.SocketAddress; -import java.net.StandardSocketOptions; -import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; -import java.util.Iterator; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Selector loop derivative tailored to acceptConnection inbound connections. - */ -public abstract class AcceptorLoop extends SelectorLoop { - - private SocketAddress listenAddress; - private ServerSocketChannel socketChannel; - - /** - * Creates an acceptor loop with the specified selection timeout and - * accepting connections on the the given address. - * - * @param selectTimeout selection timeout; specified in millis - * @param listenAddress socket address where to listen for connections - * @throws IOException if the backing selector cannot be opened - */ - public AcceptorLoop(long selectTimeout, SocketAddress listenAddress) - throws IOException { - super(selectTimeout); - this.listenAddress = checkNotNull(listenAddress, "Address cannot be null"); - } - - /** - * Hook to accept an inbound connection on the specified socket channel. - * - * @param channel socketChannel where an accept operation awaits - * @throws IOException if the accept operation cannot be processed - */ - protected abstract void acceptConnection(ServerSocketChannel channel) throws IOException; - - /** - * Opens a new server socket channel configured in non-blocking mode and - * bound to the loop's listen address. - * - * @throws IOException if unable to open or configure the socket channel - */ - protected synchronized void openChannel() throws IOException { - socketChannel = ServerSocketChannel.open(); - socketChannel.configureBlocking(false); - socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - socketChannel.register(selector, SelectionKey.OP_ACCEPT); - socketChannel.bind(listenAddress); - } - - /** - * Closes the server socket channel. - * - * @throws IOException if unable to close the socketChannel - */ - protected synchronized void closechannel() throws IOException { - if (socketChannel != null) { - socketChannel.close(); - socketChannel = null; - } - } - - @Override - public void shutdown() { - try { - closechannel(); - } catch (IOException e) { - log.warn("Unable to close the socketChannel", e); - } - super.shutdown(); - } - - @Override - protected void loop() throws IOException { - openChannel(); - notifyReady(); - - // Keep looping until told otherwise. - while (isRunning()) { - // Attempt a selection; if no operations selected or if signalled - // to shutdown, spin through. - int count = selector.select(selectTimeout); - if (count == 0 || !isRunning()) { - continue; - } - - // Iterate over all keys selected for an operation and process them. - Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); - while (keys.hasNext()) { - // Fetch the key and remove it from the pending list. - SelectionKey key = keys.next(); - keys.remove(); - - // If the key has a pending acceptConnection operation, process it. - if (key.isAcceptable()) { - acceptConnection((ServerSocketChannel) key.channel()); - } - } - } - } - -} - diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java deleted file mode 100644 index 106df7b2..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import java.io.IOException; -import java.nio.channels.ByteChannel; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArraySet; - -/** - * I/O loop for driving inbound & outbound {@link Message} transfer via - * {@link MessageStream}. - * - * @param <M> message type - * @param <S> message stream type - */ -public abstract class IOLoop<M extends Message, S extends MessageStream<M>> - extends SelectorLoop { - - // Queue of requests for new message streams to enter the IO loop processing. - private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>(); - - // Carries information required for admitting a new message stream. - private class NewStreamRequest { - private final S stream; - private final SelectableChannel channel; - private final int op; - - public NewStreamRequest(S stream, SelectableChannel channel, int op) { - this.stream = stream; - this.channel = channel; - this.op = op; - } - } - - // Set of message streams currently admitted into the IO loop. - private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>(); - - /** - * Creates an IO loop with the given selection timeout. - * - * @param timeout selection timeout in milliseconds - * @throws IOException if the backing selector cannot be opened - */ - public IOLoop(long timeout) throws IOException { - super(timeout); - } - - /** - * Returns the number of message stream in custody of the loop. - * - * @return number of message streams - */ - public int streamCount() { - return streams.size(); - } - - /** - * Creates a new message stream backed by the specified socket channel. - * - * @param byteChannel backing byte channel - * @return newly created message stream - */ - protected abstract S createStream(ByteChannel byteChannel); - - /** - * Removes the specified message stream from the IO loop. - * - * @param stream message stream to remove - */ - protected void removeStream(MessageStream<M> stream) { - streams.remove(stream); - } - - /** - * Processes the list of messages extracted from the specified message - * stream. - * - * @param messages non-empty list of received messages - * @param stream message stream from which the messages were extracted - */ - protected abstract void processMessages(List<M> messages, MessageStream<M> stream); - - /** - * Completes connection request pending on the given selection key. - * - * @param key selection key holding the pending connect operation. - * @throws IOException when I/O exception of some sort has occurred - */ - protected void connect(SelectionKey key) throws IOException { - SocketChannel ch = (SocketChannel) key.channel(); - ch.finishConnect(); - if (key.isValid()) { - key.interestOps(SelectionKey.OP_READ); - } - } - - /** - * Processes an IO operation pending on the specified key. - * - * @param key selection key holding the pending I/O operation. - */ - protected void processKeyOperation(SelectionKey key) { - @SuppressWarnings("unchecked") - S stream = (S) key.attachment(); - - try { - // If the key is not valid, bail out. - if (!key.isValid()) { - stream.close(); - return; - } - - // If there is a pending connect operation, complete it. - if (key.isConnectable()) { - try { - connect(key); - } catch (IOException | IllegalStateException e) { - log.warn("Unable to complete connection", e); - } - } - - // If there is a read operation, slurp as much data as possible. - if (key.isReadable()) { - List<M> messages = stream.read(); - - // No messages or failed flush imply disconnect; bail. - if (messages == null || stream.hadError()) { - stream.close(); - return; - } - - // If there were any messages read, process them. - if (!messages.isEmpty()) { - try { - processMessages(messages, stream); - } catch (RuntimeException e) { - onError(stream, e); - } - } - } - - // If there are pending writes, flush them - if (key.isWritable()) { - stream.flushIfPossible(); - } - - // If there were any issued flushing, close the stream. - if (stream.hadError()) { - stream.close(); - } - - } catch (CancelledKeyException e) { - // Key was cancelled, so silently close the stream - stream.close(); - } catch (IOException e) { - if (!stream.isClosed() && !isResetByPeer(e)) { - log.warn("Unable to process IO", e); - } - stream.close(); - } - } - - // Indicates whether or not this exception is caused by 'reset by peer'. - private boolean isResetByPeer(IOException e) { - Throwable cause = e.getCause(); - return cause != null && cause instanceof IOException && - cause.getMessage().contains("reset by peer"); - } - - /** - * Hook to allow intercept of any errors caused during message processing. - * Default behaviour is to rethrow the error. - * - * @param stream message stream involved in the error - * @param error the runtime exception - */ - protected void onError(S stream, RuntimeException error) { - throw error; - } - - /** - * Admits a new message stream backed by the specified socket channel - * with a pending accept operation. - * - * @param channel backing socket channel - * @return newly accepted message stream - */ - public S acceptStream(SocketChannel channel) { - return createAndAdmit(channel, SelectionKey.OP_READ); - } - - - /** - * Admits a new message stream backed by the specified socket channel - * with a pending connect operation. - * - * @param channel backing socket channel - * @return newly connected message stream - */ - public S connectStream(SocketChannel channel) { - return createAndAdmit(channel, SelectionKey.OP_CONNECT); - } - - /** - * Creates a new message stream backed by the specified socket channel - * and admits it into the IO loop. - * - * @param channel socket channel - * @param op pending operations mask to be applied to the selection - * key as a set of initial interestedOps - * @return newly created message stream - */ - private synchronized S createAndAdmit(SocketChannel channel, int op) { - S stream = createStream(channel); - streams.add(stream); - newStreamRequests.add(new NewStreamRequest(stream, channel, op)); - selector.wakeup(); - return stream; - } - - /** - * Safely admits new streams into the IO loop. - */ - private void admitNewStreams() { - Iterator<NewStreamRequest> it = newStreamRequests.iterator(); - while (isRunning() && it.hasNext()) { - try { - NewStreamRequest request = it.next(); - it.remove(); - SelectionKey key = request.channel.register(selector, request.op, - request.stream); - request.stream.setKey(key); - } catch (ClosedChannelException e) { - log.warn("Unable to admit new message stream", e); - } - } - } - - @Override - protected void loop() throws IOException { - notifyReady(); - - // Keep going until told otherwise. - while (isRunning()) { - admitNewStreams(); - - // Process flushes & write selects on all streams - for (MessageStream<M> stream : streams) { - stream.flushIfWriteNotPending(); - } - - // Select keys and process them. - int count = selector.select(selectTimeout); - if (count > 0 && isRunning()) { - Iterator<SelectionKey> it = selector.selectedKeys().iterator(); - while (it.hasNext()) { - SelectionKey key = it.next(); - it.remove(); - processKeyOperation(key); - } - } - } - } - - /** - * Prunes the registered streams by discarding any stale ones. - * - * @return number of remaining streams - */ - public synchronized int pruneStaleStreams() { - for (MessageStream<M> stream : streams) { - if (stream.isStale()) { - stream.close(); - } - } - return streams.size(); - } - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java deleted file mode 100644 index c1a339e0..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -/** - * Representation of a message transferred via {@link MessageStream}. - */ -public interface Message { - - /** - * Gets the message length in bytes. - * - * @return number of bytes - */ - int length(); - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java deleted file mode 100644 index a19e8aac..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.onlab.util.Counter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.System.currentTimeMillis; -import static java.nio.ByteBuffer.allocateDirect; - -/** - * Bi-directional message stream for transferring messages to & from the - * network via two byte buffers. - * - * @param <M> message type - */ -public abstract class MessageStream<M extends Message> { - - protected Logger log = LoggerFactory.getLogger(getClass()); - - private final IOLoop<M, ?> loop; - private final ByteChannel channel; - private final int maxIdleMillis; - - private final ByteBuffer inbound; - private ByteBuffer outbound; - private SelectionKey key; - - private volatile boolean closed = false; - private volatile boolean writePending; - private volatile boolean writeOccurred; - - private Exception ioError; - private long lastActiveTime; - - private final Counter bytesIn = new Counter(); - private final Counter messagesIn = new Counter(); - private final Counter bytesOut = new Counter(); - private final Counter messagesOut = new Counter(); - - /** - * Creates a message stream associated with the specified IO loop and - * backed by the given byte channel. - * - * @param loop IO loop - * @param byteChannel backing byte channel - * @param bufferSize size of the backing byte buffers - * @param maxIdleMillis maximum number of millis the stream can be idle - * before it will be closed - */ - protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel, - int bufferSize, int maxIdleMillis) { - this.loop = checkNotNull(loop, "Loop cannot be null"); - this.channel = checkNotNull(byteChannel, "Byte channel cannot be null"); - - checkArgument(maxIdleMillis > 0, "Idle time must be positive"); - this.maxIdleMillis = maxIdleMillis; - - inbound = allocateDirect(bufferSize); - outbound = allocateDirect(bufferSize); - } - - /** - * Gets a single message from the specified byte buffer; this is - * to be done without manipulating the buffer via flip, reset or clear. - * - * @param buffer byte buffer - * @return read message or null if there are not enough bytes to read - * a complete message - */ - protected abstract M read(ByteBuffer buffer); - - /** - * Puts the specified message into the specified byte buffer; this is - * to be done without manipulating the buffer via flip, reset or clear. - * - * @param message message to be write into the buffer - * @param buffer byte buffer - */ - protected abstract void write(M message, ByteBuffer buffer); - - /** - * Closes the message buffer. - */ - public void close() { - synchronized (this) { - if (closed) { - return; - } - closed = true; - } - - bytesIn.freeze(); - bytesOut.freeze(); - messagesIn.freeze(); - messagesOut.freeze(); - - loop.removeStream(this); - if (key != null) { - try { - key.cancel(); - key.channel().close(); - } catch (IOException e) { - log.warn("Unable to close stream", e); - } - } - } - - /** - * Indicates whether this buffer has been closed. - * - * @return true if this stream has been closed - */ - public synchronized boolean isClosed() { - return closed; - } - - /** - * Returns the stream IO selection key. - * - * @return socket channel registration selection key - */ - public SelectionKey key() { - return key; - } - - /** - * Binds the selection key to be used for driving IO operations on the stream. - * - * @param key IO selection key - */ - public void setKey(SelectionKey key) { - this.key = key; - this.lastActiveTime = currentTimeMillis(); - } - - /** - * Returns the IO loop to which this stream is bound. - * - * @return I/O loop used to drive this stream - */ - public IOLoop<M, ?> loop() { - return loop; - } - - /** - * Indicates whether the any prior IO encountered an error. - * - * @return true if a write failed - */ - public boolean hadError() { - return ioError != null; - } - - /** - * Gets the prior IO error, if one occurred. - * - * @return IO error; null if none occurred - */ - public Exception getError() { - return ioError; - } - - /** - * Reads, without blocking, a list of messages from the stream. - * The list will be empty if there were not messages pending. - * - * @return list of messages or null if backing channel has been closed - * @throws IOException if messages could not be read - */ - public List<M> read() throws IOException { - try { - int read = channel.read(inbound); - if (read != -1) { - // Read the messages one-by-one and add them to the list. - List<M> messages = new ArrayList<>(); - M message; - inbound.flip(); - while ((message = read(inbound)) != null) { - messages.add(message); - messagesIn.add(1); - bytesIn.add(message.length()); - } - inbound.compact(); - - // Mark the stream with current time to indicate liveness. - lastActiveTime = currentTimeMillis(); - return messages; - } - return null; - - } catch (Exception e) { - throw new IOException("Unable to read messages", e); - } - } - - /** - * Writes the specified list of messages to the stream. - * - * @param messages list of messages to write - * @throws IOException if error occurred while writing the data - */ - public void write(List<M> messages) throws IOException { - synchronized (this) { - // First write all messages. - for (M m : messages) { - append(m); - } - flushUnlessAlreadyPlanningTo(); - } - } - - /** - * Writes the given message to the stream. - * - * @param message message to write - * @throws IOException if error occurred while writing the data - */ - public void write(M message) throws IOException { - synchronized (this) { - append(message); - flushUnlessAlreadyPlanningTo(); - } - } - - // Appends the specified message into the internal buffer, growing the - // buffer if required. - private void append(M message) { - // If the buffer does not have sufficient length double it. - while (outbound.remaining() < message.length()) { - doubleSize(); - } - write(message, outbound); - messagesOut.add(1); - bytesOut.add(message.length()); - } - - // Forces a flush, unless one is planned already. - private void flushUnlessAlreadyPlanningTo() throws IOException { - if (!writeOccurred && !writePending) { - flush(); - } - } - - /** - * Flushes any pending writes. - * - * @throws IOException if flush failed - */ - public void flush() throws IOException { - synchronized (this) { - if (!writeOccurred && !writePending) { - outbound.flip(); - try { - channel.write(outbound); - } catch (IOException e) { - if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) { - log.warn("Unable to write data", e); - ioError = e; - } - } - lastActiveTime = currentTimeMillis(); - writeOccurred = true; - writePending = outbound.hasRemaining(); - outbound.compact(); - } - } - } - - /** - * Indicates whether the stream has bytes to be written to the channel. - * - * @return true if there are bytes to be written - */ - boolean isWritePending() { - synchronized (this) { - return writePending; - } - } - - - /** - * Indicates whether data has been written but not flushed yet. - * - * @return true if flush is required - */ - boolean isFlushRequired() { - synchronized (this) { - return outbound.position() > 0; - } - } - - /** - * Attempts to flush data, internal stream state and channel availability - * permitting. Invoked by the driver I/O loop during handling of writable - * selection key. - * <p> - * Resets the internal state flags {@code writeOccurred} and - * {@code writePending}. - * </p> - * @throws IOException if implicit flush failed - */ - void flushIfPossible() throws IOException { - synchronized (this) { - writePending = false; - writeOccurred = false; - if (outbound.position() > 0) { - flush(); - } - } - key.interestOps(SelectionKey.OP_READ); - } - - /** - * Attempts to flush data, internal stream state and channel availability - * permitting and if other writes are not pending. Invoked by the driver - * I/O loop prior to entering select wait. Resets the internal - * {@code writeOccurred} state flag. - * - * @throws IOException if implicit flush failed - */ - void flushIfWriteNotPending() throws IOException { - synchronized (this) { - writeOccurred = false; - if (!writePending && outbound.position() > 0) { - flush(); - } - } - if (isWritePending()) { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - } - } - - /** - * Doubles the size of the outbound buffer. - */ - private void doubleSize() { - ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2); - outbound.flip(); - newBuffer.put(outbound); - outbound = newBuffer; - } - - /** - * Returns the maximum number of milliseconds the stream is allowed - * without any read/write operations. - * - * @return number if millis of permissible idle time - */ - protected int maxIdleMillis() { - return maxIdleMillis; - } - - - /** - * Returns true if the given stream has gone stale. - * - * @return true if the stream is stale - */ - boolean isStale() { - return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null; - } - - /** - * Returns the inbound bytes counter. - * - * @return inbound bytes counter - */ - public Counter bytesIn() { - return bytesIn; - } - - /** - * Returns the outbound bytes counter. - * - * @return outbound bytes counter - */ - public Counter bytesOut() { - return bytesOut; - } - - /** - * Returns the inbound messages counter. - * - * @return inbound messages counter - */ - public Counter messagesIn() { - return messagesIn; - } - - /** - * Returns the outbound messages counter. - * - * @return outbound messages counter - */ - public Counter messagesOut() { - return messagesOut; - } - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java deleted file mode 100644 index 95a9b61e..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.channels.Selector; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.lang.System.currentTimeMillis; - -/** - * Abstraction of an I/O processing loop based on an NIO selector. - */ -public abstract class SelectorLoop implements Runnable { - - protected final Logger log = LoggerFactory.getLogger(getClass()); - - /** - * Selector used by this loop to pace the I/O operations. - */ - protected final Selector selector; - - /** - * Selection operations timeout; specified in millis. - */ - protected long selectTimeout; - - /** - * Retains the error that caused the loop to exit prematurely. - */ - private Throwable error; - - // State indicator - private enum State { STARTING, STARTED, STOPPING, STOPPED }; - private volatile State state = State.STOPPED; - - /** - * Creates a new selector loop with the given selection timeout. - * - * @param selectTimeout selection timeout; specified in millis - * @throws IOException if the backing selector cannot be opened - */ - public SelectorLoop(long selectTimeout) throws IOException { - checkArgument(selectTimeout > 0, "Timeout must be positive"); - this.selectTimeout = selectTimeout; - this.selector = openSelector(); - } - - /** - * Opens a new selector for the use by the loop. - * - * @return newly open selector - * @throws IOException if the backing selector cannot be opened - */ - protected Selector openSelector() throws IOException { - return Selector.open(); - } - - /** - * Indicates that the loop is marked to run. - * @return true if the loop is marked to run - */ - protected boolean isRunning() { - return state == State.STARTED || state == State.STARTING; - } - - /** - * Returns the error, if there was one, that caused the loop to terminate - * prematurely. - * - * @return error or null if there was none - */ - public Throwable getError() { - return error; - } - - /** - * Contains the body of the I/O selector loop. - * - * @throws IOException if an error is encountered while selecting I/O - */ - protected abstract void loop() throws IOException; - - @Override - public void run() { - error = null; - state = State.STARTING; - try { - loop(); - } catch (Exception e) { - error = e; - log.error("Loop aborted", e); - } - notifyDone(); - } - - /** - * Notifies observers waiting for loop to become ready. - */ - protected synchronized void notifyReady() { - state = State.STARTED; - notifyAll(); - } - - /** - * Triggers loop shutdown. - */ - public void shutdown() { - // Mark the loop as no longer running and wake up the selector. - state = State.STOPPING; - selector.wakeup(); - } - - /** - * Notifies observers waiting for loop to fully stop. - */ - private synchronized void notifyDone() { - state = State.STOPPED; - notifyAll(); - } - - /** - * Waits for the loop execution to start. - * - * @param timeout number of milliseconds to wait - * @return true if loop started in time - */ - public final synchronized boolean awaitStart(long timeout) { - long max = currentTimeMillis() + timeout; - while (state != State.STARTED && (currentTimeMillis() < max)) { - try { - wait(timeout); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted", e); - } - } - return state == State.STARTED; - } - - /** - * Waits for the loop execution to stop. - * - * @param timeout number of milliseconds to wait - * @return true if loop finished in time - */ - public final synchronized boolean awaitStop(long timeout) { - long max = currentTimeMillis() + timeout; - while (state != State.STOPPED && (currentTimeMillis() < max)) { - try { - wait(timeout); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted", e); - } - } - return state == State.STOPPED; - } - - -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java deleted file mode 100644 index 0d58b568..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2014 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Mechanism to transfer messages over network using IO loop and - * message stream, backed by NIO byte buffers. - */ -package org.onlab.nio; diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java deleted file mode 100644 index 0d977929..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import java.io.IOException; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.util.List; -import java.util.function.Consumer; - -import org.onlab.nio.IOLoop; -import org.onlab.nio.MessageStream; - -/** - * IOLoop for transporting DefaultMessages. - */ -public class DefaultIOLoop extends IOLoop<DefaultMessage, DefaultMessageStream> { - - public static final int SELECT_TIMEOUT_MILLIS = 500; - private static final int MAX_IDLE_TIMEOUT_MILLIS = 1000; - private static final int BUFFER_SIZE = 1024 * 1024; - private final Consumer<DefaultMessage> consumer; - - public DefaultIOLoop(Consumer<DefaultMessage> consumer) throws IOException { - this(SELECT_TIMEOUT_MILLIS, consumer); - } - - public DefaultIOLoop(long timeout, Consumer<DefaultMessage> consumer) throws IOException { - super(timeout); - this.consumer = consumer; - } - - @Override - protected DefaultMessageStream createStream(ByteChannel byteChannel) { - return new DefaultMessageStream(this, byteChannel, BUFFER_SIZE, MAX_IDLE_TIMEOUT_MILLIS); - } - - @Override - protected void processMessages(List<DefaultMessage> messages, MessageStream<DefaultMessage> stream) { - messages.forEach(consumer); - } - - @Override - protected void connect(SelectionKey key) throws IOException { - DefaultMessageStream stream = (DefaultMessageStream) key.attachment(); - try { - super.connect(key); - stream.connected(); - } catch (Exception e) { - stream.connectFailed(e); - } - } -}
\ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java deleted file mode 100644 index 591d49b3..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.onlab.nio.AbstractMessage; -import org.onlab.packet.IpAddress; -import org.onlab.util.ByteArraySizeHashPrinter; -import org.onosproject.store.cluster.messaging.Endpoint; - -import com.google.common.base.Charsets; -import com.google.common.base.MoreObjects; - -/** - * Default message. - */ -public class DefaultMessage extends AbstractMessage { - - private long id; - private Endpoint sender; - private String type; - private byte[] payload; - - /** - * Creates a new message with the specified data. - * - * @param id message id - * @param type message type - * @param sender sender endpoint - * @param payload message payload - */ - DefaultMessage(long id, Endpoint sender, String type, byte[] payload) { - this.id = id; - this.type = checkNotNull(type, "Type cannot be null"); - this.sender = checkNotNull(sender, "Sender cannot be null"); - this.payload = checkNotNull(payload, "Payload cannot be null"); - - byte[] messageTypeBytes = type.getBytes(Charsets.UTF_8); - IpAddress senderIp = sender.host(); - byte[] ipOctets = senderIp.toOctets(); - - length = 25 + ipOctets.length + messageTypeBytes.length + payload.length; - } - - /** - * Returns message id. - * - * @return message id - */ - public long id() { - return id; - } - - /** - * Returns message sender. - * - * @return message sender - */ - public Endpoint sender() { - return sender; - } - - /** - * Returns message type. - * - * @return message type - */ - public String type() { - return type; - } - - /** - * Returns message payload. - * - * @return payload - */ - public byte[] payload() { - return payload; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("id", id) - .add("type", type) - .add("sender", sender) - .add("payload", ByteArraySizeHashPrinter.of(payload)) - .toString(); - } -}
\ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java deleted file mode 100644 index a7dd3c04..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; - -import org.onlab.nio.IOLoop; -import org.onlab.nio.MessageStream; -import org.onlab.packet.IpAddress; -import org.onlab.packet.IpAddress.Version; -import org.onosproject.store.cluster.messaging.Endpoint; - -import com.google.common.base.Charsets; - -/** - * Default bi-directional message stream for transferring messages to & from the - * network via two byte buffers. - */ -public class DefaultMessageStream extends MessageStream<DefaultMessage> { - - private final CompletableFuture<Void> connectFuture = new CompletableFuture<>(); - - public DefaultMessageStream( - IOLoop<DefaultMessage, ?> loop, - ByteChannel byteChannel, - int bufferSize, - int maxIdleMillis) { - super(loop, byteChannel, bufferSize, maxIdleMillis); - } - - public CompletableFuture<DefaultMessageStream> connectedFuture() { - return connectFuture.thenApply(v -> this); - } - - private final AtomicInteger messageLength = new AtomicInteger(-1); - - @Override - protected DefaultMessage read(ByteBuffer buffer) { - if (messageLength.get() == -1) { - // check if we can read the message length. - if (buffer.remaining() < Integer.BYTES) { - return null; - } else { - messageLength.set(buffer.getInt()); - } - } - - if (buffer.remaining() < messageLength.get()) { - return null; - } - - long id = buffer.getLong(); - Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6; - byte[] octects = new byte[IpAddress.byteLength(ipVersion)]; - buffer.get(octects); - IpAddress senderIp = IpAddress.valueOf(ipVersion, octects); - int senderPort = buffer.getInt(); - int messageTypeByteLength = buffer.getInt(); - byte[] messageTypeBytes = new byte[messageTypeByteLength]; - buffer.get(messageTypeBytes); - String messageType = new String(messageTypeBytes, Charsets.UTF_8); - int payloadLength = buffer.getInt(); - byte[] payloadBytes = new byte[payloadLength]; - buffer.get(payloadBytes); - - // reset for next message - messageLength.set(-1); - - return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes); - } - - @Override - protected void write(DefaultMessage message, ByteBuffer buffer) { - Endpoint sender = message.sender(); - byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8); - IpAddress senderIp = sender.host(); - byte[] ipOctets = senderIp.toOctets(); - byte[] payload = message.payload(); - - int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length; - - buffer.putInt(messageLength); - - buffer.putLong(message.id()); - - if (senderIp.version() == Version.INET) { - buffer.put((byte) 0x0); - } else { - buffer.put((byte) 0x1); - } - buffer.put(ipOctets); - - // write sender port - buffer.putInt(sender.port()); - - // write length of message type - buffer.putInt(messageTypeBytes.length); - - // write message type bytes - buffer.put(messageTypeBytes); - - // write payload length - buffer.putInt(payload.length); - - // write payload. - buffer.put(payload); - } - - /** - * Callback invoked when the stream is successfully connected. - */ - public void connected() { - connectFuture.complete(null); - } - - /** - * Callback invoked when the stream fails to connect. - * @param cause failure cause - */ - public void connectFailed(Throwable cause) { - connectFuture.completeExceptionally(cause); - } -}
\ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java deleted file mode 100644 index c195d160..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import static org.onlab.util.Tools.groupedThreads; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.apache.commons.pool.KeyedPoolableObjectFactory; -import org.apache.commons.pool.impl.GenericKeyedObjectPool; -import org.onlab.nio.AcceptorLoop; -import org.onlab.nio.SelectorLoop; -import org.onosproject.store.cluster.messaging.Endpoint; -import org.onosproject.store.cluster.messaging.MessagingService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Lists; - -/** - * MessagingService implementation based on IOLoop. - */ -public class IOLoopMessaging implements MessagingService { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY"; - - static final long TIMEOUT = 1000; - - static final boolean SO_NO_DELAY = false; - static final int SO_SEND_BUFFER_SIZE = 128 * 1024; - static final int SO_RCV_BUFFER_SIZE = 128 * 1024; - - private static final int NUM_WORKERS = 8; - - private AcceptorLoop acceptorLoop; - private final ExecutorService acceptorThreadPool = - Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor")); - private final ExecutorService ioThreadPool = - Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d")); - - private final List<DefaultIOLoop> ioLoops = Lists.newArrayList(); - - private int lastWorker = -1; - - private final AtomicBoolean started = new AtomicBoolean(false); - private Endpoint localEp; - - private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams = - new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory()); - - private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>(); - private final AtomicLong messageIdGenerator = new AtomicLong(0); - private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder() - .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() { - @Override - public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) { - if (entry.wasEvicted()) { - entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply")); - } - } - }) - .build(); - - /** - * Activates IO Loops. - * - * @param localEp local end-point - * @throws IOException is activation fails - */ - public void start(Endpoint localEp) throws IOException { - if (started.get()) { - log.warn("IOMessaging is already running at {}", localEp); - return; - } - this.localEp = localEp; - streams.setLifo(false); - this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port())); - - for (int i = 0; i < NUM_WORKERS; i++) { - ioLoops.add(new DefaultIOLoop(this::dispatchLocally)); - } - - ioLoops.forEach(ioThreadPool::execute); - acceptorThreadPool.execute(acceptorLoop); - ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT)); - acceptorLoop.awaitStart(TIMEOUT); - started.set(true); - } - - /** - * Shuts down IO loops. - */ - public void stop() { - if (started.get()) { - ioLoops.forEach(SelectorLoop::shutdown); - acceptorLoop.shutdown(); - ioThreadPool.shutdown(); - acceptorThreadPool.shutdown(); - started.set(false); - } - } - - - @Override - public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) { - DefaultMessage message = new DefaultMessage( - messageIdGenerator.incrementAndGet(), - localEp, - type, - payload); - return sendAsync(ep, message); - } - - protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) { - CompletableFuture<Void> future = new CompletableFuture<>(); - if (ep.equals(localEp)) { - dispatchLocally(message); - future.complete(null); - return future; - } - - DefaultMessageStream stream = null; - try { - stream = streams.borrowObject(ep); - stream.write(message); - future.complete(null); - } catch (Exception e) { - future.completeExceptionally(e); - } finally { - try { - streams.returnObject(ep, stream); - } catch (Exception e) { - log.warn("Failed to return stream to pool"); - } - } - return future; - } - - @Override - public CompletableFuture<byte[]> sendAndReceive( - Endpoint ep, - String type, - byte[] payload) { - CompletableFuture<byte[]> response = new CompletableFuture<>(); - Long messageId = messageIdGenerator.incrementAndGet(); - responseFutures.put(messageId, response); - DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload); - try { - sendAsync(ep, message); - } catch (Exception e) { - responseFutures.invalidate(messageId); - response.completeExceptionally(e); - } - return response; - } - - @Override - public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) { - handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload()))); - } - - @Override - public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) { - handlers.put(type, message -> executor.execute(() -> { - byte[] responsePayload = handler.apply(message.payload()); - if (responsePayload != null) { - DefaultMessage response = new DefaultMessage(message.id(), - localEp, - REPLY_MESSAGE_TYPE, - responsePayload); - sendAsync(message.sender(), response).whenComplete((result, error) -> { - log.debug("Failed to respond", error); - }); - } - })); - } - - @Override - public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) { - handlers.put(type, message -> { - handler.apply(message.payload()).whenComplete((result, error) -> { - if (error == null) { - DefaultMessage response = new DefaultMessage(message.id(), - localEp, - REPLY_MESSAGE_TYPE, - result); - sendAsync(message.sender(), response).whenComplete((r, e) -> { - if (e != null) { - log.debug("Failed to respond", e); - } - }); - } - }); - }); - } - - @Override - public void unregisterHandler(String type) { - handlers.remove(type); - } - - protected void dispatchLocally(DefaultMessage message) { - String type = message.type(); - if (REPLY_MESSAGE_TYPE.equals(type)) { - try { - CompletableFuture<byte[]> futureResponse = - responseFutures.getIfPresent(message.id()); - if (futureResponse != null) { - futureResponse.complete(message.payload()); - } else { - log.warn("Received a reply for message id:[{}]. " - + " from {}. But was unable to locate the" - + " request handle", message.id(), message.sender()); - } - } finally { - responseFutures.invalidate(message.id()); - } - return; - } - Consumer<DefaultMessage> handler = handlers.get(type); - if (handler != null) { - handler.accept(message); - } else { - log.debug("No handler registered for {}", type); - } - } - - // Get the next worker to which a client should be assigned - private synchronized DefaultIOLoop nextWorker() { - lastWorker = (lastWorker + 1) % NUM_WORKERS; - return ioLoops.get(lastWorker); - } - - /** - * Initiates open connection request and registers the pending socket - * channel with the given IO loop. - * - * @param loop loop with which the channel should be registered - * @throws java.io.IOException if the socket could not be open or connected - */ - private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException { - SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port()); - SocketChannel ch = SocketChannel.open(); - ch.configureBlocking(false); - DefaultMessageStream stream = loop.connectStream(ch); - ch.connect(sa); - return stream; - } - - // Loop for accepting client connections - private class DefaultAcceptorLoop extends AcceptorLoop { - - public DefaultAcceptorLoop(SocketAddress address) throws IOException { - super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address); - } - - @Override - protected void acceptConnection(ServerSocketChannel channel) throws IOException { - SocketChannel sc = channel.accept(); - sc.configureBlocking(false); - - Socket so = sc.socket(); - so.setTcpNoDelay(SO_NO_DELAY); - so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); - so.setSendBufferSize(SO_SEND_BUFFER_SIZE); - - nextWorker().acceptStream(sc); - } - } - - private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> { - - @Override - public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception { - } - - @Override - public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception { - stream.close(); - } - - @Override - public DefaultMessageStream makeObject(Endpoint ep) throws Exception { - DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get(); - log.info("Established a new connection to {}", ep); - return stream; - } - - @Override - public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception { - } - - @Override - public boolean validateObject(Endpoint ep, DefaultMessageStream stream) { - return stream.isClosed(); - } - } -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java deleted file mode 100644 index 399a2b95..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Assembly for sending and receiving messages using the I/O loop mechanism. - */ -package org.onlab.nio.service;
\ No newline at end of file |