diff options
author | 2015-09-09 22:15:21 -0700 | |
---|---|---|
committer | 2015-09-09 22:15:21 -0700 | |
commit | 13d05bc8458758ee39cb829098241e89616717ee (patch) | |
tree | 22a4d1ce65f15952f07a3df5af4b462b4697cb3a /framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java | |
parent | 6139282e1e93c2322076de4b91b1c85d0bc4a8b3 (diff) |
ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60
Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java | 605 |
1 files changed, 605 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java new file mode 100644 index 00000000..1882b1b5 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java @@ -0,0 +1,605 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.consistent.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.commons.lang.math.RandomUtils; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.cluster.ClusterEvent; +import org.onosproject.cluster.ClusterEvent.Type; +import org.onosproject.cluster.ClusterEventListener; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.Leadership; +import org.onosproject.cluster.LeadershipEvent; +import org.onosproject.cluster.LeadershipEventListener; +import org.onosproject.cluster.LeadershipService; +import org.onosproject.cluster.NodeId; +import org.onosproject.event.ListenerRegistry; +import org.onosproject.event.EventDeliveryService; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.ConsistentMapException; +import org.onosproject.store.service.MapEvent; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.onlab.util.Tools.groupedThreads; +import static org.slf4j.LoggerFactory.getLogger; +import static org.onosproject.cluster.ControllerNode.State.ACTIVE; +import static org.onosproject.cluster.ControllerNode.State.INACTIVE; + +/** + * Distributed Lock Manager implemented on top of ConsistentMap. + * <p> + * This implementation makes use of ClusterService's failure + * detection capabilities to detect and purge stale locks. + * TODO: Ensure lock safety and liveness. + */ +@Component(immediate = true, enabled = true) +@Service +public class DistributedLeadershipManager implements LeadershipService { + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService clusterCommunicator; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected EventDeliveryService eventDispatcher; + + private final Logger log = getLogger(getClass()); + private ScheduledExecutorService electionRunner; + private ScheduledExecutorService lockExecutor; + private ScheduledExecutorService staleLeadershipPurgeExecutor; + private ScheduledExecutorService leadershipRefresher; + + private ConsistentMap<String, NodeId> leaderMap; + private ConsistentMap<String, List<NodeId>> candidateMap; + + private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry; + private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap(); + private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap(); + private final ClusterEventListener clusterEventListener = new InternalClusterEventListener(); + + private NodeId localNodeId; + private Set<String> activeTopics = Sets.newConcurrentHashSet(); + private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap(); + + // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS) + private static final int WAIT_BEFORE_RETRY_MILLIS = 150; + private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2; + private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2; + private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2; + + private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false); + + private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API); + + @Activate + public void activate() { + leaderMap = storageService.<String, NodeId>consistentMapBuilder() + .withName("onos-topic-leaders") + .withSerializer(SERIALIZER) + .withPartitionsDisabled().build(); + candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder() + .withName("onos-topic-candidates") + .withSerializer(SERIALIZER) + .withPartitionsDisabled().build(); + + leaderMap.addListener(event -> { + log.debug("Received {}", event); + LeadershipEvent.Type leadershipEventType = null; + if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) { + leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED; + } else if (event.type() == MapEvent.Type.REMOVE) { + leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED; + } + onLeadershipEvent(new LeadershipEvent( + leadershipEventType, + new Leadership(event.key(), + event.value().value(), + event.value().version(), + event.value().creationTime()))); + }); + + candidateMap.addListener(event -> { + log.debug("Received {}", event); + if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) { + log.error("Entries must not be removed from candidate map"); + return; + } + onLeadershipEvent(new LeadershipEvent( + LeadershipEvent.Type.CANDIDATES_CHANGED, + new Leadership(event.key(), + event.value().value(), + event.value().version(), + event.value().creationTime()))); + }); + + localNodeId = clusterService.getLocalNode().id(); + + electionRunner = Executors.newSingleThreadScheduledExecutor( + groupedThreads("onos/store/leadership", "election-runner")); + lockExecutor = Executors.newScheduledThreadPool( + 4, groupedThreads("onos/store/leadership", "election-thread-%d")); + staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor( + groupedThreads("onos/store/leadership", "stale-leadership-evictor")); + leadershipRefresher = Executors.newSingleThreadScheduledExecutor( + groupedThreads("onos/store/leadership", "refresh-thread")); + + clusterService.addListener(clusterEventListener); + + electionRunner.scheduleWithFixedDelay( + this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS); + + leadershipRefresher.scheduleWithFixedDelay( + this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS); + + listenerRegistry = new ListenerRegistry<>(); + eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry); + + log.info("Started"); + } + + @Deactivate + public void deactivate() { + if (clusterService.getNodes().size() > 1) { + // FIXME: Determine why this takes ~50 seconds to shutdown on a single node! + leaderBoard.forEach((topic, leadership) -> { + if (localNodeId.equals(leadership.leader())) { + withdraw(topic); + } + }); + } + + clusterService.removeListener(clusterEventListener); + eventDispatcher.removeSink(LeadershipEvent.class); + + electionRunner.shutdown(); + lockExecutor.shutdown(); + staleLeadershipPurgeExecutor.shutdown(); + leadershipRefresher.shutdown(); + + log.info("Stopped"); + } + + @Override + public Map<String, Leadership> getLeaderBoard() { + return ImmutableMap.copyOf(leaderBoard); + } + + @Override + public Map<String, List<NodeId>> getCandidates() { + return Maps.toMap(candidateBoard.keySet(), this::getCandidates); + } + + @Override + public List<NodeId> getCandidates(String path) { + Leadership current = candidateBoard.get(path); + return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates()); + } + + @Override + public NodeId getLeader(String path) { + Leadership leadership = leaderBoard.get(path); + return leadership != null ? leadership.leader() : null; + } + + @Override + public Leadership getLeadership(String path) { + checkArgument(path != null); + return leaderBoard.get(path); + } + + @Override + public Set<String> ownedTopics(NodeId nodeId) { + checkArgument(nodeId != null); + return leaderBoard.entrySet() + .stream() + .filter(entry -> nodeId.equals(entry.getValue().leader())) + .map(Entry::getKey) + .collect(Collectors.toSet()); + } + + @Override + public CompletableFuture<Leadership> runForLeadership(String path) { + log.debug("Running for leadership for topic: {}", path); + CompletableFuture<Leadership> resultFuture = new CompletableFuture<>(); + doRunForLeadership(path, resultFuture); + return resultFuture; + } + + private void doRunForLeadership(String path, CompletableFuture<Leadership> future) { + try { + Versioned<List<NodeId>> candidates = candidateMap.computeIf(path, + currentList -> currentList == null || !currentList.contains(localNodeId), + (topic, currentList) -> { + if (currentList == null) { + return ImmutableList.of(localNodeId); + } else { + List<NodeId> newList = Lists.newLinkedList(); + newList.addAll(currentList); + newList.add(localNodeId); + return newList; + } + }); + log.debug("In the leadership race for topic {} with candidates {}", path, candidates); + activeTopics.add(path); + Leadership leadership = electLeader(path, candidates.value()); + if (leadership == null) { + pendingFutures.put(path, future); + } else { + future.complete(leadership); + } + } catch (ConsistentMapException e) { + log.debug("Failed to enter topic leader race for {}. Retrying.", path, e); + rerunForLeadership(path, future); + } + } + + @Override + public CompletableFuture<Void> withdraw(String path) { + activeTopics.remove(path); + CompletableFuture<Void> resultFuture = new CompletableFuture<>(); + doWithdraw(path, resultFuture); + return resultFuture; + } + + + private void doWithdraw(String path, CompletableFuture<Void> future) { + if (activeTopics.contains(path)) { + future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path))); + } + try { + leaderMap.computeIf(path, + localNodeId::equals, + (topic, leader) -> null); + candidateMap.computeIf(path, + candidates -> candidates != null && candidates.contains(localNodeId), + (topic, candidates) -> candidates.stream() + .filter(nodeId -> !localNodeId.equals(nodeId)) + .collect(Collectors.toList())); + future.complete(null); + } catch (Exception e) { + log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e); + retryWithdraw(path, future); + } + } + + @Override + public boolean stepdown(String path) { + if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) { + return false; + } + + try { + return leaderMap.computeIf(path, + localNodeId::equals, + (topic, leader) -> null) == null; + } catch (Exception e) { + log.warn("Error executing stepdown for {}", path, e); + } + return false; + } + + @Override + public void addListener(LeadershipEventListener listener) { + listenerRegistry.addListener(listener); + } + + @Override + public void removeListener(LeadershipEventListener listener) { + listenerRegistry.removeListener(listener); + } + + @Override + public boolean makeTopCandidate(String path, NodeId nodeId) { + Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path, + candidates -> candidates != null && + candidates.contains(nodeId) && + !nodeId.equals(Iterables.getFirst(candidates, null)), + (topic, candidates) -> { + List<NodeId> updatedCandidates = new ArrayList<>(candidates.size()); + updatedCandidates.add(nodeId); + candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add); + return updatedCandidates; + }); + List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList(); + return candidates.size() > 0 && nodeId.equals(candidates.get(0)); + } + + private Leadership electLeader(String path, List<NodeId> candidates) { + Leadership currentLeadership = getLeadership(path); + if (currentLeadership != null) { + return currentLeadership; + } else { + NodeId topCandidate = candidates + .stream() + .filter(n -> clusterService.getState(n) == ACTIVE) + .findFirst() + .orElse(null); + try { + Versioned<NodeId> leader = localNodeId.equals(topCandidate) + ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path); + if (leader != null) { + Leadership newLeadership = new Leadership(path, + leader.value(), + leader.version(), + leader.creationTime()); + // Since reads only go through the local copy of leader board, we ought to update it + // first before returning from this method. + // This is to ensure a subsequent read will not read a stale value. + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership)); + return newLeadership; + } + } catch (Exception e) { + log.debug("Failed to elect leader for {}", path, e); + } + } + return null; + } + + private void electLeaders() { + try { + candidateMap.entrySet().forEach(entry -> { + String path = entry.getKey(); + Versioned<List<NodeId>> candidates = entry.getValue(); + // for active topics, check if this node can become a leader (if it isn't already) + if (activeTopics.contains(path)) { + lockExecutor.submit(() -> { + Leadership leadership = electLeader(path, candidates.value()); + if (leadership != null) { + CompletableFuture<Leadership> future = pendingFutures.remove(path); + if (future != null) { + future.complete(leadership); + } + } + }); + } + // Raise a CANDIDATES_CHANGED event to force refresh local candidate board + // and also to update local listeners. + // Don't worry about duplicate events as they will be suppressed. + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, + new Leadership(path, + candidates.value(), + candidates.version(), + candidates.creationTime()))); + }); + } catch (Exception e) { + log.debug("Failure electing leaders", e); + } + } + + private void onLeadershipEvent(LeadershipEvent leadershipEvent) { + log.trace("Leadership Event: time = {} type = {} event = {}", + leadershipEvent.time(), leadershipEvent.type(), + leadershipEvent); + + Leadership leadershipUpdate = leadershipEvent.subject(); + LeadershipEvent.Type eventType = leadershipEvent.type(); + String topic = leadershipUpdate.topic(); + + AtomicBoolean updateAccepted = new AtomicBoolean(false); + if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) { + leaderBoard.compute(topic, (k, currentLeadership) -> { + if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) { + updateAccepted.set(true); + return leadershipUpdate; + } + return currentLeadership; + }); + } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) { + leaderBoard.compute(topic, (k, currentLeadership) -> { + if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) { + updateAccepted.set(true); + // FIXME: Removing entries from leaderboard is not safe and should be visited. + return null; + } + return currentLeadership; + }); + } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) { + candidateBoard.compute(topic, (k, currentInfo) -> { + if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) { + updateAccepted.set(true); + return leadershipUpdate; + } + return currentInfo; + }); + } else { + throw new IllegalStateException("Unknown event type."); + } + + if (updateAccepted.get()) { + eventDispatcher.post(leadershipEvent); + } + } + + private void rerunForLeadership(String path, CompletableFuture<Leadership> future) { + lockExecutor.schedule( + () -> doRunForLeadership(path, future), + RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS), + TimeUnit.MILLISECONDS); + } + + private void retryWithdraw(String path, CompletableFuture<Void> future) { + lockExecutor.schedule( + () -> doWithdraw(path, future), + RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS), + TimeUnit.MILLISECONDS); + } + + private void scheduleStaleLeadershipPurge(int afterDelaySec) { + if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) { + staleLeadershipPurgeExecutor.schedule( + this::purgeStaleLeadership, + afterDelaySec, + TimeUnit.SECONDS); + } + } + + /** + * Purges locks held by inactive nodes and evicts inactive nodes from candidacy. + */ + private void purgeStaleLeadership() { + AtomicBoolean rerunPurge = new AtomicBoolean(false); + try { + staleLeadershipPurgeScheduled.set(false); + leaderMap.entrySet() + .stream() + .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE) + .forEach(entry -> { + String path = entry.getKey(); + NodeId nodeId = entry.getValue().value(); + try { + leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null); + } catch (Exception e) { + log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e); + rerunPurge.set(true); + } + }); + + candidateMap.entrySet() + .forEach(entry -> { + String path = entry.getKey(); + Versioned<List<NodeId>> candidates = entry.getValue(); + List<NodeId> candidatesList = candidates != null + ? candidates.value() : Collections.emptyList(); + List<NodeId> activeCandidatesList = + candidatesList.stream() + .filter(n -> clusterService.getState(n) == ACTIVE) + .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path)) + .collect(Collectors.toList()); + if (activeCandidatesList.size() < candidatesList.size()) { + Set<NodeId> removedCandidates = + Sets.difference(Sets.newHashSet(candidatesList), + Sets.newHashSet(activeCandidatesList)); + try { + candidateMap.computeIf(path, + c -> c.stream() + .filter(n -> clusterService.getState(n) == INACTIVE) + .count() > 0, + (topic, c) -> c.stream() + .filter(n -> clusterService.getState(n) == ACTIVE) + .filter(n -> !localNodeId.equals(n) || + activeTopics.contains(path)) + .collect(Collectors.toList())); + } catch (Exception e) { + log.debug("Failed to evict inactive candidates {} from " + + "candidate list for {}", removedCandidates, path, e); + rerunPurge.set(true); + } + } + }); + } catch (Exception e) { + log.debug("Failure purging state leadership.", e); + rerunPurge.set(true); + } + + if (rerunPurge.get()) { + log.debug("Rescheduling stale leadership purge due to errors encountered in previous run"); + scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC); + } + } + + private void refreshLeaderBoard() { + try { + Map<String, Leadership> newLeaderBoard = Maps.newHashMap(); + leaderMap.entrySet().forEach(entry -> { + String path = entry.getKey(); + Versioned<NodeId> leader = entry.getValue(); + Leadership leadership = new Leadership(path, + leader.value(), + leader.version(), + leader.creationTime()); + newLeaderBoard.put(path, leadership); + }); + + // first take snapshot of current leader board. + Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard); + + MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard); + + // evict stale leaders + diff.entriesOnlyOnLeft().forEach((path, leadership) -> { + log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership); + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership)); + }); + + // add missing leaders + diff.entriesOnlyOnRight().forEach((path, leadership) -> { + log.debug("Adding {} to leaderboard. It is now the active leader.", leadership); + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership)); + }); + + // add updated leaders + diff.entriesDiffering().forEach((path, difference) -> { + Leadership current = difference.leftValue(); + Leadership updated = difference.rightValue(); + if (current.epoch() < updated.epoch()) { + log.debug("Updated {} in leaderboard.", updated); + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated)); + } + }); + } catch (Exception e) { + log.debug("Failed to refresh leader board", e); + } + } + + private class InternalClusterEventListener implements ClusterEventListener { + + @Override + public void event(ClusterEvent event) { + if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) { + scheduleStaleLeadershipPurge(0); + } + } + } +} |