diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java | 49 | ||||
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java | 131 | ||||
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java | 2 | ||||
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java (renamed from framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java) | 79 | ||||
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java | 7 | ||||
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java | 91 |
6 files changed, 282 insertions, 77 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java index c6d300c9..af2bb74d 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java @@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Maps; + import org.onlab.util.HexString; import org.onlab.util.SharedExecutors; import org.onlab.util.Tools; @@ -33,6 +34,7 @@ import org.onosproject.store.service.Versioned; import org.slf4j.Logger; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -92,18 +94,25 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V private static final String ERROR_NULL_KEY = "Key cannot be null"; private static final String ERROR_NULL_VALUE = "Null values are not allowed"; - private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder() + // String representation of serialized byte[] -> original key Object + private final LoadingCache<String, K> keyCache = CacheBuilder.newBuilder() .softValues() - .build(new CacheLoader<K, String>() { + .build(new CacheLoader<String, K>() { @Override - public String load(K key) { - return HexString.toHexString(serializer.encode(key)); + public K load(String key) { + return serializer.decode(HexString.fromHexString(key)); } }); + protected String sK(K key) { + String s = HexString.toHexString(serializer.encode(key)); + keyCache.put(s, key); + return s; + } + protected K dK(String key) { - return serializer.decode(HexString.fromHexString(key)); + return keyCache.getUnchecked(key); } public DefaultAsyncConsistentMap(String name, @@ -207,7 +216,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V public CompletableFuture<Boolean> containsKey(K key) { checkNotNull(key, ERROR_NULL_KEY); final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY); - return database.mapContainsKey(name, keyCache.getUnchecked(key)) + return database.mapContainsKey(name, sK(key)) .whenComplete((r, e) -> timer.stop(e)); } @@ -223,7 +232,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V public CompletableFuture<Versioned<V>> get(K key) { checkNotNull(key, ERROR_NULL_KEY); final MeteringAgent.Context timer = monitor.startTimer(GET); - return database.mapGet(name, keyCache.getUnchecked(key)) + return database.mapGet(name, sK(key)) .whenComplete((r, e) -> timer.stop(e)) .thenApply(v -> v != null ? v.map(serializer::decode) : null); } @@ -328,10 +337,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V public CompletableFuture<Set<K>> keySet() { final MeteringAgent.Context timer = monitor.startTimer(KEY_SET); return database.mapKeySet(name) - .thenApply(s -> s - .stream() - .map(this::dK) - .collect(Collectors.toSet())) + .thenApply(s -> newMappingKeySet(s)) .whenComplete((r, e) -> timer.stop(e)); } @@ -351,10 +357,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET); return database.mapEntrySet(name) .whenComplete((r, e) -> timer.stop(e)) - .thenApply(s -> s - .stream() - .map(this::mapRawEntry) - .collect(Collectors.toSet())); + .thenApply(s -> newMappingEntrySet(s)); } @Override @@ -413,17 +416,31 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V checkIfUnmodifiable(); } + private Set<K> newMappingKeySet(Set<String> s) { + return new MappingSet<>(s, Collections::unmodifiableSet, + this::sK, this::dK); + } + + private Set<Entry<K, Versioned<V>>> newMappingEntrySet(Set<Entry<String, Versioned<byte[]>>> s) { + return new MappingSet<>(s, Collections::unmodifiableSet, + this::reverseMapRawEntry, this::mapRawEntry); + } + private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) { return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode)); } + private Map.Entry<String, Versioned<byte[]>> reverseMapRawEntry(Map.Entry<K, Versioned<V>> e) { + return Maps.immutableEntry(sK(e.getKey()), e.getValue().map(serializer::encode)); + } + private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key, Match<V> oldValueMatch, Match<Long> oldVersionMatch, V value) { beforeUpdate(key); return database.mapUpdate(name, - keyCache.getUnchecked(key), + sK(key), oldValueMatch.map(serializer::encode), oldVersionMatch, value == null ? null : serializer.encode(value)) diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java new file mode 100644 index 00000000..9bf80a73 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java @@ -0,0 +1,131 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.consistent.impl; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.Iterators; + +/** + * Set view backed by Set with element type {@code <BACK>} but returns + * element as {@code <OUT>} for convenience. + * + * @param <BACK> Backing {@link Set} element type. + * MappingSet will follow this type's equality behavior. + * @param <OUT> external facing element type. + * MappingSet will ignores equality defined by this type. + */ +class MappingSet<BACK, OUT> implements Set<OUT> { + + private final Set<BACK> backedSet; + private final Function<OUT, BACK> toBack; + private final Function<BACK, OUT> toOut; + + public MappingSet(Set<BACK> backedSet, + Function<Set<BACK>, Set<BACK>> supplier, + Function<OUT, BACK> toBack, Function<BACK, OUT> toOut) { + this.backedSet = supplier.apply(backedSet); + this.toBack = toBack; + this.toOut = toOut; + } + + @Override + public int size() { + return backedSet.size(); + } + + @Override + public boolean isEmpty() { + return backedSet.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return backedSet.contains(toBack.apply((OUT) o)); + } + + @Override + public Iterator<OUT> iterator() { + return Iterators.transform(backedSet.iterator(), toOut::apply); + } + + @Override + public Object[] toArray() { + return backedSet.stream() + .map(toOut) + .toArray(); + } + + @Override + public <T> T[] toArray(T[] a) { + return backedSet.stream() + .map(toOut) + .toArray(size -> { + if (size < a.length) { + return (T[]) new Object[size]; + } else { + Arrays.fill(a, null); + return a; + } + }); + } + + @Override + public boolean add(OUT e) { + return backedSet.add(toBack.apply(e)); + } + + @Override + public boolean remove(Object o) { + return backedSet.remove(toBack.apply((OUT) o)); + } + + @Override + public boolean containsAll(Collection<?> c) { + return c.stream() + .map(e -> toBack.apply((OUT) e)) + .allMatch(backedSet::contains); + } + + @Override + public boolean addAll(Collection<? extends OUT> c) { + return backedSet.addAll(c.stream().map(toBack).collect(Collectors.toList())); + } + + @Override + public boolean retainAll(Collection<?> c) { + return backedSet.retainAll(c.stream() + .map(x -> toBack.apply((OUT) x)) + .collect(Collectors.toList())); + } + + @Override + public boolean removeAll(Collection<?> c) { + return backedSet.removeAll(c.stream() + .map(x -> toBack.apply((OUT) x)) + .collect(Collectors.toList())); + } + + @Override + public void clear() { + backedSet.clear(); + } +} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java index 83319c3e..cc32a735 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java @@ -350,7 +350,7 @@ public class DistributedGroupStore // Check if a group is existing with the same key Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie()); if (existingGroup != null) { - log.warn("Group already exists with the same key {} in dev:{} with id:{}", + log.warn("Group already exists with the same key {} in dev:{} with id:0x{}", groupDesc.appCookie(), groupDesc.deviceId(), Integer.toHexString(existingGroup.id().id())); return; diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java index 20124576..836a3c22 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java @@ -15,26 +15,8 @@ */ package org.onosproject.store.host.impl; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static org.onosproject.net.DefaultAnnotations.merge; -import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED; -import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED; -import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED; -import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; -import java.util.stream.Collectors; - +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -60,22 +42,34 @@ import org.onosproject.net.host.HostStoreDelegate; import org.onosproject.net.provider.ProviderId; import org.onosproject.store.AbstractStore; import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapEvent; -import org.onosproject.store.service.EventuallyConsistentMapListener; -import org.onosproject.store.service.LogicalClockService; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.MapEvent; +import org.onosproject.store.service.MapEventListener; +import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.slf4j.Logger; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.onosproject.net.DefaultAnnotations.merge; +import static org.onosproject.net.host.HostEvent.Type.*; +import static org.slf4j.LoggerFactory.getLogger; /** * Manages the inventory of hosts using a {@code EventuallyConsistentMap}. */ @Component(immediate = true) @Service -public class ECHostStore +public class DistributedHostStore extends AbstractStore<HostEvent, HostStoreDelegate> implements HostStore { @@ -84,15 +78,13 @@ public class ECHostStore @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected StorageService storageService; - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected LogicalClockService clockService; - - private EventuallyConsistentMap<HostId, DefaultHost> hosts; + private ConsistentMap<HostId, DefaultHost> host; + private Map<HostId, DefaultHost> hosts; private final ConcurrentHashMap<HostId, DefaultHost> prevHosts = new ConcurrentHashMap<>(); - private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker = + private MapEventListener<HostId, DefaultHost> hostLocationTracker = new HostLocationTracker(); @Activate @@ -100,21 +92,22 @@ public class ECHostStore KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder() .register(KryoNamespaces.API); - hosts = storageService.<HostId, DefaultHost>eventuallyConsistentMapBuilder() + host = storageService.<HostId, DefaultHost>consistentMapBuilder() .withName("onos-hosts") - .withSerializer(hostSerializer) - .withTimestampProvider((k, v) -> clockService.getTimestamp()) + .withRelaxedReadConsistency() + .withSerializer(Serializer.using(hostSerializer.build())) .build(); - hosts.addListener(hostLocationTracker); + hosts = host.asJavaMap(); + + host.addListener(hostLocationTracker); log.info("Started"); } @Deactivate public void deactivate() { - hosts.removeListener(hostLocationTracker); - hosts.destroy(); + host.removeListener(hostLocationTracker); prevHosts.clear(); log.info("Stopped"); @@ -249,11 +242,11 @@ public class ECHostStore return collection.stream().filter(predicate).collect(Collectors.toSet()); } - private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> { + private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> { @Override - public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) { - DefaultHost host = checkNotNull(event.value()); - if (event.type() == PUT) { + public void event(MapEvent<HostId, DefaultHost> event) { + DefaultHost host = checkNotNull(event.value().value()); + if (event.type() == MapEvent.Type.INSERT) { Host prevHost = prevHosts.put(host.id(), host); if (prevHost == null) { notifyDelegate(new HostEvent(HOST_ADDED, host)); @@ -262,7 +255,7 @@ public class ECHostStore } else if (!Objects.equals(prevHost, host)) { notifyDelegate(new HostEvent(HOST_UPDATED, host, prevHost)); } - } else if (event.type() == REMOVE) { + } else if (event.type() == MapEvent.Type.REMOVE) { if (prevHosts.remove(host.id()) != null) { notifyDelegate(new HostEvent(HOST_REMOVED, host)); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java index 851185b5..4d7e7f33 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java @@ -113,7 +113,7 @@ public class DistributedProxyArpStore implements ProxyArpStore { @Override public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) { - NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId()); + /*NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId()); if (nodeId.equals(localNodeId)) { if (delegate != null) { delegate.emitResponse(outPort, packet); @@ -122,7 +122,10 @@ public class DistributedProxyArpStore implements ProxyArpStore { log.info("Forwarding ARP response from {} to {}", subject.id(), outPort); commService.unicast(new ArpResponseMessage(outPort, subject, packet.array()), ARP_RESPONSE_MESSAGE, serializer::encode, nodeId); - } + }*/ + //FIXME: Code above may be unnecessary and therefore cluster messaging + // and pendingMessages could be pruned as well. + delegate.emitResponse(outPort, packet); } @Override diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java index da4e3cc4..2641635d 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java @@ -15,44 +15,43 @@ */ package org.onosproject.store.topology.impl; -import static com.google.common.base.Preconditions.checkArgument; -import static org.onlab.util.Tools.isNullOrEmpty; -import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Collections; -import java.util.Map; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Modified; +import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.onlab.graph.GraphPathSearch; import org.onlab.util.KryoNamespace; +import org.onosproject.cfg.ComponentConfigService; import org.onosproject.common.DefaultTopology; import org.onosproject.event.Event; import org.onosproject.mastership.MastershipService; import org.onosproject.net.ConnectPoint; import org.onosproject.net.Device; import org.onosproject.net.DeviceId; +import org.onosproject.net.DisjointPath; import org.onosproject.net.Link; import org.onosproject.net.Path; -import org.onosproject.net.DisjointPath; +import org.onosproject.net.device.DeviceService; import org.onosproject.net.provider.ProviderId; import org.onosproject.net.topology.ClusterId; import org.onosproject.net.topology.DefaultGraphDescription; +import org.onosproject.net.topology.GeoDistanceLinkWeight; import org.onosproject.net.topology.GraphDescription; import org.onosproject.net.topology.LinkWeight; +import org.onosproject.net.topology.MetricLinkWeight; +import org.onosproject.net.topology.PathAdminService; import org.onosproject.net.topology.Topology; import org.onosproject.net.topology.TopologyCluster; +import org.onosproject.net.topology.TopologyEdge; import org.onosproject.net.topology.TopologyEvent; import org.onosproject.net.topology.TopologyGraph; import org.onosproject.net.topology.TopologyStore; import org.onosproject.net.topology.TopologyStoreDelegate; +import org.onosproject.net.topology.TopologyVertex; import org.onosproject.store.AbstractStore; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.service.EventuallyConsistentMap; @@ -60,8 +59,23 @@ import org.onosproject.store.service.EventuallyConsistentMapEvent; import org.onosproject.store.service.EventuallyConsistentMapListener; import org.onosproject.store.service.LogicalClockService; import org.onosproject.store.service.StorageService; +import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; +import java.util.Collections; +import java.util.Dictionary; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.onlab.util.Tools.get; +import static org.onlab.util.Tools.isNullOrEmpty; +import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED; +import static org.slf4j.LoggerFactory.getLogger; + /** * Manages inventory of topology snapshots using trivial in-memory * structures implementation. @@ -73,9 +87,12 @@ import org.slf4j.Logger; @Service public class DistributedTopologyStore extends AbstractStore<TopologyEvent, TopologyStoreDelegate> - implements TopologyStore { + implements TopologyStore, PathAdminService { private final Logger log = getLogger(getClass()); + + private static final String FORMAT = "Settings: linkWeightFunction={}"; + private volatile DefaultTopology current = new DefaultTopology(ProviderId.NONE, new DefaultGraphDescription(0L, System.currentTimeMillis(), @@ -91,6 +108,21 @@ public class DistributedTopologyStore @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected MastershipService mastershipService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ComponentConfigService configService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceService deviceService; + + private static final String HOP_COUNT = "hopCount"; + private static final String LINK_METRIC = "linkMetric"; + private static final String GEO_DISTANCE = "geoDistance"; + + private static final String DEFAULT_LINK_WEIGHT_FUNCTION = "hopCount"; + @Property(name = "linkWeightFunction", value = DEFAULT_LINK_WEIGHT_FUNCTION, + label = "Default link-weight function: hopCount, linkMetric, geoDistance") + private String linkWeightFunction = DEFAULT_LINK_WEIGHT_FUNCTION; + // Cluster root to broadcast points bindings to allow convergence to // a shared broadcast tree; node that is the master of the cluster root // is the primary. @@ -100,7 +132,8 @@ public class DistributedTopologyStore new InternalBroadcastPointListener(); @Activate - public void activate() { + protected void activate() { + configService.registerProperties(getClass()); KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder() .register(KryoNamespaces.API); @@ -114,12 +147,30 @@ public class DistributedTopologyStore } @Deactivate - public void deactivate() { + protected void deactivate() { + configService.unregisterProperties(getClass(), false); broadcastPoints.removeListener(listener); broadcastPoints.destroy(); log.info("Stopped"); } + @Modified + protected void modified(ComponentContext context) { + Dictionary<?, ?> properties = context.getProperties(); + + String newLinkWeightFunction = get(properties, "linkWeightFunction"); + if (newLinkWeightFunction != null && + !Objects.equals(newLinkWeightFunction, linkWeightFunction)) { + linkWeightFunction = newLinkWeightFunction; + LinkWeight weight = linkWeightFunction.equals(LINK_METRIC) ? + new MetricLinkWeight() : + linkWeightFunction.equals(GEO_DISTANCE) ? + new GeoDistanceLinkWeight(deviceService) : null; + setDefaultLinkWeight(weight); + } + log.info(FORMAT, linkWeightFunction); + } + @Override public Topology currentTopology() { return current; @@ -263,6 +314,16 @@ public class DistributedTopologyStore return (DefaultTopology) topology; } + @Override + public void setDefaultLinkWeight(LinkWeight linkWeight) { + DefaultTopology.setDefaultLinkWeight(linkWeight); + } + + @Override + public void setDefaultGraphPathSearch(GraphPathSearch<TopologyVertex, TopologyEdge> graphPathSearch) { + DefaultTopology.setDefaultGraphPathSearch(graphPathSearch); + } + private class InternalBroadcastPointListener implements EventuallyConsistentMapListener<DeviceId, Set<ConnectPoint>> { @Override |