diff options
author | 2015-11-03 14:08:10 -0800 | |
---|---|---|
committer | 2015-11-03 14:08:10 -0800 | |
commit | 643ee33289bd2cb9e6afbfb09b4ed72d467ba1c2 (patch) | |
tree | c2c376a44a359544fe3d4c45eb0cc0e2ec4a7080 /framework/src/onos/core/net/src/main/java/org/onosproject/cluster | |
parent | 46eeb79b54345bdafb6055b8ee4bad4ce8b01274 (diff) |
This updates ONOS src tree to commit id
03fa5e571cabbd001ddb1598847e1150b11c7333
Change-Id: I13b554026d6f902933e35887d29bd5fdb669c0bd
Signed-off-by: Ashlee Young <ashlee@wildernessvoice.com>
Diffstat (limited to 'framework/src/onos/core/net/src/main/java/org/onosproject/cluster')
2 files changed, 155 insertions, 9 deletions
diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java index 04d1dfdf..7ddac0ce 100644 --- a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java +++ b/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java @@ -25,17 +25,26 @@ import org.apache.karaf.system.SystemService; import org.joda.time.DateTime; import org.onlab.packet.IpAddress; import org.onosproject.cluster.ClusterAdminService; -import org.onosproject.cluster.ClusterDefinitionService; import org.onosproject.cluster.ClusterEvent; import org.onosproject.cluster.ClusterEventListener; +import org.onosproject.cluster.ClusterMetadata; +import org.onosproject.cluster.ClusterMetadataService; import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.ClusterStore; import org.onosproject.cluster.ClusterStoreDelegate; import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.NodeId; +import org.onosproject.cluster.Partition; import org.onosproject.event.AbstractListenerManager; import org.slf4j.Logger; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; @@ -44,8 +53,6 @@ import static org.onosproject.security.AppGuard.checkPermission; import static org.slf4j.LoggerFactory.getLogger; import static org.onosproject.security.AppPermission.Type.*; - - /** * Implementation of the cluster service. */ @@ -61,7 +68,7 @@ public class ClusterManager private ClusterStoreDelegate delegate = new InternalStoreDelegate(); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterDefinitionService clusterDefinitionService; + protected ClusterMetadataService clusterMetadataService; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClusterStore store; @@ -73,8 +80,9 @@ public class ClusterManager public void activate() { store.setDelegate(delegate); eventDispatcher.addSink(ClusterEvent.class, listenerRegistry); - clusterDefinitionService.seedNodes() - .forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort())); + clusterMetadataService.getClusterMetadata() + .getNodes() + .forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort())); log.info("Started"); } @@ -119,11 +127,16 @@ public class ClusterManager } @Override - public void formCluster(Set<ControllerNode> nodes, String ipPrefix) { + public void formCluster(Set<ControllerNode> nodes) { checkNotNull(nodes, "Nodes cannot be null"); checkArgument(!nodes.isEmpty(), "Nodes cannot be empty"); - checkNotNull(ipPrefix, "IP prefix cannot be null"); - clusterDefinitionService.formCluster(nodes, ipPrefix); + + ClusterMetadata metadata = ClusterMetadata.builder() + .withName("default") + .withControllerNodes(nodes) + .withPartitions(buildDefaultPartitions(nodes)) + .build(); + clusterMetadataService.setClusterMetadata(metadata); try { log.warn("Shutting down container for cluster reconfiguration!"); systemService.reboot("now", SystemService.Swipe.NONE); @@ -153,4 +166,21 @@ public class ClusterManager post(event); } } + + private static Collection<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes) { + List<ControllerNode> sorted = new ArrayList<>(nodes); + Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString())); + Collection<Partition> partitions = Lists.newArrayList(); + + int length = nodes.size(); + int count = 3; + for (int i = 0; i < length; i++) { + Set<NodeId> set = new HashSet<>(count); + for (int j = 0; j < count; j++) { + set.add(sorted.get((i + j) % length).id()); + } + partitions.add(new Partition("p" + (i + 1), set)); + } + return partitions; + } } diff --git a/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java b/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java new file mode 100644 index 00000000..a0f7a833 --- /dev/null +++ b/framework/src/onos/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java @@ -0,0 +1,116 @@ +package org.onosproject.cluster.impl; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.slf4j.LoggerFactory.getLogger; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Collection; +import java.util.Enumeration; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.packet.IpAddress; +import org.onosproject.cluster.ClusterMetadata; +import org.onosproject.cluster.ClusterMetadataEvent; +import org.onosproject.cluster.ClusterMetadataEventListener; +import org.onosproject.cluster.ClusterMetadataService; +import org.onosproject.cluster.ClusterMetadataStore; +import org.onosproject.cluster.ClusterMetadataStoreDelegate; +import org.onosproject.cluster.ControllerNode; +import org.onosproject.event.AbstractListenerManager; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +/** + * Implementation of ClusterMetadataService. + */ +@Component(immediate = true) +@Service +public class ClusterMetadataManager + extends AbstractListenerManager<ClusterMetadataEvent, ClusterMetadataEventListener> + implements ClusterMetadataService { + + private ControllerNode localNode; + private final Logger log = getLogger(getClass()); + + private ClusterMetadataStoreDelegate delegate = new InternalStoreDelegate(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterMetadataStore store; + + @Activate + public void activate() { + store.setDelegate(delegate); + eventDispatcher.addSink(ClusterMetadataEvent.class, listenerRegistry); + establishSelfIdentity(); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + store.unsetDelegate(delegate); + eventDispatcher.removeSink(ClusterMetadataEvent.class); + log.info("Stopped"); + } + + @Override + public ClusterMetadata getClusterMetadata() { + return Versioned.valueOrElse(store.getClusterMetadata(), null); + } + + @Override + public ControllerNode getLocalNode() { + return localNode; + } + + @Override + public void setClusterMetadata(ClusterMetadata metadata) { + checkNotNull(metadata, "Cluster metadata cannot be null"); + store.setClusterMetadata(metadata); + } + + // Store delegate to re-post events emitted from the store. + private class InternalStoreDelegate implements ClusterMetadataStoreDelegate { + @Override + public void notify(ClusterMetadataEvent event) { + post(event); + } + } + + private IpAddress findLocalIp(Collection<ControllerNode> controllerNodes) throws SocketException { + Enumeration<NetworkInterface> interfaces = + NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface iface = interfaces.nextElement(); + Enumeration<InetAddress> inetAddresses = iface.getInetAddresses(); + while (inetAddresses.hasMoreElements()) { + IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement()); + if (controllerNodes.stream() + .map(ControllerNode::ip) + .anyMatch(nodeIp -> ip.equals(nodeIp))) { + return ip; + } + } + } + throw new IllegalStateException("Unable to determine local ip"); + } + + private void establishSelfIdentity() { + try { + IpAddress ip = findLocalIp(getClusterMetadata().getNodes()); + localNode = getClusterMetadata().getNodes() + .stream() + .filter(node -> node.ip().equals(ip)) + .findFirst() + .get(); + } catch (SocketException e) { + throw new IllegalStateException("Cannot determine local IP", e); + } + } +}
\ No newline at end of file |