diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl')
10 files changed, 484 insertions, 87 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java index ff3e36ac..52a999a4 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java @@ -38,7 +38,7 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p> * * Additionally, the database will be constructed with an database configuration that searches the classpath for - * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and + * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource * configurations will be loaded according to namespaces as well; for example, `databases.conf`. @@ -54,7 +54,7 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa * Creates a new database.<p> * * The database will be constructed with an database configuration that searches the classpath for - * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and + * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource * configurations will be loaded according to namespaces as well; for example, `databases.conf`. diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java index fbc2c88d..6ea7c220 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java @@ -443,7 +443,10 @@ public class DatabaseManager implements StorageService, StorageAdminService { public void event(ApplicationEvent event) { if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) { ApplicationId appId = event.subject().id(); - List<DefaultAsyncConsistentMap> mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId)); + List<DefaultAsyncConsistentMap> mapsToRemove; + synchronized (mapsByApplication) { + mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId)); + } mapsToRemove.forEach(DatabaseManager.this::unregisterMap); if (event.type() == APP_UNINSTALLED) { mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear()); diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java index 95f9e39a..1d81f998 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java @@ -16,14 +16,14 @@ package org.onosproject.store.consistent.impl; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; + import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - /** * Database proxy. */ @@ -45,6 +45,7 @@ public interface DatabaseProxy<K, V> { /** * Returns the number of entries in map. + * * @param mapName map name * @return A completable future to be completed with the result once complete. */ @@ -62,7 +63,7 @@ public interface DatabaseProxy<K, V> { * Checks whether the map contains a key. * * @param mapName map name - * @param key key to check. + * @param key key to check. * @return A completable future to be completed with the result once complete. */ CompletableFuture<Boolean> mapContainsKey(String mapName, K key); @@ -71,7 +72,7 @@ public interface DatabaseProxy<K, V> { * Checks whether the map contains a value. * * @param mapName map name - * @param value The value to check. + * @param value The value to check. * @return A completable future to be completed with the result once complete. */ CompletableFuture<Boolean> mapContainsValue(String mapName, V value); @@ -80,7 +81,7 @@ public interface DatabaseProxy<K, V> { * Gets a value from the map. * * @param mapName map name - * @param key The key to get. + * @param key The key to get. * @return A completable future to be completed with the result once complete. */ CompletableFuture<Versioned<V>> mapGet(String mapName, K key); @@ -88,11 +89,11 @@ public interface DatabaseProxy<K, V> { /** * Updates the map. * - * @param mapName map name - * @param key The key to set - * @param valueMatch match for checking existing value - * @param versionMatch match for checking existing version - * @param value new value + * @param mapName map name + * @param key The key to set + * @param valueMatch match for checking existing value + * @param versionMatch match for checking existing version + * @param value new value * @return A completable future to be completed with the result once complete */ CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate( @@ -130,11 +131,11 @@ public interface DatabaseProxy<K, V> { */ CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName); - /** + /** * Atomically add the given value to current value of the specified counter. * * @param counterName counter name - * @param delta value to add + * @param delta value to add * @return updated value */ CompletableFuture<Long> counterAddAndGet(String counterName, long delta); @@ -143,11 +144,31 @@ public interface DatabaseProxy<K, V> { * Atomically add the given value to current value of the specified counter. * * @param counterName counter name - * @param delta value to add + * @param delta value to add * @return previous value */ CompletableFuture<Long> counterGetAndAdd(String counterName, long delta); + + /** + * Atomically sets the given value to current value of the specified counter. + * + * @param counterName counter name + * @param value value to set + * @return void future + */ + CompletableFuture<Void> counterSet(String counterName, long value); + + /** + * Atomically sets the given counter to the specified update value if and only if the current value is equal to the + * expected value. + * @param counterName counter name + * @param expectedValue value to use for equivalence check + * @param update value to set if expected value is current value + * @return true if an update occurred, false otherwise + */ + CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update); + /** * Returns the current value of the specified atomic counter. * @@ -158,6 +179,7 @@ public interface DatabaseProxy<K, V> { /** * Returns the size of queue. + * * @param queueName queue name * @return queue size */ @@ -165,14 +187,16 @@ public interface DatabaseProxy<K, V> { /** * Inserts an entry into the queue. + * * @param queueName queue name - * @param entry queue entry + * @param entry queue entry * @return void future */ CompletableFuture<Void> queuePush(String queueName, byte[] entry); /** * Removes an entry from the queue if the queue is non-empty. + * * @param queueName queue name * @return entry future. Can be completed with null if queue is empty */ @@ -180,6 +204,7 @@ public interface DatabaseProxy<K, V> { /** * Returns but does not remove an entry from the queue. + * * @param queueName queue name * @return entry. Can be null if queue is empty */ diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java index b3dd1c44..1136428b 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java @@ -16,18 +16,17 @@ package org.onosproject.store.consistent.impl; -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - import net.kuujo.copycat.state.Command; import net.kuujo.copycat.state.Initializer; import net.kuujo.copycat.state.Query; import net.kuujo.copycat.state.StateContext; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; + +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** * Database state. @@ -83,6 +82,9 @@ public interface DatabaseState<K, V> { Long counterAddAndGet(String counterName, long delta); @Command + Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue); + + @Command Long counterGetAndAdd(String counterName, long delta); @Query diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java index 7a439c34..d851eaa0 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java @@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl; import org.onosproject.store.service.AsyncAtomicCounter; import java.util.concurrent.CompletableFuture; + import static com.google.common.base.Preconditions.checkNotNull; /** @@ -38,6 +39,8 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { private static final String GET_AND_ADD = "getAndAdd"; private static final String ADD_AND_GET = "addAndGet"; private static final String GET = "get"; + private static final String SET = "set"; + private static final String COMPARE_AND_SET = "compareAndSet"; public DefaultAsyncAtomicCounter(String name, Database database, @@ -72,13 +75,27 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { public CompletableFuture<Long> getAndAdd(long delta) { final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD); return database.counterGetAndAdd(name, delta) - .whenComplete((r, e) -> timer.stop(e)); + .whenComplete((r, e) -> timer.stop(e)); } @Override public CompletableFuture<Long> addAndGet(long delta) { final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET); return database.counterAddAndGet(name, delta) - .whenComplete((r, e) -> timer.stop(e)); + .whenComplete((r, e) -> timer.stop(e)); + } + + @Override + public CompletableFuture<Void> set(long value) { + final MeteringAgent.Context timer = monitor.startTimer(SET); + return database.counterSet(name, value) + .whenComplete((r, e) -> timer.stop(e)); + } + + @Override + public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) { + final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET); + return database.counterCompareAndSet(name, expectedValue, updateValue) + .whenComplete((r, e) -> timer.stop(e)); } } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java index 64886e41..2d6a956c 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java @@ -63,6 +63,16 @@ public class DefaultAtomicCounter implements AtomicCounter { } @Override + public void set(long value) { + complete(asyncCounter.set(value)); + } + + @Override + public boolean compareAndSet(long expectedValue, long updateValue) { + return complete(asyncCounter.compareAndSet(expectedValue, updateValue)); + } + + @Override public long get() { return complete(asyncCounter.get()); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java index 4d9776ee..2a50fbd6 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java @@ -16,12 +16,15 @@ package org.onosproject.store.consistent.impl; -import net.kuujo.copycat.state.StateMachine; +import com.google.common.collect.Sets; import net.kuujo.copycat.resource.internal.AbstractResource; import net.kuujo.copycat.resource.internal.ResourceManager; +import net.kuujo.copycat.state.StateMachine; import net.kuujo.copycat.state.internal.DefaultStateMachine; import net.kuujo.copycat.util.concurrent.Futures; import net.kuujo.copycat.util.function.TriConsumer; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; import java.util.Collection; import java.util.Map; @@ -30,11 +33,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import com.google.common.collect.Sets; - /** * Default database. */ @@ -44,7 +42,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet(); private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher(); - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) public DefaultDatabase(ResourceManager context) { super(context); this.stateMachine = new DefaultStateMachine(context, @@ -66,7 +64,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab * return the completed future result. * * @param supplier The supplier to call if the database is open. - * @param <T> The future result type. + * @param <T> The future result type. * @return A completable future that if this database is closed is immediately failed. */ protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) { @@ -153,6 +151,16 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab } @Override + public CompletableFuture<Void> counterSet(String counterName, long value) { + return checkOpen(() -> proxy.counterSet(counterName, value)); + } + + @Override + public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) { + return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update)); + } + + @Override public CompletableFuture<Long> queueSize(String queueName) { return checkOpen(() -> proxy.queueSize(queueName)); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java index 9a55ffb1..8943fc87 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java @@ -16,27 +16,26 @@ package org.onosproject.store.consistent.impl; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import net.kuujo.copycat.state.Initializer; +import net.kuujo.copycat.state.StateContext; +import org.onosproject.store.service.DatabaseUpdate; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; + import java.util.Arrays; import java.util.Collection; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import java.util.Set; - -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; -import com.google.common.base.Objects; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import net.kuujo.copycat.state.Initializer; -import net.kuujo.copycat.state.StateContext; /** * Default database state. @@ -195,6 +194,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { } @Override + public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) { + return getCounter(counterName).compareAndSet(expectedValue, updateValue); + } + + @Override public Long counterGet(String counterName) { return getCounter(counterName).get(); } diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java new file mode 100644 index 00000000..d8593e37 --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java @@ -0,0 +1,315 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.consistent.impl; + +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.Tools; +import org.onosproject.cluster.ClusterEvent; +import org.onosproject.cluster.ClusterEventListener; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.ControllerNode.State; +import org.onosproject.cluster.NodeId; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.ConsistentMapException; +import org.onosproject.store.service.MapEvent; +import org.onosproject.store.service.MapEventListener; +import org.onosproject.store.service.MutexExecutionService; +import org.onosproject.store.service.MutexTask; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Implementation of a MutexExecutionService. + */ +@Component(immediate = true) +@Service +public class MutexExecutionManager implements MutexExecutionService { + + private final Logger log = getLogger(getClass()); + + protected ConsistentMap<String, MutexState> lockMap; + protected NodeId localNodeId; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener(); + private final ClusterEventListener clusterEventListener = new InternalClusterEventListener(); + + private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap(); + private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap(); + + @Activate + public void activate() { + localNodeId = clusterService.getLocalNode().id(); + lockMap = storageService.<String, MutexState>consistentMapBuilder() + .withName("onos-mutexes") + .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class)) + .withPartitionsDisabled() + .build(); + lockMap.addListener(mapEventListener); + clusterService.addListener(clusterEventListener); + releaseOldLocks(); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + lockMap.removeListener(mapEventListener); + pending.values().forEach(future -> future.cancel(true)); + activeTasks.forEach((k, v) -> { + v.stop(); + unlock(k); + }); + clusterService.removeListener(clusterEventListener); + log.info("Stopped"); + } + + @Override + public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) { + return lock(exclusionPath) + .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath, + k -> new InnerMutexTask(exclusionPath, + task, + state.term()))) + .thenAcceptAsync(t -> t.start(), executor) + .whenComplete((r, e) -> unlock(exclusionPath)); + } + + protected CompletableFuture<MutexState> lock(String exclusionPath) { + CompletableFuture<MutexState> future = + pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>()); + tryLock(exclusionPath); + return future; + } + + /** + * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to + * the wait list. + * @param exclusionPath exclusion path + */ + protected void tryLock(String exclusionPath) { + Tools.retryable(() -> lockMap.asJavaMap() + .compute(exclusionPath, + (k, v) -> MutexState.admit(v, localNodeId)), + ConsistentMapException.ConcurrentModification.class, + Integer.MAX_VALUE, + 100).get(); + } + + /** + * Releases lock for the specific path. This operation is idempotent. + * @param exclusionPath exclusion path + */ + protected void unlock(String exclusionPath) { + Tools.retryable(() -> lockMap.asJavaMap() + .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)), + ConsistentMapException.ConcurrentModification.class, + Integer.MAX_VALUE, + 100).get(); + } + + /** + * Detects and releases all locks held by this node. + */ + private void releaseOldLocks() { + Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder())) + .keySet() + .forEach(path -> { + log.info("Detected zombie task still holding lock for {}. Releasing lock.", path); + unlock(path); + }); + } + + private class InternalLockMapEventListener implements MapEventListener<String, MutexState> { + + @Override + public void event(MapEvent<String, MutexState> event) { + log.debug("Received {}", event); + if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) { + pending.computeIfPresent(event.key(), (k, future) -> { + MutexState state = Versioned.valueOrElse(event.value(), null); + if (state != null && localNodeId.equals(state.holder())) { + log.debug("Local node is now owner for {}", event.key()); + future.complete(state); + return null; + } else { + return future; + } + }); + InnerMutexTask task = activeTasks.get(event.key()); + if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) { + task.stop(); + } + } + } + } + + private class InternalClusterEventListener implements ClusterEventListener { + + @Override + public void event(ClusterEvent event) { + if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED || + event.type() == ClusterEvent.Type.INSTANCE_REMOVED) { + NodeId nodeId = event.subject().id(); + log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId); + lockMap.asJavaMap().forEach((k, v) -> { + if (v.contains(nodeId)) { + lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId)); + } + }); + } + long activeNodes = clusterService.getNodes() + .stream() + .map(node -> clusterService.getState(node.id())) + .filter(State.ACTIVE::equals) + .count(); + if (clusterService.getNodes().size() > 1 && activeNodes == 1) { + log.info("This node is partitioned away from the cluster. Stopping all inflight executions"); + activeTasks.forEach((k, v) -> { + v.stop(); + }); + } + } + } + + private static final class MutexState { + + private final NodeId holder; + private final List<NodeId> waitList; + private final long term; + + public static MutexState admit(MutexState state, NodeId nodeId) { + if (state == null) { + return new MutexState(nodeId, 1L, Lists.newArrayList()); + } else if (state.holder() == null) { + return new MutexState(nodeId, state.term() + 1, Lists.newArrayList()); + } else { + if (!state.contains(nodeId)) { + NodeId newHolder = state.holder(); + List<NodeId> newWaitList = Lists.newArrayList(state.waitList()); + newWaitList.add(nodeId); + return new MutexState(newHolder, state.term(), newWaitList); + } else { + return state; + } + } + } + + public static MutexState evict(MutexState state, NodeId nodeId) { + return state.evict(nodeId); + } + + public MutexState evict(NodeId nodeId) { + if (nodeId.equals(holder)) { + if (waitList.isEmpty()) { + return new MutexState(null, term, waitList); + } + List<NodeId> newWaitList = Lists.newArrayList(waitList); + NodeId newHolder = newWaitList.remove(0); + return new MutexState(newHolder, term + 1, newWaitList); + } else { + NodeId newHolder = holder; + List<NodeId> newWaitList = Lists.newArrayList(waitList); + newWaitList.remove(nodeId); + return new MutexState(newHolder, term, newWaitList); + } + } + + public NodeId holder() { + return holder; + } + + public List<NodeId> waitList() { + return waitList; + } + + public long term() { + return term; + } + + private boolean contains(NodeId nodeId) { + return (nodeId.equals(holder) || waitList.contains(nodeId)); + } + + private MutexState(NodeId holder, long term, List<NodeId> waitList) { + this.holder = holder; + this.term = term; + this.waitList = Lists.newArrayList(waitList); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("holder", holder) + .add("term", term) + .add("waitList", waitList) + .toString(); + } + } + + private class InnerMutexTask implements MutexTask { + private final MutexTask task; + private final String mutexPath; + private final long term; + + public InnerMutexTask(String mutexPath, MutexTask task, long term) { + this.mutexPath = mutexPath; + this.term = term; + this.task = task; + } + + public long term() { + return term; + } + + @Override + public void start() { + log.debug("Starting execution for mutex task guarded by {}", mutexPath); + task.start(); + log.debug("Finished execution for mutex task guarded by {}", mutexPath); + } + + @Override + public void stop() { + log.debug("Stopping execution for mutex task guarded by {}", mutexPath); + task.stop(); + } + } +}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java index a294681e..f741b367 100644 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java @@ -16,6 +16,17 @@ package org.onosproject.store.consistent.impl; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import net.kuujo.copycat.Task; +import net.kuujo.copycat.cluster.Cluster; +import net.kuujo.copycat.resource.ResourceState; +import org.onosproject.store.service.DatabaseUpdate; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; + import java.util.Collection; import java.util.List; import java.util.Map; @@ -28,18 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import net.kuujo.copycat.Task; -import net.kuujo.copycat.cluster.Cluster; -import net.kuujo.copycat.resource.ResourceState; import static com.google.common.base.Preconditions.checkState; /** @@ -100,10 +99,10 @@ public class PartitionedDatabase implements Database { return CompletableFuture.allOf(partitions .stream() .map(db -> db.counters() - .thenApply(m -> { - counters.putAll(m); - return null; - })) + .thenApply(m -> { + counters.putAll(m); + return null; + })) .toArray(CompletableFuture[]::new)) .thenApply(v -> counters); } @@ -113,9 +112,9 @@ public class PartitionedDatabase implements Database { checkState(isOpen.get(), DB_NOT_OPEN); AtomicInteger totalSize = new AtomicInteger(0); return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet)) - .toArray(CompletableFuture[]::new)) + .stream() + .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet)) + .toArray(CompletableFuture[]::new)) .thenApply(v -> totalSize.get()); } @@ -136,10 +135,10 @@ public class PartitionedDatabase implements Database { checkState(isOpen.get(), DB_NOT_OPEN); AtomicBoolean containsValue = new AtomicBoolean(false); return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapContainsValue(mapName, value) - .thenApply(v -> containsValue.compareAndSet(false, v))) - .toArray(CompletableFuture[]::new)) + .stream() + .map(p -> p.mapContainsValue(mapName, value) + .thenApply(v -> containsValue.compareAndSet(false, v))) + .toArray(CompletableFuture[]::new)) .thenApply(v -> containsValue.get()); } @@ -196,9 +195,9 @@ public class PartitionedDatabase implements Database { checkState(isOpen.get(), DB_NOT_OPEN); Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet(); return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll)) - .toArray(CompletableFuture[]::new)) + .stream() + .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll)) + .toArray(CompletableFuture[]::new)) .thenApply(v -> entrySet); } @@ -220,6 +219,19 @@ public class PartitionedDatabase implements Database { return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta); } + @Override + public CompletableFuture<Void> counterSet(String counterName, long value) { + checkState(isOpen.get(), DB_NOT_OPEN); + return partitioner.getPartition(counterName, counterName).counterSet(counterName, value); + } + + @Override + public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) { + checkState(isOpen.get(), DB_NOT_OPEN); + return partitioner.getPartition(counterName, counterName). + counterCompareAndSet(counterName, expectedValue, updateValue); + + } @Override public CompletableFuture<Long> queueSize(String queueName) { @@ -268,8 +280,8 @@ public class PartitionedDatabase implements Database { AtomicBoolean status = new AtomicBoolean(true); return CompletableFuture.allOf(subTransactions.entrySet() .stream() - .map(entry -> entry - .getKey() + .map(entry -> entry + .getKey() .prepare(entry.getValue()) .thenApply(v -> status.compareAndSet(true, v))) .toArray(CompletableFuture[]::new)) @@ -282,15 +294,15 @@ public class PartitionedDatabase implements Database { AtomicBoolean success = new AtomicBoolean(true); List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList(); return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry.getKey().commit(entry.getValue()) - .thenAccept(response -> { - success.set(success.get() && response.success()); - if (success.get()) { - allUpdates.addAll(response.updates()); - } - })) - .toArray(CompletableFuture[]::new)) + .stream() + .map(entry -> entry.getKey().commit(entry.getValue()) + .thenAccept(response -> { + success.set(success.get() && response.success()); + if (success.get()) { + allUpdates.addAll(response.updates()); + } + })) + .toArray(CompletableFuture[]::new)) .thenApply(v -> success.get() ? CommitResponse.success(allUpdates) : CommitResponse.failure()); } @@ -301,7 +313,7 @@ public class PartitionedDatabase implements Database { return CompletableFuture.allOf(subTransactions.entrySet() .stream() .map(entry -> entry.getKey().rollback(entry.getValue())) - .toArray(CompletableFuture[]::new)) + .toArray(CompletableFuture[]::new)) .thenApply(v -> true); } @@ -384,3 +396,4 @@ public class PartitionedDatabase implements Database { partitions.forEach(p -> p.unregisterConsumer(consumer)); } } + |