diff options
Diffstat (limited to 'framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java')
-rw-r--r-- | framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java | 139 |
1 files changed, 0 insertions, 139 deletions
diff --git a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java b/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java deleted file mode 100644 index a7dd3c04..00000000 --- a/framework/src/onos/utils/nio/src/main/java/org/onlab/nio/service/DefaultMessageStream.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.nio.service; - -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; - -import org.onlab.nio.IOLoop; -import org.onlab.nio.MessageStream; -import org.onlab.packet.IpAddress; -import org.onlab.packet.IpAddress.Version; -import org.onosproject.store.cluster.messaging.Endpoint; - -import com.google.common.base.Charsets; - -/** - * Default bi-directional message stream for transferring messages to & from the - * network via two byte buffers. - */ -public class DefaultMessageStream extends MessageStream<DefaultMessage> { - - private final CompletableFuture<Void> connectFuture = new CompletableFuture<>(); - - public DefaultMessageStream( - IOLoop<DefaultMessage, ?> loop, - ByteChannel byteChannel, - int bufferSize, - int maxIdleMillis) { - super(loop, byteChannel, bufferSize, maxIdleMillis); - } - - public CompletableFuture<DefaultMessageStream> connectedFuture() { - return connectFuture.thenApply(v -> this); - } - - private final AtomicInteger messageLength = new AtomicInteger(-1); - - @Override - protected DefaultMessage read(ByteBuffer buffer) { - if (messageLength.get() == -1) { - // check if we can read the message length. - if (buffer.remaining() < Integer.BYTES) { - return null; - } else { - messageLength.set(buffer.getInt()); - } - } - - if (buffer.remaining() < messageLength.get()) { - return null; - } - - long id = buffer.getLong(); - Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6; - byte[] octects = new byte[IpAddress.byteLength(ipVersion)]; - buffer.get(octects); - IpAddress senderIp = IpAddress.valueOf(ipVersion, octects); - int senderPort = buffer.getInt(); - int messageTypeByteLength = buffer.getInt(); - byte[] messageTypeBytes = new byte[messageTypeByteLength]; - buffer.get(messageTypeBytes); - String messageType = new String(messageTypeBytes, Charsets.UTF_8); - int payloadLength = buffer.getInt(); - byte[] payloadBytes = new byte[payloadLength]; - buffer.get(payloadBytes); - - // reset for next message - messageLength.set(-1); - - return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes); - } - - @Override - protected void write(DefaultMessage message, ByteBuffer buffer) { - Endpoint sender = message.sender(); - byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8); - IpAddress senderIp = sender.host(); - byte[] ipOctets = senderIp.toOctets(); - byte[] payload = message.payload(); - - int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length; - - buffer.putInt(messageLength); - - buffer.putLong(message.id()); - - if (senderIp.version() == Version.INET) { - buffer.put((byte) 0x0); - } else { - buffer.put((byte) 0x1); - } - buffer.put(ipOctets); - - // write sender port - buffer.putInt(sender.port()); - - // write length of message type - buffer.putInt(messageTypeBytes.length); - - // write message type bytes - buffer.put(messageTypeBytes); - - // write payload length - buffer.putInt(payload.length); - - // write payload. - buffer.put(payload); - } - - /** - * Callback invoked when the stream is successfully connected. - */ - public void connected() { - connectFuture.complete(null); - } - - /** - * Callback invoked when the stream fails to connect. - * @param cause failure cause - */ - public void connectFailed(Throwable cause) { - connectFuture.completeExceptionally(cause); - } -}
\ No newline at end of file |