-package org.onosproject.store.consistent.impl;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEvent.Type;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
-import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
- * Distributed Lock Manager implemented on top of ConsistentMap.
- * <p>
- * This implementation makes use of ClusterService's failure
- * detection capabilities to detect and purge stale locks.
- * TODO: Ensure lock safety and liveness.
- */
-@Component(immediate = true, enabled = true)
-public class DistributedLeadershipManager implements LeadershipService {
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected EventDeliveryService eventDispatcher;
- private final Logger log = getLogger(getClass());
- private ScheduledExecutorService electionRunner;
- private ScheduledExecutorService lockExecutor;
- private ScheduledExecutorService staleLeadershipPurgeExecutor;
- private ScheduledExecutorService leadershipRefresher;
- private ConsistentMap<String, NodeId> leaderMap;
- private ConsistentMap<String, List<NodeId>> candidateMap;
- private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
- private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
- private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
- private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
- private NodeId localNodeId;
- private Set<String> activeTopics = Sets.newConcurrentHashSet();
- private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
- // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
- private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
- private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
- private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
- private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
- private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
- @Activate
- public void activate() {
- leaderMap = storageService.<String, NodeId>consistentMapBuilder()
- .withName("onos-topic-leaders")
- .withSerializer(SERIALIZER)
- .withPartitionsDisabled().build();
- candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
- .withName("onos-topic-candidates")
- .withSerializer(SERIALIZER)
- .withPartitionsDisabled().build();
- leaderMap.addListener(event -> {
- log.debug("Received {}", event);
- LeadershipEvent.Type leadershipEventType = null;
- if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
- leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
- } else if (event.type() == MapEvent.Type.REMOVE) {
- leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
- }
- onLeadershipEvent(new LeadershipEvent(
- leadershipEventType,
- new Leadership(event.key(),
- event.value().value(),
- event.value().version(),
- event.value().creationTime())));
- });
- candidateMap.addListener(event -> {
- log.debug("Received {}", event);
- if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
- log.error("Entries must not be removed from candidate map");
- return;
- }
- onLeadershipEvent(new LeadershipEvent(
- LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(event.key(),
- event.value().value(),
- event.value().version(),
- event.value().creationTime())));
- });
- localNodeId = clusterService.getLocalNode().id();
- electionRunner = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "election-runner"));
- lockExecutor = Executors.newScheduledThreadPool(
- 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
- staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
- leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "refresh-thread"));
- clusterService.addListener(clusterEventListener);
- electionRunner.scheduleWithFixedDelay(
- leadershipRefresher.scheduleWithFixedDelay(
- this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
- listenerRegistry = new ListenerRegistry<>();
- eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
- log.info("Started");
- }
- @Deactivate
- public void deactivate() {
- if (clusterService.getNodes().size() > 1) {
- // FIXME: Determine why this takes ~50 seconds to shutdown on a single node!
- leaderBoard.forEach((topic, leadership) -> {
- if (localNodeId.equals(leadership.leader())) {
- withdraw(topic);
- }
- });
- }
- clusterService.removeListener(clusterEventListener);
- eventDispatcher.removeSink(LeadershipEvent.class);
- electionRunner.shutdown();
- lockExecutor.shutdown();
- staleLeadershipPurgeExecutor.shutdown();
- leadershipRefresher.shutdown();
- log.info("Stopped");
- }
- @Override
- public Map<String, Leadership> getLeaderBoard() {
- return ImmutableMap.copyOf(leaderBoard);
- }
- @Override
- public Map<String, List<NodeId>> getCandidates() {
- return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
- }
- @Override
- public List<NodeId> getCandidates(String path) {
- Leadership current = candidateBoard.get(path);
- return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
- }
- @Override
- public NodeId getLeader(String path) {
- Leadership leadership = leaderBoard.get(path);
- return leadership != null ? leadership.leader() : null;
- }
- @Override
- public Leadership getLeadership(String path) {
- checkArgument(path != null);
- return leaderBoard.get(path);
- }
- @Override
- public Set<String> ownedTopics(NodeId nodeId) {
- checkArgument(nodeId != null);
- return leaderBoard.entrySet()
- .stream()
- .filter(entry -> nodeId.equals(entry.getValue().leader()))
- .map(Entry::getKey)
- .collect(Collectors.toSet());
- }
- @Override
- public CompletableFuture<Leadership> runForLeadership(String path) {
- log.debug("Running for leadership for topic: {}", path);
- CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
- doRunForLeadership(path, resultFuture);
- return resultFuture;
- }
- private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
- try {
- Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
- currentList -> currentList == null || !currentList.contains(localNodeId),
- (topic, currentList) -> {
- if (currentList == null) {
- return ImmutableList.of(localNodeId);
- } else {
- List<NodeId> newList = Lists.newLinkedList();
- newList.addAll(currentList);
- newList.add(localNodeId);
- return newList;
- }
- });
- log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
- activeTopics.add(path);
- Leadership leadership = electLeader(path, candidates.value());
- if (leadership == null) {
- pendingFutures.put(path, future);
- } else {
- future.complete(leadership);
- }
- } catch (ConsistentMapException e) {
- log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
- rerunForLeadership(path, future);
- }
- }
- @Override
- public CompletableFuture<Void> withdraw(String path) {
- activeTopics.remove(path);
- CompletableFuture<Void> resultFuture = new CompletableFuture<>();
- doWithdraw(path, resultFuture);
- return resultFuture;
- }
- private void doWithdraw(String path, CompletableFuture<Void> future) {
- if (activeTopics.contains(path)) {
- future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
- }
- try {
- leaderMap.computeIf(path,
- localNodeId::equals,
- (topic, leader) -> null);
- candidateMap.computeIf(path,
- candidates -> candidates != null && candidates.contains(localNodeId),
- (topic, candidates) -> candidates.stream()
- .filter(nodeId -> !localNodeId.equals(nodeId))
- .collect(Collectors.toList()));
- future.complete(null);
- } catch (Exception e) {
- log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
- retryWithdraw(path, future);
- }
- }
- @Override
- public boolean stepdown(String path) {
- if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
- return false;
- }
- try {
- return leaderMap.computeIf(path,
- localNodeId::equals,
- (topic, leader) -> null) == null;
- } catch (Exception e) {
- log.warn("Error executing stepdown for {}", path, e);
- }
- return false;
- }
- @Override
- public void addListener(LeadershipEventListener listener) {
- listenerRegistry.addListener(listener);
- }
- @Override
- public void removeListener(LeadershipEventListener listener) {
- listenerRegistry.removeListener(listener);
- }
- @Override
- public boolean makeTopCandidate(String path, NodeId nodeId) {
- Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
- candidates -> candidates != null &&
- candidates.contains(nodeId) &&
- !nodeId.equals(Iterables.getFirst(candidates, null)),
- (topic, candidates) -> {
- List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
- updatedCandidates.add(nodeId);
- candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
- return updatedCandidates;
- });
- List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
- return candidates.size() > 0 && nodeId.equals(candidates.get(0));
- }
- private Leadership electLeader(String path, List<NodeId> candidates) {
- Leadership currentLeadership = getLeadership(path);
- if (currentLeadership != null) {
- return currentLeadership;
- } else {
- NodeId topCandidate = candidates
- .stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .findFirst()
- .orElse(null);
- try {
- Versioned<NodeId> leader = localNodeId.equals(topCandidate)
- ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
- if (leader != null) {
- Leadership newLeadership = new Leadership(path,
- leader.value(),
- leader.version(),
- leader.creationTime());
- // Since reads only go through the local copy of leader board, we ought to update it
- // first before returning from this method.
- // This is to ensure a subsequent read will not read a stale value.
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership));
- return newLeadership;
- }
- } catch (Exception e) {
- log.debug("Failed to elect leader for {}", path, e);
- }
- }
- return null;
- }
- private void electLeaders() {
- try {
- candidateMap.entrySet().forEach(entry -> {
- String path = entry.getKey();
- Versioned<List<NodeId>> candidates = entry.getValue();
- // for active topics, check if this node can become a leader (if it isn't already)
- if (activeTopics.contains(path)) {
- lockExecutor.submit(() -> {
- Leadership leadership = electLeader(path, candidates.value());
- if (leadership != null) {
- CompletableFuture<Leadership> future = pendingFutures.remove(path);
- if (future != null) {
- future.complete(leadership);
- }
- }
- });
- }
- // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
- // and also to update local listeners.
- // Don't worry about duplicate events as they will be suppressed.
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(path,
- candidates.value(),
- candidates.version(),
- candidates.creationTime())));
- });
- } catch (Exception e) {
- log.debug("Failure electing leaders", e);
- }
- }
- private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
- log.trace("Leadership Event: time = {} type = {} event = {}",
- leadershipEvent.time(), leadershipEvent.type(),
- leadershipEvent);
- Leadership leadershipUpdate = leadershipEvent.subject();
- LeadershipEvent.Type eventType = leadershipEvent.type();
- String topic = leadershipUpdate.topic();
- AtomicBoolean updateAccepted = new AtomicBoolean(false);
- if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
- leaderBoard.compute(topic, (k, currentLeadership) -> {
- if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
- updateAccepted.set(true);
- return leadershipUpdate;
- }
- return currentLeadership;
- });
- } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
- leaderBoard.compute(topic, (k, currentLeadership) -> {
- if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
- updateAccepted.set(true);
- // FIXME: Removing entries from leaderboard is not safe and should be visited.
- return null;
- }
- return currentLeadership;
- });
- } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
- candidateBoard.compute(topic, (k, currentInfo) -> {
- if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
- updateAccepted.set(true);
- return leadershipUpdate;
- }
- return currentInfo;
- });
- } else {
- throw new IllegalStateException("Unknown event type.");
- }
- if (updateAccepted.get()) {
- eventDispatcher.post(leadershipEvent);
- }
- }
- private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
- lockExecutor.schedule(
- () -> doRunForLeadership(path, future),
- RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
- }
- private void retryWithdraw(String path, CompletableFuture<Void> future) {
- lockExecutor.schedule(
- () -> doWithdraw(path, future),
- RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
- }
- private void scheduleStaleLeadershipPurge(int afterDelaySec) {
- if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
- staleLeadershipPurgeExecutor.schedule(
- this::purgeStaleLeadership,
- afterDelaySec,
- TimeUnit.SECONDS);
- }
- }
- /**
- * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
- */
- private void purgeStaleLeadership() {
- AtomicBoolean rerunPurge = new AtomicBoolean(false);
- try {
- staleLeadershipPurgeScheduled.set(false);
- leaderMap.entrySet()
- .stream()
- .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
- .forEach(entry -> {
- String path = entry.getKey();
- NodeId nodeId = entry.getValue().value();
- try {
- leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
- } catch (Exception e) {
- log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
- rerunPurge.set(true);
- }
- });
- candidateMap.entrySet()
- .forEach(entry -> {
- String path = entry.getKey();
- Versioned<List<NodeId>> candidates = entry.getValue();
- List<NodeId> candidatesList = candidates != null
- ? candidates.value() : Collections.emptyList();
- List<NodeId> activeCandidatesList =
- candidatesList.stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
- .collect(Collectors.toList());
- if (activeCandidatesList.size() < candidatesList.size()) {
- Set<NodeId> removedCandidates =
- Sets.difference(Sets.newHashSet(candidatesList),
- Sets.newHashSet(activeCandidatesList));
- try {
- candidateMap.computeIf(path,
- c -> c.stream()
- .filter(n -> clusterService.getState(n) == INACTIVE)
- .count() > 0,
- (topic, c) -> c.stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .filter(n -> !localNodeId.equals(n) ||
- activeTopics.contains(path))
- .collect(Collectors.toList()));
- } catch (Exception e) {
- log.debug("Failed to evict inactive candidates {} from "
- + "candidate list for {}", removedCandidates, path, e);
- rerunPurge.set(true);
- }
- }
- });
- } catch (Exception e) {
- log.debug("Failure purging state leadership.", e);
- rerunPurge.set(true);
- }
- if (rerunPurge.get()) {
- log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
- }
- }
- private void refreshLeaderBoard() {
- try {
- Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
- leaderMap.entrySet().forEach(entry -> {
- String path = entry.getKey();
- Versioned<NodeId> leader = entry.getValue();
- Leadership leadership = new Leadership(path,
- leader.value(),
- leader.version(),
- leader.creationTime());
- newLeaderBoard.put(path, leadership);
- });
- // first take snapshot of current leader board.
- Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
- MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
- // evict stale leaders
- diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
- log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
- });
- // add missing leaders
- diff.entriesOnlyOnRight().forEach((path, leadership) -> {
- log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
- });
- // add updated leaders
- diff.entriesDiffering().forEach((path, difference) -> {
- Leadership current = difference.leftValue();
- Leadership updated = difference.rightValue();
- if (current.epoch() < updated.epoch()) {
- log.debug("Updated {} in leaderboard.", updated);
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
- }
- });
- } catch (Exception e) {
- log.debug("Failed to refresh leader board", e);
- }
- }
- private class InternalClusterEventListener implements ClusterEventListener {
- @Override
- public void event(ClusterEvent event) {
- if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
- scheduleStaleLeadershipPurge(0);
- }
- }
- }