diff options
Diffstat (limited to 'framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java')
-rw-r--r-- | framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java index 1cd7ca7b..2dda747d 100644 --- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java @@ -74,6 +74,7 @@ public class NettyMessaging implements MessagingService { private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY"; private Endpoint localEp; + private int preamble; private final AtomicBoolean started = new AtomicBoolean(false); private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>(); private final AtomicLong messageIdGenerator = new AtomicLong(0); @@ -123,11 +124,12 @@ public class NettyMessaging implements MessagingService { clientChannelClass = NioSocketChannel.class; } - public void start(Endpoint localEp) throws Exception { + public void start(int preamble, Endpoint localEp) throws Exception { if (started.get()) { log.warn("Already running at local endpoint: {}", localEp); return; } + this.preamble = preamble; this.localEp = localEp; channels.setLifo(true); channels.setTestOnBorrow(true); @@ -324,7 +326,7 @@ public class NettyMessaging implements MessagingService { private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { private final ChannelHandler dispatcher = new InboundMessageDispatcher(); - private final ChannelHandler encoder = new MessageEncoder(); + private final ChannelHandler encoder = new MessageEncoder(preamble); @Override protected void initChannel(SocketChannel channel) throws Exception { @@ -351,7 +353,7 @@ public class NettyMessaging implements MessagingService { channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine)) .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder()) + .addLast("decoder", new MessageDecoder(preamble)) .addLast("handler", dispatcher); } @@ -360,7 +362,7 @@ public class NettyMessaging implements MessagingService { private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { private final ChannelHandler dispatcher = new InboundMessageDispatcher(); - private final ChannelHandler encoder = new MessageEncoder(); + private final ChannelHandler encoder = new MessageEncoder(preamble); @Override protected void initChannel(SocketChannel channel) throws Exception { @@ -386,7 +388,7 @@ public class NettyMessaging implements MessagingService { channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine)) .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder()) + .addLast("decoder", new MessageDecoder(preamble)) .addLast("handler", dispatcher); } @@ -395,13 +397,13 @@ public class NettyMessaging implements MessagingService { private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { private final ChannelHandler dispatcher = new InboundMessageDispatcher(); - private final ChannelHandler encoder = new MessageEncoder(); + private final ChannelHandler encoder = new MessageEncoder(preamble); @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder()) + .addLast("decoder", new MessageDecoder(preamble)) .addLast("handler", dispatcher); } } @@ -424,7 +426,6 @@ public class NettyMessaging implements MessagingService { context.close(); } } - private void dispatchLocally(InternalMessage message) throws IOException { String type = message.type(); if (REPLY_MESSAGE_TYPE.equals(type)) { |