From b731e2f1dd0972409b136aebc7b463dd72c9cfad Mon Sep 17 00:00:00 2001 From: CNlucius Date: Tue, 13 Sep 2016 11:40:12 +0800 Subject: ONOSFW-171 O/S-SFC-ONOS scenario documentation Change-Id: I51ae1cf736ea24ab6680f8edca1b2bf5dd598365 Signed-off-by: CNlucius --- .../impl/ConsistentDeviceMastershipStore.java | 419 --------------------- 1 file changed, 419 deletions(-) delete mode 100644 framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java') diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java deleted file mode 100644 index c6fc6933..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.mastership.impl; - -import static org.onlab.util.Tools.groupedThreads; -import static org.onlab.util.Tools.futureGetOrElse; -import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED; -import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED; -import static org.slf4j.LoggerFactory.getLogger; -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.util.KryoNamespace; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.Leadership; -import org.onosproject.cluster.LeadershipEvent; -import org.onosproject.cluster.LeadershipEventListener; -import org.onosproject.cluster.LeadershipService; -import org.onosproject.cluster.NodeId; -import org.onosproject.cluster.RoleInfo; -import org.onosproject.mastership.MastershipEvent; -import org.onosproject.mastership.MastershipStore; -import org.onosproject.mastership.MastershipStoreDelegate; -import org.onosproject.mastership.MastershipTerm; -import org.onosproject.net.DeviceId; -import org.onosproject.net.MastershipRole; -import org.onosproject.store.AbstractStore; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.MessageSubject; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.KryoSerializer; -import org.onosproject.store.serializers.StoreSerializer; -import org.slf4j.Logger; - -import com.google.common.base.Objects; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * Implementation of the MastershipStore on top of Leadership Service. - */ -@Component(immediate = true, enabled = true) -@Service -public class ConsistentDeviceMastershipStore - extends AbstractStore - implements MastershipStore { - - private final Logger log = getLogger(getClass()); - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected LeadershipService leadershipService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - private NodeId localNodeId; - private final Set connectedDevices = Sets.newHashSet(); - - private static final MessageSubject ROLE_RELINQUISH_SUBJECT = - new MessageSubject("mastership-store-device-role-relinquish"); - private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT = - new MessageSubject("mastership-store-device-mastership-relinquish"); - - private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = - Pattern.compile("device:(.*)"); - - private ExecutorService messageHandlingExecutor; - private ScheduledExecutorService transferExecutor; - private final LeadershipEventListener leadershipEventListener = - new InternalDeviceMastershipEventListener(); - - private static final String NODE_ID_NULL = "Node ID cannot be null"; - private static final String DEVICE_ID_NULL = "Device ID cannot be null"; - private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000; - - public static final StoreSerializer SERIALIZER = new KryoSerializer() { - @Override - protected void setupKryoPool() { - serializerPool = KryoNamespace.newBuilder() - .register(KryoNamespaces.API) - .register(MastershipRole.class) - .register(MastershipEvent.class) - .register(MastershipEvent.Type.class) - .build(); - } - }; - - @Activate - public void activate() { - messageHandlingExecutor = - Executors.newSingleThreadExecutor( - groupedThreads("onos/store/device/mastership", "message-handler")); - transferExecutor = - Executors.newSingleThreadScheduledExecutor( - groupedThreads("onos/store/device/mastership", "mastership-transfer-executor")); - clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT, - SERIALIZER::decode, - this::relinquishLocalRole, - SERIALIZER::encode, - messageHandlingExecutor); - clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT, - SERIALIZER::decode, - this::transitionFromMasterToStandby, - SERIALIZER::encode, - messageHandlingExecutor); - localNodeId = clusterService.getLocalNode().id(); - leadershipService.addListener(leadershipEventListener); - - log.info("Started"); - } - - @Deactivate - public void deactivate() { - clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT); - clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT); - messageHandlingExecutor.shutdown(); - transferExecutor.shutdown(); - leadershipService.removeListener(leadershipEventListener); - - log.info("Stopped"); - } - - @Override - public CompletableFuture requestRole(DeviceId deviceId) { - checkArgument(deviceId != null, DEVICE_ID_NULL); - - String leadershipTopic = createDeviceMastershipTopic(deviceId); - if (connectedDevices.add(deviceId)) { - return leadershipService.runForLeadership(leadershipTopic) - .thenApply(leadership -> { - return Objects.equal(localNodeId, leadership.leader()) - ? MastershipRole.MASTER : MastershipRole.STANDBY; - }); - } else { - NodeId leader = leadershipService.getLeader(leadershipTopic); - if (Objects.equal(localNodeId, leader)) { - return CompletableFuture.completedFuture(MastershipRole.MASTER); - } else { - return CompletableFuture.completedFuture(MastershipRole.STANDBY); - } - } - } - - @Override - public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) { - checkArgument(nodeId != null, NODE_ID_NULL); - checkArgument(deviceId != null, DEVICE_ID_NULL); - - String leadershipTopic = createDeviceMastershipTopic(deviceId); - NodeId leader = leadershipService.getLeader(leadershipTopic); - if (Objects.equal(nodeId, leader)) { - return MastershipRole.MASTER; - } - return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ? - MastershipRole.STANDBY : MastershipRole.NONE; - } - - @Override - public NodeId getMaster(DeviceId deviceId) { - checkArgument(deviceId != null, DEVICE_ID_NULL); - - String leadershipTopic = createDeviceMastershipTopic(deviceId); - return leadershipService.getLeader(leadershipTopic); - } - - @Override - public RoleInfo getNodes(DeviceId deviceId) { - checkArgument(deviceId != null, DEVICE_ID_NULL); - - Map roles = Maps.newHashMap(); - clusterService - .getNodes() - .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId))); - - NodeId master = null; - final List standbys = Lists.newLinkedList(); - - List candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId)); - - for (Map.Entry entry : roles.entrySet()) { - if (entry.getValue() == MastershipRole.MASTER) { - master = entry.getKey(); - } else if (entry.getValue() == MastershipRole.STANDBY) { - standbys.add(entry.getKey()); - } - } - - List sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList()); - - return new RoleInfo(master, sortedStandbyList); - } - - @Override - public Set getDevices(NodeId nodeId) { - checkArgument(nodeId != null, NODE_ID_NULL); - - return leadershipService - .ownedTopics(nodeId) - .stream() - .filter(this::isDeviceMastershipTopic) - .map(this::extractDeviceIdFromTopic) - .collect(Collectors.toSet()); - } - - @Override - public CompletableFuture setMaster(NodeId nodeId, DeviceId deviceId) { - checkArgument(nodeId != null, NODE_ID_NULL); - checkArgument(deviceId != null, DEVICE_ID_NULL); - - NodeId currentMaster = getMaster(deviceId); - if (nodeId.equals(currentMaster)) { - return CompletableFuture.completedFuture(null); - } else { - String leadershipTopic = createDeviceMastershipTopic(deviceId); - List candidates = leadershipService.getCandidates(leadershipTopic); - if (candidates.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) { - CompletableFuture result = new CompletableFuture<>(); - // There is brief wait before we step down from mastership. - // This is to ensure any work that happens when standby preference - // order changes can complete. For example: flow entries need to be backed - // to the new top standby (ONOS-1883) - // FIXME: This potentially introduces a race-condition. - // Right now role changes are only forced via CLI. - transferExecutor.schedule(() -> { - result.complete(transitionFromMasterToStandby(deviceId)); - }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS); - return result; - } else { - log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId); - } - } - return CompletableFuture.completedFuture(null); - } - - @Override - public MastershipTerm getTermFor(DeviceId deviceId) { - checkArgument(deviceId != null, DEVICE_ID_NULL); - - String leadershipTopic = createDeviceMastershipTopic(deviceId); - Leadership leadership = leadershipService.getLeadership(leadershipTopic); - return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null; - } - - @Override - public CompletableFuture setStandby(NodeId nodeId, DeviceId deviceId) { - checkArgument(nodeId != null, NODE_ID_NULL); - checkArgument(deviceId != null, DEVICE_ID_NULL); - - NodeId currentMaster = getMaster(deviceId); - if (!nodeId.equals(currentMaster)) { - return CompletableFuture.completedFuture(null); - } - - String leadershipTopic = createDeviceMastershipTopic(deviceId); - List candidates = leadershipService.getCandidates(leadershipTopic); - - NodeId newMaster = candidates.stream() - .filter(candidate -> !Objects.equal(nodeId, candidate)) - .findFirst() - .orElse(null); - log.info("Transitioning to role {} for {}. Next master: {}", - newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster); - - if (newMaster != null) { - return setMaster(newMaster, deviceId); - } - return relinquishRole(nodeId, deviceId); - } - - @Override - public CompletableFuture relinquishRole(NodeId nodeId, DeviceId deviceId) { - checkArgument(nodeId != null, NODE_ID_NULL); - checkArgument(deviceId != null, DEVICE_ID_NULL); - - if (nodeId.equals(localNodeId)) { - return relinquishLocalRole(deviceId); - } - - log.debug("Forwarding request to relinquish " - + "role for device {} to {}", deviceId, nodeId); - return clusterCommunicator.sendAndReceive( - deviceId, - ROLE_RELINQUISH_SUBJECT, - SERIALIZER::encode, - SERIALIZER::decode, - nodeId); - } - - private CompletableFuture relinquishLocalRole(DeviceId deviceId) { - checkArgument(deviceId != null, DEVICE_ID_NULL); - - // Check if this node is can be managed by this node. - if (!connectedDevices.contains(deviceId)) { - return CompletableFuture.completedFuture(null); - } - - String leadershipTopic = createDeviceMastershipTopic(deviceId); - NodeId currentLeader = leadershipService.getLeader(leadershipTopic); - - MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId) - ? MastershipEvent.Type.MASTER_CHANGED - : MastershipEvent.Type.BACKUPS_CHANGED; - - connectedDevices.remove(deviceId); - return leadershipService.withdraw(leadershipTopic) - .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId))); - } - - private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) { - checkArgument(deviceId != null, DEVICE_ID_NULL); - - NodeId currentMaster = getMaster(deviceId); - if (currentMaster == null) { - return null; - } - - if (!currentMaster.equals(localNodeId)) { - log.info("Forwarding request to relinquish " - + "mastership for device {} to {}", deviceId, currentMaster); - return futureGetOrElse(clusterCommunicator.sendAndReceive( - deviceId, - TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT, - SERIALIZER::encode, - SERIALIZER::decode, - currentMaster), null); - } - - return leadershipService.stepdown(createDeviceMastershipTopic(deviceId)) - ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null; - } - - @Override - public void relinquishAllRole(NodeId nodeId) { - // Noop. LeadershipService already takes care of detecting and purging deadlocks. - } - - private class InternalDeviceMastershipEventListener implements LeadershipEventListener { - @Override - public void event(LeadershipEvent event) { - Leadership leadership = event.subject(); - if (!isDeviceMastershipTopic(leadership.topic())) { - return; - } - DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic()); - switch (event.type()) { - case LEADER_ELECTED: - notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); - break; - case LEADER_REELECTED: - // There is no concept of leader re-election in the new distributed leadership manager. - throw new IllegalStateException("Unexpected event type"); - case LEADER_BOOTED: - notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); - break; - case CANDIDATES_CHANGED: - notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId))); - break; - default: - return; - } - } - } - - private String createDeviceMastershipTopic(DeviceId deviceId) { - return String.format("device:%s", deviceId.toString()); - } - - private DeviceId extractDeviceIdFromTopic(String topic) { - Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic); - if (m.matches()) { - return DeviceId.deviceId(m.group(1)); - } else { - throw new IllegalArgumentException("Invalid device mastership topic: " + topic); - } - } - - private boolean isDeviceMastershipTopic(String topic) { - Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic); - return m.matches(); - } - -} -- cgit 1.2.3-korg