aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/netty
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/utils/netty')
-rw-r--r--framework/src/onos/utils/netty/pom.xml83
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java31
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java66
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java115
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java98
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java454
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java20
7 files changed, 0 insertions, 867 deletions
diff --git a/framework/src/onos/utils/netty/pom.xml b/framework/src/onos/utils/netty/pom.xml
deleted file mode 100644
index 7bae10aa..00000000
--- a/framework/src/onos/utils/netty/pom.xml
+++ /dev/null
@@ -1,83 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright 2014 Open Networking Laboratory
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.onosproject</groupId>
- <artifactId>onlab-utils</artifactId>
- <version>1.4.0-rc1</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>onlab-netty</artifactId>
- <packaging>bundle</packaging>
-
- <description>Network I/O using Netty framework</description>
-
- <dependencies>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-testlib</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onlab-misc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onlab-junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport-native-epoll</artifactId>
- <version>${netty4.version}</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
deleted file mode 100644
index c4393018..00000000
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-/**
- * State transitions a decoder goes through as it is decoding an incoming message.
- */
-public enum DecoderState {
- READ_MESSAGE_PREAMBLE,
- READ_MESSAGE_ID,
- READ_SENDER_IP_VERSION,
- READ_SENDER_IP,
- READ_SENDER_PORT,
- READ_MESSAGE_TYPE_LENGTH,
- READ_MESSAGE_TYPE,
- READ_CONTENT_LENGTH,
- READ_CONTENT
-}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
deleted file mode 100644
index 102e2a22..00000000
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import org.onlab.util.ByteArraySizeHashPrinter;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Internal message representation with additional attributes
- * for supporting, synchronous request/reply behavior.
- */
-public final class InternalMessage {
-
- private final long id;
- private final Endpoint sender;
- private final String type;
- private final byte[] payload;
-
- public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
- this.id = id;
- this.sender = sender;
- this.type = type;
- this.payload = payload;
- }
-
- public long id() {
- return id;
- }
-
- public String type() {
- return type;
- }
-
- public Endpoint sender() {
- return sender;
- }
-
- public byte[] payload() {
- return payload;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("id", id)
- .add("type", type)
- .add("sender", sender)
- .add("payload", ByteArraySizeHashPrinter.of(payload))
- .toString();
- }
-}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
deleted file mode 100644
index af52a41c..00000000
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ReplayingDecoder;
-
-import java.util.List;
-
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpAddress.Version;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-
-/**
- * Decoder for inbound messages.
- */
-public class MessageDecoder extends ReplayingDecoder<DecoderState> {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final int correctPreamble;
- private long messageId;
- private int preamble;
- private Version ipVersion;
- private IpAddress senderIp;
- private int senderPort;
- private int messageTypeLength;
- private String messageType;
- private int contentLength;
-
- public MessageDecoder(int correctPreamble) {
- super(DecoderState.READ_MESSAGE_PREAMBLE);
- this.correctPreamble = correctPreamble;
- }
-
- @Override
- protected void decode(
- ChannelHandlerContext context,
- ByteBuf buffer,
- List<Object> out) throws Exception {
-
- switch (state()) {
- case READ_MESSAGE_PREAMBLE:
- preamble = buffer.readInt();
- if (preamble != correctPreamble) {
- throw new IllegalStateException("This message had an incorrect preamble.");
- }
- checkpoint(DecoderState.READ_MESSAGE_ID);
- case READ_MESSAGE_ID:
- messageId = buffer.readLong();
- checkpoint(DecoderState.READ_SENDER_IP_VERSION);
- case READ_SENDER_IP_VERSION:
- ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
- checkpoint(DecoderState.READ_SENDER_IP);
- case READ_SENDER_IP:
- byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
- buffer.readBytes(octets);
- senderIp = IpAddress.valueOf(ipVersion, octets);
- checkpoint(DecoderState.READ_SENDER_PORT);
- case READ_SENDER_PORT:
- senderPort = buffer.readInt();
- checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
- case READ_MESSAGE_TYPE_LENGTH:
- messageTypeLength = buffer.readInt();
- checkpoint(DecoderState.READ_MESSAGE_TYPE);
- case READ_MESSAGE_TYPE:
- byte[] messageTypeBytes = new byte[messageTypeLength];
- buffer.readBytes(messageTypeBytes);
- messageType = new String(messageTypeBytes, Charsets.UTF_8);
- checkpoint(DecoderState.READ_CONTENT_LENGTH);
- case READ_CONTENT_LENGTH:
- contentLength = buffer.readInt();
- checkpoint(DecoderState.READ_CONTENT);
- case READ_CONTENT:
- //TODO Perform a sanity check on the size before allocating
- byte[] payload = new byte[contentLength];
- buffer.readBytes(payload);
- InternalMessage message = new InternalMessage(messageId,
- new Endpoint(senderIp, senderPort),
- messageType,
- payload);
- out.add(message);
- checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
- break;
- default:
- checkState(false, "Must not be here");
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- log.error("Exception inside channel handling pipeline.", cause);
- context.close();
- }
-}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
deleted file mode 100644
index c74c1de9..00000000
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
-
-import java.io.IOException;
-
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpAddress.Version;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-
-/**
- * Encode InternalMessage out into a byte buffer.
- */
-@Sharable
-public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
-
- private final int preamble;
-
- public MessageEncoder(int preamble) {
- super();
- this.preamble = preamble;
- }
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Override
- protected void encode(
- ChannelHandlerContext context,
- InternalMessage message,
- ByteBuf out) throws Exception {
-
- out.writeInt(this.preamble);
-
- // write message id
- out.writeLong(message.id());
-
- Endpoint sender = message.sender();
-
- IpAddress senderIp = sender.host();
- if (senderIp.version() == Version.INET) {
- out.writeByte(0);
- } else {
- out.writeByte(1);
- }
- out.writeBytes(senderIp.toOctets());
-
- // write sender port
- out.writeInt(sender.port());
-
- byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
-
- // write length of message type
- out.writeInt(messageTypeBytes.length);
-
- // write message type bytes
- out.writeBytes(messageTypeBytes);
-
- byte[] payload = message.payload();
-
- // write payload length
- out.writeInt(payload.length);
-
- // write payload.
- out.writeBytes(payload);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- if (cause instanceof IOException) {
- log.debug("IOException inside channel handling pipeline.", cause);
- } else {
- log.error("non-IOException inside channel handling pipeline.", cause);
- }
- context.close();
- }
-}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
deleted file mode 100644
index 2dda747d..00000000
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.security.KeyStore;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-/**
- * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
- */
-public class NettyMessaging implements MessagingService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
-
- private Endpoint localEp;
- private int preamble;
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
- private final AtomicLong messageIdGenerator = new AtomicLong(0);
- private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
- .expireAfterWrite(10, TimeUnit.SECONDS)
- .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
- @Override
- public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
- if (entry.wasEvicted()) {
- entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
- }
- }
- })
- .build();
-
- private final GenericKeyedObjectPool<Endpoint, Channel> channels
- = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
-
- private EventLoopGroup serverGroup;
- private EventLoopGroup clientGroup;
- private Class<? extends ServerChannel> serverChannelClass;
- private Class<? extends Channel> clientChannelClass;
-
- protected static final boolean TLS_DISABLED = false;
- protected boolean enableNettyTls = TLS_DISABLED;
-
- protected String ksLocation;
- protected String tsLocation;
- protected char[] ksPwd;
- protected char[] tsPwd;
-
- private void initEventLoopGroup() {
- // try Epoll first and if that does work, use nio.
- try {
- clientGroup = new EpollEventLoopGroup();
- serverGroup = new EpollEventLoopGroup();
- serverChannelClass = EpollServerSocketChannel.class;
- clientChannelClass = EpollSocketChannel.class;
- return;
- } catch (Throwable e) {
- log.debug("Failed to initialize native (epoll) transport. "
- + "Reason: {}. Proceeding with nio.", e.getMessage());
- }
- clientGroup = new NioEventLoopGroup();
- serverGroup = new NioEventLoopGroup();
- serverChannelClass = NioServerSocketChannel.class;
- clientChannelClass = NioSocketChannel.class;
- }
-
- public void start(int preamble, Endpoint localEp) throws Exception {
- if (started.get()) {
- log.warn("Already running at local endpoint: {}", localEp);
- return;
- }
- this.preamble = preamble;
- this.localEp = localEp;
- channels.setLifo(true);
- channels.setTestOnBorrow(true);
- channels.setTestOnReturn(true);
- channels.setMinEvictableIdleTimeMillis(60_000L);
- channels.setTimeBetweenEvictionRunsMillis(30_000L);
- initEventLoopGroup();
- startAcceptingConnections();
- started.set(true);
- }
-
- public void stop() throws Exception {
- if (started.get()) {
- channels.close();
- serverGroup.shutdownGracefully();
- clientGroup.shutdownGracefully();
- started.set(false);
- }
- }
-
- @Override
- public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
- InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
- localEp,
- type,
- payload);
- return sendAsync(ep, message);
- }
-
- protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- try {
- if (ep.equals(localEp)) {
- dispatchLocally(message);
- future.complete(null);
- } else {
- Channel channel = null;
- try {
- channel = channels.borrowObject(ep);
- channel.writeAndFlush(message).addListener(channelFuture -> {
- if (!channelFuture.isSuccess()) {
- future.completeExceptionally(channelFuture.cause());
- } else {
- future.complete(null);
- }
- });
- } finally {
- channels.returnObject(ep, channel);
- }
- }
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
- CompletableFuture<byte[]> response = new CompletableFuture<>();
- Long messageId = messageIdGenerator.incrementAndGet();
- responseFutures.put(messageId, response);
- InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
- try {
- sendAsync(ep, message);
- } catch (Exception e) {
- responseFutures.invalidate(messageId);
- response.completeExceptionally(e);
- }
- return response;
- }
-
- @Override
- public void registerHandler(String type, Consumer<byte[]> handler, Executor executor) {
- handlers.put(type, message -> executor.execute(() -> handler.accept(message.payload())));
- }
-
- @Override
- public void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor) {
- handlers.put(type, message -> executor.execute(() -> {
- byte[] responsePayload = handler.apply(message.payload());
- if (responsePayload != null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- responsePayload);
- sendAsync(message.sender(), response).whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to respond", error);
- }
- });
- }
- }));
- }
-
- @Override
- public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
- handlers.put(type, message -> {
- handler.apply(message.payload()).whenComplete((result, error) -> {
- if (error == null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- result);
- sendAsync(message.sender(), response).whenComplete((r, e) -> {
- if (e != null) {
- log.debug("Failed to respond", e);
- }
- });
- }
- });
- });
- }
-
- @Override
- public void unregisterHandler(String type) {
- handlers.remove(type);
- }
-
- private void startAcceptingConnections() throws InterruptedException {
- ServerBootstrap b = new ServerBootstrap();
- b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
- b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
- b.option(ChannelOption.SO_RCVBUF, 1048576);
- b.option(ChannelOption.TCP_NODELAY, true);
- b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- b.group(serverGroup, clientGroup);
- b.channel(serverChannelClass);
- if (enableNettyTls) {
- b.childHandler(new SslServerCommunicationChannelInitializer());
- } else {
- b.childHandler(new OnosCommunicationChannelInitializer());
- }
- b.option(ChannelOption.SO_BACKLOG, 128);
- b.childOption(ChannelOption.SO_KEEPALIVE, true);
-
- // Bind and start to accept incoming connections.
- b.bind(localEp.port()).sync().addListener(future -> {
- if (future.isSuccess()) {
- log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
- } else {
- log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
- }
- });
- }
-
- private class OnosCommunicationChannelFactory
- implements KeyedPoolableObjectFactory<Endpoint, Channel> {
-
- @Override
- public void activateObject(Endpoint endpoint, Channel channel)
- throws Exception {
- }
-
- @Override
- public void destroyObject(Endpoint ep, Channel channel) throws Exception {
- log.debug("Closing connection to {}", ep);
- channel.close();
- }
-
- @Override
- public Channel makeObject(Endpoint ep) throws Exception {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
- bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
- bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
- bootstrap.group(clientGroup);
- // TODO: Make this faster:
- // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
- bootstrap.channel(clientChannelClass);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
- if (enableNettyTls) {
- bootstrap.handler(new SslClientCommunicationChannelInitializer());
- } else {
- bootstrap.handler(new OnosCommunicationChannelInitializer());
- }
- // Start the client.
- ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
- log.debug("Established a new connection to {}", ep);
- return f.channel();
- }
-
- @Override
- public void passivateObject(Endpoint ep, Channel channel)
- throws Exception {
- }
-
- @Override
- public boolean validateObject(Endpoint ep, Channel channel) {
- return channel.isOpen();
- }
- }
-
- private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- KeyStore ts = KeyStore.getInstance("JKS");
- ts.load(new FileInputStream(tsLocation), tsPwd);
- tmFactory.init(ts);
-
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(new FileInputStream(ksLocation), ksPwd);
- kmf.init(ks, ksPwd);
-
- SSLContext serverContext = SSLContext.getInstance("TLS");
- serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
-
- SSLEngine serverSslEngine = serverContext.createSSLEngine();
-
- serverSslEngine.setNeedClientAuth(true);
- serverSslEngine.setUseClientMode(false);
- serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
- serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
- serverSslEngine.setEnableSessionCreation(true);
-
- channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
- .addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
- .addLast("handler", dispatcher);
- }
-
- }
-
- private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- KeyStore ts = KeyStore.getInstance("JKS");
- ts.load(new FileInputStream(tsLocation), tsPwd);
- tmFactory.init(ts);
-
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(new FileInputStream(ksLocation), ksPwd);
- kmf.init(ks, ksPwd);
-
- SSLContext clientContext = SSLContext.getInstance("TLS");
- clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
-
- SSLEngine clientSslEngine = clientContext.createSSLEngine();
-
- clientSslEngine.setUseClientMode(true);
- clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
- clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
- clientSslEngine.setEnableSessionCreation(true);
-
- channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
- .addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
- .addLast("handler", dispatcher);
- }
-
- }
-
- private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
- channel.pipeline()
- .addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
- .addLast("handler", dispatcher);
- }
- }
-
- @ChannelHandler.Sharable
- private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
- try {
- dispatchLocally(message);
- } catch (RejectedExecutionException e) {
- log.warn("Unable to dispatch message due to {}", e.getMessage());
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
- log.error("Exception inside channel handling pipeline.", cause);
- context.close();
- }
- }
- private void dispatchLocally(InternalMessage message) throws IOException {
- String type = message.type();
- if (REPLY_MESSAGE_TYPE.equals(type)) {
- try {
- CompletableFuture<byte[]> futureResponse =
- responseFutures.getIfPresent(message.id());
- if (futureResponse != null) {
- futureResponse.complete(message.payload());
- } else {
- log.warn("Received a reply for message id:[{}]. "
- + " from {}. But was unable to locate the"
- + " request handle", message.id(), message.sender());
- }
- } finally {
- responseFutures.invalidate(message.id());
- }
- return;
- }
- Consumer<InternalMessage> handler = handlers.get(type);
- if (handler != null) {
- handler.accept(message);
- } else {
- log.debug("No handler registered for {}", type);
- }
- }
-}
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java
deleted file mode 100644
index 2d6257ff..00000000
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Asynchronous messaging APIs implemented using the Netty framework.
- */
-package org.onlab.netty;