aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java49
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java131
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java2
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java (renamed from framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java)79
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java7
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java91
-rw-r--r--framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java369
-rw-r--r--framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java8
-rw-r--r--framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java (renamed from framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java)18
-rw-r--r--framework/src/onos/core/store/persistence/src/main/java/org/onosproject/persistence/impl/PersistenceManager.java1
-rw-r--r--framework/src/onos/core/store/serializers/src/test/java/org/onosproject/store/serializers/KryoSerializerTest.java4
11 files changed, 660 insertions, 99 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index c6d300c9..af2bb74d 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
+
import org.onlab.util.HexString;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
@@ -33,6 +34,7 @@ import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -92,18 +94,25 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
- private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
+ // String representation of serialized byte[] -> original key Object
+ private final LoadingCache<String, K> keyCache = CacheBuilder.newBuilder()
.softValues()
- .build(new CacheLoader<K, String>() {
+ .build(new CacheLoader<String, K>() {
@Override
- public String load(K key) {
- return HexString.toHexString(serializer.encode(key));
+ public K load(String key) {
+ return serializer.decode(HexString.fromHexString(key));
}
});
+ protected String sK(K key) {
+ String s = HexString.toHexString(serializer.encode(key));
+ keyCache.put(s, key);
+ return s;
+ }
+
protected K dK(String key) {
- return serializer.decode(HexString.fromHexString(key));
+ return keyCache.getUnchecked(key);
}
public DefaultAsyncConsistentMap(String name,
@@ -207,7 +216,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
public CompletableFuture<Boolean> containsKey(K key) {
checkNotNull(key, ERROR_NULL_KEY);
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY);
- return database.mapContainsKey(name, keyCache.getUnchecked(key))
+ return database.mapContainsKey(name, sK(key))
.whenComplete((r, e) -> timer.stop(e));
}
@@ -223,7 +232,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
public CompletableFuture<Versioned<V>> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
final MeteringAgent.Context timer = monitor.startTimer(GET);
- return database.mapGet(name, keyCache.getUnchecked(key))
+ return database.mapGet(name, sK(key))
.whenComplete((r, e) -> timer.stop(e))
.thenApply(v -> v != null ? v.map(serializer::decode) : null);
}
@@ -328,10 +337,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
public CompletableFuture<Set<K>> keySet() {
final MeteringAgent.Context timer = monitor.startTimer(KEY_SET);
return database.mapKeySet(name)
- .thenApply(s -> s
- .stream()
- .map(this::dK)
- .collect(Collectors.toSet()))
+ .thenApply(s -> newMappingKeySet(s))
.whenComplete((r, e) -> timer.stop(e));
}
@@ -351,10 +357,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET);
return database.mapEntrySet(name)
.whenComplete((r, e) -> timer.stop(e))
- .thenApply(s -> s
- .stream()
- .map(this::mapRawEntry)
- .collect(Collectors.toSet()));
+ .thenApply(s -> newMappingEntrySet(s));
}
@Override
@@ -413,17 +416,31 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkIfUnmodifiable();
}
+ private Set<K> newMappingKeySet(Set<String> s) {
+ return new MappingSet<>(s, Collections::unmodifiableSet,
+ this::sK, this::dK);
+ }
+
+ private Set<Entry<K, Versioned<V>>> newMappingEntrySet(Set<Entry<String, Versioned<byte[]>>> s) {
+ return new MappingSet<>(s, Collections::unmodifiableSet,
+ this::reverseMapRawEntry, this::mapRawEntry);
+ }
+
private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode));
}
+ private Map.Entry<String, Versioned<byte[]>> reverseMapRawEntry(Map.Entry<K, Versioned<V>> e) {
+ return Maps.immutableEntry(sK(e.getKey()), e.getValue().map(serializer::encode));
+ }
+
private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key,
Match<V> oldValueMatch,
Match<Long> oldVersionMatch,
V value) {
beforeUpdate(key);
return database.mapUpdate(name,
- keyCache.getUnchecked(key),
+ sK(key),
oldValueMatch.map(serializer::encode),
oldVersionMatch,
value == null ? null : serializer.encode(value))
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java
new file mode 100644
index 00000000..9bf80a73
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Set view backed by Set with element type {@code <BACK>} but returns
+ * element as {@code <OUT>} for convenience.
+ *
+ * @param <BACK> Backing {@link Set} element type.
+ * MappingSet will follow this type's equality behavior.
+ * @param <OUT> external facing element type.
+ * MappingSet will ignores equality defined by this type.
+ */
+class MappingSet<BACK, OUT> implements Set<OUT> {
+
+ private final Set<BACK> backedSet;
+ private final Function<OUT, BACK> toBack;
+ private final Function<BACK, OUT> toOut;
+
+ public MappingSet(Set<BACK> backedSet,
+ Function<Set<BACK>, Set<BACK>> supplier,
+ Function<OUT, BACK> toBack, Function<BACK, OUT> toOut) {
+ this.backedSet = supplier.apply(backedSet);
+ this.toBack = toBack;
+ this.toOut = toOut;
+ }
+
+ @Override
+ public int size() {
+ return backedSet.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return backedSet.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return backedSet.contains(toBack.apply((OUT) o));
+ }
+
+ @Override
+ public Iterator<OUT> iterator() {
+ return Iterators.transform(backedSet.iterator(), toOut::apply);
+ }
+
+ @Override
+ public Object[] toArray() {
+ return backedSet.stream()
+ .map(toOut)
+ .toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ return backedSet.stream()
+ .map(toOut)
+ .toArray(size -> {
+ if (size < a.length) {
+ return (T[]) new Object[size];
+ } else {
+ Arrays.fill(a, null);
+ return a;
+ }
+ });
+ }
+
+ @Override
+ public boolean add(OUT e) {
+ return backedSet.add(toBack.apply(e));
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return backedSet.remove(toBack.apply((OUT) o));
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ return c.stream()
+ .map(e -> toBack.apply((OUT) e))
+ .allMatch(backedSet::contains);
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends OUT> c) {
+ return backedSet.addAll(c.stream().map(toBack).collect(Collectors.toList()));
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ return backedSet.retainAll(c.stream()
+ .map(x -> toBack.apply((OUT) x))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ return backedSet.removeAll(c.stream()
+ .map(x -> toBack.apply((OUT) x))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public void clear() {
+ backedSet.clear();
+ }
+}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 83319c3e..cc32a735 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -350,7 +350,7 @@ public class DistributedGroupStore
// Check if a group is existing with the same key
Group existingGroup = getGroup(groupDesc.deviceId(), groupDesc.appCookie());
if (existingGroup != null) {
- log.warn("Group already exists with the same key {} in dev:{} with id:{}",
+ log.warn("Group already exists with the same key {} in dev:{} with id:0x{}",
groupDesc.appCookie(), groupDesc.deviceId(),
Integer.toHexString(existingGroup.id().id()));
return;
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index 20124576..836a3c22 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -15,26 +15,8 @@
*/
package org.onosproject.store.host.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.onosproject.net.DefaultAnnotations.merge;
-import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -60,22 +42,34 @@ import org.onosproject.net.host.HostStoreDelegate;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.LogicalClockService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.host.HostEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages the inventory of hosts using a {@code EventuallyConsistentMap}.
*/
@Component(immediate = true)
@Service
-public class ECHostStore
+public class DistributedHostStore
extends AbstractStore<HostEvent, HostStoreDelegate>
implements HostStore {
@@ -84,15 +78,13 @@ public class ECHostStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LogicalClockService clockService;
-
- private EventuallyConsistentMap<HostId, DefaultHost> hosts;
+ private ConsistentMap<HostId, DefaultHost> host;
+ private Map<HostId, DefaultHost> hosts;
private final ConcurrentHashMap<HostId, DefaultHost> prevHosts =
new ConcurrentHashMap<>();
- private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker =
+ private MapEventListener<HostId, DefaultHost> hostLocationTracker =
new HostLocationTracker();
@Activate
@@ -100,21 +92,22 @@ public class ECHostStore
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
- hosts = storageService.<HostId, DefaultHost>eventuallyConsistentMapBuilder()
+ host = storageService.<HostId, DefaultHost>consistentMapBuilder()
.withName("onos-hosts")
- .withSerializer(hostSerializer)
- .withTimestampProvider((k, v) -> clockService.getTimestamp())
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(hostSerializer.build()))
.build();
- hosts.addListener(hostLocationTracker);
+ hosts = host.asJavaMap();
+
+ host.addListener(hostLocationTracker);
log.info("Started");
}
@Deactivate
public void deactivate() {
- hosts.removeListener(hostLocationTracker);
- hosts.destroy();
+ host.removeListener(hostLocationTracker);
prevHosts.clear();
log.info("Stopped");
@@ -249,11 +242,11 @@ public class ECHostStore
return collection.stream().filter(predicate).collect(Collectors.toSet());
}
- private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> {
+ private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> {
@Override
- public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
- DefaultHost host = checkNotNull(event.value());
- if (event.type() == PUT) {
+ public void event(MapEvent<HostId, DefaultHost> event) {
+ DefaultHost host = checkNotNull(event.value().value());
+ if (event.type() == MapEvent.Type.INSERT) {
Host prevHost = prevHosts.put(host.id(), host);
if (prevHost == null) {
notifyDelegate(new HostEvent(HOST_ADDED, host));
@@ -262,7 +255,7 @@ public class ECHostStore
} else if (!Objects.equals(prevHost, host)) {
notifyDelegate(new HostEvent(HOST_UPDATED, host, prevHost));
}
- } else if (event.type() == REMOVE) {
+ } else if (event.type() == MapEvent.Type.REMOVE) {
if (prevHosts.remove(host.id()) != null) {
notifyDelegate(new HostEvent(HOST_REMOVED, host));
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
index 851185b5..4d7e7f33 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/proxyarp/impl/DistributedProxyArpStore.java
@@ -113,7 +113,7 @@ public class DistributedProxyArpStore implements ProxyArpStore {
@Override
public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) {
- NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
+ /*NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
if (nodeId.equals(localNodeId)) {
if (delegate != null) {
delegate.emitResponse(outPort, packet);
@@ -122,7 +122,10 @@ public class DistributedProxyArpStore implements ProxyArpStore {
log.info("Forwarding ARP response from {} to {}", subject.id(), outPort);
commService.unicast(new ArpResponseMessage(outPort, subject, packet.array()),
ARP_RESPONSE_MESSAGE, serializer::encode, nodeId);
- }
+ }*/
+ //FIXME: Code above may be unnecessary and therefore cluster messaging
+ // and pendingMessages could be pruned as well.
+ delegate.emitResponse(outPort, packet);
}
@Override
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java
index da4e3cc4..2641635d 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/topology/impl/DistributedTopologyStore.java
@@ -15,44 +15,43 @@
*/
package org.onosproject.store.topology.impl;
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.isNullOrEmpty;
-import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.graph.GraphPathSearch;
import org.onlab.util.KryoNamespace;
+import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.common.DefaultTopology;
import org.onosproject.event.Event;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.DisjointPath;
import org.onosproject.net.Link;
import org.onosproject.net.Path;
-import org.onosproject.net.DisjointPath;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.topology.ClusterId;
import org.onosproject.net.topology.DefaultGraphDescription;
+import org.onosproject.net.topology.GeoDistanceLinkWeight;
import org.onosproject.net.topology.GraphDescription;
import org.onosproject.net.topology.LinkWeight;
+import org.onosproject.net.topology.MetricLinkWeight;
+import org.onosproject.net.topology.PathAdminService;
import org.onosproject.net.topology.Topology;
import org.onosproject.net.topology.TopologyCluster;
+import org.onosproject.net.topology.TopologyEdge;
import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.net.topology.TopologyGraph;
import org.onosproject.net.topology.TopologyStore;
import org.onosproject.net.topology.TopologyStoreDelegate;
+import org.onosproject.net.topology.TopologyVertex;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
@@ -60,8 +59,23 @@ import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.isNullOrEmpty;
+import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* Manages inventory of topology snapshots using trivial in-memory
* structures implementation.
@@ -73,9 +87,12 @@ import org.slf4j.Logger;
@Service
public class DistributedTopologyStore
extends AbstractStore<TopologyEvent, TopologyStoreDelegate>
- implements TopologyStore {
+ implements TopologyStore, PathAdminService {
private final Logger log = getLogger(getClass());
+
+ private static final String FORMAT = "Settings: linkWeightFunction={}";
+
private volatile DefaultTopology current =
new DefaultTopology(ProviderId.NONE,
new DefaultGraphDescription(0L, System.currentTimeMillis(),
@@ -91,6 +108,21 @@ public class DistributedTopologyStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService configService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ private static final String HOP_COUNT = "hopCount";
+ private static final String LINK_METRIC = "linkMetric";
+ private static final String GEO_DISTANCE = "geoDistance";
+
+ private static final String DEFAULT_LINK_WEIGHT_FUNCTION = "hopCount";
+ @Property(name = "linkWeightFunction", value = DEFAULT_LINK_WEIGHT_FUNCTION,
+ label = "Default link-weight function: hopCount, linkMetric, geoDistance")
+ private String linkWeightFunction = DEFAULT_LINK_WEIGHT_FUNCTION;
+
// Cluster root to broadcast points bindings to allow convergence to
// a shared broadcast tree; node that is the master of the cluster root
// is the primary.
@@ -100,7 +132,8 @@ public class DistributedTopologyStore
new InternalBroadcastPointListener();
@Activate
- public void activate() {
+ protected void activate() {
+ configService.registerProperties(getClass());
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
@@ -114,12 +147,30 @@ public class DistributedTopologyStore
}
@Deactivate
- public void deactivate() {
+ protected void deactivate() {
+ configService.unregisterProperties(getClass(), false);
broadcastPoints.removeListener(listener);
broadcastPoints.destroy();
log.info("Stopped");
}
+ @Modified
+ protected void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ String newLinkWeightFunction = get(properties, "linkWeightFunction");
+ if (newLinkWeightFunction != null &&
+ !Objects.equals(newLinkWeightFunction, linkWeightFunction)) {
+ linkWeightFunction = newLinkWeightFunction;
+ LinkWeight weight = linkWeightFunction.equals(LINK_METRIC) ?
+ new MetricLinkWeight() :
+ linkWeightFunction.equals(GEO_DISTANCE) ?
+ new GeoDistanceLinkWeight(deviceService) : null;
+ setDefaultLinkWeight(weight);
+ }
+ log.info(FORMAT, linkWeightFunction);
+ }
+
@Override
public Topology currentTopology() {
return current;
@@ -263,6 +314,16 @@ public class DistributedTopologyStore
return (DefaultTopology) topology;
}
+ @Override
+ public void setDefaultLinkWeight(LinkWeight linkWeight) {
+ DefaultTopology.setDefaultLinkWeight(linkWeight);
+ }
+
+ @Override
+ public void setDefaultGraphPathSearch(GraphPathSearch<TopologyVertex, TopologyEdge> graphPathSearch) {
+ DefaultTopology.setDefaultGraphPathSearch(graphPathSearch);
+ }
+
private class InternalBroadcastPointListener
implements EventuallyConsistentMapListener<DeviceId, Set<ConnectPoint>> {
@Override
diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java
new file mode 100644
index 00000000..3f6402c5
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMapTest.java
@@ -0,0 +1,369 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static java.util.Collections.unmodifiableCollection;
+import static java.util.Collections.unmodifiableSet;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.*;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
+import net.kuujo.copycat.resource.ResourceState;
+
+/**
+ *
+ */
+public class DefaultAsyncConsistentMapTest {
+
+ private static final ApplicationId APP_ID = new DefaultApplicationId(42, "what");
+
+ private static final TestData KEY1A = new TestData("One", "a");
+ private static final TestData KEY1B = new TestData("One", "b");
+
+ private static final TestData VALUE2A = new TestData("Two", "a");
+ private static final TestData VALUE2B = new TestData("Two", "b");
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testKeySet() throws Exception {
+ DefaultAsyncConsistentMap<TestData, TestData> map;
+ String name = "map_name";
+ Database database = new TestDatabase();
+ Serializer serializer = Serializer.forTypes(TestData.class);
+
+ map = new DefaultAsyncConsistentMap<>(name, APP_ID, database, serializer,
+ false, false, false);
+ map.put(KEY1A, VALUE2A);
+ map.put(KEY1B, VALUE2A);
+
+ Set<TestData> set = map.keySet().get();
+ assertEquals("Should contain 2 keys",
+ 2, set.size());
+ assertThat(set.contains(KEY1A), is(true));
+ assertThat(set.contains(KEY1B), is(true));
+ assertThat(set.contains(new TestData("One", "a")), is(true));
+ }
+
+ @Test
+ public void testEntrySet() throws Exception {
+ DefaultAsyncConsistentMap<TestData, TestData> map;
+ String name = "map_name";
+ Database database = new TestDatabase();
+ Serializer serializer = Serializer.forTypes(TestData.class);
+
+ map = new DefaultAsyncConsistentMap<>(name, APP_ID, database, serializer,
+ false, false, false);
+ map.put(KEY1A, VALUE2A);
+ map.put(KEY1B, VALUE2A);
+
+ assertEquals("Should contain 2 entry",
+ 2,
+ map.entrySet().get().size());
+ }
+
+ /**
+ * Object to be used as a test data.
+ *
+ * {@link Object#equals(Object)} use only part of it's fields.
+ *
+ * As a result there can be 2 instances which the
+ * serialized bytes are not-equal but
+ * {@link Object#equals(Object)}-wise they are equal.
+ */
+ public static class TestData {
+
+ private final String theKey;
+
+ @SuppressWarnings("unused")
+ private final String notUsedForEquals;
+
+ public TestData(String theKey, String notUsedForEquals) {
+ this.theKey = theKey;
+ this.notUsedForEquals = notUsedForEquals;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(theKey);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TestData) {
+ TestData that = (TestData) obj;
+ return Objects.equals(this.theKey, that.theKey);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("theKey", theKey)
+ .add("notUsedForEquals", notUsedForEquals)
+ .toString();
+ }
+ }
+
+ /**
+ * {@link Database} implementation for testing.
+ *
+ * There is only 1 backing Map, {@code mapName} will be ignored.
+ */
+ public class TestDatabase implements Database {
+
+ Map<String, Versioned<byte[]>> map = new ConcurrentHashMap<>();
+
+ @Override
+ public CompletableFuture<Set<String>> maps() {
+ return CompletableFuture.completedFuture(ImmutableSet.of());
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Long>> counters() {
+ return CompletableFuture.completedFuture(ImmutableMap.of());
+ }
+
+ @Override
+ public CompletableFuture<Integer> mapSize(String mapName) {
+ return CompletableFuture.completedFuture(map.size());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
+ return CompletableFuture.completedFuture(map.isEmpty());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> mapContainsKey(String mapName,
+ String key) {
+ return CompletableFuture.completedFuture(map.containsKey(key));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> mapContainsValue(String mapName,
+ byte[] value) {
+ return CompletableFuture.completedFuture(map.containsValue(value));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> mapGet(String mapName,
+ String key) {
+ return CompletableFuture.completedFuture(map.get(key));
+ }
+
+ @Override
+ public synchronized CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(String mapName,
+ String key,
+ Match<byte[]> valueMatch,
+ Match<Long> versionMatch,
+ byte[] value) {
+
+ boolean updated = false;
+ final Versioned<byte[]> oldValue;
+ final Versioned<byte[]> newValue;
+
+ Versioned<byte[]> old = map.getOrDefault(key, new Versioned<byte[]>(null, 0));
+ if (valueMatch.matches(old.value()) && versionMatch.matches(old.version())) {
+ updated = true;
+ oldValue = old;
+ newValue = new Versioned<>(value, old.version() + 1);
+ map.put(key, newValue);
+ } else {
+ updated = false;
+ oldValue = old;
+ newValue = old;
+ }
+ return CompletableFuture.completedFuture(
+ Result.ok(new UpdateResult<String, byte[]>(updated,
+ mapName, key, oldValue, newValue)));
+ }
+
+ @Override
+ public CompletableFuture<Result<Void>> mapClear(String mapName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> mapKeySet(String mapName) {
+ return CompletableFuture.completedFuture(unmodifiableSet(map.keySet()));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
+ return CompletableFuture.completedFuture(unmodifiableCollection(map.values()));
+ }
+
+ @Override
+ public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
+ return CompletableFuture.completedFuture(unmodifiableSet(map.entrySet()));
+ }
+
+ @Override
+ public CompletableFuture<Long> counterAddAndGet(String counterName,
+ long delta) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Long> counterGetAndAdd(String counterName,
+ long delta) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> counterSet(String counterName,
+ long value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> counterCompareAndSet(String counterName,
+ long expectedValue,
+ long update) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Long> counterGet(String counterName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Long> queueSize(String queueName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> queuePush(String queueName,
+ byte[] entry) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<byte[]> queuePop(String queueName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<byte[]> queuePeek(String queueName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(Transaction transaction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<CommitResponse> commit(Transaction transaction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> rollback(Transaction transaction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String name() {
+ return "name";
+ }
+
+ @Override
+ public ResourceState state() {
+ return ResourceState.HEALTHY;
+ }
+
+ @Override
+ public Cluster cluster() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Database addStartupTask(Task<CompletableFuture<Void>> task) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Database> open() {
+ return CompletableFuture.completedFuture(this);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
+ }
+
+ @Override
+ public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
+ }
+ }
+
+}
diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index ef8d9924..b74aa370 100644
--- a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -42,12 +42,13 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractEvent;
-import org.onosproject.persistence.impl.PersistenceManager;
+import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.LogicalTimestamp;
+import org.onosproject.store.persistence.TestPersistenceService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
@@ -82,7 +83,7 @@ public class EventuallyConsistentMapImplTest {
private EventuallyConsistentMap<String, String> ecMap;
- private PersistenceManager persistenceService;
+ private PersistenceService persistenceService;
private ClusterService clusterService;
private ClusterCommunicationService clusterCommunicator;
private SequentialClockService<String, String> clockService;
@@ -138,8 +139,7 @@ public class EventuallyConsistentMapImplTest {
clusterCommunicator = createMock(ClusterCommunicationService.class);
- persistenceService = new PersistenceManager();
- persistenceService.activate();
+ persistenceService = new TestPersistenceService();
// Add expectation for adding cluster message subscribers which
// delegate to our ClusterCommunicationService implementation. This
// allows us to get a reference to the map's internal cluster message
diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java
index a7077a81..0732126d 100644
--- a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/ECHostStoreTest.java
+++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java
@@ -27,8 +27,6 @@ import org.onosproject.net.HostLocation;
import org.onosproject.net.host.DefaultHostDescription;
import org.onosproject.net.host.HostDescription;
import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.TestStorageService;
import java.util.HashSet;
@@ -37,9 +35,9 @@ import java.util.Set;
/**
* Tests for the ECHostStore.
*/
-public class ECHostStoreTest extends TestCase {
+public class DistributedHostStoreTest extends TestCase {
- private ECHostStore ecXHostStore;
+ private DistributedHostStore ecXHostStore;
private static final HostId HOSTID = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1a"));
@@ -50,10 +48,9 @@ public class ECHostStoreTest extends TestCase {
@Before
public void setUp() {
- ecXHostStore = new ECHostStore();
+ ecXHostStore = new DistributedHostStore();
ecXHostStore.storageService = new TestStorageService();
- ecXHostStore.clockService = new TestLogicalClockService();
ecXHostStore.activate();
}
@@ -83,13 +80,4 @@ public class ECHostStoreTest extends TestCase {
assertTrue(host.ipAddresses().contains(IP2));
}
- /**
- * Mocks the LogicalClockService class.
- */
- class TestLogicalClockService implements LogicalClockService {
- @Override
- public Timestamp getTimestamp() {
- return null;
- }
- }
} \ No newline at end of file
diff --git a/framework/src/onos/core/store/persistence/src/main/java/org/onosproject/persistence/impl/PersistenceManager.java b/framework/src/onos/core/store/persistence/src/main/java/org/onosproject/persistence/impl/PersistenceManager.java
index 3428bce1..05c577c0 100644
--- a/framework/src/onos/core/store/persistence/src/main/java/org/onosproject/persistence/impl/PersistenceManager.java
+++ b/framework/src/onos/core/store/persistence/src/main/java/org/onosproject/persistence/impl/PersistenceManager.java
@@ -47,7 +47,6 @@ import static org.slf4j.LoggerFactory.getLogger;
public class PersistenceManager implements PersistenceService {
private static final String DATABASE_PATH = "../data/localDB";
-
private static final String ENCLOSING_FOLDER = "../data";
static final String MAP_PREFIX = "map:";
diff --git a/framework/src/onos/core/store/serializers/src/test/java/org/onosproject/store/serializers/KryoSerializerTest.java b/framework/src/onos/core/store/serializers/src/test/java/org/onosproject/store/serializers/KryoSerializerTest.java
index 45b4da1a..7df518e2 100644
--- a/framework/src/onos/core/store/serializers/src/test/java/org/onosproject/store/serializers/KryoSerializerTest.java
+++ b/framework/src/onos/core/store/serializers/src/test/java/org/onosproject/store/serializers/KryoSerializerTest.java
@@ -374,13 +374,13 @@ public class KryoSerializerTest {
@Test
public void testResourcePath() {
- testSerializedEquals(ResourcePath.discrete(LinkKey.linkKey(CP1, CP2), VLAN1));
+ testSerializedEquals(ResourcePath.discrete(DID1, P1, VLAN1));
}
@Test
public void testResourceAllocation() {
testSerializedEquals(new org.onosproject.net.newresource.ResourceAllocation(
- ResourcePath.discrete(LinkKey.linkKey(CP1, CP2), VLAN1),
+ ResourcePath.discrete(DID1, P1, VLAN1),
IntentId.valueOf(30)));
}