diff options
Diffstat (limited to 'framework/src/onos/utils/netty')
7 files changed, 850 insertions, 0 deletions
diff --git a/framework/src/onos/utils/netty/pom.xml b/framework/src/onos/utils/netty/pom.xml new file mode 100644 index 00000000..fa320d52 --- /dev/null +++ b/framework/src/onos/utils/netty/pom.xml @@ -0,0 +1,83 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Copyright 2014 Open Networking Laboratory + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onosproject</groupId> + <artifactId>onlab-utils</artifactId> + <version>1.3.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>onlab-netty</artifactId> + <packaging>bundle</packaging> + + <description>Network I/O using Netty framework</description> + + <dependencies> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava-testlib</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.onosproject</groupId> + <artifactId>onos-api</artifactId> + </dependency> + <dependency> + <groupId>org.onosproject</groupId> + <artifactId>onlab-misc</artifactId> + </dependency> + <dependency> + <groupId>org.onosproject</groupId> + <artifactId>onlab-junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-pool</groupId> + <artifactId>commons-pool</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-epoll</artifactId> + <version>${netty4.version}</version> + </dependency> + </dependencies> +</project> diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java new file mode 100644 index 00000000..c9fc725b --- /dev/null +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java @@ -0,0 +1,30 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.netty; + +/** + * State transitions a decoder goes through as it is decoding an incoming message. + */ +public enum DecoderState { + READ_MESSAGE_ID, + READ_SENDER_IP_VERSION, + READ_SENDER_IP, + READ_SENDER_PORT, + READ_MESSAGE_TYPE_LENGTH, + READ_MESSAGE_TYPE, + READ_CONTENT_LENGTH, + READ_CONTENT +} diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java new file mode 100644 index 00000000..102e2a22 --- /dev/null +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java @@ -0,0 +1,66 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.netty; + +import org.onlab.util.ByteArraySizeHashPrinter; +import org.onosproject.store.cluster.messaging.Endpoint; + +import com.google.common.base.MoreObjects; + +/** + * Internal message representation with additional attributes + * for supporting, synchronous request/reply behavior. + */ +public final class InternalMessage { + + private final long id; + private final Endpoint sender; + private final String type; + private final byte[] payload; + + public InternalMessage(long id, Endpoint sender, String type, byte[] payload) { + this.id = id; + this.sender = sender; + this.type = type; + this.payload = payload; + } + + public long id() { + return id; + } + + public String type() { + return type; + } + + public Endpoint sender() { + return sender; + } + + public byte[] payload() { + return payload; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", id) + .add("type", type) + .add("sender", sender) + .add("payload", ByteArraySizeHashPrinter.of(payload)) + .toString(); + } +} diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java new file mode 100644 index 00000000..c34d3cca --- /dev/null +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java @@ -0,0 +1,105 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.netty; + +import static com.google.common.base.Preconditions.checkState; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; + +import java.util.List; + +import org.onlab.packet.IpAddress; +import org.onlab.packet.IpAddress.Version; +import org.onosproject.store.cluster.messaging.Endpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; + +/** + * Decoder for inbound messages. + */ +public class MessageDecoder extends ReplayingDecoder<DecoderState> { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private long messageId; + private Version ipVersion; + private IpAddress senderIp; + private int senderPort; + private int messageTypeLength; + private String messageType; + private int contentLength; + + public MessageDecoder() { + super(DecoderState.READ_MESSAGE_ID); + } + + @Override + protected void decode( + ChannelHandlerContext context, + ByteBuf buffer, + List<Object> out) throws Exception { + + switch (state()) { + case READ_MESSAGE_ID: + messageId = buffer.readLong(); + checkpoint(DecoderState.READ_SENDER_IP_VERSION); + case READ_SENDER_IP_VERSION: + ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6; + checkpoint(DecoderState.READ_SENDER_IP); + case READ_SENDER_IP: + byte[] octects = new byte[IpAddress.byteLength(ipVersion)]; + buffer.readBytes(octects); + senderIp = IpAddress.valueOf(ipVersion, octects); + checkpoint(DecoderState.READ_SENDER_PORT); + case READ_SENDER_PORT: + senderPort = buffer.readInt(); + checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH); + case READ_MESSAGE_TYPE_LENGTH: + messageTypeLength = buffer.readInt(); + checkpoint(DecoderState.READ_MESSAGE_TYPE); + case READ_MESSAGE_TYPE: + byte[] messageTypeBytes = new byte[messageTypeLength]; + buffer.readBytes(messageTypeBytes); + messageType = new String(messageTypeBytes, Charsets.UTF_8); + checkpoint(DecoderState.READ_CONTENT_LENGTH); + case READ_CONTENT_LENGTH: + contentLength = buffer.readInt(); + checkpoint(DecoderState.READ_CONTENT); + case READ_CONTENT: + byte[] payload = new byte[contentLength]; + buffer.readBytes(payload); + InternalMessage message = new InternalMessage( + messageId, + new Endpoint(senderIp, senderPort), + messageType, + payload); + out.add(message); + checkpoint(DecoderState.READ_MESSAGE_ID); + break; + default: + checkState(false, "Must not be here"); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { + log.error("Exception inside channel handling pipeline.", cause); + context.close(); + } +} diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java new file mode 100644 index 00000000..2b7784f8 --- /dev/null +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java @@ -0,0 +1,89 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +import java.io.IOException; + +import org.onlab.packet.IpAddress; +import org.onlab.packet.IpAddress.Version; +import org.onosproject.store.cluster.messaging.Endpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; + +/** + * Encode InternalMessage out into a byte buffer. + */ +@Sharable +public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Override + protected void encode( + ChannelHandlerContext context, + InternalMessage message, + ByteBuf out) throws Exception { + + // write message id + out.writeLong(message.id()); + + Endpoint sender = message.sender(); + + IpAddress senderIp = sender.host(); + if (senderIp.version() == Version.INET) { + out.writeByte(0); + } else { + out.writeByte(1); + } + out.writeBytes(senderIp.toOctets()); + + // write sender port + out.writeInt(sender.port()); + + byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8); + + // write length of message type + out.writeInt(messageTypeBytes.length); + + // write message type bytes + out.writeBytes(messageTypeBytes); + + byte[] payload = message.payload(); + + // write payload length + out.writeInt(payload.length); + + // write payload. + out.writeBytes(payload); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { + if (cause instanceof IOException) { + log.debug("IOException inside channel handling pipeline.", cause); + } else { + log.error("non-IOException inside channel handling pipeline.", cause); + } + context.close(); + } +} diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java new file mode 100644 index 00000000..8c759d14 --- /dev/null +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java @@ -0,0 +1,457 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +import java.io.FileInputStream; +import java.io.IOException; +import java.security.KeyStore; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.apache.commons.pool.KeyedPoolableObjectFactory; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import org.onosproject.store.cluster.messaging.Endpoint; +import org.onosproject.store.cluster.messaging.MessagingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManagerFactory; + +/** + * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework. + */ +public class NettyMessaging implements MessagingService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY"; + + private Endpoint localEp; + private final AtomicBoolean started = new AtomicBoolean(false); + private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>(); + private final AtomicLong messageIdGenerator = new AtomicLong(0); + private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder() + .expireAfterWrite(10, TimeUnit.SECONDS) + .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() { + @Override + public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) { + if (entry.wasEvicted()) { + entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply")); + } + } + }) + .build(); + + private final GenericKeyedObjectPool<Endpoint, Channel> channels + = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory()); + + private EventLoopGroup serverGroup; + private EventLoopGroup clientGroup; + private Class<? extends ServerChannel> serverChannelClass; + private Class<? extends Channel> clientChannelClass; + + protected static final boolean TLS_DISABLED = false; + protected boolean enableNettyTLS = TLS_DISABLED; + + protected String ksLocation; + protected String tsLocation; + protected char[] ksPwd; + protected char[] tsPwd; + + private void initEventLoopGroup() { + // try Epoll first and if that does work, use nio. + try { + clientGroup = new EpollEventLoopGroup(); + serverGroup = new EpollEventLoopGroup(); + serverChannelClass = EpollServerSocketChannel.class; + clientChannelClass = EpollSocketChannel.class; + return; + } catch (Throwable e) { + log.debug("Failed to initialize native (epoll) transport. " + + "Reason: {}. Proceeding with nio.", e.getMessage()); + } + clientGroup = new NioEventLoopGroup(); + serverGroup = new NioEventLoopGroup(); + serverChannelClass = NioServerSocketChannel.class; + clientChannelClass = NioSocketChannel.class; + } + + public void start(Endpoint localEp) throws Exception { + if (started.get()) { + log.warn("Already running at local endpoint: {}", localEp); + return; + } + this.localEp = localEp; + channels.setLifo(true); + channels.setTestOnBorrow(true); + channels.setTestOnReturn(true); + channels.setMinEvictableIdleTimeMillis(60_000L); + channels.setTimeBetweenEvictionRunsMillis(30_000L); + initEventLoopGroup(); + startAcceptingConnections(); + started.set(true); + } + + public void stop() throws Exception { + if (started.get()) { + channels.close(); + serverGroup.shutdownGracefully(); + clientGroup.shutdownGracefully(); + started.set(false); + } + } + + @Override + public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) { + InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(), + localEp, + type, + payload); + return sendAsync(ep, message); + } + + protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) { + CompletableFuture<Void> future = new CompletableFuture<>(); + try { + if (ep.equals(localEp)) { + dispatchLocally(message); + future.complete(null); + } else { + Channel channel = null; + try { + channel = channels.borrowObject(ep); + channel.writeAndFlush(message).addListener(channelFuture -> { + if (!channelFuture.isSuccess()) { + future.completeExceptionally(channelFuture.cause()); + } else { + future.complete(null); + } + }); + } finally { + channels.returnObject(ep, channel); + } + } + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) { + CompletableFuture<byte[]> response = new CompletableFuture<>(); + Long messageId = messageIdGenerator.incrementAndGet(); + responseFutures.put(messageId, response); + InternalMessage message = new InternalMessage(messageId, localEp, type, payload); + try { + sendAsync(ep, message); + } catch (Exception e) { + responseFutures.invalidate(messageId); + response.completeExceptionally(e); + } + return response; + } + + @Override + public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) { + handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload()))); + } + + @Override + public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) { + handlers.put(type, message -> executor.execute(() -> { + byte[] responsePayload = handler.apply(message.payload()); + if (responsePayload != null) { + InternalMessage response = new InternalMessage(message.id(), + localEp, + REPLY_MESSAGE_TYPE, + responsePayload); + sendAsync(message.sender(), response).whenComplete((result, error) -> { + if (error != null) { + log.debug("Failed to respond", error); + } + }); + } + })); + } + + @Override + public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) { + handlers.put(type, message -> { + handler.apply(message.payload()).whenComplete((result, error) -> { + if (error == null) { + InternalMessage response = new InternalMessage(message.id(), + localEp, + REPLY_MESSAGE_TYPE, + result); + sendAsync(message.sender(), response).whenComplete((r, e) -> { + if (e != null) { + log.debug("Failed to respond", e); + } + }); + } + }); + }); + } + + @Override + public void unregisterHandler(String type) { + handlers.remove(type); + } + + private void startAcceptingConnections() throws InterruptedException { + ServerBootstrap b = new ServerBootstrap(); + b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024); + b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024); + b.option(ChannelOption.SO_RCVBUF, 1048576); + b.option(ChannelOption.TCP_NODELAY, true); + b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + b.group(serverGroup, clientGroup); + b.channel(serverChannelClass); + if (enableNettyTLS) { + b.childHandler(new SSLServerCommunicationChannelInitializer()); + } else { + b.childHandler(new OnosCommunicationChannelInitializer()); + } + b.option(ChannelOption.SO_BACKLOG, 128); + b.childOption(ChannelOption.SO_KEEPALIVE, true); + + // Bind and start to accept incoming connections. + b.bind(localEp.port()).sync().addListener(future -> { + if (future.isSuccess()) { + log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port()); + } else { + log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause()); + } + }); + } + + private class OnosCommunicationChannelFactory + implements KeyedPoolableObjectFactory<Endpoint, Channel> { + + @Override + public void activateObject(Endpoint endpoint, Channel channel) + throws Exception { + } + + @Override + public void destroyObject(Endpoint ep, Channel channel) throws Exception { + log.debug("Closing connection to {}", ep); + channel.close(); + } + + @Override + public Channel makeObject(Endpoint ep) throws Exception { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024); + bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024); + bootstrap.option(ChannelOption.SO_SNDBUF, 1048576); + bootstrap.group(clientGroup); + // TODO: Make this faster: + // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0 + bootstrap.channel(clientChannelClass); + bootstrap.option(ChannelOption.SO_KEEPALIVE, true); + if (enableNettyTLS) { + bootstrap.handler(new SSLClientCommunicationChannelInitializer()); + } else { + bootstrap.handler(new OnosCommunicationChannelInitializer()); + } + // Start the client. + ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync(); + log.debug("Established a new connection to {}", ep); + return f.channel(); + } + + @Override + public void passivateObject(Endpoint ep, Channel channel) + throws Exception { + } + + @Override + public boolean validateObject(Endpoint ep, Channel channel) { + return channel.isOpen(); + } + } + + private class SSLServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { + + private final ChannelHandler dispatcher = new InboundMessageDispatcher(); + private final ChannelHandler encoder = new MessageEncoder(); + + @Override + protected void initChannel(SocketChannel channel) throws Exception { + TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + KeyStore ts = KeyStore.getInstance("JKS"); + ts.load(new FileInputStream(tsLocation), tsPwd); + tmFactory.init(ts); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(new FileInputStream(ksLocation), ksPwd); + kmf.init(ks, ksPwd); + + SSLContext serverContext = SSLContext.getInstance("TLS"); + serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null); + + SSLEngine serverSSLEngine = serverContext.createSSLEngine(); + + serverSSLEngine.setNeedClientAuth(true); + serverSSLEngine.setUseClientMode(false); + serverSSLEngine.setEnabledProtocols(serverSSLEngine.getSupportedProtocols()); + serverSSLEngine.setEnabledCipherSuites(serverSSLEngine.getSupportedCipherSuites()); + serverSSLEngine.setEnableSessionCreation(true); + + channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSSLEngine)) + .addLast("encoder", encoder) + .addLast("decoder", new MessageDecoder()) + .addLast("handler", dispatcher); + } + + } + + private class SSLClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { + + private final ChannelHandler dispatcher = new InboundMessageDispatcher(); + private final ChannelHandler encoder = new MessageEncoder(); + + @Override + protected void initChannel(SocketChannel channel) throws Exception { + TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + KeyStore ts = KeyStore.getInstance("JKS"); + ts.load(new FileInputStream(tsLocation), tsPwd); + tmFactory.init(ts); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(new FileInputStream(ksLocation), ksPwd); + kmf.init(ks, ksPwd); + + SSLContext clientContext = SSLContext.getInstance("TLS"); + clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null); + + SSLEngine clientSSLEngine = clientContext.createSSLEngine(); + + clientSSLEngine.setUseClientMode(true); + clientSSLEngine.setEnabledProtocols(clientSSLEngine.getSupportedProtocols()); + clientSSLEngine.setEnabledCipherSuites(clientSSLEngine.getSupportedCipherSuites()); + clientSSLEngine.setEnableSessionCreation(true); + + channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSSLEngine)) + .addLast("encoder", encoder) + .addLast("decoder", new MessageDecoder()) + .addLast("handler", dispatcher); + } + + } + + private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { + + private final ChannelHandler dispatcher = new InboundMessageDispatcher(); + private final ChannelHandler encoder = new MessageEncoder(); + + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline() + .addLast("encoder", encoder) + .addLast("decoder", new MessageDecoder()) + .addLast("handler", dispatcher); + } + } + + @ChannelHandler.Sharable + private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception { + try { + dispatchLocally(message); + } catch (RejectedExecutionException e) { + log.warn("Unable to dispatch message due to {}", e.getMessage()); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { + log.error("Exception inside channel handling pipeline.", cause); + context.close(); + } + } + + private void dispatchLocally(InternalMessage message) throws IOException { + String type = message.type(); + if (REPLY_MESSAGE_TYPE.equals(type)) { + try { + CompletableFuture<byte[]> futureResponse = + responseFutures.getIfPresent(message.id()); + if (futureResponse != null) { + futureResponse.complete(message.payload()); + } else { + log.warn("Received a reply for message id:[{}]. " + + " from {}. But was unable to locate the" + + " request handle", message.id(), message.sender()); + } + } finally { + responseFutures.invalidate(message.id()); + } + return; + } + Consumer<InternalMessage> handler = handlers.get(type); + if (handler != null) { + handler.accept(message); + } else { + log.debug("No handler registered for {}", type); + } + } +} diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java new file mode 100644 index 00000000..2d6257ff --- /dev/null +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Asynchronous messaging APIs implemented using the Netty framework. + */ +package org.onlab.netty; |