diff options
author | Ashlee Young <ashlee@onosfw.com> | 2015-09-09 22:15:21 -0700 |
---|---|---|
committer | Ashlee Young <ashlee@onosfw.com> | 2015-09-09 22:15:21 -0700 |
commit | 13d05bc8458758ee39cb829098241e89616717ee (patch) | |
tree | 22a4d1ce65f15952f07a3df5af4b462b4697cb3a /framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster | |
parent | 6139282e1e93c2322076de4b91b1c85d0bc4a8b3 (diff) |
ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60
Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd
Diffstat (limited to 'framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster')
7 files changed, 592 insertions, 0 deletions
diff --git a/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java new file mode 100644 index 00000000..161a8528 --- /dev/null +++ b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java @@ -0,0 +1,166 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.cluster.messaging; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.onosproject.cluster.NodeId; + +/** + * Service for assisting communications between controller cluster nodes. + */ +public interface ClusterCommunicationService { + + /** + * Adds a new subscriber for the specified message subject. + * + * @param subject message subject + * @param subscriber message subscriber + * @param executor executor to use for running handler. + * @deprecated in Cardinal Release + */ + @Deprecated + void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor); + + /** + * Broadcasts a message to all controller nodes. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding message to byte[] + * @param <M> message type + */ + <M> void broadcast(M message, + MessageSubject subject, + Function<M, byte[]> encoder); + + /** + * Broadcasts a message to all controller nodes including self. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding message to byte[] + * @param <M> message type + */ + <M> void broadcastIncludeSelf(M message, + MessageSubject subject, + Function<M, byte[]> encoder); + + /** + * Sends a message to the specified controller node. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding message to byte[] + * @param toNodeId destination node identifier + * @param <M> message type + * @return future that is completed when the message is sent + */ + <M> CompletableFuture<Void> unicast(M message, + MessageSubject subject, + Function<M, byte[]> encoder, + NodeId toNodeId); + + /** + * Multicasts a message to a set of controller nodes. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding message to byte[] + * @param nodeIds recipient node identifiers + * @param <M> message type + */ + <M> void multicast(M message, + MessageSubject subject, + Function<M, byte[]> encoder, + Set<NodeId> nodeIds); + + /** + * Sends a message and expects a reply. + * + * @param message message to send + * @param subject message subject + * @param encoder function for encoding request to byte[] + * @param decoder function for decoding response from byte[] + * @param toNodeId recipient node identifier + * @param <M> request type + * @param <R> reply type + * @return reply future + */ + <M, R> CompletableFuture<R> sendAndReceive(M message, + MessageSubject subject, + Function<M, byte[]> encoder, + Function<byte[], R> decoder, + NodeId toNodeId); + + /** + * Adds a new subscriber for the specified message subject. + * + * @param subject message subject + * @param decoder decoder for resurrecting incoming message + * @param handler handler function that processes the incoming message and produces a reply + * @param encoder encoder for serializing reply + * @param executor executor to run this handler on + * @param <M> incoming message type + * @param <R> reply message type + */ + <M, R> void addSubscriber(MessageSubject subject, + Function<byte[], M> decoder, + Function<M, R> handler, + Function<R, byte[]> encoder, + Executor executor); + + /** + * Adds a new subscriber for the specified message subject. + * + * @param subject message subject + * @param decoder decoder for resurrecting incoming message + * @param handler handler function that processes the incoming message and produces a reply + * @param encoder encoder for serializing reply + * @param <M> incoming message type + * @param <R> reply message type + */ + <M, R> void addSubscriber(MessageSubject subject, + Function<byte[], M> decoder, + Function<M, CompletableFuture<R>> handler, + Function<R, byte[]> encoder); + + /** + * Adds a new subscriber for the specified message subject. + * + * @param subject message subject + * @param decoder decoder to resurrecting incoming message + * @param handler handler for handling message + * @param executor executor to run this handler on + * @param <M> incoming message type + */ + <M> void addSubscriber(MessageSubject subject, + Function<byte[], M> decoder, + Consumer<M> handler, + Executor executor); + + /** + * Removes a subscriber for the specified message subject. + * + * @param subject message subject + */ + void removeSubscriber(MessageSubject subject); +} diff --git a/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java new file mode 100644 index 00000000..46560e4c --- /dev/null +++ b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java @@ -0,0 +1,160 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.cluster.messaging; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Objects; + +import org.onlab.util.ByteArraySizeHashPrinter; +import org.onosproject.cluster.NodeId; + +import com.google.common.base.Charsets; +import com.google.common.base.MoreObjects; + +// TODO: Should payload type be ByteBuffer? +/** + * Base message for cluster-wide communications. + */ +public class ClusterMessage { + + private final NodeId sender; + private final MessageSubject subject; + private final byte[] payload; + private transient byte[] response; + + /** + * Creates a cluster message. + * + * @param sender message sender + * @param subject message subject + * @param payload message payload + */ + public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) { + this.sender = sender; + this.subject = subject; + this.payload = payload; + } + + /** + * Returns the id of the controller sending this message. + * + * @return message sender id. + */ + public NodeId sender() { + return sender; + } + + /** + * Returns the message subject indicator. + * + * @return message subject + */ + public MessageSubject subject() { + return subject; + } + + /** + * Returns the message payload. + * + * @return message payload. + */ + public byte[] payload() { + return payload; + } + + /** + * Records the response to be sent to the sender. + * + * @param data response payload + */ + public void respond(byte[] data) { + response = data; + } + + /** + * Returns the response to be sent to the sender. + * + * @return response bytes + */ + public byte[] response() { + return response; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("sender", sender) + .add("subject", subject) + .add("payload", ByteArraySizeHashPrinter.of(payload)) + .toString(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ClusterMessage)) { + return false; + } + + ClusterMessage that = (ClusterMessage) o; + + return Objects.equals(this.sender, that.sender) && + Objects.equals(this.subject, that.subject) && + Arrays.equals(this.payload, that.payload); + } + + /** + * Serializes this instance. + * @return bytes + */ + public byte[] getBytes() { + byte[] senderBytes = sender.toString().getBytes(Charsets.UTF_8); + byte[] subjectBytes = subject.value().getBytes(Charsets.UTF_8); + int capacity = 12 + senderBytes.length + subjectBytes.length + payload.length; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + buffer.putInt(senderBytes.length); + buffer.put(senderBytes); + buffer.putInt(subjectBytes.length); + buffer.put(subjectBytes); + buffer.putInt(payload.length); + buffer.put(payload); + return buffer.array(); + } + + /** + * Decodes a new ClusterMessage from raw bytes. + * @param bytes raw bytes + * @return cluster message + */ + public static ClusterMessage fromBytes(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + byte[] senderBytes = new byte[buffer.getInt()]; + buffer.get(senderBytes); + byte[] subjectBytes = new byte[buffer.getInt()]; + buffer.get(subjectBytes); + byte[] payloadBytes = new byte[buffer.getInt()]; + buffer.get(payloadBytes); + + return new ClusterMessage(new NodeId(new String(senderBytes, Charsets.UTF_8)), + new MessageSubject(new String(subjectBytes, Charsets.UTF_8)), + payloadBytes); + } + + @Override + public int hashCode() { + return Objects.hash(sender, subject, payload); + } +} diff --git a/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessageHandler.java b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessageHandler.java new file mode 100644 index 00000000..ce770a81 --- /dev/null +++ b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessageHandler.java @@ -0,0 +1,28 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.cluster.messaging; + +/** + * Interface for handling cluster messages. + */ +public interface ClusterMessageHandler { + + /** + * Handles/Processes the cluster message. + * @param message cluster message. + */ + void handle(ClusterMessage message); +} diff --git a/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/Endpoint.java b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/Endpoint.java new file mode 100644 index 00000000..2ac50dfd --- /dev/null +++ b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/Endpoint.java @@ -0,0 +1,75 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.cluster.messaging; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Objects; + +import org.onlab.packet.IpAddress; + +import com.google.common.base.MoreObjects; + +/** + * Representation of a TCP/UDP communication end point. + */ +public final class Endpoint { + + private final int port; + private final IpAddress ip; + + public Endpoint(IpAddress host, int port) { + this.ip = checkNotNull(host); + this.port = port; + } + + public IpAddress host() { + return ip; + } + + public int port() { + return port; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("ip", ip) + .add("port", port) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(ip, port); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Endpoint that = (Endpoint) obj; + return Objects.equals(this.port, that.port) && + Objects.equals(this.ip, that.ip); + } +} diff --git a/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessageSubject.java b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessageSubject.java new file mode 100644 index 00000000..8d5e313d --- /dev/null +++ b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessageSubject.java @@ -0,0 +1,68 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.cluster.messaging; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Objects; + +/** + * Representation of a message subject. + * Cluster messages have associated subjects that dictate how they get handled + * on the receiving side. + */ +public final class MessageSubject { + + private final String value; + + public MessageSubject(String value) { + this.value = checkNotNull(value); + } + + public String value() { + return value; + } + + @Override + public String toString() { + return value; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + MessageSubject that = (MessageSubject) obj; + return Objects.equals(this.value, that.value); + } + + // for serializer + protected MessageSubject() { + this.value = ""; + } +} diff --git a/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java new file mode 100644 index 00000000..6ccd4835 --- /dev/null +++ b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java @@ -0,0 +1,75 @@ +/* + * Copyright 2014-2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.cluster.messaging; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Interface for low level messaging primitives. + */ +public interface MessagingService { + + /** + * Sends a message asynchronously to the specified communication end point. + * The message is specified using the type and payload. + * @param ep end point to send the message to. + * @param type type of message. + * @param payload message payload bytes. + * @return future that is completed when the message is sent + */ + CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload); + + /** + * Sends a message synchronously and waits for a response. + * @param ep end point to send the message to. + * @param type type of message. + * @param payload message payload. + * @return a response future + */ + CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload); + + /** + * Registers a new message handler for message type. + * @param type message type. + * @param handler message handler + * @param executor executor to use for running message handler logic. + */ + void registerHandler(String type, Consumer<byte[]> handler, Executor executor); + + /** + * Registers a new message handler for message type. + * @param type message type. + * @param handler message handler + * @param executor executor to use for running message handler logic. + */ + void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor); + + /** + * Registers a new message handler for message type. + * @param type message type. + * @param handler message handler + */ + void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler); + + /** + * Unregister current handler, if one exists for message type. + * @param type message type + */ + void unregisterHandler(String type); +} diff --git a/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/package-info.java b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/package-info.java new file mode 100644 index 00000000..582c50ed --- /dev/null +++ b/framework/src/onos/core/api/src/main/java/org/onosproject/store/cluster/messaging/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Cluster messaging APIs for the use by the various distributed stores. + */ +package org.onosproject.store.cluster.messaging; |