aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java63
1 files changed, 32 insertions, 31 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index f1e0dbd4..b5ea52e0 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -28,6 +28,7 @@ import org.onlab.util.SlidingWindowCounter;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -37,6 +38,7 @@ import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +83,7 @@ public class EventuallyConsistentMapImpl<K, V>
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
private final NodeId localNodeId;
+ private final PersistenceService persistenceService;
private final BiFunction<K, V, Timestamp> timestampProvider;
@@ -116,7 +119,9 @@ public class EventuallyConsistentMapImpl<K, V>
private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
private final boolean persistent;
- private final PersistentStore<K, V> persistentStore;
+
+ private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
+
/**
* Creates a new eventually consistent map shared amongst multiple instances.
@@ -158,9 +163,32 @@ public class EventuallyConsistentMapImpl<K, V>
long antiEntropyPeriod,
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster,
- boolean persistent) {
+ boolean persistent,
+ PersistenceService persistenceService) {
this.mapName = mapName;
- items = Maps.newConcurrentMap();
+ this.serializer = createSerializer(serializerBuilder);
+ this.persistenceService = persistenceService;
+ this.persistent =
+ persistent;
+ if (persistent) {
+ items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
+ .withName(PERSISTENT_LOCAL_MAP_NAME)
+ .withSerializer(new Serializer() {
+
+ @Override
+ public <T> byte[] encode(T object) {
+ return EventuallyConsistentMapImpl.this.serializer.encode(object);
+ }
+
+ @Override
+ public <T> T decode(byte[] bytes) {
+ return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
+ }
+ })
+ .build();
+ } else {
+ items = Maps.newConcurrentMap();
+ }
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
@@ -168,8 +196,6 @@ public class EventuallyConsistentMapImpl<K, V>
this.clusterCommunicator = clusterCommunicator;
this.localNodeId = clusterService.getLocalNode().id();
- this.serializer = createSerializer(serializerBuilder);
-
this.timestampProvider = timestampProvider;
if (peerUpdateFunction != null) {
@@ -198,20 +224,6 @@ public class EventuallyConsistentMapImpl<K, V>
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
}
- this.persistent = persistent;
-
- if (this.persistent) {
- String dataDirectory = System.getProperty("karaf.data", "./data");
- String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
-
- ExecutorService dbExecutor =
- newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
-
- persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
- persistentStore.readInto(items);
- } else {
- this.persistentStore = null;
- }
if (backgroundExecutor != null) {
this.backgroundExecutor = backgroundExecutor;
@@ -373,15 +385,6 @@ public class EventuallyConsistentMapImpl<K, V>
return existing;
}
});
- if (updated.get()) {
- if (persistent) {
- if (tombstone.isPresent()) {
- persistentStore.update(key, tombstone.get());
- } else {
- persistentStore.remove(key);
- }
- }
- }
return previousValue.get();
}
@@ -455,6 +458,7 @@ public class EventuallyConsistentMapImpl<K, V>
/**
* Returns true if newValue was accepted i.e. map is updated.
+ *
* @param key key
* @param newValue proposed new value
* @return true if update happened; false if map already contains a more recent value for the key
@@ -473,9 +477,6 @@ public class EventuallyConsistentMapImpl<K, V>
}
return existing;
});
- if (updated.get() && persistent) {
- persistentStore.update(key, newValue);
- }
return updated.get();
}