aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/netty
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/utils/netty')
-rw-r--r--framework/src/onos/utils/netty/pom.xml83
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java30
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java66
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java105
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java89
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java457
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java20
7 files changed, 850 insertions, 0 deletions
diff --git a/framework/src/onos/utils/netty/pom.xml b/framework/src/onos/utils/netty/pom.xml
new file mode 100644
index 00000000..fa320d52
--- /dev/null
+++ b/framework/src/onos/utils/netty/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2014 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-utils</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>onlab-netty</artifactId>
+ <packaging>bundle</packaging>
+
+ <description>Network I/O using Netty framework</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-testlib</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-misc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <version>${netty4.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
new file mode 100644
index 00000000..c9fc725b
--- /dev/null
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.netty;
+
+/**
+ * State transitions a decoder goes through as it is decoding an incoming message.
+ */
+public enum DecoderState {
+ READ_MESSAGE_ID,
+ READ_SENDER_IP_VERSION,
+ READ_SENDER_IP,
+ READ_SENDER_PORT,
+ READ_MESSAGE_TYPE_LENGTH,
+ READ_MESSAGE_TYPE,
+ READ_CONTENT_LENGTH,
+ READ_CONTENT
+}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
new file mode 100644
index 00000000..102e2a22
--- /dev/null
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.netty;
+
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Internal message representation with additional attributes
+ * for supporting, synchronous request/reply behavior.
+ */
+public final class InternalMessage {
+
+ private final long id;
+ private final Endpoint sender;
+ private final String type;
+ private final byte[] payload;
+
+ public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+ this.id = id;
+ this.sender = sender;
+ this.type = type;
+ this.payload = payload;
+ }
+
+ public long id() {
+ return id;
+ }
+
+ public String type() {
+ return type;
+ }
+
+ public Endpoint sender() {
+ return sender;
+ }
+
+ public byte[] payload() {
+ return payload;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("id", id)
+ .add("type", type)
+ .add("sender", sender)
+ .add("payload", ByteArraySizeHashPrinter.of(payload))
+ .toString();
+ }
+}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
new file mode 100644
index 00000000..c34d3cca
--- /dev/null
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.netty;
+
+import static com.google.common.base.Preconditions.checkState;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
+import java.util.List;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Decoder for inbound messages.
+ */
+public class MessageDecoder extends ReplayingDecoder<DecoderState> {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private long messageId;
+ private Version ipVersion;
+ private IpAddress senderIp;
+ private int senderPort;
+ private int messageTypeLength;
+ private String messageType;
+ private int contentLength;
+
+ public MessageDecoder() {
+ super(DecoderState.READ_MESSAGE_ID);
+ }
+
+ @Override
+ protected void decode(
+ ChannelHandlerContext context,
+ ByteBuf buffer,
+ List<Object> out) throws Exception {
+
+ switch (state()) {
+ case READ_MESSAGE_ID:
+ messageId = buffer.readLong();
+ checkpoint(DecoderState.READ_SENDER_IP_VERSION);
+ case READ_SENDER_IP_VERSION:
+ ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
+ checkpoint(DecoderState.READ_SENDER_IP);
+ case READ_SENDER_IP:
+ byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
+ buffer.readBytes(octects);
+ senderIp = IpAddress.valueOf(ipVersion, octects);
+ checkpoint(DecoderState.READ_SENDER_PORT);
+ case READ_SENDER_PORT:
+ senderPort = buffer.readInt();
+ checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
+ case READ_MESSAGE_TYPE_LENGTH:
+ messageTypeLength = buffer.readInt();
+ checkpoint(DecoderState.READ_MESSAGE_TYPE);
+ case READ_MESSAGE_TYPE:
+ byte[] messageTypeBytes = new byte[messageTypeLength];
+ buffer.readBytes(messageTypeBytes);
+ messageType = new String(messageTypeBytes, Charsets.UTF_8);
+ checkpoint(DecoderState.READ_CONTENT_LENGTH);
+ case READ_CONTENT_LENGTH:
+ contentLength = buffer.readInt();
+ checkpoint(DecoderState.READ_CONTENT);
+ case READ_CONTENT:
+ byte[] payload = new byte[contentLength];
+ buffer.readBytes(payload);
+ InternalMessage message = new InternalMessage(
+ messageId,
+ new Endpoint(senderIp, senderPort),
+ messageType,
+ payload);
+ out.add(message);
+ checkpoint(DecoderState.READ_MESSAGE_ID);
+ break;
+ default:
+ checkState(false, "Must not be here");
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
+}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
new file mode 100644
index 00000000..2b7784f8
--- /dev/null
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import java.io.IOException;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Encode InternalMessage out into a byte buffer.
+ */
+@Sharable
+public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Override
+ protected void encode(
+ ChannelHandlerContext context,
+ InternalMessage message,
+ ByteBuf out) throws Exception {
+
+ // write message id
+ out.writeLong(message.id());
+
+ Endpoint sender = message.sender();
+
+ IpAddress senderIp = sender.host();
+ if (senderIp.version() == Version.INET) {
+ out.writeByte(0);
+ } else {
+ out.writeByte(1);
+ }
+ out.writeBytes(senderIp.toOctets());
+
+ // write sender port
+ out.writeInt(sender.port());
+
+ byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
+
+ // write length of message type
+ out.writeInt(messageTypeBytes.length);
+
+ // write message type bytes
+ out.writeBytes(messageTypeBytes);
+
+ byte[] payload = message.payload();
+
+ // write payload length
+ out.writeInt(payload.length);
+
+ // write payload.
+ out.writeBytes(payload);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ if (cause instanceof IOException) {
+ log.debug("IOException inside channel handling pipeline.", cause);
+ } else {
+ log.error("non-IOException inside channel handling pipeline.", cause);
+ }
+ context.close();
+ }
+}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
new file mode 100644
index 00000000..8c759d14
--- /dev/null
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -0,0 +1,457 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.KeyStore;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+
+/**
+ * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
+ */
+public class NettyMessaging implements MessagingService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+
+ private Endpoint localEp;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
+ private final AtomicLong messageIdGenerator = new AtomicLong(0);
+ private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+ .expireAfterWrite(10, TimeUnit.SECONDS)
+ .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
+ @Override
+ public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
+ if (entry.wasEvicted()) {
+ entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
+ }
+ }
+ })
+ .build();
+
+ private final GenericKeyedObjectPool<Endpoint, Channel> channels
+ = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+
+ private EventLoopGroup serverGroup;
+ private EventLoopGroup clientGroup;
+ private Class<? extends ServerChannel> serverChannelClass;
+ private Class<? extends Channel> clientChannelClass;
+
+ protected static final boolean TLS_DISABLED = false;
+ protected boolean enableNettyTLS = TLS_DISABLED;
+
+ protected String ksLocation;
+ protected String tsLocation;
+ protected char[] ksPwd;
+ protected char[] tsPwd;
+
+ private void initEventLoopGroup() {
+ // try Epoll first and if that does work, use nio.
+ try {
+ clientGroup = new EpollEventLoopGroup();
+ serverGroup = new EpollEventLoopGroup();
+ serverChannelClass = EpollServerSocketChannel.class;
+ clientChannelClass = EpollSocketChannel.class;
+ return;
+ } catch (Throwable e) {
+ log.debug("Failed to initialize native (epoll) transport. "
+ + "Reason: {}. Proceeding with nio.", e.getMessage());
+ }
+ clientGroup = new NioEventLoopGroup();
+ serverGroup = new NioEventLoopGroup();
+ serverChannelClass = NioServerSocketChannel.class;
+ clientChannelClass = NioSocketChannel.class;
+ }
+
+ public void start(Endpoint localEp) throws Exception {
+ if (started.get()) {
+ log.warn("Already running at local endpoint: {}", localEp);
+ return;
+ }
+ this.localEp = localEp;
+ channels.setLifo(true);
+ channels.setTestOnBorrow(true);
+ channels.setTestOnReturn(true);
+ channels.setMinEvictableIdleTimeMillis(60_000L);
+ channels.setTimeBetweenEvictionRunsMillis(30_000L);
+ initEventLoopGroup();
+ startAcceptingConnections();
+ started.set(true);
+ }
+
+ public void stop() throws Exception {
+ if (started.get()) {
+ channels.close();
+ serverGroup.shutdownGracefully();
+ clientGroup.shutdownGracefully();
+ started.set(false);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
+ InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+ localEp,
+ type,
+ payload);
+ return sendAsync(ep, message);
+ }
+
+ protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ if (ep.equals(localEp)) {
+ dispatchLocally(message);
+ future.complete(null);
+ } else {
+ Channel channel = null;
+ try {
+ channel = channels.borrowObject(ep);
+ channel.writeAndFlush(message).addListener(channelFuture -> {
+ if (!channelFuture.isSuccess()) {
+ future.completeExceptionally(channelFuture.cause());
+ } else {
+ future.complete(null);
+ }
+ });
+ } finally {
+ channels.returnObject(ep, channel);
+ }
+ }
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+ CompletableFuture<byte[]> response = new CompletableFuture<>();
+ Long messageId = messageIdGenerator.incrementAndGet();
+ responseFutures.put(messageId, response);
+ InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
+ try {
+ sendAsync(ep, message);
+ } catch (Exception e) {
+ responseFutures.invalidate(messageId);
+ response.completeExceptionally(e);
+ }
+ return response;
+ }
+
+ @Override
+ public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
+ }
+
+ @Override
+ public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
+ handlers.put(type, message -> executor.execute(() -> {
+ byte[] responsePayload = handler.apply(message.payload());
+ if (responsePayload != null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ responsePayload);
+ sendAsync(message.sender(), response).whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to respond", error);
+ }
+ });
+ }
+ }));
+ }
+
+ @Override
+ public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
+ handlers.put(type, message -> {
+ handler.apply(message.payload()).whenComplete((result, error) -> {
+ if (error == null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ result);
+ sendAsync(message.sender(), response).whenComplete((r, e) -> {
+ if (e != null) {
+ log.debug("Failed to respond", e);
+ }
+ });
+ }
+ });
+ });
+ }
+
+ @Override
+ public void unregisterHandler(String type) {
+ handlers.remove(type);
+ }
+
+ private void startAcceptingConnections() throws InterruptedException {
+ ServerBootstrap b = new ServerBootstrap();
+ b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+ b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
+ b.option(ChannelOption.SO_RCVBUF, 1048576);
+ b.option(ChannelOption.TCP_NODELAY, true);
+ b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ b.group(serverGroup, clientGroup);
+ b.channel(serverChannelClass);
+ if (enableNettyTLS) {
+ b.childHandler(new SSLServerCommunicationChannelInitializer());
+ } else {
+ b.childHandler(new OnosCommunicationChannelInitializer());
+ }
+ b.option(ChannelOption.SO_BACKLOG, 128);
+ b.childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ // Bind and start to accept incoming connections.
+ b.bind(localEp.port()).sync().addListener(future -> {
+ if (future.isSuccess()) {
+ log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
+ } else {
+ log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
+ }
+ });
+ }
+
+ private class OnosCommunicationChannelFactory
+ implements KeyedPoolableObjectFactory<Endpoint, Channel> {
+
+ @Override
+ public void activateObject(Endpoint endpoint, Channel channel)
+ throws Exception {
+ }
+
+ @Override
+ public void destroyObject(Endpoint ep, Channel channel) throws Exception {
+ log.debug("Closing connection to {}", ep);
+ channel.close();
+ }
+
+ @Override
+ public Channel makeObject(Endpoint ep) throws Exception {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
+ bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
+ bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
+ bootstrap.group(clientGroup);
+ // TODO: Make this faster:
+ // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
+ bootstrap.channel(clientChannelClass);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ if (enableNettyTLS) {
+ bootstrap.handler(new SSLClientCommunicationChannelInitializer());
+ } else {
+ bootstrap.handler(new OnosCommunicationChannelInitializer());
+ }
+ // Start the client.
+ ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
+ log.debug("Established a new connection to {}", ep);
+ return f.channel();
+ }
+
+ @Override
+ public void passivateObject(Endpoint ep, Channel channel)
+ throws Exception {
+ }
+
+ @Override
+ public boolean validateObject(Endpoint ep, Channel channel) {
+ return channel.isOpen();
+ }
+ }
+
+ private class SSLServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+ private final ChannelHandler encoder = new MessageEncoder();
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ KeyStore ts = KeyStore.getInstance("JKS");
+ ts.load(new FileInputStream(tsLocation), tsPwd);
+ tmFactory.init(ts);
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(new FileInputStream(ksLocation), ksPwd);
+ kmf.init(ks, ksPwd);
+
+ SSLContext serverContext = SSLContext.getInstance("TLS");
+ serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
+
+ SSLEngine serverSSLEngine = serverContext.createSSLEngine();
+
+ serverSSLEngine.setNeedClientAuth(true);
+ serverSSLEngine.setUseClientMode(false);
+ serverSSLEngine.setEnabledProtocols(serverSSLEngine.getSupportedProtocols());
+ serverSSLEngine.setEnabledCipherSuites(serverSSLEngine.getSupportedCipherSuites());
+ serverSSLEngine.setEnableSessionCreation(true);
+
+ channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSSLEngine))
+ .addLast("encoder", encoder)
+ .addLast("decoder", new MessageDecoder())
+ .addLast("handler", dispatcher);
+ }
+
+ }
+
+ private class SSLClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+ private final ChannelHandler encoder = new MessageEncoder();
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ KeyStore ts = KeyStore.getInstance("JKS");
+ ts.load(new FileInputStream(tsLocation), tsPwd);
+ tmFactory.init(ts);
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(new FileInputStream(ksLocation), ksPwd);
+ kmf.init(ks, ksPwd);
+
+ SSLContext clientContext = SSLContext.getInstance("TLS");
+ clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
+
+ SSLEngine clientSSLEngine = clientContext.createSSLEngine();
+
+ clientSSLEngine.setUseClientMode(true);
+ clientSSLEngine.setEnabledProtocols(clientSSLEngine.getSupportedProtocols());
+ clientSSLEngine.setEnabledCipherSuites(clientSSLEngine.getSupportedCipherSuites());
+ clientSSLEngine.setEnableSessionCreation(true);
+
+ channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSSLEngine))
+ .addLast("encoder", encoder)
+ .addLast("decoder", new MessageDecoder())
+ .addLast("handler", dispatcher);
+ }
+
+ }
+
+ private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ private final ChannelHandler dispatcher = new InboundMessageDispatcher();
+ private final ChannelHandler encoder = new MessageEncoder();
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ channel.pipeline()
+ .addLast("encoder", encoder)
+ .addLast("decoder", new MessageDecoder())
+ .addLast("handler", dispatcher);
+ }
+ }
+
+ @ChannelHandler.Sharable
+ private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
+ try {
+ dispatchLocally(message);
+ } catch (RejectedExecutionException e) {
+ log.warn("Unable to dispatch message due to {}", e.getMessage());
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
+ }
+
+ private void dispatchLocally(InternalMessage message) throws IOException {
+ String type = message.type();
+ if (REPLY_MESSAGE_TYPE.equals(type)) {
+ try {
+ CompletableFuture<byte[]> futureResponse =
+ responseFutures.getIfPresent(message.id());
+ if (futureResponse != null) {
+ futureResponse.complete(message.payload());
+ } else {
+ log.warn("Received a reply for message id:[{}]. "
+ + " from {}. But was unable to locate the"
+ + " request handle", message.id(), message.sender());
+ }
+ } finally {
+ responseFutures.invalidate(message.id());
+ }
+ return;
+ }
+ Consumer<InternalMessage> handler = handlers.get(type);
+ if (handler != null) {
+ handler.accept(message);
+ } else {
+ log.debug("No handler registered for {}", type);
+ }
+ }
+}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java
new file mode 100644
index 00000000..2d6257ff
--- /dev/null
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Asynchronous messaging APIs implemented using the Netty framework.
+ */
+package org.onlab.netty;