From e63291850fd0795c5700e25e67e5dee89ba54c5f Mon Sep 17 00:00:00 2001
From: Ashlee Young <ashlee@wildernessvoice.com>
Date: Tue, 1 Dec 2015 05:49:27 -0800
Subject: onos commit hash c2999f30c69e50df905a9d175ef80b3f23a98514

Change-Id: I2bb8562c4942b6d6a6d60b663db2e17540477b81
Signed-off-by: Ashlee Young <ashlee@wildernessvoice.com>
---
 .../main/java/org/onlab/netty/DecoderState.java    |  1 +
 .../main/java/org/onlab/netty/MessageDecoder.java  | 26 +++++++++++++++-------
 .../main/java/org/onlab/netty/MessageEncoder.java  |  9 ++++++++
 .../main/java/org/onlab/netty/NettyMessaging.java  | 17 +++++++-------
 4 files changed, 37 insertions(+), 16 deletions(-)

(limited to 'framework/src/onos/utils/netty')

diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
index c9fc725b..c4393018 100644
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
@@ -19,6 +19,7 @@ package org.onlab.netty;
  * State transitions a decoder goes through as it is decoding an incoming message.
  */
 public enum DecoderState {
+    READ_MESSAGE_PREAMBLE,
     READ_MESSAGE_ID,
     READ_SENDER_IP_VERSION,
     READ_SENDER_IP,
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index c34d3cca..af52a41c 100644
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -16,6 +16,7 @@
 package org.onlab.netty;
 
 import static com.google.common.base.Preconditions.checkState;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ReplayingDecoder;
@@ -37,7 +38,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private final int correctPreamble;
     private long messageId;
+    private int preamble;
     private Version ipVersion;
     private IpAddress senderIp;
     private int senderPort;
@@ -45,8 +48,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
     private String messageType;
     private int contentLength;
 
-    public MessageDecoder() {
-        super(DecoderState.READ_MESSAGE_ID);
+    public MessageDecoder(int correctPreamble) {
+        super(DecoderState.READ_MESSAGE_PREAMBLE);
+        this.correctPreamble = correctPreamble;
     }
 
     @Override
@@ -56,6 +60,12 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
             List<Object> out) throws Exception {
 
         switch (state()) {
+        case READ_MESSAGE_PREAMBLE:
+            preamble = buffer.readInt();
+            if (preamble != correctPreamble) {
+                throw new IllegalStateException("This message had an incorrect preamble.");
+            }
+            checkpoint(DecoderState.READ_MESSAGE_ID);
         case READ_MESSAGE_ID:
             messageId = buffer.readLong();
             checkpoint(DecoderState.READ_SENDER_IP_VERSION);
@@ -63,9 +73,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
             ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
             checkpoint(DecoderState.READ_SENDER_IP);
         case READ_SENDER_IP:
-            byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
-            buffer.readBytes(octects);
-            senderIp = IpAddress.valueOf(ipVersion, octects);
+            byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
+            buffer.readBytes(octets);
+            senderIp = IpAddress.valueOf(ipVersion, octets);
             checkpoint(DecoderState.READ_SENDER_PORT);
         case READ_SENDER_PORT:
             senderPort = buffer.readInt();
@@ -82,15 +92,15 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
             contentLength = buffer.readInt();
             checkpoint(DecoderState.READ_CONTENT);
         case READ_CONTENT:
+            //TODO Perform a sanity check on the size before allocating
             byte[] payload = new byte[contentLength];
             buffer.readBytes(payload);
-            InternalMessage message = new InternalMessage(
-                    messageId,
+            InternalMessage message = new InternalMessage(messageId,
                     new Endpoint(senderIp, senderPort),
                     messageType,
                     payload);
             out.add(message);
-            checkpoint(DecoderState.READ_MESSAGE_ID);
+            checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
             break;
          default:
             checkState(false, "Must not be here");
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 2b7784f8..c74c1de9 100644
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -36,6 +36,13 @@ import com.google.common.base.Charsets;
 @Sharable
 public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
 
+    private final int preamble;
+
+    public MessageEncoder(int preamble) {
+        super();
+        this.preamble = preamble;
+    }
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Override
@@ -44,6 +51,8 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
             InternalMessage message,
             ByteBuf out) throws Exception {
 
+        out.writeInt(this.preamble);
+
         // write message id
         out.writeLong(message.id());
 
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index 1cd7ca7b..2dda747d 100644
--- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -74,6 +74,7 @@ public class NettyMessaging implements MessagingService {
     private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
 
     private Endpoint localEp;
+    private int preamble;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
@@ -123,11 +124,12 @@ public class NettyMessaging implements MessagingService {
         clientChannelClass = NioSocketChannel.class;
     }
 
-    public void start(Endpoint localEp) throws Exception {
+    public void start(int preamble, Endpoint localEp) throws Exception {
         if (started.get()) {
             log.warn("Already running at local endpoint: {}", localEp);
             return;
         }
+        this.preamble = preamble;
         this.localEp = localEp;
         channels.setLifo(true);
         channels.setTestOnBorrow(true);
@@ -324,7 +326,7 @@ public class NettyMessaging implements MessagingService {
     private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
 
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder();
+        private final ChannelHandler encoder = new MessageEncoder(preamble);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
@@ -351,7 +353,7 @@ public class NettyMessaging implements MessagingService {
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
                     .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder())
+                    .addLast("decoder", new MessageDecoder(preamble))
                     .addLast("handler", dispatcher);
         }
 
@@ -360,7 +362,7 @@ public class NettyMessaging implements MessagingService {
     private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
 
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder();
+        private final ChannelHandler encoder = new MessageEncoder(preamble);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
@@ -386,7 +388,7 @@ public class NettyMessaging implements MessagingService {
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
                     .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder())
+                    .addLast("decoder", new MessageDecoder(preamble))
                     .addLast("handler", dispatcher);
         }
 
@@ -395,13 +397,13 @@ public class NettyMessaging implements MessagingService {
     private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
 
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder();
+        private final ChannelHandler encoder = new MessageEncoder(preamble);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
                 channel.pipeline()
                         .addLast("encoder", encoder)
-                        .addLast("decoder", new MessageDecoder())
+                        .addLast("decoder", new MessageDecoder(preamble))
                         .addLast("handler", dispatcher);
         }
     }
@@ -424,7 +426,6 @@ public class NettyMessaging implements MessagingService {
             context.close();
         }
     }
-
     private void dispatchLocally(InternalMessage message) throws IOException {
         String type = message.type();
         if (REPLY_MESSAGE_TYPE.equals(type)) {
-- 
cgit