From e63291850fd0795c5700e25e67e5dee89ba54c5f Mon Sep 17 00:00:00 2001 From: Ashlee Young Date: Tue, 1 Dec 2015 05:49:27 -0800 Subject: onos commit hash c2999f30c69e50df905a9d175ef80b3f23a98514 Change-Id: I2bb8562c4942b6d6a6d60b663db2e17540477b81 Signed-off-by: Ashlee Young --- framework/src/onos/utils/catalyst/pom.xml | 8 ++- .../java/org/onlab/catalyst/OnlabCatalyst.java | 7 +++ .../main/java/org/onlab/catalyst/package-info.java | 4 ++ .../src/main/java/org/onlab/util/HexString.java | 60 +++++++++++++++++----- .../main/java/org/onlab/netty/DecoderState.java | 1 + .../main/java/org/onlab/netty/MessageDecoder.java | 26 +++++++--- .../main/java/org/onlab/netty/MessageEncoder.java | 9 ++++ .../main/java/org/onlab/netty/NettyMessaging.java | 17 +++--- 8 files changed, 102 insertions(+), 30 deletions(-) create mode 100644 framework/src/onos/utils/catalyst/src/main/java/org/onlab/catalyst/OnlabCatalyst.java create mode 100644 framework/src/onos/utils/catalyst/src/main/java/org/onlab/catalyst/package-info.java (limited to 'framework/src/onos/utils') diff --git a/framework/src/onos/utils/catalyst/pom.xml b/framework/src/onos/utils/catalyst/pom.xml index bd1d52dd..a4fc7aa3 100644 --- a/framework/src/onos/utils/catalyst/pom.xml +++ b/framework/src/onos/utils/catalyst/pom.xml @@ -15,6 +15,11 @@ ONLab catalyst dependency + + io.atomix.catalyst + catalyst-netty + 1.0.0-rc5 + @@ -24,10 +29,9 @@ org.apache.maven.plugins maven-shade-plugin - true - io.atomix.catalyst.* + io/atomix/catalyst/** diff --git a/framework/src/onos/utils/catalyst/src/main/java/org/onlab/catalyst/OnlabCatalyst.java b/framework/src/onos/utils/catalyst/src/main/java/org/onlab/catalyst/OnlabCatalyst.java new file mode 100644 index 00000000..c3ecc860 --- /dev/null +++ b/framework/src/onos/utils/catalyst/src/main/java/org/onlab/catalyst/OnlabCatalyst.java @@ -0,0 +1,7 @@ +package org.onlab.catalyst; + +/** + * Created by admin on 11/24/15. + */ +public class OnlabCatalyst { +} diff --git a/framework/src/onos/utils/catalyst/src/main/java/org/onlab/catalyst/package-info.java b/framework/src/onos/utils/catalyst/src/main/java/org/onlab/catalyst/package-info.java new file mode 100644 index 00000000..1fdb9ea2 --- /dev/null +++ b/framework/src/onos/utils/catalyst/src/main/java/org/onlab/catalyst/package-info.java @@ -0,0 +1,4 @@ +/** + * Created by admin on 11/24/15. + */ +package org.onlab.catalyst; \ No newline at end of file diff --git a/framework/src/onos/utils/misc/src/main/java/org/onlab/util/HexString.java b/framework/src/onos/utils/misc/src/main/java/org/onlab/util/HexString.java index a1aba93b..962e1119 100644 --- a/framework/src/onos/utils/misc/src/main/java/org/onlab/util/HexString.java +++ b/framework/src/onos/utils/misc/src/main/java/org/onlab/util/HexString.java @@ -1,5 +1,5 @@ /* - * Copyright 2014 Open Networking Laboratory + * Copyright 2014-2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,25 +18,39 @@ package org.onlab.util; public final class HexString { private HexString() { - } /** - * Convert a string of bytes to a ':' separated hex string. + * Convert a byte array to a colon-separated hex string. * - * @param bytes string of bytes to convert - * @return "0f:ca:fe:de:ad:be:ef" + * @param bytes byte array to be converted + * @return converted colon-separated hex string, e.g. "0f:ca:fe:de:ad:be:ef", + * or "(null)" if given byte array is null */ public static String toHexString(final byte[] bytes) { + return toHexString(bytes, ":"); + } + + /** + * Convert a byte array to a hex string separated by given separator. + * + * @param bytes byte array to be converted + * @param separator the string use to separate each byte + * @return converted hex string, or "(null)" if given byte array is null + */ + public static String toHexString(final byte[] bytes, String separator) { if (bytes == null) { return "(null)"; } + if (separator == null) { + separator = ""; + } int i; StringBuilder ret = new StringBuilder(bytes.length * 3 - 1); String tmp; for (i = 0; i < bytes.length; i++) { if (i > 0) { - ret.append(':'); + ret.append(separator); } tmp = Integer.toHexString((bytes[i] & 0xff)); if (tmp.length() == 1) { @@ -47,6 +61,14 @@ public final class HexString { return ret.toString(); } + /** + * Convert a long number to colon-separated hex string. + * Prepend zero padding until given length. + * + * @param val long number to be converted + * @param padTo prepend zeros until this length + * @return converted colon-separated hex string, e.g. "0f:ca:fe:de:ad:be:ef" + */ public static String toHexString(final long val, final int padTo) { char[] arr = Long.toHexString(val).toCharArray(); StringBuilder ret = new StringBuilder(padTo * 3 - 1); @@ -67,18 +89,24 @@ public final class HexString { return ret.toString(); } + /** + * Convert a long number to colon-separated hex string. + * Prepend zero padding until 8 bytes. + * + * @param val long number to be converted + * @return converted colon-separated hex string, e.g. "0f:ca:fe:de:ad:be:ef" + */ public static String toHexString(final long val) { return toHexString(val, 8); } /** - * Convert a string of hex values into a string of bytes. + * Convert a colon-separated hex string to byte array. * - * @param values - * "0f:ca:fe:de:ad:be:ef" - * @return [15, 5 ,2, 5, 17] - * @throws NumberFormatException - * If the string can not be parsed + * @param values colon-separated hex string to be converted, + * e.g. "0f:ca:fe:de:ad:be:ef" + * @return converted byte array + * @throws NumberFormatException if input hex string cannot be parsed */ public static byte[] fromHexString(final String values) { String[] octets = values.split(":"); @@ -93,6 +121,14 @@ public final class HexString { return ret; } + /** + * Convert a colon-separated hex string to long. + * + * @param value colon-separated hex string to be converted, + * e.g. "00:0f:ca:fe:de:ad:be:ef" + * @return converted long number + * @throws NumberFormatException if input hex string cannot be parsed + */ public static long toLong(String value) { String[] octets = value.split(":"); if (octets.length > 8) { diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java index c9fc725b..c4393018 100644 --- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/DecoderState.java @@ -19,6 +19,7 @@ package org.onlab.netty; * State transitions a decoder goes through as it is decoding an incoming message. */ public enum DecoderState { + READ_MESSAGE_PREAMBLE, READ_MESSAGE_ID, READ_SENDER_IP_VERSION, READ_SENDER_IP, diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java index c34d3cca..af52a41c 100644 --- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java @@ -16,6 +16,7 @@ package org.onlab.netty; import static com.google.common.base.Preconditions.checkState; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; @@ -37,7 +38,9 @@ public class MessageDecoder extends ReplayingDecoder { private final Logger log = LoggerFactory.getLogger(getClass()); + private final int correctPreamble; private long messageId; + private int preamble; private Version ipVersion; private IpAddress senderIp; private int senderPort; @@ -45,8 +48,9 @@ public class MessageDecoder extends ReplayingDecoder { private String messageType; private int contentLength; - public MessageDecoder() { - super(DecoderState.READ_MESSAGE_ID); + public MessageDecoder(int correctPreamble) { + super(DecoderState.READ_MESSAGE_PREAMBLE); + this.correctPreamble = correctPreamble; } @Override @@ -56,6 +60,12 @@ public class MessageDecoder extends ReplayingDecoder { List out) throws Exception { switch (state()) { + case READ_MESSAGE_PREAMBLE: + preamble = buffer.readInt(); + if (preamble != correctPreamble) { + throw new IllegalStateException("This message had an incorrect preamble."); + } + checkpoint(DecoderState.READ_MESSAGE_ID); case READ_MESSAGE_ID: messageId = buffer.readLong(); checkpoint(DecoderState.READ_SENDER_IP_VERSION); @@ -63,9 +73,9 @@ public class MessageDecoder extends ReplayingDecoder { ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6; checkpoint(DecoderState.READ_SENDER_IP); case READ_SENDER_IP: - byte[] octects = new byte[IpAddress.byteLength(ipVersion)]; - buffer.readBytes(octects); - senderIp = IpAddress.valueOf(ipVersion, octects); + byte[] octets = new byte[IpAddress.byteLength(ipVersion)]; + buffer.readBytes(octets); + senderIp = IpAddress.valueOf(ipVersion, octets); checkpoint(DecoderState.READ_SENDER_PORT); case READ_SENDER_PORT: senderPort = buffer.readInt(); @@ -82,15 +92,15 @@ public class MessageDecoder extends ReplayingDecoder { contentLength = buffer.readInt(); checkpoint(DecoderState.READ_CONTENT); case READ_CONTENT: + //TODO Perform a sanity check on the size before allocating byte[] payload = new byte[contentLength]; buffer.readBytes(payload); - InternalMessage message = new InternalMessage( - messageId, + InternalMessage message = new InternalMessage(messageId, new Endpoint(senderIp, senderPort), messageType, payload); out.add(message); - checkpoint(DecoderState.READ_MESSAGE_ID); + checkpoint(DecoderState.READ_MESSAGE_PREAMBLE); break; default: checkState(false, "Must not be here"); diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java index 2b7784f8..c74c1de9 100644 --- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java @@ -36,6 +36,13 @@ import com.google.common.base.Charsets; @Sharable public class MessageEncoder extends MessageToByteEncoder { + private final int preamble; + + public MessageEncoder(int preamble) { + super(); + this.preamble = preamble; + } + private final Logger log = LoggerFactory.getLogger(getClass()); @Override @@ -44,6 +51,8 @@ public class MessageEncoder extends MessageToByteEncoder { InternalMessage message, ByteBuf out) throws Exception { + out.writeInt(this.preamble); + // write message id out.writeLong(message.id()); diff --git a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java index 1cd7ca7b..2dda747d 100644 --- a/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java +++ b/framework/src/onos/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java @@ -74,6 +74,7 @@ public class NettyMessaging implements MessagingService { private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY"; private Endpoint localEp; + private int preamble; private final AtomicBoolean started = new AtomicBoolean(false); private final Map> handlers = new ConcurrentHashMap<>(); private final AtomicLong messageIdGenerator = new AtomicLong(0); @@ -123,11 +124,12 @@ public class NettyMessaging implements MessagingService { clientChannelClass = NioSocketChannel.class; } - public void start(Endpoint localEp) throws Exception { + public void start(int preamble, Endpoint localEp) throws Exception { if (started.get()) { log.warn("Already running at local endpoint: {}", localEp); return; } + this.preamble = preamble; this.localEp = localEp; channels.setLifo(true); channels.setTestOnBorrow(true); @@ -324,7 +326,7 @@ public class NettyMessaging implements MessagingService { private class SslServerCommunicationChannelInitializer extends ChannelInitializer { private final ChannelHandler dispatcher = new InboundMessageDispatcher(); - private final ChannelHandler encoder = new MessageEncoder(); + private final ChannelHandler encoder = new MessageEncoder(preamble); @Override protected void initChannel(SocketChannel channel) throws Exception { @@ -351,7 +353,7 @@ public class NettyMessaging implements MessagingService { channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine)) .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder()) + .addLast("decoder", new MessageDecoder(preamble)) .addLast("handler", dispatcher); } @@ -360,7 +362,7 @@ public class NettyMessaging implements MessagingService { private class SslClientCommunicationChannelInitializer extends ChannelInitializer { private final ChannelHandler dispatcher = new InboundMessageDispatcher(); - private final ChannelHandler encoder = new MessageEncoder(); + private final ChannelHandler encoder = new MessageEncoder(preamble); @Override protected void initChannel(SocketChannel channel) throws Exception { @@ -386,7 +388,7 @@ public class NettyMessaging implements MessagingService { channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine)) .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder()) + .addLast("decoder", new MessageDecoder(preamble)) .addLast("handler", dispatcher); } @@ -395,13 +397,13 @@ public class NettyMessaging implements MessagingService { private class OnosCommunicationChannelInitializer extends ChannelInitializer { private final ChannelHandler dispatcher = new InboundMessageDispatcher(); - private final ChannelHandler encoder = new MessageEncoder(); + private final ChannelHandler encoder = new MessageEncoder(preamble); @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder()) + .addLast("decoder", new MessageDecoder(preamble)) .addLast("handler", dispatcher); } } @@ -424,7 +426,6 @@ public class NettyMessaging implements MessagingService { context.close(); } } - private void dispatchLocally(InternalMessage message) throws IOException { String type = message.type(); if (REPLY_MESSAGE_TYPE.equals(type)) { -- cgit 1.2.3-korg