aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java277
1 files changed, 0 insertions, 277 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
deleted file mode 100644
index 3bb6a708..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.joda.time.DateTime;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.ControllerNode.State;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.slf4j.Logger;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-@Component(immediate = true)
-@Service
-/**
- * Distributed cluster nodes store that employs an accrual failure
- * detector to identify cluster member up/down status.
- */
-public class DistributedClusterStore
- extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
- implements ClusterStore {
-
- private static final Logger log = getLogger(DistributedClusterStore.class);
-
- public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
-
- // TODO: make these configurable.
- private static final int HEARTBEAT_INTERVAL_MS = 100;
- private static final int PHI_FAILURE_THRESHOLD = 10;
-
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(HeartbeatMessage.class)
- .build()
- .populate(1);
- }
- };
-
- private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
-
- private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
- private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
- private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
- private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-sender"));
- private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
- groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
-
- private PhiAccrualFailureDetector failureDetector;
-
- private ControllerNode localNode;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MessagingService messagingService;
-
- @Activate
- public void activate() {
- localNode = clusterMetadataService.getLocalNode();
-
- messagingService.registerHandler(HEARTBEAT_MESSAGE,
- new HeartbeatMessageHandler(), heartBeatMessageHandler);
-
- failureDetector = new PhiAccrualFailureDetector();
-
- heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
- HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
- heartBeatSender.shutdownNow();
- heartBeatMessageHandler.shutdownNow();
-
- log.info("Stopped");
- }
-
- @Override
- public void setDelegate(ClusterStoreDelegate delegate) {
- checkNotNull(delegate, "Delegate cannot be null");
- this.delegate = delegate;
- }
-
- @Override
- public void unsetDelegate(ClusterStoreDelegate delegate) {
- this.delegate = null;
- }
-
- @Override
- public boolean hasDelegate() {
- return this.delegate != null;
- }
-
- @Override
- public ControllerNode getLocalNode() {
- return localNode;
- }
-
- @Override
- public Set<ControllerNode> getNodes() {
- return ImmutableSet.copyOf(allNodes.values());
- }
-
- @Override
- public ControllerNode getNode(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return allNodes.get(nodeId);
- }
-
- @Override
- public State getState(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return nodeStates.get(nodeId);
- }
-
- @Override
- public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
- addNode(node);
- return node;
- }
-
- @Override
- public void removeNode(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- ControllerNode node = allNodes.remove(nodeId);
- if (node != null) {
- nodeStates.remove(nodeId);
- notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
- }
- }
-
- private void addNode(ControllerNode node) {
- allNodes.put(node.id(), node);
- updateState(node.id(), node.equals(localNode) ? State.ACTIVE : State.INACTIVE);
- notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
- }
-
- private void updateState(NodeId nodeId, State newState) {
- nodeStates.put(nodeId, newState);
- nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
- }
-
- private void heartbeat() {
- try {
- Set<ControllerNode> peers = allNodes.values()
- .stream()
- .filter(node -> !(node.id().equals(localNode.id())))
- .collect(Collectors.toSet());
- byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
- peers.forEach((node) -> {
- heartbeatToPeer(hbMessagePayload, node);
- State currentState = nodeStates.get(node.id());
- double phi = failureDetector.phi(node.id());
- if (phi >= PHI_FAILURE_THRESHOLD) {
- if (currentState == State.ACTIVE) {
- updateState(node.id(), State.INACTIVE);
- notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
- }
- } else {
- if (currentState == State.INACTIVE) {
- updateState(node.id(), State.ACTIVE);
- notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
- }
- }
- });
- } catch (Exception e) {
- log.debug("Failed to send heartbeat", e);
- }
- }
-
- private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
- ControllerNode node = allNodes.get(nodeId);
- if (newState == State.ACTIVE) {
- notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
- } else {
- notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
- }
- }
-
- private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
- Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
- messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload).whenComplete((result, error) -> {
- if (error != null) {
- log.trace("Sending heartbeat to {} failed", remoteEp, error);
- }
- });
- }
-
- private class HeartbeatMessageHandler implements Consumer<byte[]> {
- @Override
- public void accept(byte[] message) {
- HeartbeatMessage hb = SERIALIZER.decode(message);
- failureDetector.report(hb.source().id());
- hb.knownPeers().forEach(node -> {
- allNodes.put(node.id(), node);
- });
- }
- }
-
- private static class HeartbeatMessage {
- private ControllerNode source;
- private Set<ControllerNode> knownPeers;
-
- public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
- this.source = source;
- this.knownPeers = ImmutableSet.copyOf(members);
- }
-
- public ControllerNode source() {
- return source;
- }
-
- public Set<ControllerNode> knownPeers() {
- return knownPeers;
- }
- }
-
- @Override
- public DateTime getLastUpdated(NodeId nodeId) {
- return nodeStateLastUpdatedTimes.get(nodeId);
- }
-
-}