aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist')
-rw-r--r--framework/src/onos/core/store/dist/pom.xml7
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java37
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java31
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java20
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java12
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java7
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java10
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java11
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java63
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java19
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java4
-rw-r--r--framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java7
12 files changed, 156 insertions, 72 deletions
diff --git a/framework/src/onos/core/store/dist/pom.xml b/framework/src/onos/core/store/dist/pom.xml
index f2ec2a71..0b8b72bc 100644
--- a/framework/src/onos/core/store/dist/pom.xml
+++ b/framework/src/onos/core/store/dist/pom.xml
@@ -69,6 +69,12 @@
</dependency>
<dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-persistence</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.8</version>
@@ -110,5 +116,4 @@
<artifactId>onlab-thirdparty</artifactId>
</dependency>
</dependencies>
-
</project>
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index dda820ae..fe4aa0be 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -183,21 +183,34 @@ public class GossipApplicationStore extends ApplicationArchive
* they are marked to be active.
*/
private void loadFromDisk() {
- for (String name : getApplicationNames()) {
- for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
- try {
- Application app = create(getApplicationDescription(name), false);
- if (app != null && isActive(app.id().name())) {
- requiredBy.put(app.id(), coreAppId);
- activate(app.id(), false);
- // load app permissions
- }
- } catch (Exception e) {
- log.warn("Unable to load application {} from disk; retrying", name);
- randomDelay(RETRY_DELAY_MS); // FIXME: This is a deliberate hack; fix in Drake
+ getApplicationNames().forEach(appName -> {
+ Application app = loadFromDisk(appName);
+ if (app != null && isActive(app.id().name())) {
+ activate(app.id(), false);
+ // TODO Load app permissions
+ }
+ });
+ }
+
+ private Application loadFromDisk(String appName) {
+ for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
+ try {
+ // Directly return if app already exists
+ ApplicationId appId = getId(appName);
+ if (appId != null) {
+ return getApplication(appId);
}
+
+ ApplicationDescription appDesc = getApplicationDescription(appName);
+ boolean success = appDesc.requiredApps().stream()
+ .noneMatch(requiredApp -> loadFromDisk(requiredApp) == null);
+ return success ? create(appDesc, false) : null;
+ } catch (Exception e) {
+ log.warn("Unable to load application {} from disk; retrying", appName);
+ randomDelay(RETRY_DELAY_MS); //FIXME: This is a deliberate hack; fix in Falcon
}
}
+ return null;
}
@Deactivate
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java
index e4a09cef..3cd992bb 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/StaticClusterMetadataStore.java
@@ -7,6 +7,7 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.Inet4Address;
import java.net.NetworkInterface;
import java.util.Arrays;
import java.util.Collection;
@@ -58,6 +59,7 @@ public class StaticClusterMetadataStore
private static final String ONOS_IP = "ONOS_IP";
private static final String ONOS_INTERFACE = "ONOS_INTERFACE";
+ private static final String ONOS_ALLOW_IPV6 = "ONOS_ALLOW_IPV6";
private static final String DEFAULT_ONOS_INTERFACE = "eth0";
private static final String CLUSTER_METADATA_FILE = "../config/cluster.json";
private static final int DEFAULT_ONOS_PORT = 9876;
@@ -214,13 +216,25 @@ public class StaticClusterMetadataStore
useOnosInterface = DEFAULT_ONOS_INTERFACE;
}
+ // Capture if they want to limit IP address selection to only IPv4 (default).
+ boolean allowIPv6 = (System.getenv(ONOS_ALLOW_IPV6) != null);
+
Function<NetworkInterface, IpAddress> ipLookup = nif -> {
- for (InetAddress address : Collections.list(nif.getInetAddresses())) {
- if (address.isSiteLocalAddress()) {
- return IpAddress.valueOf(address);
+ IpAddress fallback = null;
+
+ // nif can be null if the interface name specified doesn't exist on the node's host
+ if (nif != null) {
+ for (InetAddress address : Collections.list(nif.getInetAddresses())) {
+ if (address.isSiteLocalAddress() && (allowIPv6 || address instanceof Inet4Address)) {
+ return IpAddress.valueOf(address);
+ }
+ if (fallback == null && !address.isLoopbackAddress() && !address.isMulticastAddress()
+ && (allowIPv6 || address instanceof Inet4Address)) {
+ fallback = IpAddress.valueOf(address);
+ }
}
}
- return null;
+ return fallback;
};
try {
IpAddress ip = ipLookup.apply(NetworkInterface.getByName(useOnosInterface));
@@ -228,14 +242,17 @@ public class StaticClusterMetadataStore
return ip.toString();
}
for (NetworkInterface nif : Collections.list(getNetworkInterfaces())) {
- ip = ipLookup.apply(nif);
- if (ip != null) {
- return ip.toString();
+ if (!nif.getName().equals(useOnosInterface)) {
+ ip = ipLookup.apply(nif);
+ if (ip != null) {
+ return ip.toString();
+ }
}
}
} catch (Exception e) {
throw new IllegalStateException("Unable to get network interfaces", e);
}
+
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java
index 3e73d8f4..ca8eea37 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/config/impl/DistributedNetworkConfigStore.java
@@ -60,6 +60,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.onosproject.net.config.NetworkConfigEvent.Type.*;
/**
@@ -71,10 +72,12 @@ public class DistributedNetworkConfigStore
extends AbstractStore<NetworkConfigEvent, NetworkConfigStoreDelegate>
implements NetworkConfigStore {
- private static final int MAX_BACKOFF = 10;
-
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final int MAX_BACKOFF = 10;
+ private static final String INVALID_CONFIG_JSON =
+ "JSON node does not contain valid configuration";
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@@ -187,8 +190,17 @@ public class DistributedNetworkConfigStore
@Override
public <S, C extends Config<S>> C applyConfig(S subject, Class<C> configClass, JsonNode json) {
- return createConfig(subject, configClass,
- configs.putAndGet(key(subject, configClass), json).value());
+ // Create the configuration and validate it.
+ C config = createConfig(subject, configClass, json);
+ checkArgument(config.isValid(), INVALID_CONFIG_JSON);
+
+ // Insert the validated configuration and get it back.
+ Versioned<JsonNode> versioned = configs.putAndGet(key(subject, configClass), json);
+
+ // Re-create the config if for some reason what we attempted to put
+ // was supplanted by someone else already.
+ return versioned.value() == json ? config :
+ createConfig(subject, configClass, versioned.value());
}
@Override
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java
index 7e575b01..92db5b44 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java
@@ -26,8 +26,12 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
/**
- * Extension of DefaultAsyncConsistentMap that provides a weaker read consistency
+ * Extension of {@link DefaultAsyncConsistentMap} that provides a weaker read consistency
* guarantee in return for better read performance.
+ * <p>
+ * For read/write operations that are local to a node this map implementation provides
+ * guarantees similar to a ConsistentMap. However for read/write operations executed
+ * across multiple nodes this implementation only provides eventual consistency.
*
* @param <K> key type
* @param <V> value type
@@ -68,4 +72,10 @@ public class AsyncCachingConsistentMap<K, V> extends DefaultAsyncConsistentMap<K
}
return cache.getUnchecked(key);
}
+
+ @Override
+ protected void beforeUpdate(K key) {
+ super.beforeUpdate(key);
+ cache.invalidate(key);
+ }
} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 3e89635a..90d81ee7 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -55,6 +55,7 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.IdGenerator;
+import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
@@ -128,6 +129,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PersistenceService persistenceService;
+
protected String nodeIdToUri(NodeId nodeId) {
ControllerNode node = clusterService.getNode(nodeId);
return String.format("onos://%s:%d", node.ip(), node.tcpPort());
@@ -312,7 +316,8 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Override
public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
return new EventuallyConsistentMapBuilderImpl<>(clusterService,
- clusterCommunicator);
+ clusterCommunicator,
+ persistenceService);
}
@Override
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 0ea66861..c6d300c9 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -405,6 +405,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
.thenApply(v -> v.updated());
}
+ /**
+ * Pre-update hook for performing required checks/actions before going forward with an update operation.
+ * @param key map key.
+ */
+ protected void beforeUpdate(K key) {
+ checkIfUnmodifiable();
+ }
+
private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode));
}
@@ -413,7 +421,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
Match<V> oldValueMatch,
Match<Long> oldVersionMatch,
V value) {
- checkIfUnmodifiable();
+ beforeUpdate(key);
return database.mapUpdate(name,
keyCache.getUnchecked(key),
oldValueMatch.map(serializer::encode),
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
index a553ffff..eb98c829 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -18,6 +18,7 @@ package org.onosproject.store.ecmap;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.EventuallyConsistentMap;
@@ -52,6 +53,8 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
private boolean convergeFaster = false;
private boolean persistent = false;
+ private boolean persistentMap = false;
+ private final PersistenceService persistenceService;
/**
* Creates a new eventually consistent map builder.
@@ -60,7 +63,9 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
* @param clusterCommunicator cluster communication service
*/
public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator) {
+ ClusterCommunicationService clusterCommunicator,
+ PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
}
@@ -133,6 +138,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
@Override
public EventuallyConsistentMapBuilder<K, V> withPersistence() {
+ checkNotNull(this.persistenceService);
persistent = true;
return this;
}
@@ -156,6 +162,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
antiEntropyPeriod,
antiEntropyTimeUnit,
convergeFaster,
- persistent);
+ persistent,
+ persistenceService);
}
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index f1e0dbd4..b5ea52e0 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -28,6 +28,7 @@ import org.onlab.util.SlidingWindowCounter;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -37,6 +38,7 @@ import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +83,7 @@ public class EventuallyConsistentMapImpl<K, V>
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
private final NodeId localNodeId;
+ private final PersistenceService persistenceService;
private final BiFunction<K, V, Timestamp> timestampProvider;
@@ -116,7 +119,9 @@ public class EventuallyConsistentMapImpl<K, V>
private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
private final boolean persistent;
- private final PersistentStore<K, V> persistentStore;
+
+ private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
+
/**
* Creates a new eventually consistent map shared amongst multiple instances.
@@ -158,9 +163,32 @@ public class EventuallyConsistentMapImpl<K, V>
long antiEntropyPeriod,
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster,
- boolean persistent) {
+ boolean persistent,
+ PersistenceService persistenceService) {
this.mapName = mapName;
- items = Maps.newConcurrentMap();
+ this.serializer = createSerializer(serializerBuilder);
+ this.persistenceService = persistenceService;
+ this.persistent =
+ persistent;
+ if (persistent) {
+ items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
+ .withName(PERSISTENT_LOCAL_MAP_NAME)
+ .withSerializer(new Serializer() {
+
+ @Override
+ public <T> byte[] encode(T object) {
+ return EventuallyConsistentMapImpl.this.serializer.encode(object);
+ }
+
+ @Override
+ public <T> T decode(byte[] bytes) {
+ return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
+ }
+ })
+ .build();
+ } else {
+ items = Maps.newConcurrentMap();
+ }
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
@@ -168,8 +196,6 @@ public class EventuallyConsistentMapImpl<K, V>
this.clusterCommunicator = clusterCommunicator;
this.localNodeId = clusterService.getLocalNode().id();
- this.serializer = createSerializer(serializerBuilder);
-
this.timestampProvider = timestampProvider;
if (peerUpdateFunction != null) {
@@ -198,20 +224,6 @@ public class EventuallyConsistentMapImpl<K, V>
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
}
- this.persistent = persistent;
-
- if (this.persistent) {
- String dataDirectory = System.getProperty("karaf.data", "./data");
- String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
-
- ExecutorService dbExecutor =
- newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
-
- persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
- persistentStore.readInto(items);
- } else {
- this.persistentStore = null;
- }
if (backgroundExecutor != null) {
this.backgroundExecutor = backgroundExecutor;
@@ -373,15 +385,6 @@ public class EventuallyConsistentMapImpl<K, V>
return existing;
}
});
- if (updated.get()) {
- if (persistent) {
- if (tombstone.isPresent()) {
- persistentStore.update(key, tombstone.get());
- } else {
- persistentStore.remove(key);
- }
- }
- }
return previousValue.get();
}
@@ -455,6 +458,7 @@ public class EventuallyConsistentMapImpl<K, V>
/**
* Returns true if newValue was accepted i.e. map is updated.
+ *
* @param key key
* @param newValue proposed new value
* @return true if update happened; false if map already contains a more recent value for the key
@@ -473,9 +477,6 @@ public class EventuallyConsistentMapImpl<K, V>
}
return existing;
});
- if (updated.get() && persistent) {
- persistentStore.update(key, newValue);
- }
return updated.get();
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
index 391a88f7..20124576 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
@@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
+import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
@@ -88,7 +89,7 @@ public class ECHostStore
private EventuallyConsistentMap<HostId, DefaultHost> hosts;
- private final ConcurrentHashMap<HostId, HostLocation> locations =
+ private final ConcurrentHashMap<HostId, DefaultHost> prevHosts =
new ConcurrentHashMap<>();
private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker =
@@ -114,7 +115,7 @@ public class ECHostStore
public void deactivate() {
hosts.removeListener(hostLocationTracker);
hosts.destroy();
- locations.clear();
+ prevHosts.clear();
log.info("Stopped");
}
@@ -253,16 +254,16 @@ public class ECHostStore
public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
DefaultHost host = checkNotNull(event.value());
if (event.type() == PUT) {
- HostLocation prevLocation = locations.put(host.id(), host.location());
- if (prevLocation == null) {
+ Host prevHost = prevHosts.put(host.id(), host);
+ if (prevHost == null) {
notifyDelegate(new HostEvent(HOST_ADDED, host));
- } else if (!Objects.equals(prevLocation, host.location())) {
- notifyDelegate(new HostEvent(host, prevLocation));
- } else {
- notifyDelegate(new HostEvent(HOST_UPDATED, host));
+ } else if (!Objects.equals(prevHost.location(), host.location())) {
+ notifyDelegate(new HostEvent(HOST_MOVED, host, prevHost));
+ } else if (!Objects.equals(prevHost, host)) {
+ notifyDelegate(new HostEvent(HOST_UPDATED, host, prevHost));
}
} else if (event.type() == REMOVE) {
- if (locations.remove(host.id()) != null) {
+ if (prevHosts.remove(host.id()) != null) {
notifyDelegate(new HostEvent(HOST_REMOVED, host));
}
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java
index 4d9e3cbf..0335ba5d 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/newresource/impl/ConsistentResourceStore.java
@@ -264,7 +264,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
}
return children.value().stream()
- .filter(x -> x.lastComponent().getClass().equals(cls))
+ .filter(x -> x.last().getClass().equals(cls))
.filter(consumerMap::containsKey)
.collect(Collectors.toList());
}
@@ -344,7 +344,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
*/
private boolean isRegistered(TransactionalMap<ResourcePath, List<ResourcePath>> map, ResourcePath resource) {
// root is always regarded to be registered
- if (resource.isRoot()) {
+ if (!resource.parent().isPresent()) {
return true;
}
diff --git a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index ccf6ee71..ef8d9924 100644
--- a/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/framework/src/onos/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -42,6 +42,7 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractEvent;
+import org.onosproject.persistence.impl.PersistenceManager;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
@@ -81,6 +82,7 @@ public class EventuallyConsistentMapImplTest {
private EventuallyConsistentMap<String, String> ecMap;
+ private PersistenceManager persistenceService;
private ClusterService clusterService;
private ClusterCommunicationService clusterCommunicator;
private SequentialClockService<String, String> clockService;
@@ -136,6 +138,8 @@ public class EventuallyConsistentMapImplTest {
clusterCommunicator = createMock(ClusterCommunicationService.class);
+ persistenceService = new PersistenceManager();
+ persistenceService.activate();
// Add expectation for adding cluster message subscribers which
// delegate to our ClusterCommunicationService implementation. This
// allows us to get a reference to the map's internal cluster message
@@ -153,11 +157,12 @@ public class EventuallyConsistentMapImplTest {
.register(TestTimestamp.class);
ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
- clusterService, clusterCommunicator)
+ clusterService, clusterCommunicator, persistenceService)
.withName(MAP_NAME)
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
.withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
+ .withPersistence()
.build();
// Reset ready for tests to add their own expectations