path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl
diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl')
4 files changed, 0 insertions, 685 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/
deleted file mode 100644
index c6fc6933..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/
+++ /dev/null
@@ -1,419 +0,0 @@
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.futureGetOrElse;
-import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
-import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
-import static org.slf4j.LoggerFactory.getLogger;
-import static;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
-import org.onosproject.mastership.MastershipEvent;
-import org.onosproject.mastership.MastershipStore;
-import org.onosproject.mastership.MastershipStoreDelegate;
-import org.onosproject.mastership.MastershipTerm;
-import org.slf4j.Logger;
- * Implementation of the MastershipStore on top of Leadership Service.
- */
-@Component(immediate = true, enabled = true)
-public class ConsistentDeviceMastershipStore
- extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
- implements MastershipStore {
- private final Logger log = getLogger(getClass());
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LeadershipService leadershipService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
- private NodeId localNodeId;
- private final Set<DeviceId> connectedDevices = Sets.newHashSet();
- private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
- new MessageSubject("mastership-store-device-role-relinquish");
- private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
- new MessageSubject("mastership-store-device-mastership-relinquish");
- private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
- Pattern.compile("device:(.*)");
- private ExecutorService messageHandlingExecutor;
- private ScheduledExecutorService transferExecutor;
- private final LeadershipEventListener leadershipEventListener =
- new InternalDeviceMastershipEventListener();
- private static final String NODE_ID_NULL = "Node ID cannot be null";
- private static final String DEVICE_ID_NULL = "Device ID cannot be null";
- private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
- public static final StoreSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(MastershipRole.class)
- .register(MastershipEvent.class)
- .register(MastershipEvent.Type.class)
- .build();
- }
- };
- @Activate
- public void activate() {
- messageHandlingExecutor =
- Executors.newSingleThreadExecutor(
- groupedThreads("onos/store/device/mastership", "message-handler"));
- transferExecutor =
- Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
- clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
- SERIALIZER::decode,
- this::relinquishLocalRole,
- SERIALIZER::encode,
- messageHandlingExecutor);
- clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
- SERIALIZER::decode,
- this::transitionFromMasterToStandby,
- SERIALIZER::encode,
- messageHandlingExecutor);
- localNodeId = clusterService.getLocalNode().id();
- leadershipService.addListener(leadershipEventListener);
- }
- @Deactivate
- public void deactivate() {
- clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
- clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
- messageHandlingExecutor.shutdown();
- transferExecutor.shutdown();
- leadershipService.removeListener(leadershipEventListener);
- }
- @Override
- public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- if (connectedDevices.add(deviceId)) {
- return leadershipService.runForLeadership(leadershipTopic)
- .thenApply(leadership -> {
- return Objects.equal(localNodeId, leadership.leader())
- ? MastershipRole.MASTER : MastershipRole.STANDBY;
- });
- } else {
- NodeId leader = leadershipService.getLeader(leadershipTopic);
- if (Objects.equal(localNodeId, leader)) {
- return CompletableFuture.completedFuture(MastershipRole.MASTER);
- } else {
- return CompletableFuture.completedFuture(MastershipRole.STANDBY);
- }
- }
- }
- @Override
- public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
- checkArgument(nodeId != null, NODE_ID_NULL);
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- NodeId leader = leadershipService.getLeader(leadershipTopic);
- if (Objects.equal(nodeId, leader)) {
- return MastershipRole.MASTER;
- }
- return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ?
- MastershipRole.STANDBY : MastershipRole.NONE;
- }
- @Override
- public NodeId getMaster(DeviceId deviceId) {
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- return leadershipService.getLeader(leadershipTopic);
- }
- @Override
- public RoleInfo getNodes(DeviceId deviceId) {
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- Map<NodeId, MastershipRole> roles = Maps.newHashMap();
- clusterService
- .getNodes()
- .forEach((node) -> roles.put(, getRole(, deviceId)));
- NodeId master = null;
- final List<NodeId> standbys = Lists.newLinkedList();
- List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
- for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
- if (entry.getValue() == MastershipRole.MASTER) {
- master = entry.getKey();
- } else if (entry.getValue() == MastershipRole.STANDBY) {
- standbys.add(entry.getKey());
- }
- }
- List<NodeId> sortedStandbyList =;
- return new RoleInfo(master, sortedStandbyList);
- }
- @Override
- public Set<DeviceId> getDevices(NodeId nodeId) {
- checkArgument(nodeId != null, NODE_ID_NULL);
- return leadershipService
- .ownedTopics(nodeId)
- .stream()
- .filter(this::isDeviceMastershipTopic)
- .map(this::extractDeviceIdFromTopic)
- .collect(Collectors.toSet());
- }
- @Override
- public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
- checkArgument(nodeId != null, NODE_ID_NULL);
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- NodeId currentMaster = getMaster(deviceId);
- if (nodeId.equals(currentMaster)) {
- return CompletableFuture.completedFuture(null);
- } else {
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
- if (candidates.isEmpty()) {
- return CompletableFuture.completedFuture(null);
- }
- if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
- CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
- // There is brief wait before we step down from mastership.
- // This is to ensure any work that happens when standby preference
- // order changes can complete. For example: flow entries need to be backed
- // to the new top standby (ONOS-1883)
- // FIXME: This potentially introduces a race-condition.
- // Right now role changes are only forced via CLI.
- transferExecutor.schedule(() -> {
- result.complete(transitionFromMasterToStandby(deviceId));
- return result;
- } else {
- log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
- }
- }
- return CompletableFuture.completedFuture(null);
- }
- @Override
- public MastershipTerm getTermFor(DeviceId deviceId) {
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- Leadership leadership = leadershipService.getLeadership(leadershipTopic);
- return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
- }
- @Override
- public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
- checkArgument(nodeId != null, NODE_ID_NULL);
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- NodeId currentMaster = getMaster(deviceId);
- if (!nodeId.equals(currentMaster)) {
- return CompletableFuture.completedFuture(null);
- }
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
- NodeId newMaster =
- .filter(candidate -> !Objects.equal(nodeId, candidate))
- .findFirst()
- .orElse(null);
-"Transitioning to role {} for {}. Next master: {}",
- newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
- if (newMaster != null) {
- return setMaster(newMaster, deviceId);
- }
- return relinquishRole(nodeId, deviceId);
- }
- @Override
- public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
- checkArgument(nodeId != null, NODE_ID_NULL);
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- if (nodeId.equals(localNodeId)) {
- return relinquishLocalRole(deviceId);
- }
- log.debug("Forwarding request to relinquish "
- + "role for device {} to {}", deviceId, nodeId);
- return clusterCommunicator.sendAndReceive(
- deviceId,
- SERIALIZER::encode,
- SERIALIZER::decode,
- nodeId);
- }
- private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- // Check if this node is can be managed by this node.
- if (!connectedDevices.contains(deviceId)) {
- return CompletableFuture.completedFuture(null);
- }
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
- MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
- ? MastershipEvent.Type.MASTER_CHANGED
- : MastershipEvent.Type.BACKUPS_CHANGED;
- connectedDevices.remove(deviceId);
- return leadershipService.withdraw(leadershipTopic)
- .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
- }
- private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
- checkArgument(deviceId != null, DEVICE_ID_NULL);
- NodeId currentMaster = getMaster(deviceId);
- if (currentMaster == null) {
- return null;
- }
- if (!currentMaster.equals(localNodeId)) {
-"Forwarding request to relinquish "
- + "mastership for device {} to {}", deviceId, currentMaster);
- return futureGetOrElse(clusterCommunicator.sendAndReceive(
- deviceId,
- SERIALIZER::encode,
- SERIALIZER::decode,
- currentMaster), null);
- }
- return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
- ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
- }
- @Override
- public void relinquishAllRole(NodeId nodeId) {
- // Noop. LeadershipService already takes care of detecting and purging deadlocks.
- }
- private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
- @Override
- public void event(LeadershipEvent event) {
- Leadership leadership = event.subject();
- if (!isDeviceMastershipTopic(leadership.topic())) {
- return;
- }
- DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
- switch (event.type()) {
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
- break;
- // There is no concept of leader re-election in the new distributed leadership manager.
- throw new IllegalStateException("Unexpected event type");
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
- break;
- notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
- break;
- default:
- return;
- }
- }
- }
- private String createDeviceMastershipTopic(DeviceId deviceId) {
- return String.format("device:%s", deviceId.toString());
- }
- private DeviceId extractDeviceIdFromTopic(String topic) {
- Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
- if (m.matches()) {
- return DeviceId.deviceId(;
- } else {
- throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
- }
- }
- private boolean isDeviceMastershipTopic(String topic) {
- Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
- return m.matches();
- }
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/
deleted file mode 100644
index 5a38a34a..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/
+++ /dev/null
@@ -1,179 +0,0 @@
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static;
-import static;
-import static;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
- * A structure that holds node mastership roles associated with a
- * {@link}. This structure needs to be locked through IMap.
- */
-final class RoleValue {
- protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
- /**
- * Constructs empty RoleValue.
- */
- public RoleValue() {
- value.put(MastershipRole.MASTER, new LinkedList<>());
- value.put(MastershipRole.STANDBY, new LinkedList<>());
- value.put(MastershipRole.NONE, new LinkedList<>());
- }
- /**
- * Constructs copy of specified RoleValue.
- *
- * @param original original to create copy from
- */
- public RoleValue(final RoleValue original) {
- value.put(MASTER, Lists.newLinkedList(original.value.get(MASTER)));
- value.put(STANDBY, Lists.newLinkedList(original.value.get(STANDBY)));
- value.put(NONE, Lists.newLinkedList(original.value.get(NONE)));
- }
- // exposing internals for serialization purpose only
- Map<MastershipRole, List<NodeId>> value() {
- return Collections.unmodifiableMap(value);
- }
- public List<NodeId> nodesOfRole(MastershipRole type) {
- return value.get(type);
- }
- /**
- * Returns the first node to match the MastershipRole, or if there
- * are none, null.
- *
- * @param type the role
- * @return a node ID or null
- */
- public NodeId get(MastershipRole type) {
- return value.get(type).isEmpty() ? null : value.get(type).get(0);
- }
- public boolean contains(MastershipRole type, NodeId nodeId) {
- return value.get(type).contains(nodeId);
- }
- public MastershipRole getRole(NodeId nodeId) {
- if (contains(MASTER, nodeId)) {
- return MASTER;
- }
- if (contains(STANDBY, nodeId)) {
- return STANDBY;
- }
- return NONE;
- }
- /**
- * Associates a node to a certain role.
- *
- * @param type the role
- * @param nodeId the node ID of the node to associate
- * @return true if modified
- */
- public boolean add(MastershipRole type, NodeId nodeId) {
- List<NodeId> nodes = value.get(type);
- if (!nodes.contains(nodeId)) {
- return nodes.add(nodeId);
- }
- return false;
- }
- /**
- * Removes a node from a certain role.
- *
- * @param type the role
- * @param nodeId the ID of the node to remove
- * @return true if modified
- */
- public boolean remove(MastershipRole type, NodeId nodeId) {
- List<NodeId> nodes = value.get(type);
- if (!nodes.isEmpty()) {
- return nodes.remove(nodeId);
- } else {
- return false;
- }
- }
- /**
- * Reassigns a node from one role to another. If the node was not of the
- * old role, it will still be assigned the new role.
- *
- * @param nodeId the Node ID of node changing roles
- * @param from the old role
- * @param to the new role
- * @return true if modified
- */
- public boolean reassign(NodeId nodeId, MastershipRole from, MastershipRole to) {
- boolean modified = remove(from, nodeId);
- modified |= add(to, nodeId);
- return modified;
- }
- /**
- * Replaces a node in one role with another node. Even if there is no node to
- * replace, the new node is associated to the role.
- *
- * @param from the old NodeId to replace
- * @param to the new NodeId
- * @param type the role associated with the old NodeId
- * @return true if modified
- */
- public boolean replace(NodeId from, NodeId to, MastershipRole type) {
- boolean modified = remove(type, from);
- modified |= add(type, to);
- return modified;
- }
- /**
- * Summarizes this RoleValue as a RoleInfo. Note that master and/or backups
- * may be empty, so the values should be checked for safety.
- *
- * @return the RoleInfo.
- */
- public RoleInfo roleInfo() {
- return new RoleInfo(
- get(MastershipRole.MASTER), nodesOfRole(MastershipRole.STANDBY));
- }
- @Override
- public String toString() {
- ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
- for (Map.Entry<MastershipRole, List<NodeId>> el : value.entrySet()) {
- helper.add(el.getKey().toString(), el.getValue());
- }
- return helper.toString();
- }
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/
deleted file mode 100644
index c81ea7f9..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/
+++ /dev/null
@@ -1,67 +0,0 @@
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.List;
-import java.util.Map;
-import org.onosproject.cluster.NodeId;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
- * Serializer for RoleValues used by {@link org.onosproject.mastership.MastershipStore}.
- */
-public class RoleValueSerializer extends Serializer<RoleValue> {
- //RoleValues are assumed to hold a Map of MastershipRoles (an enum)
- //to a List of NodeIds.
- @Override
- public RoleValue read(Kryo kryo, Input input, Class<RoleValue> type) {
- RoleValue rv = new RoleValue();
- int size = input.readInt();
- for (int i = 0; i < size; i++) {
- MastershipRole role = MastershipRole.values()[input.readInt()];
- int s = input.readInt();
- for (int j = 0; j < s; j++) {
- rv.add(role, new NodeId(input.readString()));
- }
- }
- return rv;
- }
- @Override
- public void write(Kryo kryo, Output output, RoleValue type) {
- final Map<MastershipRole, List<NodeId>> map = type.value();
- output.writeInt(map.size());
- for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
- output.writeInt(el.getKey().ordinal());
- List<NodeId> nodes = el.getValue();
- output.writeInt(nodes.size());
- for (NodeId n : nodes) {
- output.writeString(n.toString());
- }
- }
- }
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/
deleted file mode 100644
index 40ff6f76..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/
+++ /dev/null
@@ -1,20 +0,0 @@
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- * Implementation of a distributed mastership store using Hazelcast.
- */