summaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/nio/src/main/java/org/onlab/nio
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/utils/nio/src/main/java/org/onlab/nio')
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java30
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java123
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java302
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java30
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java424
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java175
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java21
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java66
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java104
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java139
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java334
-rw-r--r--framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java20
12 files changed, 0 insertions, 1768 deletions
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java
deleted file mode 100644
index 16bb9359..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-/**
- * Base {@link Message} implementation.
- */
-public abstract class AbstractMessage implements Message {
-
- protected int length;
-
- @Override
- public int length() {
- return length;
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java
deleted file mode 100644
index e416f3be..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.util.Iterator;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Selector loop derivative tailored to acceptConnection inbound connections.
- */
-public abstract class AcceptorLoop extends SelectorLoop {
-
- private SocketAddress listenAddress;
- private ServerSocketChannel socketChannel;
-
- /**
- * Creates an acceptor loop with the specified selection timeout and
- * accepting connections on the the given address.
- *
- * @param selectTimeout selection timeout; specified in millis
- * @param listenAddress socket address where to listen for connections
- * @throws IOException if the backing selector cannot be opened
- */
- public AcceptorLoop(long selectTimeout, SocketAddress listenAddress)
- throws IOException {
- super(selectTimeout);
- this.listenAddress = checkNotNull(listenAddress, "Address cannot be null");
- }
-
- /**
- * Hook to accept an inbound connection on the specified socket channel.
- *
- * @param channel socketChannel where an accept operation awaits
- * @throws IOException if the accept operation cannot be processed
- */
- protected abstract void acceptConnection(ServerSocketChannel channel) throws IOException;
-
- /**
- * Opens a new server socket channel configured in non-blocking mode and
- * bound to the loop's listen address.
- *
- * @throws IOException if unable to open or configure the socket channel
- */
- protected synchronized void openChannel() throws IOException {
- socketChannel = ServerSocketChannel.open();
- socketChannel.configureBlocking(false);
- socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
- socketChannel.register(selector, SelectionKey.OP_ACCEPT);
- socketChannel.bind(listenAddress);
- }
-
- /**
- * Closes the server socket channel.
- *
- * @throws IOException if unable to close the socketChannel
- */
- protected synchronized void closechannel() throws IOException {
- if (socketChannel != null) {
- socketChannel.close();
- socketChannel = null;
- }
- }
-
- @Override
- public void shutdown() {
- try {
- closechannel();
- } catch (IOException e) {
- log.warn("Unable to close the socketChannel", e);
- }
- super.shutdown();
- }
-
- @Override
- protected void loop() throws IOException {
- openChannel();
- notifyReady();
-
- // Keep looping until told otherwise.
- while (isRunning()) {
- // Attempt a selection; if no operations selected or if signalled
- // to shutdown, spin through.
- int count = selector.select(selectTimeout);
- if (count == 0 || !isRunning()) {
- continue;
- }
-
- // Iterate over all keys selected for an operation and process them.
- Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
- while (keys.hasNext()) {
- // Fetch the key and remove it from the pending list.
- SelectionKey key = keys.next();
- keys.remove();
-
- // If the key has a pending acceptConnection operation, process it.
- if (key.isAcceptable()) {
- acceptConnection((ServerSocketChannel) key.channel());
- }
- }
- }
- }
-
-}
-
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
deleted file mode 100644
index 106df7b2..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import java.io.IOException;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-/**
- * I/O loop for driving inbound &amp; outbound {@link Message} transfer via
- * {@link MessageStream}.
- *
- * @param <M> message type
- * @param <S> message stream type
- */
-public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
- extends SelectorLoop {
-
- // Queue of requests for new message streams to enter the IO loop processing.
- private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
-
- // Carries information required for admitting a new message stream.
- private class NewStreamRequest {
- private final S stream;
- private final SelectableChannel channel;
- private final int op;
-
- public NewStreamRequest(S stream, SelectableChannel channel, int op) {
- this.stream = stream;
- this.channel = channel;
- this.op = op;
- }
- }
-
- // Set of message streams currently admitted into the IO loop.
- private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
-
- /**
- * Creates an IO loop with the given selection timeout.
- *
- * @param timeout selection timeout in milliseconds
- * @throws IOException if the backing selector cannot be opened
- */
- public IOLoop(long timeout) throws IOException {
- super(timeout);
- }
-
- /**
- * Returns the number of message stream in custody of the loop.
- *
- * @return number of message streams
- */
- public int streamCount() {
- return streams.size();
- }
-
- /**
- * Creates a new message stream backed by the specified socket channel.
- *
- * @param byteChannel backing byte channel
- * @return newly created message stream
- */
- protected abstract S createStream(ByteChannel byteChannel);
-
- /**
- * Removes the specified message stream from the IO loop.
- *
- * @param stream message stream to remove
- */
- protected void removeStream(MessageStream<M> stream) {
- streams.remove(stream);
- }
-
- /**
- * Processes the list of messages extracted from the specified message
- * stream.
- *
- * @param messages non-empty list of received messages
- * @param stream message stream from which the messages were extracted
- */
- protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
-
- /**
- * Completes connection request pending on the given selection key.
- *
- * @param key selection key holding the pending connect operation.
- * @throws IOException when I/O exception of some sort has occurred
- */
- protected void connect(SelectionKey key) throws IOException {
- SocketChannel ch = (SocketChannel) key.channel();
- ch.finishConnect();
- if (key.isValid()) {
- key.interestOps(SelectionKey.OP_READ);
- }
- }
-
- /**
- * Processes an IO operation pending on the specified key.
- *
- * @param key selection key holding the pending I/O operation.
- */
- protected void processKeyOperation(SelectionKey key) {
- @SuppressWarnings("unchecked")
- S stream = (S) key.attachment();
-
- try {
- // If the key is not valid, bail out.
- if (!key.isValid()) {
- stream.close();
- return;
- }
-
- // If there is a pending connect operation, complete it.
- if (key.isConnectable()) {
- try {
- connect(key);
- } catch (IOException | IllegalStateException e) {
- log.warn("Unable to complete connection", e);
- }
- }
-
- // If there is a read operation, slurp as much data as possible.
- if (key.isReadable()) {
- List<M> messages = stream.read();
-
- // No messages or failed flush imply disconnect; bail.
- if (messages == null || stream.hadError()) {
- stream.close();
- return;
- }
-
- // If there were any messages read, process them.
- if (!messages.isEmpty()) {
- try {
- processMessages(messages, stream);
- } catch (RuntimeException e) {
- onError(stream, e);
- }
- }
- }
-
- // If there are pending writes, flush them
- if (key.isWritable()) {
- stream.flushIfPossible();
- }
-
- // If there were any issued flushing, close the stream.
- if (stream.hadError()) {
- stream.close();
- }
-
- } catch (CancelledKeyException e) {
- // Key was cancelled, so silently close the stream
- stream.close();
- } catch (IOException e) {
- if (!stream.isClosed() && !isResetByPeer(e)) {
- log.warn("Unable to process IO", e);
- }
- stream.close();
- }
- }
-
- // Indicates whether or not this exception is caused by 'reset by peer'.
- private boolean isResetByPeer(IOException e) {
- Throwable cause = e.getCause();
- return cause != null && cause instanceof IOException &&
- cause.getMessage().contains("reset by peer");
- }
-
- /**
- * Hook to allow intercept of any errors caused during message processing.
- * Default behaviour is to rethrow the error.
- *
- * @param stream message stream involved in the error
- * @param error the runtime exception
- */
- protected void onError(S stream, RuntimeException error) {
- throw error;
- }
-
- /**
- * Admits a new message stream backed by the specified socket channel
- * with a pending accept operation.
- *
- * @param channel backing socket channel
- * @return newly accepted message stream
- */
- public S acceptStream(SocketChannel channel) {
- return createAndAdmit(channel, SelectionKey.OP_READ);
- }
-
-
- /**
- * Admits a new message stream backed by the specified socket channel
- * with a pending connect operation.
- *
- * @param channel backing socket channel
- * @return newly connected message stream
- */
- public S connectStream(SocketChannel channel) {
- return createAndAdmit(channel, SelectionKey.OP_CONNECT);
- }
-
- /**
- * Creates a new message stream backed by the specified socket channel
- * and admits it into the IO loop.
- *
- * @param channel socket channel
- * @param op pending operations mask to be applied to the selection
- * key as a set of initial interestedOps
- * @return newly created message stream
- */
- private synchronized S createAndAdmit(SocketChannel channel, int op) {
- S stream = createStream(channel);
- streams.add(stream);
- newStreamRequests.add(new NewStreamRequest(stream, channel, op));
- selector.wakeup();
- return stream;
- }
-
- /**
- * Safely admits new streams into the IO loop.
- */
- private void admitNewStreams() {
- Iterator<NewStreamRequest> it = newStreamRequests.iterator();
- while (isRunning() && it.hasNext()) {
- try {
- NewStreamRequest request = it.next();
- it.remove();
- SelectionKey key = request.channel.register(selector, request.op,
- request.stream);
- request.stream.setKey(key);
- } catch (ClosedChannelException e) {
- log.warn("Unable to admit new message stream", e);
- }
- }
- }
-
- @Override
- protected void loop() throws IOException {
- notifyReady();
-
- // Keep going until told otherwise.
- while (isRunning()) {
- admitNewStreams();
-
- // Process flushes & write selects on all streams
- for (MessageStream<M> stream : streams) {
- stream.flushIfWriteNotPending();
- }
-
- // Select keys and process them.
- int count = selector.select(selectTimeout);
- if (count > 0 && isRunning()) {
- Iterator<SelectionKey> it = selector.selectedKeys().iterator();
- while (it.hasNext()) {
- SelectionKey key = it.next();
- it.remove();
- processKeyOperation(key);
- }
- }
- }
- }
-
- /**
- * Prunes the registered streams by discarding any stale ones.
- *
- * @return number of remaining streams
- */
- public synchronized int pruneStaleStreams() {
- for (MessageStream<M> stream : streams) {
- if (stream.isStale()) {
- stream.close();
- }
- }
- return streams.size();
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java
deleted file mode 100644
index c1a339e0..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/Message.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-/**
- * Representation of a message transferred via {@link MessageStream}.
- */
-public interface Message {
-
- /**
- * Gets the message length in bytes.
- *
- * @return number of bytes
- */
- int length();
-
-}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
deleted file mode 100644
index a19e8aac..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import org.onlab.util.Counter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.System.currentTimeMillis;
-import static java.nio.ByteBuffer.allocateDirect;
-
-/**
- * Bi-directional message stream for transferring messages to &amp; from the
- * network via two byte buffers.
- *
- * @param <M> message type
- */
-public abstract class MessageStream<M extends Message> {
-
- protected Logger log = LoggerFactory.getLogger(getClass());
-
- private final IOLoop<M, ?> loop;
- private final ByteChannel channel;
- private final int maxIdleMillis;
-
- private final ByteBuffer inbound;
- private ByteBuffer outbound;
- private SelectionKey key;
-
- private volatile boolean closed = false;
- private volatile boolean writePending;
- private volatile boolean writeOccurred;
-
- private Exception ioError;
- private long lastActiveTime;
-
- private final Counter bytesIn = new Counter();
- private final Counter messagesIn = new Counter();
- private final Counter bytesOut = new Counter();
- private final Counter messagesOut = new Counter();
-
- /**
- * Creates a message stream associated with the specified IO loop and
- * backed by the given byte channel.
- *
- * @param loop IO loop
- * @param byteChannel backing byte channel
- * @param bufferSize size of the backing byte buffers
- * @param maxIdleMillis maximum number of millis the stream can be idle
- * before it will be closed
- */
- protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
- int bufferSize, int maxIdleMillis) {
- this.loop = checkNotNull(loop, "Loop cannot be null");
- this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
-
- checkArgument(maxIdleMillis > 0, "Idle time must be positive");
- this.maxIdleMillis = maxIdleMillis;
-
- inbound = allocateDirect(bufferSize);
- outbound = allocateDirect(bufferSize);
- }
-
- /**
- * Gets a single message from the specified byte buffer; this is
- * to be done without manipulating the buffer via flip, reset or clear.
- *
- * @param buffer byte buffer
- * @return read message or null if there are not enough bytes to read
- * a complete message
- */
- protected abstract M read(ByteBuffer buffer);
-
- /**
- * Puts the specified message into the specified byte buffer; this is
- * to be done without manipulating the buffer via flip, reset or clear.
- *
- * @param message message to be write into the buffer
- * @param buffer byte buffer
- */
- protected abstract void write(M message, ByteBuffer buffer);
-
- /**
- * Closes the message buffer.
- */
- public void close() {
- synchronized (this) {
- if (closed) {
- return;
- }
- closed = true;
- }
-
- bytesIn.freeze();
- bytesOut.freeze();
- messagesIn.freeze();
- messagesOut.freeze();
-
- loop.removeStream(this);
- if (key != null) {
- try {
- key.cancel();
- key.channel().close();
- } catch (IOException e) {
- log.warn("Unable to close stream", e);
- }
- }
- }
-
- /**
- * Indicates whether this buffer has been closed.
- *
- * @return true if this stream has been closed
- */
- public synchronized boolean isClosed() {
- return closed;
- }
-
- /**
- * Returns the stream IO selection key.
- *
- * @return socket channel registration selection key
- */
- public SelectionKey key() {
- return key;
- }
-
- /**
- * Binds the selection key to be used for driving IO operations on the stream.
- *
- * @param key IO selection key
- */
- public void setKey(SelectionKey key) {
- this.key = key;
- this.lastActiveTime = currentTimeMillis();
- }
-
- /**
- * Returns the IO loop to which this stream is bound.
- *
- * @return I/O loop used to drive this stream
- */
- public IOLoop<M, ?> loop() {
- return loop;
- }
-
- /**
- * Indicates whether the any prior IO encountered an error.
- *
- * @return true if a write failed
- */
- public boolean hadError() {
- return ioError != null;
- }
-
- /**
- * Gets the prior IO error, if one occurred.
- *
- * @return IO error; null if none occurred
- */
- public Exception getError() {
- return ioError;
- }
-
- /**
- * Reads, without blocking, a list of messages from the stream.
- * The list will be empty if there were not messages pending.
- *
- * @return list of messages or null if backing channel has been closed
- * @throws IOException if messages could not be read
- */
- public List<M> read() throws IOException {
- try {
- int read = channel.read(inbound);
- if (read != -1) {
- // Read the messages one-by-one and add them to the list.
- List<M> messages = new ArrayList<>();
- M message;
- inbound.flip();
- while ((message = read(inbound)) != null) {
- messages.add(message);
- messagesIn.add(1);
- bytesIn.add(message.length());
- }
- inbound.compact();
-
- // Mark the stream with current time to indicate liveness.
- lastActiveTime = currentTimeMillis();
- return messages;
- }
- return null;
-
- } catch (Exception e) {
- throw new IOException("Unable to read messages", e);
- }
- }
-
- /**
- * Writes the specified list of messages to the stream.
- *
- * @param messages list of messages to write
- * @throws IOException if error occurred while writing the data
- */
- public void write(List<M> messages) throws IOException {
- synchronized (this) {
- // First write all messages.
- for (M m : messages) {
- append(m);
- }
- flushUnlessAlreadyPlanningTo();
- }
- }
-
- /**
- * Writes the given message to the stream.
- *
- * @param message message to write
- * @throws IOException if error occurred while writing the data
- */
- public void write(M message) throws IOException {
- synchronized (this) {
- append(message);
- flushUnlessAlreadyPlanningTo();
- }
- }
-
- // Appends the specified message into the internal buffer, growing the
- // buffer if required.
- private void append(M message) {
- // If the buffer does not have sufficient length double it.
- while (outbound.remaining() < message.length()) {
- doubleSize();
- }
- write(message, outbound);
- messagesOut.add(1);
- bytesOut.add(message.length());
- }
-
- // Forces a flush, unless one is planned already.
- private void flushUnlessAlreadyPlanningTo() throws IOException {
- if (!writeOccurred && !writePending) {
- flush();
- }
- }
-
- /**
- * Flushes any pending writes.
- *
- * @throws IOException if flush failed
- */
- public void flush() throws IOException {
- synchronized (this) {
- if (!writeOccurred && !writePending) {
- outbound.flip();
- try {
- channel.write(outbound);
- } catch (IOException e) {
- if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
- log.warn("Unable to write data", e);
- ioError = e;
- }
- }
- lastActiveTime = currentTimeMillis();
- writeOccurred = true;
- writePending = outbound.hasRemaining();
- outbound.compact();
- }
- }
- }
-
- /**
- * Indicates whether the stream has bytes to be written to the channel.
- *
- * @return true if there are bytes to be written
- */
- boolean isWritePending() {
- synchronized (this) {
- return writePending;
- }
- }
-
-
- /**
- * Indicates whether data has been written but not flushed yet.
- *
- * @return true if flush is required
- */
- boolean isFlushRequired() {
- synchronized (this) {
- return outbound.position() > 0;
- }
- }
-
- /**
- * Attempts to flush data, internal stream state and channel availability
- * permitting. Invoked by the driver I/O loop during handling of writable
- * selection key.
- * <p>
- * Resets the internal state flags {@code writeOccurred} and
- * {@code writePending}.
- * </p>
- * @throws IOException if implicit flush failed
- */
- void flushIfPossible() throws IOException {
- synchronized (this) {
- writePending = false;
- writeOccurred = false;
- if (outbound.position() > 0) {
- flush();
- }
- }
- key.interestOps(SelectionKey.OP_READ);
- }
-
- /**
- * Attempts to flush data, internal stream state and channel availability
- * permitting and if other writes are not pending. Invoked by the driver
- * I/O loop prior to entering select wait. Resets the internal
- * {@code writeOccurred} state flag.
- *
- * @throws IOException if implicit flush failed
- */
- void flushIfWriteNotPending() throws IOException {
- synchronized (this) {
- writeOccurred = false;
- if (!writePending && outbound.position() > 0) {
- flush();
- }
- }
- if (isWritePending()) {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- }
- }
-
- /**
- * Doubles the size of the outbound buffer.
- */
- private void doubleSize() {
- ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
- outbound.flip();
- newBuffer.put(outbound);
- outbound = newBuffer;
- }
-
- /**
- * Returns the maximum number of milliseconds the stream is allowed
- * without any read/write operations.
- *
- * @return number if millis of permissible idle time
- */
- protected int maxIdleMillis() {
- return maxIdleMillis;
- }
-
-
- /**
- * Returns true if the given stream has gone stale.
- *
- * @return true if the stream is stale
- */
- boolean isStale() {
- return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
- }
-
- /**
- * Returns the inbound bytes counter.
- *
- * @return inbound bytes counter
- */
- public Counter bytesIn() {
- return bytesIn;
- }
-
- /**
- * Returns the outbound bytes counter.
- *
- * @return outbound bytes counter
- */
- public Counter bytesOut() {
- return bytesOut;
- }
-
- /**
- * Returns the inbound messages counter.
- *
- * @return inbound messages counter
- */
- public Counter messagesIn() {
- return messagesIn;
- }
-
- /**
- * Returns the outbound messages counter.
- *
- * @return outbound messages counter
- */
- public Counter messagesOut() {
- return messagesOut;
- }
-
-}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java
deleted file mode 100644
index 95a9b61e..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.channels.Selector;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.lang.System.currentTimeMillis;
-
-/**
- * Abstraction of an I/O processing loop based on an NIO selector.
- */
-public abstract class SelectorLoop implements Runnable {
-
- protected final Logger log = LoggerFactory.getLogger(getClass());
-
- /**
- * Selector used by this loop to pace the I/O operations.
- */
- protected final Selector selector;
-
- /**
- * Selection operations timeout; specified in millis.
- */
- protected long selectTimeout;
-
- /**
- * Retains the error that caused the loop to exit prematurely.
- */
- private Throwable error;
-
- // State indicator
- private enum State { STARTING, STARTED, STOPPING, STOPPED };
- private volatile State state = State.STOPPED;
-
- /**
- * Creates a new selector loop with the given selection timeout.
- *
- * @param selectTimeout selection timeout; specified in millis
- * @throws IOException if the backing selector cannot be opened
- */
- public SelectorLoop(long selectTimeout) throws IOException {
- checkArgument(selectTimeout > 0, "Timeout must be positive");
- this.selectTimeout = selectTimeout;
- this.selector = openSelector();
- }
-
- /**
- * Opens a new selector for the use by the loop.
- *
- * @return newly open selector
- * @throws IOException if the backing selector cannot be opened
- */
- protected Selector openSelector() throws IOException {
- return Selector.open();
- }
-
- /**
- * Indicates that the loop is marked to run.
- * @return true if the loop is marked to run
- */
- protected boolean isRunning() {
- return state == State.STARTED || state == State.STARTING;
- }
-
- /**
- * Returns the error, if there was one, that caused the loop to terminate
- * prematurely.
- *
- * @return error or null if there was none
- */
- public Throwable getError() {
- return error;
- }
-
- /**
- * Contains the body of the I/O selector loop.
- *
- * @throws IOException if an error is encountered while selecting I/O
- */
- protected abstract void loop() throws IOException;
-
- @Override
- public void run() {
- error = null;
- state = State.STARTING;
- try {
- loop();
- } catch (Exception e) {
- error = e;
- log.error("Loop aborted", e);
- }
- notifyDone();
- }
-
- /**
- * Notifies observers waiting for loop to become ready.
- */
- protected synchronized void notifyReady() {
- state = State.STARTED;
- notifyAll();
- }
-
- /**
- * Triggers loop shutdown.
- */
- public void shutdown() {
- // Mark the loop as no longer running and wake up the selector.
- state = State.STOPPING;
- selector.wakeup();
- }
-
- /**
- * Notifies observers waiting for loop to fully stop.
- */
- private synchronized void notifyDone() {
- state = State.STOPPED;
- notifyAll();
- }
-
- /**
- * Waits for the loop execution to start.
- *
- * @param timeout number of milliseconds to wait
- * @return true if loop started in time
- */
- public final synchronized boolean awaitStart(long timeout) {
- long max = currentTimeMillis() + timeout;
- while (state != State.STARTED && (currentTimeMillis() < max)) {
- try {
- wait(timeout);
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted", e);
- }
- }
- return state == State.STARTED;
- }
-
- /**
- * Waits for the loop execution to stop.
- *
- * @param timeout number of milliseconds to wait
- * @return true if loop finished in time
- */
- public final synchronized boolean awaitStop(long timeout) {
- long max = currentTimeMillis() + timeout;
- while (state != State.STOPPED && (currentTimeMillis() < max)) {
- try {
- wait(timeout);
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted", e);
- }
- }
- return state == State.STOPPED;
- }
-
-
-}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java
deleted file mode 100644
index 0d58b568..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Mechanism to transfer messages over network using IO loop and
- * message stream, backed by NIO byte buffers.
- */
-package org.onlab.nio;
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java
deleted file mode 100644
index 0d977929..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio.service;
-
-import java.io.IOException;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.util.List;
-import java.util.function.Consumer;
-
-import org.onlab.nio.IOLoop;
-import org.onlab.nio.MessageStream;
-
-/**
- * IOLoop for transporting DefaultMessages.
- */
-public class DefaultIOLoop extends IOLoop<DefaultMessage, DefaultMessageStream> {
-
- public static final int SELECT_TIMEOUT_MILLIS = 500;
- private static final int MAX_IDLE_TIMEOUT_MILLIS = 1000;
- private static final int BUFFER_SIZE = 1024 * 1024;
- private final Consumer<DefaultMessage> consumer;
-
- public DefaultIOLoop(Consumer<DefaultMessage> consumer) throws IOException {
- this(SELECT_TIMEOUT_MILLIS, consumer);
- }
-
- public DefaultIOLoop(long timeout, Consumer<DefaultMessage> consumer) throws IOException {
- super(timeout);
- this.consumer = consumer;
- }
-
- @Override
- protected DefaultMessageStream createStream(ByteChannel byteChannel) {
- return new DefaultMessageStream(this, byteChannel, BUFFER_SIZE, MAX_IDLE_TIMEOUT_MILLIS);
- }
-
- @Override
- protected void processMessages(List<DefaultMessage> messages, MessageStream<DefaultMessage> stream) {
- messages.forEach(consumer);
- }
-
- @Override
- protected void connect(SelectionKey key) throws IOException {
- DefaultMessageStream stream = (DefaultMessageStream) key.attachment();
- try {
- super.connect(key);
- stream.connected();
- } catch (Exception e) {
- stream.connectFailed(e);
- }
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java
deleted file mode 100644
index 591d49b3..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio.service;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.onlab.nio.AbstractMessage;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.ByteArraySizeHashPrinter;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.MoreObjects;
-
-/**
- * Default message.
- */
-public class DefaultMessage extends AbstractMessage {
-
- private long id;
- private Endpoint sender;
- private String type;
- private byte[] payload;
-
- /**
- * Creates a new message with the specified data.
- *
- * @param id message id
- * @param type message type
- * @param sender sender endpoint
- * @param payload message payload
- */
- DefaultMessage(long id, Endpoint sender, String type, byte[] payload) {
- this.id = id;
- this.type = checkNotNull(type, "Type cannot be null");
- this.sender = checkNotNull(sender, "Sender cannot be null");
- this.payload = checkNotNull(payload, "Payload cannot be null");
-
- byte[] messageTypeBytes = type.getBytes(Charsets.UTF_8);
- IpAddress senderIp = sender.host();
- byte[] ipOctets = senderIp.toOctets();
-
- length = 25 + ipOctets.length + messageTypeBytes.length + payload.length;
- }
-
- /**
- * Returns message id.
- *
- * @return message id
- */
- public long id() {
- return id;
- }
-
- /**
- * Returns message sender.
- *
- * @return message sender
- */
- public Endpoint sender() {
- return sender;
- }
-
- /**
- * Returns message type.
- *
- * @return message type
- */
- public String type() {
- return type;
- }
-
- /**
- * Returns message payload.
- *
- * @return payload
- */
- public byte[] payload() {
- return payload;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("id", id)
- .add("type", type)
- .add("sender", sender)
- .add("payload", ByteArraySizeHashPrinter.of(payload))
- .toString();
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java
deleted file mode 100644
index a7dd3c04..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio.service;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.onlab.nio.IOLoop;
-import org.onlab.nio.MessageStream;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpAddress.Version;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-import com.google.common.base.Charsets;
-
-/**
- * Default bi-directional message stream for transferring messages to &amp; from the
- * network via two byte buffers.
- */
-public class DefaultMessageStream extends MessageStream<DefaultMessage> {
-
- private final CompletableFuture<Void> connectFuture = new CompletableFuture<>();
-
- public DefaultMessageStream(
- IOLoop<DefaultMessage, ?> loop,
- ByteChannel byteChannel,
- int bufferSize,
- int maxIdleMillis) {
- super(loop, byteChannel, bufferSize, maxIdleMillis);
- }
-
- public CompletableFuture<DefaultMessageStream> connectedFuture() {
- return connectFuture.thenApply(v -> this);
- }
-
- private final AtomicInteger messageLength = new AtomicInteger(-1);
-
- @Override
- protected DefaultMessage read(ByteBuffer buffer) {
- if (messageLength.get() == -1) {
- // check if we can read the message length.
- if (buffer.remaining() < Integer.BYTES) {
- return null;
- } else {
- messageLength.set(buffer.getInt());
- }
- }
-
- if (buffer.remaining() < messageLength.get()) {
- return null;
- }
-
- long id = buffer.getLong();
- Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6;
- byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
- buffer.get(octects);
- IpAddress senderIp = IpAddress.valueOf(ipVersion, octects);
- int senderPort = buffer.getInt();
- int messageTypeByteLength = buffer.getInt();
- byte[] messageTypeBytes = new byte[messageTypeByteLength];
- buffer.get(messageTypeBytes);
- String messageType = new String(messageTypeBytes, Charsets.UTF_8);
- int payloadLength = buffer.getInt();
- byte[] payloadBytes = new byte[payloadLength];
- buffer.get(payloadBytes);
-
- // reset for next message
- messageLength.set(-1);
-
- return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes);
- }
-
- @Override
- protected void write(DefaultMessage message, ByteBuffer buffer) {
- Endpoint sender = message.sender();
- byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
- IpAddress senderIp = sender.host();
- byte[] ipOctets = senderIp.toOctets();
- byte[] payload = message.payload();
-
- int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length;
-
- buffer.putInt(messageLength);
-
- buffer.putLong(message.id());
-
- if (senderIp.version() == Version.INET) {
- buffer.put((byte) 0x0);
- } else {
- buffer.put((byte) 0x1);
- }
- buffer.put(ipOctets);
-
- // write sender port
- buffer.putInt(sender.port());
-
- // write length of message type
- buffer.putInt(messageTypeBytes.length);
-
- // write message type bytes
- buffer.put(messageTypeBytes);
-
- // write payload length
- buffer.putInt(payload.length);
-
- // write payload.
- buffer.put(payload);
- }
-
- /**
- * Callback invoked when the stream is successfully connected.
- */
- public void connected() {
- connectFuture.complete(null);
- }
-
- /**
- * Callback invoked when the stream fails to connect.
- * @param cause failure cause
- */
- public void connectFailed(Throwable cause) {
- connectFuture.completeExceptionally(cause);
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
deleted file mode 100644
index c195d160..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.nio.service;
-
-import static org.onlab.util.Tools.groupedThreads;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.onlab.nio.AcceptorLoop;
-import org.onlab.nio.SelectorLoop;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Lists;
-
-/**
- * MessagingService implementation based on IOLoop.
- */
-public class IOLoopMessaging implements MessagingService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
-
- static final long TIMEOUT = 1000;
-
- static final boolean SO_NO_DELAY = false;
- static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
- static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
-
- private static final int NUM_WORKERS = 8;
-
- private AcceptorLoop acceptorLoop;
- private final ExecutorService acceptorThreadPool =
- Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
- private final ExecutorService ioThreadPool =
- Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
-
- private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
-
- private int lastWorker = -1;
-
- private final AtomicBoolean started = new AtomicBoolean(false);
- private Endpoint localEp;
-
- private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
- new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
-
- private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
- private final AtomicLong messageIdGenerator = new AtomicLong(0);
- private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
- .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
- @Override
- public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
- if (entry.wasEvicted()) {
- entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
- }
- }
- })
- .build();
-
- /**
- * Activates IO Loops.
- *
- * @param localEp local end-point
- * @throws IOException is activation fails
- */
- public void start(Endpoint localEp) throws IOException {
- if (started.get()) {
- log.warn("IOMessaging is already running at {}", localEp);
- return;
- }
- this.localEp = localEp;
- streams.setLifo(false);
- this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
-
- for (int i = 0; i < NUM_WORKERS; i++) {
- ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
- }
-
- ioLoops.forEach(ioThreadPool::execute);
- acceptorThreadPool.execute(acceptorLoop);
- ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
- acceptorLoop.awaitStart(TIMEOUT);
- started.set(true);
- }
-
- /**
- * Shuts down IO loops.
- */
- public void stop() {
- if (started.get()) {
- ioLoops.forEach(SelectorLoop::shutdown);
- acceptorLoop.shutdown();
- ioThreadPool.shutdown();
- acceptorThreadPool.shutdown();
- started.set(false);
- }
- }
-
-
- @Override
- public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
- DefaultMessage message = new DefaultMessage(
- messageIdGenerator.incrementAndGet(),
- localEp,
- type,
- payload);
- return sendAsync(ep, message);
- }
-
- protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- if (ep.equals(localEp)) {
- dispatchLocally(message);
- future.complete(null);
- return future;
- }
-
- DefaultMessageStream stream = null;
- try {
- stream = streams.borrowObject(ep);
- stream.write(message);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- } finally {
- try {
- streams.returnObject(ep, stream);
- } catch (Exception e) {
- log.warn("Failed to return stream to pool");
- }
- }
- return future;
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(
- Endpoint ep,
- String type,
- byte[] payload) {
- CompletableFuture<byte[]> response = new CompletableFuture<>();
- Long messageId = messageIdGenerator.incrementAndGet();
- responseFutures.put(messageId, response);
- DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
- try {
- sendAsync(ep, message);
- } catch (Exception e) {
- responseFutures.invalidate(messageId);
- response.completeExceptionally(e);
- }
- return response;
- }
-
- @Override
- public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
- handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
- }
-
- @Override
- public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
- handlers.put(type, message -> executor.execute(() -> {
- byte[] responsePayload = handler.apply(message.payload());
- if (responsePayload != null) {
- DefaultMessage response = new DefaultMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- responsePayload);
- sendAsync(message.sender(), response).whenComplete((result, error) -> {
- log.debug("Failed to respond", error);
- });
- }
- }));
- }
-
- @Override
- public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
- handlers.put(type, message -> {
- handler.apply(message.payload()).whenComplete((result, error) -> {
- if (error == null) {
- DefaultMessage response = new DefaultMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- result);
- sendAsync(message.sender(), response).whenComplete((r, e) -> {
- if (e != null) {
- log.debug("Failed to respond", e);
- }
- });
- }
- });
- });
- }
-
- @Override
- public void unregisterHandler(String type) {
- handlers.remove(type);
- }
-
- protected void dispatchLocally(DefaultMessage message) {
- String type = message.type();
- if (REPLY_MESSAGE_TYPE.equals(type)) {
- try {
- CompletableFuture<byte[]> futureResponse =
- responseFutures.getIfPresent(message.id());
- if (futureResponse != null) {
- futureResponse.complete(message.payload());
- } else {
- log.warn("Received a reply for message id:[{}]. "
- + " from {}. But was unable to locate the"
- + " request handle", message.id(), message.sender());
- }
- } finally {
- responseFutures.invalidate(message.id());
- }
- return;
- }
- Consumer<DefaultMessage> handler = handlers.get(type);
- if (handler != null) {
- handler.accept(message);
- } else {
- log.debug("No handler registered for {}", type);
- }
- }
-
- // Get the next worker to which a client should be assigned
- private synchronized DefaultIOLoop nextWorker() {
- lastWorker = (lastWorker + 1) % NUM_WORKERS;
- return ioLoops.get(lastWorker);
- }
-
- /**
- * Initiates open connection request and registers the pending socket
- * channel with the given IO loop.
- *
- * @param loop loop with which the channel should be registered
- * @throws java.io.IOException if the socket could not be open or connected
- */
- private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
- SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
- SocketChannel ch = SocketChannel.open();
- ch.configureBlocking(false);
- DefaultMessageStream stream = loop.connectStream(ch);
- ch.connect(sa);
- return stream;
- }
-
- // Loop for accepting client connections
- private class DefaultAcceptorLoop extends AcceptorLoop {
-
- public DefaultAcceptorLoop(SocketAddress address) throws IOException {
- super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
- }
-
- @Override
- protected void acceptConnection(ServerSocketChannel channel) throws IOException {
- SocketChannel sc = channel.accept();
- sc.configureBlocking(false);
-
- Socket so = sc.socket();
- so.setTcpNoDelay(SO_NO_DELAY);
- so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
- so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
-
- nextWorker().acceptStream(sc);
- }
- }
-
- private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
-
- @Override
- public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
- }
-
- @Override
- public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
- stream.close();
- }
-
- @Override
- public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
- DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
- log.info("Established a new connection to {}", ep);
- return stream;
- }
-
- @Override
- public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
- }
-
- @Override
- public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
- return stream.isClosed();
- }
- }
-}
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java
deleted file mode 100644
index 399a2b95..00000000
--- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Assembly for sending and receiving messages using the I/O loop mechanism.
- */
-package org.onlab.nio.service; \ No newline at end of file