aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java4
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java5
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java55
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java18
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java21
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java10
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java24
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java30
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java315
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java89
10 files changed, 484 insertions, 87 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
index ff3e36ac..52a999a4 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -38,7 +38,7 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa
* options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p>
*
* Additionally, the database will be constructed with an database configuration that searches the classpath for
- * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+ * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
* {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
* as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
* configurations will be loaded according to namespaces as well; for example, `databases.conf`.
@@ -54,7 +54,7 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa
* Creates a new database.<p>
*
* The database will be constructed with an database configuration that searches the classpath for
- * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+ * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
* {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
* as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
* configurations will be loaded according to namespaces as well; for example, `databases.conf`.
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index fbc2c88d..6ea7c220 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -443,7 +443,10 @@ public class DatabaseManager implements StorageService, StorageAdminService {
public void event(ApplicationEvent event) {
if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) {
ApplicationId appId = event.subject().id();
- List<DefaultAsyncConsistentMap> mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
+ List<DefaultAsyncConsistentMap> mapsToRemove;
+ synchronized (mapsByApplication) {
+ mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
+ }
mapsToRemove.forEach(DatabaseManager.this::unregisterMap);
if (event.type() == APP_UNINSTALLED) {
mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear());
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 95f9e39a..1d81f998 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -16,14 +16,14 @@
package org.onosproject.store.consistent.impl;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
/**
* Database proxy.
*/
@@ -45,6 +45,7 @@ public interface DatabaseProxy<K, V> {
/**
* Returns the number of entries in map.
+ *
* @param mapName map name
* @return A completable future to be completed with the result once complete.
*/
@@ -62,7 +63,7 @@ public interface DatabaseProxy<K, V> {
* Checks whether the map contains a key.
*
* @param mapName map name
- * @param key key to check.
+ * @param key key to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> mapContainsKey(String mapName, K key);
@@ -71,7 +72,7 @@ public interface DatabaseProxy<K, V> {
* Checks whether the map contains a value.
*
* @param mapName map name
- * @param value The value to check.
+ * @param value The value to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> mapContainsValue(String mapName, V value);
@@ -80,7 +81,7 @@ public interface DatabaseProxy<K, V> {
* Gets a value from the map.
*
* @param mapName map name
- * @param key The key to get.
+ * @param key The key to get.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> mapGet(String mapName, K key);
@@ -88,11 +89,11 @@ public interface DatabaseProxy<K, V> {
/**
* Updates the map.
*
- * @param mapName map name
- * @param key The key to set
- * @param valueMatch match for checking existing value
- * @param versionMatch match for checking existing version
- * @param value new value
+ * @param mapName map name
+ * @param key The key to set
+ * @param valueMatch match for checking existing value
+ * @param versionMatch match for checking existing version
+ * @param value new value
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate(
@@ -130,11 +131,11 @@ public interface DatabaseProxy<K, V> {
*/
CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName);
- /**
+ /**
* Atomically add the given value to current value of the specified counter.
*
* @param counterName counter name
- * @param delta value to add
+ * @param delta value to add
* @return updated value
*/
CompletableFuture<Long> counterAddAndGet(String counterName, long delta);
@@ -143,11 +144,31 @@ public interface DatabaseProxy<K, V> {
* Atomically add the given value to current value of the specified counter.
*
* @param counterName counter name
- * @param delta value to add
+ * @param delta value to add
* @return previous value
*/
CompletableFuture<Long> counterGetAndAdd(String counterName, long delta);
+
+ /**
+ * Atomically sets the given value to current value of the specified counter.
+ *
+ * @param counterName counter name
+ * @param value value to set
+ * @return void future
+ */
+ CompletableFuture<Void> counterSet(String counterName, long value);
+
+ /**
+ * Atomically sets the given counter to the specified update value if and only if the current value is equal to the
+ * expected value.
+ * @param counterName counter name
+ * @param expectedValue value to use for equivalence check
+ * @param update value to set if expected value is current value
+ * @return true if an update occurred, false otherwise
+ */
+ CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update);
+
/**
* Returns the current value of the specified atomic counter.
*
@@ -158,6 +179,7 @@ public interface DatabaseProxy<K, V> {
/**
* Returns the size of queue.
+ *
* @param queueName queue name
* @return queue size
*/
@@ -165,14 +187,16 @@ public interface DatabaseProxy<K, V> {
/**
* Inserts an entry into the queue.
+ *
* @param queueName queue name
- * @param entry queue entry
+ * @param entry queue entry
* @return void future
*/
CompletableFuture<Void> queuePush(String queueName, byte[] entry);
/**
* Removes an entry from the queue if the queue is non-empty.
+ *
* @param queueName queue name
* @return entry future. Can be completed with null if queue is empty
*/
@@ -180,6 +204,7 @@ public interface DatabaseProxy<K, V> {
/**
* Returns but does not remove an entry from the queue.
+ *
* @param queueName queue name
* @return entry. Can be null if queue is empty
*/
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index b3dd1c44..1136428b 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -16,18 +16,17 @@
package org.onosproject.store.consistent.impl;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
import net.kuujo.copycat.state.Command;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.Query;
import net.kuujo.copycat.state.StateContext;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
* Database state.
@@ -83,6 +82,9 @@ public interface DatabaseState<K, V> {
Long counterAddAndGet(String counterName, long delta);
@Command
+ Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue);
+
+ @Command
Long counterGetAndAdd(String counterName, long delta);
@Query
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index 7a439c34..d851eaa0 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl;
import org.onosproject.store.service.AsyncAtomicCounter;
import java.util.concurrent.CompletableFuture;
+
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -38,6 +39,8 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
private static final String GET_AND_ADD = "getAndAdd";
private static final String ADD_AND_GET = "addAndGet";
private static final String GET = "get";
+ private static final String SET = "set";
+ private static final String COMPARE_AND_SET = "compareAndSet";
public DefaultAsyncAtomicCounter(String name,
Database database,
@@ -72,13 +75,27 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
public CompletableFuture<Long> getAndAdd(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
return database.counterGetAndAdd(name, delta)
- .whenComplete((r, e) -> timer.stop(e));
+ .whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> addAndGet(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
return database.counterAddAndGet(name, delta)
- .whenComplete((r, e) -> timer.stop(e));
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
+ @Override
+ public CompletableFuture<Void> set(long value) {
+ final MeteringAgent.Context timer = monitor.startTimer(SET);
+ return database.counterSet(name, value)
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
+ final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET);
+ return database.counterCompareAndSet(name, expectedValue, updateValue)
+ .whenComplete((r, e) -> timer.stop(e));
}
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
index 64886e41..2d6a956c 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -63,6 +63,16 @@ public class DefaultAtomicCounter implements AtomicCounter {
}
@Override
+ public void set(long value) {
+ complete(asyncCounter.set(value));
+ }
+
+ @Override
+ public boolean compareAndSet(long expectedValue, long updateValue) {
+ return complete(asyncCounter.compareAndSet(expectedValue, updateValue));
+ }
+
+ @Override
public long get() {
return complete(asyncCounter.get());
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 4d9776ee..2a50fbd6 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -16,12 +16,15 @@
package org.onosproject.store.consistent.impl;
-import net.kuujo.copycat.state.StateMachine;
+import com.google.common.collect.Sets;
import net.kuujo.copycat.resource.internal.AbstractResource;
import net.kuujo.copycat.resource.internal.ResourceManager;
+import net.kuujo.copycat.state.StateMachine;
import net.kuujo.copycat.state.internal.DefaultStateMachine;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.function.TriConsumer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.Map;
@@ -30,11 +33,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Sets;
-
/**
* Default database.
*/
@@ -44,7 +42,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
public DefaultDatabase(ResourceManager context) {
super(context);
this.stateMachine = new DefaultStateMachine(context,
@@ -66,7 +64,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
* return the completed future result.
*
* @param supplier The supplier to call if the database is open.
- * @param <T> The future result type.
+ * @param <T> The future result type.
* @return A completable future that if this database is closed is immediately failed.
*/
protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
@@ -153,6 +151,16 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
}
@Override
+ public CompletableFuture<Void> counterSet(String counterName, long value) {
+ return checkOpen(() -> proxy.counterSet(counterName, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
+ return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
+ }
+
+ @Override
public CompletableFuture<Long> queueSize(String queueName) {
return checkOpen(() -> proxy.queueSize(queueName));
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 9a55ffb1..8943fc87 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -16,27 +16,26 @@
package org.onosproject.store.consistent.impl;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.StateContext;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import java.util.Set;
-
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import net.kuujo.copycat.state.Initializer;
-import net.kuujo.copycat.state.StateContext;
/**
* Default database state.
@@ -195,6 +194,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
@Override
+ public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
+ return getCounter(counterName).compareAndSet(expectedValue, updateValue);
+ }
+
+ @Override
public Long counterGet(String counterName) {
return getCounter(counterName).get();
}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java
new file mode 100644
index 00000000..d8593e37
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java
@@ -0,0 +1,315 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode.State;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MutexExecutionService;
+import org.onosproject.store.service.MutexTask;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Implementation of a MutexExecutionService.
+ */
+@Component(immediate = true)
+@Service
+public class MutexExecutionManager implements MutexExecutionService {
+
+ private final Logger log = getLogger(getClass());
+
+ protected ConsistentMap<String, MutexState> lockMap;
+ protected NodeId localNodeId;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener();
+ private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
+
+ private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap();
+ private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap();
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ lockMap = storageService.<String, MutexState>consistentMapBuilder()
+ .withName("onos-mutexes")
+ .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class))
+ .withPartitionsDisabled()
+ .build();
+ lockMap.addListener(mapEventListener);
+ clusterService.addListener(clusterEventListener);
+ releaseOldLocks();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ lockMap.removeListener(mapEventListener);
+ pending.values().forEach(future -> future.cancel(true));
+ activeTasks.forEach((k, v) -> {
+ v.stop();
+ unlock(k);
+ });
+ clusterService.removeListener(clusterEventListener);
+ log.info("Stopped");
+ }
+
+ @Override
+ public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) {
+ return lock(exclusionPath)
+ .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath,
+ k -> new InnerMutexTask(exclusionPath,
+ task,
+ state.term())))
+ .thenAcceptAsync(t -> t.start(), executor)
+ .whenComplete((r, e) -> unlock(exclusionPath));
+ }
+
+ protected CompletableFuture<MutexState> lock(String exclusionPath) {
+ CompletableFuture<MutexState> future =
+ pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>());
+ tryLock(exclusionPath);
+ return future;
+ }
+
+ /**
+ * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to
+ * the wait list.
+ * @param exclusionPath exclusion path
+ */
+ protected void tryLock(String exclusionPath) {
+ Tools.retryable(() -> lockMap.asJavaMap()
+ .compute(exclusionPath,
+ (k, v) -> MutexState.admit(v, localNodeId)),
+ ConsistentMapException.ConcurrentModification.class,
+ Integer.MAX_VALUE,
+ 100).get();
+ }
+
+ /**
+ * Releases lock for the specific path. This operation is idempotent.
+ * @param exclusionPath exclusion path
+ */
+ protected void unlock(String exclusionPath) {
+ Tools.retryable(() -> lockMap.asJavaMap()
+ .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)),
+ ConsistentMapException.ConcurrentModification.class,
+ Integer.MAX_VALUE,
+ 100).get();
+ }
+
+ /**
+ * Detects and releases all locks held by this node.
+ */
+ private void releaseOldLocks() {
+ Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder()))
+ .keySet()
+ .forEach(path -> {
+ log.info("Detected zombie task still holding lock for {}. Releasing lock.", path);
+ unlock(path);
+ });
+ }
+
+ private class InternalLockMapEventListener implements MapEventListener<String, MutexState> {
+
+ @Override
+ public void event(MapEvent<String, MutexState> event) {
+ log.debug("Received {}", event);
+ if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) {
+ pending.computeIfPresent(event.key(), (k, future) -> {
+ MutexState state = Versioned.valueOrElse(event.value(), null);
+ if (state != null && localNodeId.equals(state.holder())) {
+ log.debug("Local node is now owner for {}", event.key());
+ future.complete(state);
+ return null;
+ } else {
+ return future;
+ }
+ });
+ InnerMutexTask task = activeTasks.get(event.key());
+ if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) {
+ task.stop();
+ }
+ }
+ }
+ }
+
+ private class InternalClusterEventListener implements ClusterEventListener {
+
+ @Override
+ public void event(ClusterEvent event) {
+ if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED ||
+ event.type() == ClusterEvent.Type.INSTANCE_REMOVED) {
+ NodeId nodeId = event.subject().id();
+ log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId);
+ lockMap.asJavaMap().forEach((k, v) -> {
+ if (v.contains(nodeId)) {
+ lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId));
+ }
+ });
+ }
+ long activeNodes = clusterService.getNodes()
+ .stream()
+ .map(node -> clusterService.getState(node.id()))
+ .filter(State.ACTIVE::equals)
+ .count();
+ if (clusterService.getNodes().size() > 1 && activeNodes == 1) {
+ log.info("This node is partitioned away from the cluster. Stopping all inflight executions");
+ activeTasks.forEach((k, v) -> {
+ v.stop();
+ });
+ }
+ }
+ }
+
+ private static final class MutexState {
+
+ private final NodeId holder;
+ private final List<NodeId> waitList;
+ private final long term;
+
+ public static MutexState admit(MutexState state, NodeId nodeId) {
+ if (state == null) {
+ return new MutexState(nodeId, 1L, Lists.newArrayList());
+ } else if (state.holder() == null) {
+ return new MutexState(nodeId, state.term() + 1, Lists.newArrayList());
+ } else {
+ if (!state.contains(nodeId)) {
+ NodeId newHolder = state.holder();
+ List<NodeId> newWaitList = Lists.newArrayList(state.waitList());
+ newWaitList.add(nodeId);
+ return new MutexState(newHolder, state.term(), newWaitList);
+ } else {
+ return state;
+ }
+ }
+ }
+
+ public static MutexState evict(MutexState state, NodeId nodeId) {
+ return state.evict(nodeId);
+ }
+
+ public MutexState evict(NodeId nodeId) {
+ if (nodeId.equals(holder)) {
+ if (waitList.isEmpty()) {
+ return new MutexState(null, term, waitList);
+ }
+ List<NodeId> newWaitList = Lists.newArrayList(waitList);
+ NodeId newHolder = newWaitList.remove(0);
+ return new MutexState(newHolder, term + 1, newWaitList);
+ } else {
+ NodeId newHolder = holder;
+ List<NodeId> newWaitList = Lists.newArrayList(waitList);
+ newWaitList.remove(nodeId);
+ return new MutexState(newHolder, term, newWaitList);
+ }
+ }
+
+ public NodeId holder() {
+ return holder;
+ }
+
+ public List<NodeId> waitList() {
+ return waitList;
+ }
+
+ public long term() {
+ return term;
+ }
+
+ private boolean contains(NodeId nodeId) {
+ return (nodeId.equals(holder) || waitList.contains(nodeId));
+ }
+
+ private MutexState(NodeId holder, long term, List<NodeId> waitList) {
+ this.holder = holder;
+ this.term = term;
+ this.waitList = Lists.newArrayList(waitList);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("holder", holder)
+ .add("term", term)
+ .add("waitList", waitList)
+ .toString();
+ }
+ }
+
+ private class InnerMutexTask implements MutexTask {
+ private final MutexTask task;
+ private final String mutexPath;
+ private final long term;
+
+ public InnerMutexTask(String mutexPath, MutexTask task, long term) {
+ this.mutexPath = mutexPath;
+ this.term = term;
+ this.task = task;
+ }
+
+ public long term() {
+ return term;
+ }
+
+ @Override
+ public void start() {
+ log.debug("Starting execution for mutex task guarded by {}", mutexPath);
+ task.start();
+ log.debug("Finished execution for mutex task guarded by {}", mutexPath);
+ }
+
+ @Override
+ public void stop() {
+ log.debug("Stopping execution for mutex task guarded by {}", mutexPath);
+ task.stop();
+ }
+ }
+} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index a294681e..f741b367 100644
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -16,6 +16,17 @@
package org.onosproject.store.consistent.impl;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
+import net.kuujo.copycat.resource.ResourceState;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -28,18 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import net.kuujo.copycat.Task;
-import net.kuujo.copycat.cluster.Cluster;
-import net.kuujo.copycat.resource.ResourceState;
import static com.google.common.base.Preconditions.checkState;
/**
@@ -100,10 +99,10 @@ public class PartitionedDatabase implements Database {
return CompletableFuture.allOf(partitions
.stream()
.map(db -> db.counters()
- .thenApply(m -> {
- counters.putAll(m);
- return null;
- }))
+ .thenApply(m -> {
+ counters.putAll(m);
+ return null;
+ }))
.toArray(CompletableFuture[]::new))
.thenApply(v -> counters);
}
@@ -113,9 +112,9 @@ public class PartitionedDatabase implements Database {
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicInteger totalSize = new AtomicInteger(0);
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> totalSize.get());
}
@@ -136,10 +135,10 @@ public class PartitionedDatabase implements Database {
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicBoolean containsValue = new AtomicBoolean(false);
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapContainsValue(mapName, value)
- .thenApply(v -> containsValue.compareAndSet(false, v)))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapContainsValue(mapName, value)
+ .thenApply(v -> containsValue.compareAndSet(false, v)))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> containsValue.get());
}
@@ -196,9 +195,9 @@ public class PartitionedDatabase implements Database {
checkState(isOpen.get(), DB_NOT_OPEN);
Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> entrySet);
}
@@ -220,6 +219,19 @@ public class PartitionedDatabase implements Database {
return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
}
+ @Override
+ public CompletableFuture<Void> counterSet(String counterName, long value) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(counterName, counterName).counterSet(counterName, value);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(counterName, counterName).
+ counterCompareAndSet(counterName, expectedValue, updateValue);
+
+ }
@Override
public CompletableFuture<Long> queueSize(String queueName) {
@@ -268,8 +280,8 @@ public class PartitionedDatabase implements Database {
AtomicBoolean status = new AtomicBoolean(true);
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
- .map(entry -> entry
- .getKey()
+ .map(entry -> entry
+ .getKey()
.prepare(entry.getValue())
.thenApply(v -> status.compareAndSet(true, v)))
.toArray(CompletableFuture[]::new))
@@ -282,15 +294,15 @@ public class PartitionedDatabase implements Database {
AtomicBoolean success = new AtomicBoolean(true);
List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
return CompletableFuture.allOf(subTransactions.entrySet()
- .stream()
- .map(entry -> entry.getKey().commit(entry.getValue())
- .thenAccept(response -> {
- success.set(success.get() && response.success());
- if (success.get()) {
- allUpdates.addAll(response.updates());
- }
- }))
- .toArray(CompletableFuture[]::new))
+ .stream()
+ .map(entry -> entry.getKey().commit(entry.getValue())
+ .thenAccept(response -> {
+ success.set(success.get() && response.success());
+ if (success.get()) {
+ allUpdates.addAll(response.updates());
+ }
+ }))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> success.get() ?
CommitResponse.success(allUpdates) : CommitResponse.failure());
}
@@ -301,7 +313,7 @@ public class PartitionedDatabase implements Database {
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
.map(entry -> entry.getKey().rollback(entry.getValue()))
- .toArray(CompletableFuture[]::new))
+ .toArray(CompletableFuture[]::new))
.thenApply(v -> true);
}
@@ -384,3 +396,4 @@ public class PartitionedDatabase implements Database {
partitions.forEach(p -> p.unregisterConsumer(consumer));
}
}
+