/* * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onosproject.store.mastership.impl; import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.futureGetOrElse; import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED; import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED; import static org.slf4j.LoggerFactory.getLogger; import static com.google.common.base.Preconditions.checkArgument; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onlab.util.KryoNamespace; import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.Leadership; import org.onosproject.cluster.LeadershipEvent; import org.onosproject.cluster.LeadershipEventListener; import org.onosproject.cluster.LeadershipService; import org.onosproject.cluster.NodeId; import org.onosproject.cluster.RoleInfo; import org.onosproject.mastership.MastershipEvent; import org.onosproject.mastership.MastershipStore; import org.onosproject.mastership.MastershipStoreDelegate; import org.onosproject.mastership.MastershipTerm; import org.onosproject.net.DeviceId; import org.onosproject.net.MastershipRole; import org.onosproject.store.AbstractStore; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.serializers.StoreSerializer; import org.slf4j.Logger; import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; /** * Implementation of the MastershipStore on top of Leadership Service. */ @Component(immediate = true, enabled = true) @Service public class ConsistentDeviceMastershipStore extends AbstractStore implements MastershipStore { private final Logger log = getLogger(getClass()); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected LeadershipService leadershipService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClusterService clusterService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClusterCommunicationService clusterCommunicator; private NodeId localNodeId; private final Set connectedDevices = Sets.newHashSet(); private static final MessageSubject ROLE_RELINQUISH_SUBJECT = new MessageSubject("mastership-store-device-role-relinquish"); private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT = new MessageSubject("mastership-store-device-mastership-relinquish"); private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = Pattern.compile("device:(.*)"); private ExecutorService messageHandlingExecutor; private ScheduledExecutorService transferExecutor; private final LeadershipEventListener leadershipEventListener = new InternalDeviceMastershipEventListener(); private static final String NODE_ID_NULL = "Node ID cannot be null"; private static final String DEVICE_ID_NULL = "Device ID cannot be null"; private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000; public static final StoreSerializer SERIALIZER = new KryoSerializer() { @Override protected void setupKryoPool() { serializerPool = KryoNamespace.newBuilder() .register(KryoNamespaces.API) .register(MastershipRole.class) .register(MastershipEvent.class) .register(MastershipEvent.Type.class) .build(); } }; @Activate public void activate() { messageHandlingExecutor = Executors.newSingleThreadExecutor( groupedThreads("onos/store/device/mastership", "message-handler")); transferExecutor = Executors.newSingleThreadScheduledExecutor( groupedThreads("onos/store/device/mastership", "mastership-transfer-executor")); clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT, SERIALIZER::decode, this::relinquishLocalRole, SERIALIZER::encode, messageHandlingExecutor); clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT, SERIALIZER::decode, this::transitionFromMasterToStandby, SERIALIZER::encode, messageHandlingExecutor); localNodeId = clusterService.getLocalNode().id(); leadershipService.addListener(leadershipEventListener); log.info("Started"); } @Deactivate public void deactivate() { clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT); clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT); messageHandlingExecutor.shutdown(); transferExecutor.shutdown(); leadershipService.removeListener(leadershipEventListener); log.info("Stopped"); } @Override public CompletableFuture requestRole(DeviceId deviceId) { checkArgument(deviceId != null, DEVICE_ID_NULL); String leadershipTopic = createDeviceMastershipTopic(deviceId); if (connectedDevices.add(deviceId)) { return leadershipService.runForLeadership(leadershipTopic) .thenApply(leadership -> { return Objects.equal(localNodeId, leadership.leader()) ? MastershipRole.MASTER : MastershipRole.STANDBY; }); } else { NodeId leader = leadershipService.getLeader(leadershipTopic); if (Objects.equal(localNodeId, leader)) { return CompletableFuture.completedFuture(MastershipRole.MASTER); } else { return CompletableFuture.completedFuture(MastershipRole.STANDBY); } } } @Override public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) { checkArgument(nodeId != null, NODE_ID_NULL); checkArgument(deviceId != null, DEVICE_ID_NULL); String leadershipTopic = createDeviceMastershipTopic(deviceId); NodeId leader = leadershipService.getLeader(leadershipTopic); if (Objects.equal(nodeId, leader)) { return MastershipRole.MASTER; } return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE; } @Override public NodeId getMaster(DeviceId deviceId) { checkArgument(deviceId != null, DEVICE_ID_NULL); String leadershipTopic = createDeviceMastershipTopic(deviceId); return leadershipService.getLeader(leadershipTopic); } @Override public RoleInfo getNodes(DeviceId deviceId) { checkArgument(deviceId != null, DEVICE_ID_NULL); Map roles = Maps.newHashMap(); clusterService .getNodes() .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId))); NodeId master = null; final List standbys = Lists.newLinkedList(); List candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId)); for (Map.Entry entry : roles.entrySet()) { if (entry.getValue() == MastershipRole.MASTER) { master = entry.getKey(); } else if (entry.getValue() == MastershipRole.STANDBY) { standbys.add(entry.getKey()); } } List sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList()); return new RoleInfo(master, sortedStandbyList); } @Override public Set getDevices(NodeId nodeId) { checkArgument(nodeId != null, NODE_ID_NULL); return leadershipService .ownedTopics(nodeId) .stream() .filter(this::isDeviceMastershipTopic) .map(this::extractDeviceIdFromTopic) .collect(Collectors.toSet()); } @Override public CompletableFuture setMaster(NodeId nodeId, DeviceId deviceId) { checkArgument(nodeId != null, NODE_ID_NULL); checkArgument(deviceId != null, DEVICE_ID_NULL); NodeId currentMaster = getMaster(deviceId); if (nodeId.equals(currentMaster)) { return CompletableFuture.completedFuture(null); } else { String leadershipTopic = createDeviceMastershipTopic(deviceId); List candidates = leadershipService.getCandidates(leadershipTopic); if (candidates.isEmpty()) { return CompletableFuture.completedFuture(null); } if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) { CompletableFuture result = new CompletableFuture<>(); // There is brief wait before we step down from mastership. // This is to ensure any work that happens when standby preference // order changes can complete. For example: flow entries need to be backed // to the new top standby (ONOS-1883) // FIXME: This potentially introduces a race-condition. // Right now role changes are only forced via CLI. transferExecutor.schedule(() -> { result.complete(transitionFromMasterToStandby(deviceId)); }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS); return result; } else { log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId); } } return CompletableFuture.completedFuture(null); } @Override public MastershipTerm getTermFor(DeviceId deviceId) { checkArgument(deviceId != null, DEVICE_ID_NULL); String leadershipTopic = createDeviceMastershipTopic(deviceId); Leadership leadership = leadershipService.getLeadership(leadershipTopic); return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null; } @Override public CompletableFuture setStandby(NodeId nodeId, DeviceId deviceId) { checkArgument(nodeId != null, NODE_ID_NULL); checkArgument(deviceId != null, DEVICE_ID_NULL); NodeId currentMaster = getMaster(deviceId); if (!nodeId.equals(currentMaster)) { return CompletableFuture.completedFuture(null); } String leadershipTopic = createDeviceMastershipTopic(deviceId); List candidates = leadershipService.getCandidates(leadershipTopic); NodeId newMaster = candidates.stream() .filter(candidate -> !Objects.equal(nodeId, candidate)) .findFirst() .orElse(null); log.info("Transitioning to role {} for {}. Next master: {}", newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster); if (newMaster != null) { return setMaster(newMaster, deviceId); } return relinquishRole(nodeId, deviceId); } @Override public CompletableFuture relinquishRole(NodeId nodeId, DeviceId deviceId) { checkArgument(nodeId != null, NODE_ID_NULL); checkArgument(deviceId != null, DEVICE_ID_NULL); if (nodeId.equals(localNodeId)) { return relinquishLocalRole(deviceId); } log.debug("Forwarding request to relinquish " + "role for device {} to {}", deviceId, nodeId); return clusterCommunicator.sendAndReceive( deviceId, ROLE_RELINQUISH_SUBJECT, SERIALIZER::encode, SERIALIZER::decode, nodeId); } private CompletableFuture relinquishLocalRole(DeviceId deviceId) { checkArgument(deviceId != null, DEVICE_ID_NULL); // Check if this node is can be managed by this node. if (!connectedDevices.contains(deviceId)) { return CompletableFuture.completedFuture(null); } String leadershipTopic = createDeviceMastershipTopic(deviceId); NodeId currentLeader = leadershipService.getLeader(leadershipTopic); MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId) ? MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED; connectedDevices.remove(deviceId); return leadershipService.withdraw(leadershipTopic) .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId))); } private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) { checkArgument(deviceId != null, DEVICE_ID_NULL); NodeId currentMaster = getMaster(deviceId); if (currentMaster == null) { return null; } if (!currentMaster.equals(localNodeId)) { log.info("Forwarding request to relinquish " + "mastership for device {} to {}", deviceId, currentMaster); return futureGetOrElse(clusterCommunicator.sendAndReceive( deviceId, TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT, SERIALIZER::encode, SERIALIZER::decode, currentMaster), null); } return leadershipService.stepdown(createDeviceMastershipTopic(deviceId)) ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null; } @Override public void relinquishAllRole(NodeId nodeId) { // Noop. LeadershipService already takes care of detecting and purging deadlocks. } private class InternalDeviceMastershipEventListener implements LeadershipEventListener { @Override public void event(LeadershipEvent event) { Leadership leadership = event.subject(); if (!isDeviceMastershipTopic(leadership.topic())) { return; } DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic()); switch (event.type()) { case LEADER_ELECTED: notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); break; case LEADER_REELECTED: // There is no concept of leader re-election in the new distributed leadership manager. throw new IllegalStateException("Unexpected event type"); case LEADER_BOOTED: notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); break; case CANDIDATES_CHANGED: notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId))); break; default: return; } } } private String createDeviceMastershipTopic(DeviceId deviceId) { return String.format("device:%s", deviceId.toString()); } private DeviceId extractDeviceIdFromTopic(String topic) { Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic); if (m.matches()) { return DeviceId.deviceId(m.group(1)); } else { throw new IllegalArgumentException("Invalid device mastership topic: " + topic); } } private boolean isDeviceMastershipTopic(String topic) { Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic); return m.matches(); } }