aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl')
-rw-r--r--framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java186
-rw-r--r--framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java116
-rw-r--r--framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java282
-rw-r--r--framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/package-info.java20
4 files changed, 0 insertions, 604 deletions
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
deleted file mode 100644
index 7ddac0ce..00000000
--- a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.cluster.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.apache.karaf.system.SystemService;
-import org.joda.time.DateTime;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterAdminService;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterMetadata;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.Partition;
-import org.onosproject.event.AbstractListenerManager;
-import org.slf4j.Logger;
-
-import com.google.common.collect.Lists;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.security.AppPermission.Type.*;
-
-/**
- * Implementation of the cluster service.
- */
-@Component(immediate = true)
-@Service
-public class ClusterManager
- extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
- implements ClusterService, ClusterAdminService {
-
- public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
- private final Logger log = getLogger(getClass());
-
- private ClusterStoreDelegate delegate = new InternalStoreDelegate();
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterStore store;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected SystemService systemService;
-
- @Activate
- public void activate() {
- store.setDelegate(delegate);
- eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
- clusterMetadataService.getClusterMetadata()
- .getNodes()
- .forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort()));
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- store.unsetDelegate(delegate);
- eventDispatcher.removeSink(ClusterEvent.class);
- log.info("Stopped");
- }
-
- @Override
- public ControllerNode getLocalNode() {
- checkPermission(CLUSTER_READ);
- return store.getLocalNode();
- }
-
- @Override
- public Set<ControllerNode> getNodes() {
- checkPermission(CLUSTER_READ);
- return store.getNodes();
- }
-
- @Override
- public ControllerNode getNode(NodeId nodeId) {
- checkPermission(CLUSTER_READ);
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return store.getNode(nodeId);
- }
-
- @Override
- public ControllerNode.State getState(NodeId nodeId) {
- checkPermission(CLUSTER_READ);
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return store.getState(nodeId);
- }
-
-
- @Override
- public DateTime getLastUpdated(NodeId nodeId) {
- checkPermission(CLUSTER_READ);
- return store.getLastUpdated(nodeId);
- }
-
- @Override
- public void formCluster(Set<ControllerNode> nodes) {
- checkNotNull(nodes, "Nodes cannot be null");
- checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
-
- ClusterMetadata metadata = ClusterMetadata.builder()
- .withName("default")
- .withControllerNodes(nodes)
- .withPartitions(buildDefaultPartitions(nodes))
- .build();
- clusterMetadataService.setClusterMetadata(metadata);
- try {
- log.warn("Shutting down container for cluster reconfiguration!");
- systemService.reboot("now", SystemService.Swipe.NONE);
- } catch (Exception e) {
- log.error("Unable to reboot container", e);
- }
- }
-
- @Override
- public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- checkNotNull(ip, "IP address cannot be null");
- checkArgument(tcpPort > 5000, "TCP port must be > 5000");
- return store.addNode(nodeId, ip, tcpPort);
- }
-
- @Override
- public void removeNode(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- store.removeNode(nodeId);
- }
-
- // Store delegate to re-post events emitted from the store.
- private class InternalStoreDelegate implements ClusterStoreDelegate {
- @Override
- public void notify(ClusterEvent event) {
- post(event);
- }
- }
-
- private static Collection<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes) {
- List<ControllerNode> sorted = new ArrayList<>(nodes);
- Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
- Collection<Partition> partitions = Lists.newArrayList();
-
- int length = nodes.size();
- int count = 3;
- for (int i = 0; i < length; i++) {
- Set<NodeId> set = new HashSet<>(count);
- for (int j = 0; j < count; j++) {
- set.add(sorted.get((i + j) % length).id());
- }
- partitions.add(new Partition("p" + (i + 1), set));
- }
- return partitions;
- }
-}
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java b/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java
deleted file mode 100644
index a0f7a833..00000000
--- a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.onosproject.cluster.impl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Collection;
-import java.util.Enumeration;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterMetadata;
-import org.onosproject.cluster.ClusterMetadataEvent;
-import org.onosproject.cluster.ClusterMetadataEventListener;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ClusterMetadataStore;
-import org.onosproject.cluster.ClusterMetadataStoreDelegate;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
-
-/**
- * Implementation of ClusterMetadataService.
- */
-@Component(immediate = true)
-@Service
-public class ClusterMetadataManager
- extends AbstractListenerManager<ClusterMetadataEvent, ClusterMetadataEventListener>
- implements ClusterMetadataService {
-
- private ControllerNode localNode;
- private final Logger log = getLogger(getClass());
-
- private ClusterMetadataStoreDelegate delegate = new InternalStoreDelegate();
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataStore store;
-
- @Activate
- public void activate() {
- store.setDelegate(delegate);
- eventDispatcher.addSink(ClusterMetadataEvent.class, listenerRegistry);
- establishSelfIdentity();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- store.unsetDelegate(delegate);
- eventDispatcher.removeSink(ClusterMetadataEvent.class);
- log.info("Stopped");
- }
-
- @Override
- public ClusterMetadata getClusterMetadata() {
- return Versioned.valueOrElse(store.getClusterMetadata(), null);
- }
-
- @Override
- public ControllerNode getLocalNode() {
- return localNode;
- }
-
- @Override
- public void setClusterMetadata(ClusterMetadata metadata) {
- checkNotNull(metadata, "Cluster metadata cannot be null");
- store.setClusterMetadata(metadata);
- }
-
- // Store delegate to re-post events emitted from the store.
- private class InternalStoreDelegate implements ClusterMetadataStoreDelegate {
- @Override
- public void notify(ClusterMetadataEvent event) {
- post(event);
- }
- }
-
- private IpAddress findLocalIp(Collection<ControllerNode> controllerNodes) throws SocketException {
- Enumeration<NetworkInterface> interfaces =
- NetworkInterface.getNetworkInterfaces();
- while (interfaces.hasMoreElements()) {
- NetworkInterface iface = interfaces.nextElement();
- Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
- while (inetAddresses.hasMoreElements()) {
- IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
- if (controllerNodes.stream()
- .map(ControllerNode::ip)
- .anyMatch(nodeIp -> ip.equals(nodeIp))) {
- return ip;
- }
- }
- }
- throw new IllegalStateException("Unable to determine local ip");
- }
-
- private void establishSelfIdentity() {
- try {
- IpAddress ip = findLocalIp(getClusterMetadata().getNodes());
- localNode = getClusterMetadata().getNodes()
- .stream()
- .filter(node -> node.ip().equals(ip))
- .findFirst()
- .get();
- } catch (SocketException e) {
- throw new IllegalStateException("Cannot determine local IP", e);
- }
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
deleted file mode 100644
index 56d369fd..00000000
--- a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.cluster.impl;
-
-import com.codahale.metrics.Timer;
-import com.codahale.metrics.Timer.Context;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.metrics.MetricsService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.core.MetricsHelper;
-import org.onosproject.mastership.MastershipAdminService;
-import org.onosproject.mastership.MastershipEvent;
-import org.onosproject.mastership.MastershipListener;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.mastership.MastershipStore;
-import org.onosproject.mastership.MastershipStoreDelegate;
-import org.onosproject.mastership.MastershipTerm;
-import org.onosproject.mastership.MastershipTermService;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.slf4j.Logger;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
-import static org.onlab.metrics.MetricsUtil.startTimer;
-import static org.onlab.metrics.MetricsUtil.stopTimer;
-import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
-import static org.onosproject.net.MastershipRole.MASTER;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.security.AppPermission.Type.*;
-
-
-
-@Component(immediate = true)
-@Service
-public class MastershipManager
- extends AbstractListenerManager<MastershipEvent, MastershipListener>
- implements MastershipService, MastershipAdminService, MastershipTermService,
- MetricsHelper {
-
- private static final String NODE_ID_NULL = "Node ID cannot be null";
- private static final String DEVICE_ID_NULL = "Device ID cannot be null";
- private static final String ROLE_NULL = "Mastership role cannot be null";
-
- private final Logger log = getLogger(getClass());
-
- private final MastershipStoreDelegate delegate = new InternalDelegate();
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipStore store;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MetricsService metricsService;
-
- private NodeId localNodeId;
- private Timer requestRoleTimer;
-
- @Activate
- public void activate() {
- requestRoleTimer = createTimer("Mastership", "requestRole", "responseTime");
- localNodeId = clusterService.getLocalNode().id();
- eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
- store.setDelegate(delegate);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- eventDispatcher.removeSink(MastershipEvent.class);
- store.unsetDelegate(delegate);
- log.info("Stopped");
- }
-
- @Override
- public CompletableFuture<Void> setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
- checkNotNull(nodeId, NODE_ID_NULL);
- checkNotNull(deviceId, DEVICE_ID_NULL);
- checkNotNull(role, ROLE_NULL);
-
- CompletableFuture<MastershipEvent> eventFuture = null;
-
- switch (role) {
- case MASTER:
- eventFuture = store.setMaster(nodeId, deviceId);
- break;
- case STANDBY:
- eventFuture = store.setStandby(nodeId, deviceId);
- break;
- case NONE:
- eventFuture = store.relinquishRole(nodeId, deviceId);
- break;
- default:
- log.info("Unknown role; ignoring");
- return CompletableFuture.completedFuture(null);
- }
-
- return eventFuture.thenAccept(this::post)
- .thenApply(v -> null);
- }
-
- @Override
- public MastershipRole getLocalRole(DeviceId deviceId) {
- checkPermission(CLUSTER_READ);
-
- checkNotNull(deviceId, DEVICE_ID_NULL);
- return store.getRole(clusterService.getLocalNode().id(), deviceId);
- }
-
- @Override
- public CompletableFuture<Void> relinquishMastership(DeviceId deviceId) {
- checkPermission(CLUSTER_WRITE);
- return store.relinquishRole(localNodeId, deviceId)
- .thenAccept(this::post)
- .thenApply(v -> null);
- }
-
- @Override
- public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
- checkPermission(CLUSTER_WRITE);
-
- checkNotNull(deviceId, DEVICE_ID_NULL);
- final Context timer = startTimer(requestRoleTimer);
- return store.requestRole(deviceId).whenComplete((result, error) -> stopTimer(timer));
-
- }
-
- @Override
- public NodeId getMasterFor(DeviceId deviceId) {
- checkPermission(CLUSTER_READ);
-
- checkNotNull(deviceId, DEVICE_ID_NULL);
- return store.getMaster(deviceId);
- }
-
- @Override
- public Set<DeviceId> getDevicesOf(NodeId nodeId) {
- checkPermission(CLUSTER_READ);
-
- checkNotNull(nodeId, NODE_ID_NULL);
- return store.getDevices(nodeId);
- }
-
- @Override
- public RoleInfo getNodesFor(DeviceId deviceId) {
- checkPermission(CLUSTER_READ);
-
- checkNotNull(deviceId, DEVICE_ID_NULL);
- return store.getNodes(deviceId);
- }
-
- @Override
- public MastershipTerm getMastershipTerm(DeviceId deviceId) {
- return store.getTermFor(deviceId);
- }
-
- @Override
- public MetricsService metricsService() {
- return metricsService;
- }
-
- @Override
- public void balanceRoles() {
- List<ControllerNode> nodes = newArrayList(clusterService.getNodes());
- Map<ControllerNode, Set<DeviceId>> controllerDevices = new HashMap<>();
- int deviceCount = 0;
-
- // Create buckets reflecting current ownership.
- for (ControllerNode node : nodes) {
- if (clusterService.getState(node.id()) == ACTIVE) {
- Set<DeviceId> devicesOf = new HashSet<>(getDevicesOf(node.id()));
- deviceCount += devicesOf.size();
- controllerDevices.put(node, devicesOf);
- log.info("Node {} has {} devices.", node.id(), devicesOf.size());
- }
- }
-
- // Now re-balance the buckets until they are roughly even.
- List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
- int rounds = controllerDevices.keySet().size();
- for (int i = 0; i < rounds; i++) {
- // Iterate over the buckets and find the smallest and the largest.
- ControllerNode smallest = findBucket(true, controllerDevices);
- ControllerNode largest = findBucket(false, controllerDevices);
- balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
- }
- CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
- balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
-
- Futures.getUnchecked(balanceRolesFuture);
- }
-
- private ControllerNode findBucket(boolean min,
- Map<ControllerNode, Set<DeviceId>> controllerDevices) {
- int xSize = min ? Integer.MAX_VALUE : -1;
- ControllerNode xNode = null;
- for (ControllerNode node : controllerDevices.keySet()) {
- int size = controllerDevices.get(node).size();
- if ((min && size < xSize) || (!min && size > xSize)) {
- xSize = size;
- xNode = node;
- }
- }
- return xNode;
- }
-
- private CompletableFuture<Void> balanceBuckets(ControllerNode smallest, ControllerNode largest,
- Map<ControllerNode, Set<DeviceId>> controllerDevices,
- int deviceCount) {
- Collection<DeviceId> minBucket = controllerDevices.get(smallest);
- Collection<DeviceId> maxBucket = controllerDevices.get(largest);
- int bucketCount = controllerDevices.keySet().size();
-
- int delta = (maxBucket.size() - minBucket.size()) / 2;
- delta = Math.min(deviceCount / bucketCount, delta);
-
- List<CompletableFuture<Void>> setRoleFutures = Lists.newLinkedList();
-
- if (delta > 0) {
- log.info("Attempting to move {} nodes from {} to {}...", delta,
- largest.id(), smallest.id());
-
- int i = 0;
- Iterator<DeviceId> it = maxBucket.iterator();
- while (it.hasNext() && i < delta) {
- DeviceId deviceId = it.next();
- log.info("Setting {} as the master for {}", smallest.id(), deviceId);
- setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER));
- controllerDevices.get(smallest).add(deviceId);
- it.remove();
- i++;
- }
- }
-
- return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
- }
-
-
- public class InternalDelegate implements MastershipStoreDelegate {
- @Override
- public void notify(MastershipEvent event) {
- post(event);
- }
- }
-
-}
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/package-info.java b/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/package-info.java
deleted file mode 100644
index 653edaa5..00000000
--- a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Subsystem for tracking controller cluster nodes.
- */
-package org.onosproject.cluster.impl;