aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java280
1 files changed, 280 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
new file mode 100644
index 00000000..859efebf
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2014-2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.joda.time.DateTime;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterDefinitionService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterStore;
+import org.onosproject.cluster.ClusterStoreDelegate;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.ControllerNode.State;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(immediate = true)
+@Service
+/**
+ * Distributed cluster nodes store that employs an accrual failure
+ * detector to identify cluster member up/down status.
+ */
+public class DistributedClusterStore
+ extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
+ implements ClusterStore {
+
+ private static final Logger log = getLogger(DistributedClusterStore.class);
+
+ public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
+
+ // TODO: make these configurable.
+ private static final int HEARTBEAT_INTERVAL_MS = 100;
+ private static final int PHI_FAILURE_THRESHOLD = 10;
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(HeartbeatMessage.class)
+ .build()
+ .populate(1);
+ }
+ };
+
+ private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+
+ private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
+ private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
+ private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
+ private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/cluster/membership", "heartbeat-sender"));
+ private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
+
+ private PhiAccrualFailureDetector failureDetector;
+
+ private ControllerNode localNode;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterDefinitionService clusterDefinitionService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MessagingService messagingService;
+
+ @Activate
+ public void activate() {
+ localNode = clusterDefinitionService.localNode();
+
+ messagingService.registerHandler(HEARTBEAT_MESSAGE,
+ new HeartbeatMessageHandler(), heartBeatMessageHandler);
+
+ failureDetector = new PhiAccrualFailureDetector();
+
+ heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+ HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
+
+ addNode(localNode);
+ updateState(localNode.id(), State.ACTIVE);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
+ heartBeatSender.shutdownNow();
+ heartBeatMessageHandler.shutdownNow();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public void setDelegate(ClusterStoreDelegate delegate) {
+ checkNotNull(delegate, "Delegate cannot be null");
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void unsetDelegate(ClusterStoreDelegate delegate) {
+ this.delegate = null;
+ }
+
+ @Override
+ public boolean hasDelegate() {
+ return this.delegate != null;
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return localNode;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return ImmutableSet.copyOf(allNodes.values());
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return allNodes.get(nodeId);
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return nodeStates.get(nodeId);
+ }
+
+ @Override
+ public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+ ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
+ addNode(node);
+ return node;
+ }
+
+ @Override
+ public void removeNode(NodeId nodeId) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ ControllerNode node = allNodes.remove(nodeId);
+ if (node != null) {
+ nodeStates.remove(nodeId);
+ notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
+ }
+ }
+
+ private void addNode(ControllerNode node) {
+ allNodes.put(node.id(), node);
+ updateState(node.id(), State.INACTIVE);
+ notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
+ }
+
+ private void updateState(NodeId nodeId, State newState) {
+ nodeStates.put(nodeId, newState);
+ nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
+ }
+
+ private void heartbeat() {
+ try {
+ Set<ControllerNode> peers = allNodes.values()
+ .stream()
+ .filter(node -> !(node.id().equals(localNode.id())))
+ .collect(Collectors.toSet());
+ byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
+ peers.forEach((node) -> {
+ heartbeatToPeer(hbMessagePayload, node);
+ State currentState = nodeStates.get(node.id());
+ double phi = failureDetector.phi(node.id());
+ if (phi >= PHI_FAILURE_THRESHOLD) {
+ if (currentState == State.ACTIVE) {
+ updateState(node.id(), State.INACTIVE);
+ notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
+ }
+ } else {
+ if (currentState == State.INACTIVE) {
+ updateState(node.id(), State.ACTIVE);
+ notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
+ }
+ }
+ });
+ } catch (Exception e) {
+ log.debug("Failed to send heartbeat", e);
+ }
+ }
+
+ private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
+ ControllerNode node = allNodes.get(nodeId);
+ if (newState == State.ACTIVE) {
+ notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
+ } else {
+ notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
+ }
+ }
+
+ private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
+ Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
+ messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload).whenComplete((result, error) -> {
+ if (error != null) {
+ log.trace("Sending heartbeat to {} failed", remoteEp, error);
+ }
+ });
+ }
+
+ private class HeartbeatMessageHandler implements Consumer<byte[]> {
+ @Override
+ public void accept(byte[] message) {
+ HeartbeatMessage hb = SERIALIZER.decode(message);
+ failureDetector.report(hb.source().id());
+ hb.knownPeers().forEach(node -> {
+ allNodes.put(node.id(), node);
+ });
+ }
+ }
+
+ private static class HeartbeatMessage {
+ private ControllerNode source;
+ private Set<ControllerNode> knownPeers;
+
+ public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
+ this.source = source;
+ this.knownPeers = ImmutableSet.copyOf(members);
+ }
+
+ public ControllerNode source() {
+ return source;
+ }
+
+ public Set<ControllerNode> knownPeers() {
+ return knownPeers;
+ }
+ }
+
+ @Override
+ public DateTime getLastUpdated(NodeId nodeId) {
+ return nodeStateLastUpdatedTimes.get(nodeId);
+ }
+
+}