diff options
Diffstat (limited to 'framework/src/onos/core/store/dist')
9 files changed, 658 insertions, 96 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java index c6d300c9..af2bb74d 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java @@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Maps; + import org.onlab.util.HexString; import org.onlab.util.SharedExecutors; import org.onlab.util.Tools; @@ -33,6 +34,7 @@ import org.onosproject.store.service.Versioned; import org.slf4j.Logger; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -92,18 +94,25 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V private static final String ERROR_NULL_KEY = "Key cannot be null"; private static final String ERROR_NULL_VALUE = "Null values are not allowed"; - private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder() + // String representation of serialized byte[] -> original key Object + private final LoadingCache<String, K> keyCache = CacheBuilder.newBuilder() .softValues() - .build(new CacheLoader<K, String>() { + .build(new CacheLoader<String, K>() { @Override - public String load(K key) { - return HexString.toHexString(serializer.encode(key)); + public K load(String key) { + return serializer.decode(HexString.fromHexString(key)); } }); + protected String sK(K key) { + String s = HexString.toHexString(serializer.encode(key)); + keyCache.put(s, key); + return s; + } + protected K dK(String key) { - return serializer.decode(HexString.fromHexString(key)); + return keyCache.getUnchecked(key); } public DefaultAsyncConsistentMap(String name, @@ -207,7 +216,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V public CompletableFuture<Boolean> containsKey(K key) { checkNotNull(key, ERROR_NULL_KEY); final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY); - return database.mapContainsKey(name, keyCache.getUnchecked(key)) + return database.mapContainsKey(name, sK(key)) .whenComplete((r, e) -> timer.stop(e)); } @@ -223,7 +232,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V public CompletableFuture<Versioned<V>> get(K key) { checkNotNull(key, ERROR_NULL_KEY); final MeteringAgent.Context timer = monitor.startTimer(GET); - return database.mapGet(name, keyCache.getUnchecked(key)) + return database.mapGet(name, sK(key)) .whenComplete((r, e) -> timer.stop(e)) .thenApply(v -> v != null ? v.map(serializer::decode) : null); } @@ -328,10 +337,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V public CompletableFuture<Set<K>> keySet() { final MeteringAgent.Context timer = monitor.startTimer(KEY_SET); return database.mapKeySet(name) - .thenApply(s -> s - .stream() - .map(this::dK) - .collect(Collectors.toSet())) + .thenApply(s -> newMappingKeySet(s)) .whenComplete((r, e) -> timer.stop(e)); } @@ -351,10 +357,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET); return database.mapEntrySet(name) .whenComplete((r, e) -> timer.stop(e)) - .thenApply(s -> s - .stream() - .map(this::mapRawEntry) - .collect(Collectors.toSet())); + .thenApply(s -> newMappingEntrySet(s)); } @Override @@ -413,17 +416,31 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V checkIfUnmodifiable(); } + private Set<K> newMappingKeySet(Set<String> s) { + return new MappingSet<>(s, Collections::unmodifiableSet, + this::sK, this::dK); + } + + private Set<Entry<K, Versioned<V>>> newMappingEntrySet(Set<Entry<String, Versioned<byte[]>>> s) { + return new MappingSet<>(s, Collections::unmodifiableSet, + this::reverseMapRawEntry, this::mapRawEntry); + } + private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) { return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode)); } + private Map.Entry<String, Versioned<byte[]>> reverseMapRawEntry(Map.Entry<K, Versioned<V>> e) { + return Maps.immutableEntry(sK(e.getKey()), e.getValue().map(serializer::encode)); + } + private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key, Match<V> oldValueMatch, Match<Long> oldVersionMatch, V value) { beforeUpdate(key); return database.mapUpdate(name, - keyCache.getUnchecked(key), + sK(key), oldValueMatch.map(serializer::encode), oldVersionMatch, value == null ? null : serializer.encode(value)) diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java new file mode 100644 index 00000000..9bf80a73 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java @@ -0,0 +1,131 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.consistent.impl; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.Iterators; + +/** + * Set view backed by Set with element type {@code <BACK>} but returns + * element as {@code <OUT>} for convenience. + * + * @param <BACK> Backing {@link Set} element type. + * MappingSet will follow this type's equality behavior. + * @param <OUT> external facing element type. + * MappingSet will ignores equality defined by this type. + */ +class MappingSet<BACK, OUT> implements Set<OUT> { + + private final Set<BACK> backedSet; + private final Function<OUT, BACK> toBack; + private final Function<BACK, OUT> toOut; + + public MappingSet(Set<BACK> backedSet, + Function<Set<BACK>, Set<BACK>> supplier, + Function<OUT, BACK> toBack, Function<BACK, OUT> toOut) { + this.backedSet = supplier.apply(backedSet); + this.toBack = toBack; + this.toOut = toOut; + } + + @Override + public int size() { + return backedSet.size(); + } + + @Override + public boolean isEmpty() { + return backedSet.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return backedSet.contains(toBack.apply((OUT) o)); + } + + @Override + public Iterator<OUT> iterator() { + return Iterators.transform(backedSet.iterator(), toOut::apply); + } + + @Override + public Object[] toArray() { + return backedSet.stream() + .map(toOut) + .toArray(); + } + + @Override + public <T> T[] toArray(T[] a) { + return backedSet.stream() + .map(toOut) + .toArray(size -> { + if (size < a.length) { + return (T[]) new Object[size]; + } else { + Arrays.fill(a, null); + return a; + } + }); + } + + @Override + public boolean add(OUT e) { + return backedSet.add(toBack.apply(e)); + } + + @Override + public boolean remove(Object o) { + return backedSet.remove(toBack.apply((OUT) o)); + } + + @Override + public boolean containsAll(Collection<?> c) { + return c.stream() + .map(e -> toBack.apply((OUT) e)) + .allMatch(backedSet::contains); + } + + @Override + public boolean addAll(Collection<? extends OUT> c) { + return backedSet.addAll(c.stream().map(toBack).collect(Collectors.toList())); + } + + @Override + public boolean retainAll(Collection<?> c) { + return backedSet.retainAll(c.stream() + .map(x -> toBack.apply((OUT) x)) + .collect(Collectors.toList())); + } + + @Override + public boolean removeAll(Collection<?> c) { + return backedSet.removeAll(c.stream() + .map(x -> toBack.apply((OUT) x)) + .collect(Collectors.toList())); + } + + @Override + public void clear() { + backedSet.clear(); + } +} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java index 83319c3e..cc32a735 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java @@ -350,7 +350,7 @@ public class DistributedGroupStore // Check if a group is existing with the same key Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie()); if (existingGroup != null) { - log.warn("Group already exists with the same key {} in dev:{} with id:{}", + log.warn("Group already exists with the same key {} in dev:{} with id:0x{}", groupDesc.appCookie(), groupDesc.deviceId(), Integer.toHexString(existingGroup.id().id())); return; diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java index 20124576..836a3c22 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java @@ -15,26 +15,8 @@ */ package org.onosproject.store.host.impl; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static org.onosproject.net.DefaultAnnotations.merge; -import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED; -import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED; -import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED; -import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; -import java.util.stream.Collectors; - +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -60,22 +42,34 @@ import org.onosproject.net.host.HostStoreDelegate; import org.onosproject.net.provider.ProviderId; import org.onosproject.store.AbstractStore; import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapEvent; -import org.onosproject.store.service.EventuallyConsistentMapListener; -import org.onosproject.store.service.LogicalClockService; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.MapEvent; +import org.onosproject.store.service.MapEventListener; +import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.slf4j.Logger; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.onosproject.net.DefaultAnnotations.merge; +import static org.onosproject.net.host.HostEvent.Type.*; +import static org.slf4j.LoggerFactory.getLogger; /** * Manages the inventory of hosts using a {@code EventuallyConsistentMap}. */ @Component(immediate = true) @Service -public class ECHostStore +public class DistributedHostStore extends AbstractStore<HostEvent, HostStoreDelegate> implements HostStore { @@ -84,15 +78,13 @@ public class ECHostStore @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected StorageService storageService; - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected LogicalClockService clockService; - - private EventuallyConsistentMap<HostId, DefaultHost> hosts; + private ConsistentMap<HostId, DefaultHost> host; + private Map<HostId, DefaultHost> hosts; private final ConcurrentHashMap<HostId, DefaultHost> prevHosts = new ConcurrentHashMap<>(); - private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker = + private MapEventListener<HostId, DefaultHost> hostLocationTracker = new HostLocationTracker(); @Activate @@ -100,21 +92,22 @@ public class ECHostStore KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder() .register(KryoNamespaces.API); - hosts = storageService.<HostId, DefaultHost>eventuallyConsistentMapBuilder() + host = storageService.<HostId, DefaultHost>consistentMapBuilder() .withName("onos-hosts") - .withSerializer(hostSerializer) - .withTimestampProvider((k, v) -> clockService.getTimestamp()) + .withRelaxedReadConsistency() + .withSerializer(Serializer.using(hostSerializer.build())) .build(); - hosts.addListener(hostLocationTracker); + hosts = host.asJavaMap(); + + host.addListener(hostLocationTracker); log.info("Started"); } @Deactivate public void deactivate() { - hosts.removeListener(hostLocationTracker); - hosts.destroy(); + host.removeListener(hostLocationTracker); prevHosts.clear(); log.info("Stopped"); @@ -249,11 +242,11 @@ public class ECHostStore return collection.stream().filter(predicate).collect(Collectors.toSet()); } - private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> { + private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> { @Override - public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) { - DefaultHost host = checkNotNull(event.value()); - if (event.type() == PUT) { + public void event(MapEvent<HostId, DefaultHost> event) { + DefaultHost host = checkNotNull(event.value().value()); + if (event.type() == MapEvent.Type.INSERT) { Host prevHost = prevHosts.put(host.id(), host); if (prevHost == null) { notifyDelegate(new HostEvent(HOST_ADDED, host)); @@ -262,7 +255,7 @@ public class ECHostStore } else if (!Objects.equals(prevHost, host)) { notifyDelegate(new HostEvent(HOST_UPDATED, host, prevHost)); } - } else if (event.type() == REMOVE) { + } else if (event.type() == MapEvent.Type.REMOVE) { if (prevHosts.remove(host.id()) != null) { notifyDelegate(new HostEvent(HOST_REMOVED, host)); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java index 851185b5..4d7e7f33 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java @@ -113,7 +113,7 @@ public class DistributedProxyArpStore implements ProxyArpStore { @Override public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) { - NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId()); + /*NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId()); if (nodeId.equals(localNodeId)) { if (delegate != null) { delegate.emitResponse(outPort, packet); @@ -122,7 +122,10 @@ public class DistributedProxyArpStore implements ProxyArpStore { log.info("Forwarding ARP response from {} to {}", subject.id(), outPort); commService.unicast(new ArpResponseMessage(outPort, subject, packet.array()), ARP_RESPONSE_MESSAGE, serializer::encode, nodeId); - } + }*/ + //FIXME: Code above may be unnecessary and therefore cluster messaging + // and pendingMessages could be pruned as well. + delegate.emitResponse(outPort, packet); } @Override diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java index da4e3cc4..2641635d 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java @@ -15,44 +15,43 @@ */ package org.onosproject.store.topology.impl; -import static com.google.common.base.Preconditions.checkArgument; -import static org.onlab.util.Tools.isNullOrEmpty; -import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Collections; -import java.util.Map; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Modified; +import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.onlab.graph.GraphPathSearch; import org.onlab.util.KryoNamespace; +import org.onosproject.cfg.ComponentConfigService; import org.onosproject.common.DefaultTopology; import org.onosproject.event.Event; import org.onosproject.mastership.MastershipService; import org.onosproject.net.ConnectPoint; import org.onosproject.net.Device; import org.onosproject.net.DeviceId; +import org.onosproject.net.DisjointPath; import org.onosproject.net.Link; import org.onosproject.net.Path; -import org.onosproject.net.DisjointPath; +import org.onosproject.net.device.DeviceService; import org.onosproject.net.provider.ProviderId; import org.onosproject.net.topology.ClusterId; import org.onosproject.net.topology.DefaultGraphDescription; +import org.onosproject.net.topology.GeoDistanceLinkWeight; import org.onosproject.net.topology.GraphDescription; import org.onosproject.net.topology.LinkWeight; +import org.onosproject.net.topology.MetricLinkWeight; +import org.onosproject.net.topology.PathAdminService; import org.onosproject.net.topology.Topology; import org.onosproject.net.topology.TopologyCluster; +import org.onosproject.net.topology.TopologyEdge; import org.onosproject.net.topology.TopologyEvent; import org.onosproject.net.topology.TopologyGraph; import org.onosproject.net.topology.TopologyStore; import org.onosproject.net.topology.TopologyStoreDelegate; +import org.onosproject.net.topology.TopologyVertex; import org.onosproject.store.AbstractStore; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.service.EventuallyConsistentMap; @@ -60,8 +59,23 @@ import org.onosproject.store.service.EventuallyConsistentMapEvent; import org.onosproject.store.service.EventuallyConsistentMapListener; import org.onosproject.store.service.LogicalClockService; import org.onosproject.store.service.StorageService; +import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; +import java.util.Collections; +import java.util.Dictionary; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.onlab.util.Tools.get; +import static org.onlab.util.Tools.isNullOrEmpty; +import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED; +import static org.slf4j.LoggerFactory.getLogger; + /** * Manages inventory of topology snapshots using trivial in-memory * structures implementation. @@ -73,9 +87,12 @@ import org.slf4j.Logger; @Service public class DistributedTopologyStore extends AbstractStore<TopologyEvent, TopologyStoreDelegate> - implements TopologyStore { + implements TopologyStore, PathAdminService { private final Logger log = getLogger(getClass()); + + private static final String FORMAT = "Settings: linkWeightFunction={}"; + private volatile DefaultTopology current = new DefaultTopology(ProviderId.NONE, new DefaultGraphDescription(0L, System.currentTimeMillis(), @@ -91,6 +108,21 @@ public class DistributedTopologyStore @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected MastershipService mastershipService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ComponentConfigService configService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceService deviceService; + + private static final String HOP_COUNT = "hopCount"; + private static final String LINK_METRIC = "linkMetric"; + private static final String GEO_DISTANCE = "geoDistance"; + + private static final String DEFAULT_LINK_WEIGHT_FUNCTION = "hopCount"; + @Property(name = "linkWeightFunction", value = DEFAULT_LINK_WEIGHT_FUNCTION, + label = "Default link-weight function: hopCount, linkMetric, geoDistance") + private String linkWeightFunction = DEFAULT_LINK_WEIGHT_FUNCTION; + // Cluster root to broadcast points bindings to allow convergence to // a shared broadcast tree; node that is the master of the cluster root // is the primary. @@ -100,7 +132,8 @@ public class DistributedTopologyStore new InternalBroadcastPointListener(); @Activate - public void activate() { + protected void activate() { + configService.registerProperties(getClass()); KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder() .register(KryoNamespaces.API); @@ -114,12 +147,30 @@ public class DistributedTopologyStore } @Deactivate - public void deactivate() { + protected void deactivate() { + configService.unregisterProperties(getClass(), false); broadcastPoints.removeListener(listener); broadcastPoints.destroy(); log.info("Stopped"); } + @Modified + protected void modified(ComponentContext context) { + Dictionary<?, ?> properties = context.getProperties(); + + String newLinkWeightFunction = get(properties, "linkWeightFunction"); + if (newLinkWeightFunction != null && + !Objects.equals(newLinkWeightFunction, linkWeightFunction)) { + linkWeightFunction = newLinkWeightFunction; + LinkWeight weight = linkWeightFunction.equals(LINK_METRIC) ? + new MetricLinkWeight() : + linkWeightFunction.equals(GEO_DISTANCE) ? + new GeoDistanceLinkWeight(deviceService) : null; + setDefaultLinkWeight(weight); + } + log.info(FORMAT, linkWeightFunction); + } + @Override public Topology currentTopology() { return current; @@ -263,6 +314,16 @@ public class DistributedTopologyStore return (DefaultTopology) topology; } + @Override + public void setDefaultLinkWeight(LinkWeight linkWeight) { + DefaultTopology.setDefaultLinkWeight(linkWeight); + } + + @Override + public void setDefaultGraphPathSearch(GraphPathSearch<TopologyVertex, TopologyEdge> graphPathSearch) { + DefaultTopology.setDefaultGraphPathSearch(graphPathSearch); + } + private class InternalBroadcastPointListener implements EventuallyConsistentMapListener<DeviceId, Set<ConnectPoint>> { @Override diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java new file mode 100644 index 00000000..3f6402c5 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java @@ -0,0 +1,369 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.consistent.impl; + +import static java.util.Collections.unmodifiableCollection; +import static java.util.Collections.unmodifiableSet; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.*; + +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onosproject.core.ApplicationId; +import org.onosproject.core.DefaultApplicationId; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import net.kuujo.copycat.Task; +import net.kuujo.copycat.cluster.Cluster; +import net.kuujo.copycat.resource.ResourceState; + +/** + * + */ +public class DefaultAsyncConsistentMapTest { + + private static final ApplicationId APP_ID = new DefaultApplicationId(42, "what"); + + private static final TestData KEY1A = new TestData("One", "a"); + private static final TestData KEY1B = new TestData("One", "b"); + + private static final TestData VALUE2A = new TestData("Two", "a"); + private static final TestData VALUE2B = new TestData("Two", "b"); + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testKeySet() throws Exception { + DefaultAsyncConsistentMap<TestData, TestData> map; + String name = "map_name"; + Database database = new TestDatabase(); + Serializer serializer = Serializer.forTypes(TestData.class); + + map = new DefaultAsyncConsistentMap<>(name, APP_ID, database, serializer, + false, false, false); + map.put(KEY1A, VALUE2A); + map.put(KEY1B, VALUE2A); + + Set<TestData> set = map.keySet().get(); + assertEquals("Should contain 2 keys", + 2, set.size()); + assertThat(set.contains(KEY1A), is(true)); + assertThat(set.contains(KEY1B), is(true)); + assertThat(set.contains(new TestData("One", "a")), is(true)); + } + + @Test + public void testEntrySet() throws Exception { + DefaultAsyncConsistentMap<TestData, TestData> map; + String name = "map_name"; + Database database = new TestDatabase(); + Serializer serializer = Serializer.forTypes(TestData.class); + + map = new DefaultAsyncConsistentMap<>(name, APP_ID, database, serializer, + false, false, false); + map.put(KEY1A, VALUE2A); + map.put(KEY1B, VALUE2A); + + assertEquals("Should contain 2 entry", + 2, + map.entrySet().get().size()); + } + + /** + * Object to be used as a test data. + * + * {@link Object#equals(Object)} use only part of it's fields. + * + * As a result there can be 2 instances which the + * serialized bytes are not-equal but + * {@link Object#equals(Object)}-wise they are equal. + */ + public static class TestData { + + private final String theKey; + + @SuppressWarnings("unused") + private final String notUsedForEquals; + + public TestData(String theKey, String notUsedForEquals) { + this.theKey = theKey; + this.notUsedForEquals = notUsedForEquals; + } + + @Override + public int hashCode() { + return Objects.hashCode(theKey); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TestData) { + TestData that = (TestData) obj; + return Objects.equals(this.theKey, that.theKey); + } + return false; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("theKey", theKey) + .add("notUsedForEquals", notUsedForEquals) + .toString(); + } + } + + /** + * {@link Database} implementation for testing. + * + * There is only 1 backing Map, {@code mapName} will be ignored. + */ + public class TestDatabase implements Database { + + Map<String, Versioned<byte[]>> map = new ConcurrentHashMap<>(); + + @Override + public CompletableFuture<Set<String>> maps() { + return CompletableFuture.completedFuture(ImmutableSet.of()); + } + + @Override + public CompletableFuture<Map<String, Long>> counters() { + return CompletableFuture.completedFuture(ImmutableMap.of()); + } + + @Override + public CompletableFuture<Integer> mapSize(String mapName) { + return CompletableFuture.completedFuture(map.size()); + } + + @Override + public CompletableFuture<Boolean> mapIsEmpty(String mapName) { + return CompletableFuture.completedFuture(map.isEmpty()); + } + + @Override + public CompletableFuture<Boolean> mapContainsKey(String mapName, + String key) { + return CompletableFuture.completedFuture(map.containsKey(key)); + } + + @Override + public CompletableFuture<Boolean> mapContainsValue(String mapName, + byte[] value) { + return CompletableFuture.completedFuture(map.containsValue(value)); + } + + @Override + public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, + String key) { + return CompletableFuture.completedFuture(map.get(key)); + } + + @Override + public synchronized CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(String mapName, + String key, + Match<byte[]> valueMatch, + Match<Long> versionMatch, + byte[] value) { + + boolean updated = false; + final Versioned<byte[]> oldValue; + final Versioned<byte[]> newValue; + + Versioned<byte[]> old = map.getOrDefault(key, new Versioned<byte[]>(null, 0)); + if (valueMatch.matches(old.value()) && versionMatch.matches(old.version())) { + updated = true; + oldValue = old; + newValue = new Versioned<>(value, old.version() + 1); + map.put(key, newValue); + } else { + updated = false; + oldValue = old; + newValue = old; + } + return CompletableFuture.completedFuture( + Result.ok(new UpdateResult<String, byte[]>(updated, + mapName, key, oldValue, newValue))); + } + + @Override + public CompletableFuture<Result<Void>> mapClear(String mapName) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Set<String>> mapKeySet(String mapName) { + return CompletableFuture.completedFuture(unmodifiableSet(map.keySet())); + } + + @Override + public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) { + return CompletableFuture.completedFuture(unmodifiableCollection(map.values())); + } + + @Override + public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) { + return CompletableFuture.completedFuture(unmodifiableSet(map.entrySet())); + } + + @Override + public CompletableFuture<Long> counterAddAndGet(String counterName, + long delta) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Long> counterGetAndAdd(String counterName, + long delta) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Void> counterSet(String counterName, + long value) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Boolean> counterCompareAndSet(String counterName, + long expectedValue, + long update) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Long> counterGet(String counterName) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Long> queueSize(String queueName) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Void> queuePush(String queueName, + byte[] entry) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<byte[]> queuePop(String queueName) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<byte[]> queuePeek(String queueName) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Boolean> prepare(Transaction transaction) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<CommitResponse> commit(Transaction transaction) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Boolean> rollback(Transaction transaction) { + throw new UnsupportedOperationException(); + } + + @Override + public String name() { + return "name"; + } + + @Override + public ResourceState state() { + return ResourceState.HEALTHY; + } + + @Override + public Cluster cluster() { + throw new UnsupportedOperationException(); + } + + @Override + public Database addStartupTask(Task<CompletableFuture<Void>> task) { + throw new UnsupportedOperationException(); + } + + @Override + public Database addShutdownTask(Task<CompletableFuture<Void>> task) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Database> open() { + return CompletableFuture.completedFuture(this); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public CompletableFuture<Void> close() { + return CompletableFuture.completedFuture(null); + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void registerConsumer(Consumer<StateMachineUpdate> consumer) { + } + + @Override + public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) { + } + } + +} diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java index ef8d9924..b74aa370 100644 --- a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java +++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java @@ -42,12 +42,13 @@ import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.DefaultControllerNode; import org.onosproject.cluster.NodeId; import org.onosproject.event.AbstractEvent; -import org.onosproject.persistence.impl.PersistenceManager; +import org.onosproject.persistence.PersistenceService; import org.onosproject.store.Timestamp; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter; import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.impl.LogicalTimestamp; +import org.onosproject.store.persistence.TestPersistenceService; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.service.EventuallyConsistentMap; @@ -82,7 +83,7 @@ public class EventuallyConsistentMapImplTest { private EventuallyConsistentMap<String, String> ecMap; - private PersistenceManager persistenceService; + private PersistenceService persistenceService; private ClusterService clusterService; private ClusterCommunicationService clusterCommunicator; private SequentialClockService<String, String> clockService; @@ -138,8 +139,7 @@ public class EventuallyConsistentMapImplTest { clusterCommunicator = createMock(ClusterCommunicationService.class); - persistenceService = new PersistenceManager(); - persistenceService.activate(); + persistenceService = new TestPersistenceService(); // Add expectation for adding cluster message subscribers which // delegate to our ClusterCommunicationService implementation. This // allows us to get a reference to the map's internal cluster message diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java index a7077a81..0732126d 100644 --- a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java +++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java @@ -27,8 +27,6 @@ import org.onosproject.net.HostLocation; import org.onosproject.net.host.DefaultHostDescription; import org.onosproject.net.host.HostDescription; import org.onosproject.net.provider.ProviderId; -import org.onosproject.store.Timestamp; -import org.onosproject.store.service.LogicalClockService; import org.onosproject.store.service.TestStorageService; import java.util.HashSet; @@ -37,9 +35,9 @@ import java.util.Set; /** * Tests for the ECHostStore. */ -public class ECHostStoreTest extends TestCase { +public class DistributedHostStoreTest extends TestCase { - private ECHostStore ecXHostStore; + private DistributedHostStore ecXHostStore; private static final HostId HOSTID = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1a")); @@ -50,10 +48,9 @@ public class ECHostStoreTest extends TestCase { @Before public void setUp() { - ecXHostStore = new ECHostStore(); + ecXHostStore = new DistributedHostStore(); ecXHostStore.storageService = new TestStorageService(); - ecXHostStore.clockService = new TestLogicalClockService(); ecXHostStore.activate(); } @@ -83,13 +80,4 @@ public class ECHostStoreTest extends TestCase { assertTrue(host.ipAddresses().contains(IP2)); } - /** - * Mocks the LogicalClockService class. - */ - class TestLogicalClockService implements LogicalClockService { - @Override - public Timestamp getTimestamp() { - return null; - } - } }
\ No newline at end of file |