aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java')
-rw-r--r--framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java105
1 files changed, 105 insertions, 0 deletions
diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
new file mode 100644
index 00000000..c34d3cca
--- /dev/null
+++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.netty;
+
+import static com.google.common.base.Preconditions.checkState;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
+import java.util.List;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Decoder for inbound messages.
+ */
+public class MessageDecoder extends ReplayingDecoder<DecoderState> {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private long messageId;
+ private Version ipVersion;
+ private IpAddress senderIp;
+ private int senderPort;
+ private int messageTypeLength;
+ private String messageType;
+ private int contentLength;
+
+ public MessageDecoder() {
+ super(DecoderState.READ_MESSAGE_ID);
+ }
+
+ @Override
+ protected void decode(
+ ChannelHandlerContext context,
+ ByteBuf buffer,
+ List<Object> out) throws Exception {
+
+ switch (state()) {
+ case READ_MESSAGE_ID:
+ messageId = buffer.readLong();
+ checkpoint(DecoderState.READ_SENDER_IP_VERSION);
+ case READ_SENDER_IP_VERSION:
+ ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
+ checkpoint(DecoderState.READ_SENDER_IP);
+ case READ_SENDER_IP:
+ byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
+ buffer.readBytes(octects);
+ senderIp = IpAddress.valueOf(ipVersion, octects);
+ checkpoint(DecoderState.READ_SENDER_PORT);
+ case READ_SENDER_PORT:
+ senderPort = buffer.readInt();
+ checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
+ case READ_MESSAGE_TYPE_LENGTH:
+ messageTypeLength = buffer.readInt();
+ checkpoint(DecoderState.READ_MESSAGE_TYPE);
+ case READ_MESSAGE_TYPE:
+ byte[] messageTypeBytes = new byte[messageTypeLength];
+ buffer.readBytes(messageTypeBytes);
+ messageType = new String(messageTypeBytes, Charsets.UTF_8);
+ checkpoint(DecoderState.READ_CONTENT_LENGTH);
+ case READ_CONTENT_LENGTH:
+ contentLength = buffer.readInt();
+ checkpoint(DecoderState.READ_CONTENT);
+ case READ_CONTENT:
+ byte[] payload = new byte[contentLength];
+ buffer.readBytes(payload);
+ InternalMessage message = new InternalMessage(
+ messageId,
+ new Endpoint(senderIp, senderPort),
+ messageType,
+ payload);
+ out.add(message);
+ checkpoint(DecoderState.READ_MESSAGE_ID);
+ break;
+ default:
+ checkState(false, "Must not be here");
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ log.error("Exception inside channel handling pipeline.", cause);
+ context.close();
+ }
+}