diff options
author | CNlucius <lukai1@huawei.com> | 2016-09-13 11:40:12 +0800 |
---|---|---|
committer | CNlucius <lukai1@huawei.com> | 2016-09-13 11:41:53 +0800 |
commit | b731e2f1dd0972409b136aebc7b463dd72c9cfad (patch) | |
tree | 5107d7d80c19ad8076c2c97c2b5ef8d1cf3ab903 /framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service | |
parent | ee93993458266114c29271a481ef9ce7ce621b2a (diff) |
ONOSFW-171
O/S-SFC-ONOS scenario documentation
Change-Id: I51ae1cf736ea24ab6680f8edca1b2bf5dd598365
Signed-off-by: CNlucius <lukai1@huawei.com>
Diffstat (limited to 'framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service')
5 files changed, 0 insertions, 663 deletions
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java deleted file mode 100644 index 0d977929..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultIOLoop.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import java.io.IOException; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.util.List; -import java.util.function.Consumer; - -import org.onlab.nio.IOLoop; -import org.onlab.nio.MessageStream; - -/** - * IOLoop for transporting DefaultMessages. - */ -public class DefaultIOLoop extends IOLoop<DefaultMessage, DefaultMessageStream> { - - public static final int SELECT_TIMEOUT_MILLIS = 500; - private static final int MAX_IDLE_TIMEOUT_MILLIS = 1000; - private static final int BUFFER_SIZE = 1024 * 1024; - private final Consumer<DefaultMessage> consumer; - - public DefaultIOLoop(Consumer<DefaultMessage> consumer) throws IOException { - this(SELECT_TIMEOUT_MILLIS, consumer); - } - - public DefaultIOLoop(long timeout, Consumer<DefaultMessage> consumer) throws IOException { - super(timeout); - this.consumer = consumer; - } - - @Override - protected DefaultMessageStream createStream(ByteChannel byteChannel) { - return new DefaultMessageStream(this, byteChannel, BUFFER_SIZE, MAX_IDLE_TIMEOUT_MILLIS); - } - - @Override - protected void processMessages(List<DefaultMessage> messages, MessageStream<DefaultMessage> stream) { - messages.forEach(consumer); - } - - @Override - protected void connect(SelectionKey key) throws IOException { - DefaultMessageStream stream = (DefaultMessageStream) key.attachment(); - try { - super.connect(key); - stream.connected(); - } catch (Exception e) { - stream.connectFailed(e); - } - } -}
\ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java deleted file mode 100644 index 591d49b3..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessage.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.onlab.nio.AbstractMessage; -import org.onlab.packet.IpAddress; -import org.onlab.util.ByteArraySizeHashPrinter; -import org.onosproject.store.cluster.messaging.Endpoint; - -import com.google.common.base.Charsets; -import com.google.common.base.MoreObjects; - -/** - * Default message. - */ -public class DefaultMessage extends AbstractMessage { - - private long id; - private Endpoint sender; - private String type; - private byte[] payload; - - /** - * Creates a new message with the specified data. - * - * @param id message id - * @param type message type - * @param sender sender endpoint - * @param payload message payload - */ - DefaultMessage(long id, Endpoint sender, String type, byte[] payload) { - this.id = id; - this.type = checkNotNull(type, "Type cannot be null"); - this.sender = checkNotNull(sender, "Sender cannot be null"); - this.payload = checkNotNull(payload, "Payload cannot be null"); - - byte[] messageTypeBytes = type.getBytes(Charsets.UTF_8); - IpAddress senderIp = sender.host(); - byte[] ipOctets = senderIp.toOctets(); - - length = 25 + ipOctets.length + messageTypeBytes.length + payload.length; - } - - /** - * Returns message id. - * - * @return message id - */ - public long id() { - return id; - } - - /** - * Returns message sender. - * - * @return message sender - */ - public Endpoint sender() { - return sender; - } - - /** - * Returns message type. - * - * @return message type - */ - public String type() { - return type; - } - - /** - * Returns message payload. - * - * @return payload - */ - public byte[] payload() { - return payload; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("id", id) - .add("type", type) - .add("sender", sender) - .add("payload", ByteArraySizeHashPrinter.of(payload)) - .toString(); - } -}
\ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java deleted file mode 100644 index a7dd3c04..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; - -import org.onlab.nio.IOLoop; -import org.onlab.nio.MessageStream; -import org.onlab.packet.IpAddress; -import org.onlab.packet.IpAddress.Version; -import org.onosproject.store.cluster.messaging.Endpoint; - -import com.google.common.base.Charsets; - -/** - * Default bi-directional message stream for transferring messages to & from the - * network via two byte buffers. - */ -public class DefaultMessageStream extends MessageStream<DefaultMessage> { - - private final CompletableFuture<Void> connectFuture = new CompletableFuture<>(); - - public DefaultMessageStream( - IOLoop<DefaultMessage, ?> loop, - ByteChannel byteChannel, - int bufferSize, - int maxIdleMillis) { - super(loop, byteChannel, bufferSize, maxIdleMillis); - } - - public CompletableFuture<DefaultMessageStream> connectedFuture() { - return connectFuture.thenApply(v -> this); - } - - private final AtomicInteger messageLength = new AtomicInteger(-1); - - @Override - protected DefaultMessage read(ByteBuffer buffer) { - if (messageLength.get() == -1) { - // check if we can read the message length. - if (buffer.remaining() < Integer.BYTES) { - return null; - } else { - messageLength.set(buffer.getInt()); - } - } - - if (buffer.remaining() < messageLength.get()) { - return null; - } - - long id = buffer.getLong(); - Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6; - byte[] octects = new byte[IpAddress.byteLength(ipVersion)]; - buffer.get(octects); - IpAddress senderIp = IpAddress.valueOf(ipVersion, octects); - int senderPort = buffer.getInt(); - int messageTypeByteLength = buffer.getInt(); - byte[] messageTypeBytes = new byte[messageTypeByteLength]; - buffer.get(messageTypeBytes); - String messageType = new String(messageTypeBytes, Charsets.UTF_8); - int payloadLength = buffer.getInt(); - byte[] payloadBytes = new byte[payloadLength]; - buffer.get(payloadBytes); - - // reset for next message - messageLength.set(-1); - - return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes); - } - - @Override - protected void write(DefaultMessage message, ByteBuffer buffer) { - Endpoint sender = message.sender(); - byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8); - IpAddress senderIp = sender.host(); - byte[] ipOctets = senderIp.toOctets(); - byte[] payload = message.payload(); - - int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length; - - buffer.putInt(messageLength); - - buffer.putLong(message.id()); - - if (senderIp.version() == Version.INET) { - buffer.put((byte) 0x0); - } else { - buffer.put((byte) 0x1); - } - buffer.put(ipOctets); - - // write sender port - buffer.putInt(sender.port()); - - // write length of message type - buffer.putInt(messageTypeBytes.length); - - // write message type bytes - buffer.put(messageTypeBytes); - - // write payload length - buffer.putInt(payload.length); - - // write payload. - buffer.put(payload); - } - - /** - * Callback invoked when the stream is successfully connected. - */ - public void connected() { - connectFuture.complete(null); - } - - /** - * Callback invoked when the stream fails to connect. - * @param cause failure cause - */ - public void connectFailed(Throwable cause) { - connectFuture.completeExceptionally(cause); - } -}
\ No newline at end of file diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java deleted file mode 100644 index c195d160..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import static org.onlab.util.Tools.groupedThreads; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.apache.commons.pool.KeyedPoolableObjectFactory; -import org.apache.commons.pool.impl.GenericKeyedObjectPool; -import org.onlab.nio.AcceptorLoop; -import org.onlab.nio.SelectorLoop; -import org.onosproject.store.cluster.messaging.Endpoint; -import org.onosproject.store.cluster.messaging.MessagingService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Lists; - -/** - * MessagingService implementation based on IOLoop. - */ -public class IOLoopMessaging implements MessagingService { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY"; - - static final long TIMEOUT = 1000; - - static final boolean SO_NO_DELAY = false; - static final int SO_SEND_BUFFER_SIZE = 128 * 1024; - static final int SO_RCV_BUFFER_SIZE = 128 * 1024; - - private static final int NUM_WORKERS = 8; - - private AcceptorLoop acceptorLoop; - private final ExecutorService acceptorThreadPool = - Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor")); - private final ExecutorService ioThreadPool = - Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d")); - - private final List<DefaultIOLoop> ioLoops = Lists.newArrayList(); - - private int lastWorker = -1; - - private final AtomicBoolean started = new AtomicBoolean(false); - private Endpoint localEp; - - private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams = - new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory()); - - private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>(); - private final AtomicLong messageIdGenerator = new AtomicLong(0); - private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder() - .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() { - @Override - public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) { - if (entry.wasEvicted()) { - entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply")); - } - } - }) - .build(); - - /** - * Activates IO Loops. - * - * @param localEp local end-point - * @throws IOException is activation fails - */ - public void start(Endpoint localEp) throws IOException { - if (started.get()) { - log.warn("IOMessaging is already running at {}", localEp); - return; - } - this.localEp = localEp; - streams.setLifo(false); - this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port())); - - for (int i = 0; i < NUM_WORKERS; i++) { - ioLoops.add(new DefaultIOLoop(this::dispatchLocally)); - } - - ioLoops.forEach(ioThreadPool::execute); - acceptorThreadPool.execute(acceptorLoop); - ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT)); - acceptorLoop.awaitStart(TIMEOUT); - started.set(true); - } - - /** - * Shuts down IO loops. - */ - public void stop() { - if (started.get()) { - ioLoops.forEach(SelectorLoop::shutdown); - acceptorLoop.shutdown(); - ioThreadPool.shutdown(); - acceptorThreadPool.shutdown(); - started.set(false); - } - } - - - @Override - public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) { - DefaultMessage message = new DefaultMessage( - messageIdGenerator.incrementAndGet(), - localEp, - type, - payload); - return sendAsync(ep, message); - } - - protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) { - CompletableFuture<Void> future = new CompletableFuture<>(); - if (ep.equals(localEp)) { - dispatchLocally(message); - future.complete(null); - return future; - } - - DefaultMessageStream stream = null; - try { - stream = streams.borrowObject(ep); - stream.write(message); - future.complete(null); - } catch (Exception e) { - future.completeExceptionally(e); - } finally { - try { - streams.returnObject(ep, stream); - } catch (Exception e) { - log.warn("Failed to return stream to pool"); - } - } - return future; - } - - @Override - public CompletableFuture<byte[]> sendAndReceive( - Endpoint ep, - String type, - byte[] payload) { - CompletableFuture<byte[]> response = new CompletableFuture<>(); - Long messageId = messageIdGenerator.incrementAndGet(); - responseFutures.put(messageId, response); - DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload); - try { - sendAsync(ep, message); - } catch (Exception e) { - responseFutures.invalidate(messageId); - response.completeExceptionally(e); - } - return response; - } - - @Override - public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) { - handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload()))); - } - - @Override - public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) { - handlers.put(type, message -> executor.execute(() -> { - byte[] responsePayload = handler.apply(message.payload()); - if (responsePayload != null) { - DefaultMessage response = new DefaultMessage(message.id(), - localEp, - REPLY_MESSAGE_TYPE, - responsePayload); - sendAsync(message.sender(), response).whenComplete((result, error) -> { - log.debug("Failed to respond", error); - }); - } - })); - } - - @Override - public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) { - handlers.put(type, message -> { - handler.apply(message.payload()).whenComplete((result, error) -> { - if (error == null) { - DefaultMessage response = new DefaultMessage(message.id(), - localEp, - REPLY_MESSAGE_TYPE, - result); - sendAsync(message.sender(), response).whenComplete((r, e) -> { - if (e != null) { - log.debug("Failed to respond", e); - } - }); - } - }); - }); - } - - @Override - public void unregisterHandler(String type) { - handlers.remove(type); - } - - protected void dispatchLocally(DefaultMessage message) { - String type = message.type(); - if (REPLY_MESSAGE_TYPE.equals(type)) { - try { - CompletableFuture<byte[]> futureResponse = - responseFutures.getIfPresent(message.id()); - if (futureResponse != null) { - futureResponse.complete(message.payload()); - } else { - log.warn("Received a reply for message id:[{}]. " - + " from {}. But was unable to locate the" - + " request handle", message.id(), message.sender()); - } - } finally { - responseFutures.invalidate(message.id()); - } - return; - } - Consumer<DefaultMessage> handler = handlers.get(type); - if (handler != null) { - handler.accept(message); - } else { - log.debug("No handler registered for {}", type); - } - } - - // Get the next worker to which a client should be assigned - private synchronized DefaultIOLoop nextWorker() { - lastWorker = (lastWorker + 1) % NUM_WORKERS; - return ioLoops.get(lastWorker); - } - - /** - * Initiates open connection request and registers the pending socket - * channel with the given IO loop. - * - * @param loop loop with which the channel should be registered - * @throws java.io.IOException if the socket could not be open or connected - */ - private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException { - SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port()); - SocketChannel ch = SocketChannel.open(); - ch.configureBlocking(false); - DefaultMessageStream stream = loop.connectStream(ch); - ch.connect(sa); - return stream; - } - - // Loop for accepting client connections - private class DefaultAcceptorLoop extends AcceptorLoop { - - public DefaultAcceptorLoop(SocketAddress address) throws IOException { - super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address); - } - - @Override - protected void acceptConnection(ServerSocketChannel channel) throws IOException { - SocketChannel sc = channel.accept(); - sc.configureBlocking(false); - - Socket so = sc.socket(); - so.setTcpNoDelay(SO_NO_DELAY); - so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); - so.setSendBufferSize(SO_SEND_BUFFER_SIZE); - - nextWorker().acceptStream(sc); - } - } - - private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> { - - @Override - public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception { - } - - @Override - public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception { - stream.close(); - } - - @Override - public DefaultMessageStream makeObject(Endpoint ep) throws Exception { - DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get(); - log.info("Established a new connection to {}", ep); - return stream; - } - - @Override - public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception { - } - - @Override - public boolean validateObject(Endpoint ep, DefaultMessageStream stream) { - return stream.isClosed(); - } - } -} diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java deleted file mode 100644 index 399a2b95..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Assembly for sending and receiving messages using the I/O loop mechanism. - */ -package org.onlab.nio.service;
\ No newline at end of file |