summaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java81
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java61
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapBackedJavaMap.java160
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java134
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java106
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java157
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java442
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java45
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java249
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java103
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java116
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java101
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java490
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java92
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java77
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java138
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java71
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java204
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java141
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java251
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java372
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java129
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java81
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java234
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java93
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java70
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java117
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java50
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java204
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java605
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java131
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Match.java129
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MeteringAgent.java134
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java315
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java399
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java33
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java121
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java38
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java39
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java91
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java126
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java85
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/package-info.java21
43 files changed, 0 insertions, 6836 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java
deleted file mode 100644
index 92db5b44..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import java.util.concurrent.CompletableFuture;
-
-import org.onosproject.core.ApplicationId;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-/**
- * Extension of {@link DefaultAsyncConsistentMap} that provides a weaker read consistency
- * guarantee in return for better read performance.
- * <p>
- * For read/write operations that are local to a node this map implementation provides
- * guarantees similar to a ConsistentMap. However for read/write operations executed
- * across multiple nodes this implementation only provides eventual consistency.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class AsyncCachingConsistentMap<K, V> extends DefaultAsyncConsistentMap<K, V> {
-
- private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache =
- CacheBuilder.newBuilder()
- .maximumSize(10000) // TODO: make configurable
- .build(new CacheLoader<K, CompletableFuture<Versioned<V>>>() {
- @Override
- public CompletableFuture<Versioned<V>> load(K key)
- throws Exception {
- return AsyncCachingConsistentMap.super.get(key);
- }
- });
-
- public AsyncCachingConsistentMap(String name,
- ApplicationId applicationId,
- Database database,
- Serializer serializer,
- boolean readOnly,
- boolean purgeOnUninstall,
- boolean meteringEnabled) {
- super(name, applicationId, database, serializer, readOnly, purgeOnUninstall, meteringEnabled);
- addListener(event -> cache.invalidate(event.key()));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> get(K key) {
- CompletableFuture<Versioned<V>> cachedValue = cache.getIfPresent(key);
- if (cachedValue != null) {
- if (cachedValue.isCompletedExceptionally()) {
- cache.invalidate(key);
- } else {
- return cachedValue;
- }
- }
- return cache.getUnchecked(key);
- }
-
- @Override
- protected void beforeUpdate(K key) {
- super.beforeUpdate(key);
- cache.invalidate(key);
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java
deleted file mode 100644
index bbc8e6e0..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Result of a Transaction commit operation.
- */
-public final class CommitResponse {
-
- private boolean success;
- private List<UpdateResult<String, byte[]>> updates;
-
- public static CommitResponse success(List<UpdateResult<String, byte[]>> updates) {
- return new CommitResponse(true, updates);
- }
-
- public static CommitResponse failure() {
- return new CommitResponse(false, Collections.emptyList());
- }
-
- private CommitResponse(boolean success, List<UpdateResult<String, byte[]>> updates) {
- this.success = success;
- this.updates = ImmutableList.copyOf(updates);
- }
-
- public boolean success() {
- return success;
- }
-
- public List<UpdateResult<String, byte[]>> updates() {
- return updates;
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("success", success)
- .add("udpates", updates)
- .toString();
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapBackedJavaMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapBackedJavaMap.java
deleted file mode 100644
index 5183924c..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapBackedJavaMap.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Maps;
-
-/**
- * Standard java Map backed by a ConsistentMap.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public final class ConsistentMapBackedJavaMap<K, V> implements Map<K, V> {
-
- private final ConsistentMap<K, V> backingMap;
-
- public ConsistentMapBackedJavaMap(ConsistentMap<K, V> backingMap) {
- this.backingMap = backingMap;
- }
-
- @Override
- public int size() {
- return backingMap.size();
- }
-
- @Override
- public boolean isEmpty() {
- return backingMap.isEmpty();
- }
-
- @Override
- public boolean containsKey(Object key) {
- return backingMap.containsKey((K) key);
- }
-
- @Override
- public boolean containsValue(Object value) {
- return backingMap.containsValue((V) value);
- }
-
- @Override
- public V get(Object key) {
- return Versioned.valueOrElse(backingMap.get((K) key), null);
- }
-
- @Override
- public V getOrDefault(Object key, V defaultValue) {
- return Versioned.valueOrElse(backingMap.get((K) key), defaultValue);
- }
-
- @Override
- public V put(K key, V value) {
- return Versioned.valueOrElse(backingMap.put(key, value), null);
- }
-
- @Override
- public V putIfAbsent(K key, V value) {
- return Versioned.valueOrElse(backingMap.putIfAbsent(key, value), null);
- }
-
- @Override
- public V remove(Object key) {
- return Versioned.valueOrElse(backingMap.remove((K) key), null);
- }
-
- @Override
- public boolean remove(Object key, Object value) {
- return backingMap.remove((K) key, (V) value);
- }
-
- @Override
- public V replace(K key, V value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean replace(K key, V oldValue, V newValue) {
- return backingMap.replace(key, oldValue, newValue);
- }
-
- @Override
- public void putAll(Map<? extends K, ? extends V> m) {
- m.forEach((k, v) -> {
- backingMap.put(k, v);
- });
- }
-
- @Override
- public void clear() {
- backingMap.clear();
- }
-
- @Override
- public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return Versioned.valueOrElse(backingMap.compute(key, remappingFunction), null);
- }
-
- @Override
- public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
- return Versioned.valueOrElse(backingMap.computeIfAbsent(key, mappingFunction), null);
- }
-
- @Override
- public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return Versioned.valueOrElse(backingMap.computeIfPresent(key, remappingFunction), null);
- }
-
- @Override
- public Set<K> keySet() {
- return backingMap.keySet();
- }
-
- @Override
- public Collection<V> values() {
- return Collections2.transform(backingMap.values(), v -> v.value());
- }
-
- @Override
- public Set<java.util.Map.Entry<K, V>> entrySet() {
- return backingMap.entrySet()
- .stream()
- .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue().value()))
- .collect(Collectors.toSet());
- }
-
- @Override
- public void forEach(BiConsumer<? super K, ? super V> action) {
- entrySet().forEach(e -> action.accept(e.getKey(), e.getValue()));
- }
-
- @Override
- public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
- return computeIfPresent(key, (k, v) -> v == null ? value : remappingFunction.apply(v, value));
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
deleted file mode 100644
index 88ddae62..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
-
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-import net.kuujo.copycat.protocol.AbstractProtocol;
-import net.kuujo.copycat.protocol.ProtocolClient;
-import net.kuujo.copycat.protocol.ProtocolHandler;
-import net.kuujo.copycat.protocol.ProtocolServer;
-import net.kuujo.copycat.util.Configurable;
-
-/**
- * Protocol for Copycat communication that employs
- * {@code ClusterCommunicationService}.
- */
-public class CopycatCommunicationProtocol extends AbstractProtocol {
-
- private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
- new MessageSubject("onos-copycat-message");
-
- protected ClusterService clusterService;
- protected ClusterCommunicationService clusterCommunicator;
-
- public CopycatCommunicationProtocol(ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator) {
- this.clusterService = clusterService;
- this.clusterCommunicator = clusterCommunicator;
- }
-
- @Override
- public Configurable copy() {
- return this;
- }
-
- @Override
- public ProtocolClient createClient(URI uri) {
- NodeId nodeId = uriToNodeId(uri);
- if (nodeId == null) {
- throw new IllegalStateException("Unknown peer " + uri);
- }
- return new Client(nodeId);
- }
-
- @Override
- public ProtocolServer createServer(URI uri) {
- return new Server();
- }
-
- private class Server implements ProtocolServer {
-
- @Override
- public void handler(ProtocolHandler handler) {
- if (handler == null) {
- clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
- } else {
- clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
- ByteBuffer::wrap,
- handler,
- Tools::byteBuffertoArray);
- // FIXME: Tools::byteBuffertoArray involves a array copy.
- }
- }
-
- @Override
- public CompletableFuture<Void> listen() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> close() {
- clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
- return CompletableFuture.completedFuture(null);
- }
- }
-
- private class Client implements ProtocolClient {
- private final NodeId peer;
-
- public Client(NodeId peer) {
- this.peer = peer;
- }
-
- @Override
- public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
- return clusterCommunicator.sendAndReceive(request,
- COPYCAT_MESSAGE_SUBJECT,
- Tools::byteBuffertoArray,
- ByteBuffer::wrap,
- peer);
- }
-
- @Override
- public CompletableFuture<Void> connect() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> close() {
- return CompletableFuture.completedFuture(null);
- }
- }
-
- private NodeId uriToNodeId(URI uri) {
- return clusterService.getNodes()
- .stream()
- .filter(node -> uri.getHost().equals(node.ip().toString()))
- .map(ControllerNode::id)
- .findAny()
- .orElse(null);
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
deleted file mode 100644
index 52a999a4..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-
-import java.util.function.Consumer;
-
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
-import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
-import net.kuujo.copycat.resource.Resource;
-
-/**
- * Database.
- */
-public interface Database extends DatabaseProxy<String, byte[]>, Resource<Database> {
-
- /**
- * Creates a new database with the default cluster configuration.<p>
- *
- * The database will be constructed with the default cluster configuration. The default cluster configuration
- * searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration
- * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p>
- *
- * Additionally, the database will be constructed with an database configuration that searches the classpath for
- * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
- * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
- * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
- * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
- *
- * @param name The database name.
- * @return The database.
- */
- static Database create(String name) {
- return create(name, new ClusterConfig(), new DatabaseConfig());
- }
-
- /**
- * Creates a new database.<p>
- *
- * The database will be constructed with an database configuration that searches the classpath for
- * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
- * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
- * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
- * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
- *
- * @param name The database name.
- * @param cluster The cluster configuration.
- * @return The database.
- */
- static Database create(String name, ClusterConfig cluster) {
- return create(name, cluster, new DatabaseConfig());
- }
-
- /**
- * Creates a new database.
- *
- * @param name The database name.
- * @param cluster The cluster configuration.
- * @param config The database configuration.
-
- * @return The database.
- */
- static Database create(String name, ClusterConfig cluster, DatabaseConfig config) {
- ClusterCoordinator coordinator =
- new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
- return coordinator.<Database>getResource(name, config.resolve(cluster))
- .addStartupTask(() -> coordinator.open().thenApply(v -> null))
- .addShutdownTask(coordinator::close);
- }
-
- /**
- * Tells whether the database supports change notifications.
- * @return true if notifications are supported; false otherwise
- */
- default boolean hasChangeNotificationSupport() {
- return true;
- }
-
- /**
- * Registers a new consumer of StateMachineUpdates.
- * @param consumer consumer to register
- */
- void registerConsumer(Consumer<StateMachineUpdate> consumer);
-
- /**
- * Unregisters a consumer of StateMachineUpdates.
- * @param consumer consumer to unregister
- */
- void unregisterConsumer(Consumer<StateMachineUpdate> consumer);
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
deleted file mode 100644
index bd774b99..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import com.typesafe.config.ConfigValueFactory;
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
-import net.kuujo.copycat.protocol.Consistency;
-import net.kuujo.copycat.resource.ResourceConfig;
-import net.kuujo.copycat.state.StateLogConfig;
-import net.kuujo.copycat.util.internal.Assert;
-
-import java.util.Map;
-
-/**
- * Database configuration.
- *
- */
-public class DatabaseConfig extends ResourceConfig<DatabaseConfig> {
- private static final String DATABASE_CONSISTENCY = "consistency";
-
- private static final String DEFAULT_CONFIGURATION = "database-defaults";
- private static final String CONFIGURATION = "database";
-
- private String name;
-
- public DatabaseConfig() {
- super(CONFIGURATION, DEFAULT_CONFIGURATION);
- }
-
- public DatabaseConfig(Map<String, Object> config) {
- super(config, CONFIGURATION, DEFAULT_CONFIGURATION);
- }
-
- public DatabaseConfig(String resource) {
- super(resource, CONFIGURATION, DEFAULT_CONFIGURATION);
- }
-
- protected DatabaseConfig(DatabaseConfig config) {
- super(config);
- }
-
- @Override
- public DatabaseConfig copy() {
- return new DatabaseConfig(this);
- }
-
- /**
- * Sets the database read consistency.
- *
- * @param consistency The database read consistency.
- * @throws java.lang.NullPointerException If the consistency is {@code null}
- */
- public void setConsistency(String consistency) {
- this.config = config.withValue(DATABASE_CONSISTENCY,
- ConfigValueFactory.fromAnyRef(
- Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString()));
- }
-
- /**
- * Sets the database read consistency.
- *
- * @param consistency The database read consistency.
- * @throws java.lang.NullPointerException If the consistency is {@code null}
- */
- public void setConsistency(Consistency consistency) {
- this.config = config.withValue(DATABASE_CONSISTENCY,
- ConfigValueFactory.fromAnyRef(
- Assert.isNotNull(consistency, "consistency").toString()));
- }
-
- /**
- * Returns the database read consistency.
- *
- * @return The database read consistency.
- */
- public Consistency getConsistency() {
- return Consistency.parse(config.getString(DATABASE_CONSISTENCY));
- }
-
- /**
- * Sets the database read consistency, returning the configuration for method chaining.
- *
- * @param consistency The database read consistency.
- * @return The database configuration.
- * @throws java.lang.NullPointerException If the consistency is {@code null}
- */
- public DatabaseConfig withConsistency(String consistency) {
- setConsistency(consistency);
- return this;
- }
-
- /**
- * Sets the database read consistency, returning the configuration for method chaining.
- *
- * @param consistency The database read consistency.
- * @return The database configuration.
- * @throws java.lang.NullPointerException If the consistency is {@code null}
- */
- public DatabaseConfig withConsistency(Consistency consistency) {
- setConsistency(consistency);
- return this;
- }
-
- /**
- * Returns the database name.
- *
- * @return The database name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Sets the database name, returning the configuration for method chaining.
- *
- * @param name The database name
- * @return The database configuration
- * @throws java.lang.NullPointerException If the name is {@code null}
- */
- public DatabaseConfig withName(String name) {
- setName(Assert.isNotNull(name, "name"));
- return this;
- }
-
- /**
- * Sets the database name.
- *
- * @param name The database name
- * @throws java.lang.NullPointerException If the name is {@code null}
- */
- public void setName(String name) {
- this.name = Assert.isNotNull(name, "name");
- }
-
- @Override
- public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
- return new StateLogConfig(toMap())
- .resolve(cluster)
- .withResourceType(DefaultDatabase.class);
- }
-
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
deleted file mode 100644
index 90d81ee7..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
-import net.kuujo.copycat.CopycatConfig;
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.Member;
-import net.kuujo.copycat.cluster.Member.Type;
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
-import net.kuujo.copycat.log.BufferedLog;
-import net.kuujo.copycat.log.FileLog;
-import net.kuujo.copycat.log.Log;
-import net.kuujo.copycat.protocol.Consistency;
-import net.kuujo.copycat.protocol.Protocol;
-import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.app.ApplicationEvent;
-import org.onosproject.app.ApplicationListener;
-import org.onosproject.app.ApplicationService;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.IdGenerator;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
-import org.onosproject.store.service.AtomicCounterBuilder;
-import org.onosproject.store.service.AtomicValueBuilder;
-import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.DistributedQueueBuilder;
-import org.onosproject.store.service.EventuallyConsistentMapBuilder;
-import org.onosproject.store.service.MapInfo;
-import org.onosproject.store.service.PartitionInfo;
-import org.onosproject.store.service.DistributedSetBuilder;
-import org.onosproject.store.service.StorageAdminService;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.TransactionContextBuilder;
-import org.slf4j.Logger;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
-import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
-
-/**
- * Database manager.
- */
-@Component(immediate = true, enabled = true)
-@Service
-public class DatabaseManager implements StorageService, StorageAdminService {
-
- private final Logger log = getLogger(getClass());
-
- public static final String BASE_PARTITION_NAME = "p0";
-
- private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
- private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
-
- private ClusterCoordinator coordinator;
- protected PartitionedDatabase partitionedDatabase;
- protected Database inMemoryDatabase;
- protected NodeId localNodeId;
-
- private TransactionManager transactionManager;
- private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
-
- private ApplicationListener appListener = new InternalApplicationListener();
-
- private final Multimap<String, DefaultAsyncConsistentMap> maps =
- Multimaps.synchronizedMultimap(ArrayListMultimap.create());
- private final Multimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication =
- Multimaps.synchronizedMultimap(ArrayListMultimap.create());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
- protected ApplicationService applicationService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected PersistenceService persistenceService;
-
- protected String nodeIdToUri(NodeId nodeId) {
- ControllerNode node = clusterService.getNode(nodeId);
- return String.format("onos://%s:%d", node.ip(), node.tcpPort());
- }
-
- protected void bindApplicationService(ApplicationService service) {
- applicationService = service;
- applicationService.addListener(appListener);
- }
-
- protected void unbindApplicationService(ApplicationService service) {
- applicationService.removeListener(appListener);
- this.applicationService = null;
- }
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
-
- Map<String, Set<NodeId>> partitionMap = Maps.newHashMap();
- clusterMetadataService.getClusterMetadata().getPartitions().forEach(p -> {
- partitionMap.put(p.getName(), Sets.newHashSet(p.getMembers()));
- });
-
-
- String[] activeNodeUris = partitionMap.values()
- .stream()
- .reduce((s1, s2) -> Sets.union(s1, s2))
- .get()
- .stream()
- .map(this::nodeIdToUri)
- .toArray(String[]::new);
-
- String localNodeUri = nodeIdToUri(clusterMetadataService.getLocalNode().id());
- Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
-
- ClusterConfig clusterConfig = new ClusterConfig()
- .withProtocol(protocol)
- .withElectionTimeout(electionTimeoutMillis(activeNodeUris))
- .withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris))
- .withMembers(activeNodeUris)
- .withLocalMember(localNodeUri);
-
- CopycatConfig copycatConfig = new CopycatConfig()
- .withName("onos")
- .withClusterConfig(clusterConfig)
- .withDefaultSerializer(new DatabaseSerializer())
- .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
-
- coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
-
- DatabaseConfig inMemoryDatabaseConfig =
- newDatabaseConfig(BASE_PARTITION_NAME, newInMemoryLog(), activeNodeUris);
- inMemoryDatabase = coordinator
- .getResource(inMemoryDatabaseConfig.getName(), inMemoryDatabaseConfig.resolve(clusterConfig)
- .withSerializer(copycatConfig.getDefaultSerializer())
- .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
-
- List<Database> partitions = partitionMap.entrySet()
- .stream()
- .map(entry -> {
- String[] replicas = entry.getValue().stream().map(this::nodeIdToUri).toArray(String[]::new);
- return newDatabaseConfig(entry.getKey(), newPersistentLog(), replicas);
- })
- .map(config -> {
- Database db = coordinator.getResource(config.getName(), config.resolve(clusterConfig)
- .withSerializer(copycatConfig.getDefaultSerializer())
- .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
- return db;
- })
- .collect(Collectors.toList());
-
- partitionedDatabase = new PartitionedDatabase("onos-store", partitions);
-
- CompletableFuture<Void> status = coordinator.open()
- .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open())
- .whenComplete((db, error) -> {
- if (error != null) {
- log.error("Failed to initialize database.", error);
- } else {
- log.info("Successfully initialized database.");
- }
- }));
-
- Futures.getUnchecked(status);
-
- transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
- partitionedDatabase.setTransactionManager(transactionManager);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close())
- .thenCompose(v -> coordinator.close())
- .whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to cleanly close databases.", error);
- } else {
- log.info("Successfully closed databases.");
- }
- });
- ImmutableList.copyOf(maps.values()).forEach(this::unregisterMap);
- if (applicationService != null) {
- applicationService.removeListener(appListener);
- }
- log.info("Stopped");
- }
-
- @Override
- public TransactionContextBuilder transactionContextBuilder() {
- return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId());
- }
-
- @Override
- public List<PartitionInfo> getPartitionInfo() {
- return Lists.asList(
- inMemoryDatabase,
- partitionedDatabase.getPartitions().toArray(new Database[]{}))
- .stream()
- .map(DatabaseManager::toPartitionInfo)
- .collect(Collectors.toList());
- }
-
- private Log newPersistentLog() {
- String logDir = System.getProperty("karaf.data", "./data");
- return new FileLog()
- .withDirectory(logDir)
- .withSegmentSize(1073741824) // 1GB
- .withFlushOnWrite(true)
- .withSegmentInterval(Long.MAX_VALUE);
- }
-
- private Log newInMemoryLog() {
- return new BufferedLog()
- .withFlushOnWrite(false)
- .withFlushInterval(Long.MAX_VALUE)
- .withSegmentSize(10485760) // 10MB
- .withSegmentInterval(Long.MAX_VALUE);
- }
-
- private DatabaseConfig newDatabaseConfig(String name, Log log, String[] replicas) {
- return new DatabaseConfig()
- .withName(name)
- .withElectionTimeout(electionTimeoutMillis(replicas))
- .withHeartbeatInterval(heartbeatTimeoutMillis(replicas))
- .withConsistency(Consistency.DEFAULT)
- .withLog(log)
- .withDefaultSerializer(new DatabaseSerializer())
- .withReplicas(replicas);
- }
-
- private long electionTimeoutMillis(String[] replicas) {
- return replicas.length == 1 ? 10L : RAFT_ELECTION_TIMEOUT_MILLIS;
- }
-
- private long heartbeatTimeoutMillis(String[] replicas) {
- return electionTimeoutMillis(replicas) / 2;
- }
-
- /**
- * Maps a Raft Database object to a PartitionInfo object.
- *
- * @param database database containing input data
- * @return PartitionInfo object
- */
- private static PartitionInfo toPartitionInfo(Database database) {
- return new PartitionInfo(database.name(),
- database.cluster().term(),
- database.cluster().members()
- .stream()
- .filter(member -> Type.ACTIVE.equals(member.type()))
- .map(Member::uri)
- .sorted()
- .collect(Collectors.toList()),
- database.cluster().leader() != null ?
- database.cluster().leader().uri() : null);
- }
-
-
- @Override
- public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
- return new EventuallyConsistentMapBuilderImpl<>(clusterService,
- clusterCommunicator,
- persistenceService);
- }
-
- @Override
- public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
- return new DefaultConsistentMapBuilder<>(this);
- }
-
- @Override
- public <E> DistributedSetBuilder<E> setBuilder() {
- return new DefaultDistributedSetBuilder<>(this);
- }
-
-
- @Override
- public <E> DistributedQueueBuilder<E> queueBuilder() {
- return new DefaultDistributedQueueBuilder<>(this);
- }
-
- @Override
- public AtomicCounterBuilder atomicCounterBuilder() {
- return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase);
- }
-
- @Override
- public <V> AtomicValueBuilder<V> atomicValueBuilder() {
- return new DefaultAtomicValueBuilder<>(this);
- }
-
- @Override
- public List<MapInfo> getMapInfo() {
- List<MapInfo> maps = Lists.newArrayList();
- maps.addAll(getMapInfo(inMemoryDatabase));
- maps.addAll(getMapInfo(partitionedDatabase));
- return maps;
- }
-
- private List<MapInfo> getMapInfo(Database database) {
- return complete(database.maps())
- .stream()
- .map(name -> new MapInfo(name, complete(database.mapSize(name))))
- .filter(info -> info.size() > 0)
- .collect(Collectors.toList());
- }
-
-
- @Override
- public Map<String, Long> getCounters() {
- Map<String, Long> counters = Maps.newHashMap();
- counters.putAll(complete(inMemoryDatabase.counters()));
- counters.putAll(complete(partitionedDatabase.counters()));
- return counters;
- }
-
- @Override
- public Map<String, Long> getPartitionedDatabaseCounters() {
- Map<String, Long> counters = Maps.newHashMap();
- counters.putAll(complete(partitionedDatabase.counters()));
- return counters;
- }
-
- @Override
- public Map<String, Long> getInMemoryDatabaseCounters() {
- Map<String, Long> counters = Maps.newHashMap();
- counters.putAll(complete(inMemoryDatabase.counters()));
- return counters;
- }
-
- @Override
- public Collection<Transaction> getTransactions() {
- return complete(transactionManager.getTransactions());
- }
-
- private static <T> T complete(CompletableFuture<T> future) {
- try {
- return future.get(DATABASE_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ConsistentMapException.Interrupted();
- } catch (TimeoutException e) {
- throw new ConsistentMapException.Timeout();
- } catch (ExecutionException e) {
- throw new ConsistentMapException(e.getCause());
- }
- }
-
- @Override
- public void redriveTransactions() {
- getTransactions().stream().forEach(transactionManager::execute);
- }
-
- protected <K, V> DefaultAsyncConsistentMap<K, V> registerMap(DefaultAsyncConsistentMap<K, V> map) {
- maps.put(map.name(), map);
- if (map.applicationId() != null) {
- mapsByApplication.put(map.applicationId(), map);
- }
- return map;
- }
-
- protected <K, V> void unregisterMap(DefaultAsyncConsistentMap<K, V> map) {
- maps.remove(map.name(), map);
- if (map.applicationId() != null) {
- mapsByApplication.remove(map.applicationId(), map);
- }
- }
-
- private class InternalApplicationListener implements ApplicationListener {
- @Override
- public void event(ApplicationEvent event) {
- if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) {
- ApplicationId appId = event.subject().id();
- List<DefaultAsyncConsistentMap> mapsToRemove;
- synchronized (mapsByApplication) {
- mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
- }
- mapsToRemove.forEach(DatabaseManager.this::unregisterMap);
- if (event.type() == APP_UNINSTALLED) {
- mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear());
- }
- }
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
deleted file mode 100644
index 740f81ad..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.List;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
-
-/**
- * Partitioner for mapping map entries to individual database partitions.
- * <p>
- * By default a md5 hash of the hash key (key or map name) is used to pick a
- * partition.
- */
-public abstract class DatabasePartitioner implements Partitioner<String> {
- // Database partitions sorted by their partition name.
- protected final List<Database> partitions;
-
- public DatabasePartitioner(List<Database> partitions) {
- checkState(partitions != null && !partitions.isEmpty(), "Partitions cannot be null or empty");
- this.partitions = ImmutableList.copyOf(partitions);
- }
-
- protected int hash(String key) {
- return Math.abs(Hashing.md5().newHasher().putBytes(key.getBytes(Charsets.UTF_8)).hash().asInt());
- }
-
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
deleted file mode 100644
index 1d81f998..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Database proxy.
- */
-public interface DatabaseProxy<K, V> {
-
- /**
- * Returns a set of all map names.
- *
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Set<String>> maps();
-
- /**
- * Returns a mapping from counter name to next value.
- *
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Map<String, Long>> counters();
-
- /**
- * Returns the number of entries in map.
- *
- * @param mapName map name
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Integer> mapSize(String mapName);
-
- /**
- * Checks whether the map is empty.
- *
- * @param mapName map name
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Boolean> mapIsEmpty(String mapName);
-
- /**
- * Checks whether the map contains a key.
- *
- * @param mapName map name
- * @param key key to check.
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Boolean> mapContainsKey(String mapName, K key);
-
- /**
- * Checks whether the map contains a value.
- *
- * @param mapName map name
- * @param value The value to check.
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Boolean> mapContainsValue(String mapName, V value);
-
- /**
- * Gets a value from the map.
- *
- * @param mapName map name
- * @param key The key to get.
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Versioned<V>> mapGet(String mapName, K key);
-
- /**
- * Updates the map.
- *
- * @param mapName map name
- * @param key The key to set
- * @param valueMatch match for checking existing value
- * @param versionMatch match for checking existing version
- * @param value new value
- * @return A completable future to be completed with the result once complete
- */
- CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate(
- String mapName, K key, Match<V> valueMatch, Match<Long> versionMatch, V value);
-
- /**
- * Clears the map.
- *
- * @param mapName map name
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Result<Void>> mapClear(String mapName);
-
- /**
- * Gets a set of keys in the map.
- *
- * @param mapName map name
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Set<K>> mapKeySet(String mapName);
-
- /**
- * Gets a collection of values in the map.
- *
- * @param mapName map name
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Collection<Versioned<V>>> mapValues(String mapName);
-
- /**
- * Gets a set of entries in the map.
- *
- * @param mapName map name
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName);
-
- /**
- * Atomically add the given value to current value of the specified counter.
- *
- * @param counterName counter name
- * @param delta value to add
- * @return updated value
- */
- CompletableFuture<Long> counterAddAndGet(String counterName, long delta);
-
- /**
- * Atomically add the given value to current value of the specified counter.
- *
- * @param counterName counter name
- * @param delta value to add
- * @return previous value
- */
- CompletableFuture<Long> counterGetAndAdd(String counterName, long delta);
-
-
- /**
- * Atomically sets the given value to current value of the specified counter.
- *
- * @param counterName counter name
- * @param value value to set
- * @return void future
- */
- CompletableFuture<Void> counterSet(String counterName, long value);
-
- /**
- * Atomically sets the given counter to the specified update value if and only if the current value is equal to the
- * expected value.
- * @param counterName counter name
- * @param expectedValue value to use for equivalence check
- * @param update value to set if expected value is current value
- * @return true if an update occurred, false otherwise
- */
- CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update);
-
- /**
- * Returns the current value of the specified atomic counter.
- *
- * @param counterName counter name
- * @return current value
- */
- CompletableFuture<Long> counterGet(String counterName);
-
- /**
- * Returns the size of queue.
- *
- * @param queueName queue name
- * @return queue size
- */
- CompletableFuture<Long> queueSize(String queueName);
-
- /**
- * Inserts an entry into the queue.
- *
- * @param queueName queue name
- * @param entry queue entry
- * @return void future
- */
- CompletableFuture<Void> queuePush(String queueName, byte[] entry);
-
- /**
- * Removes an entry from the queue if the queue is non-empty.
- *
- * @param queueName queue name
- * @return entry future. Can be completed with null if queue is empty
- */
- CompletableFuture<byte[]> queuePop(String queueName);
-
- /**
- * Returns but does not remove an entry from the queue.
- *
- * @param queueName queue name
- * @return entry. Can be null if queue is empty
- */
- CompletableFuture<byte[]> queuePeek(String queueName);
-
- /**
- * Prepare and commit the specified transaction.
- *
- * @param transaction transaction to commit (after preparation)
- * @return A completable future to be completed with the result once complete
- */
- CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction);
-
- /**
- * Prepare the specified transaction for commit. A successful prepare implies
- * all the affected resources are locked thus ensuring no concurrent updates can interfere.
- *
- * @param transaction transaction to prepare (for commit)
- * @return A completable future to be completed with the result once complete. The future is completed
- * with true if the transaction is successfully prepared i.e. all pre-conditions are met and
- * applicable resources locked.
- */
- CompletableFuture<Boolean> prepare(Transaction transaction);
-
- /**
- * Commit the specified transaction. A successful commit implies
- * all the updates are applied, are now durable and are now visible externally.
- *
- * @param transaction transaction to commit
- * @return A completable future to be completed with the result once complete
- */
- CompletableFuture<CommitResponse> commit(Transaction transaction);
-
- /**
- * Rollback the specified transaction. A successful rollback implies
- * all previously acquired locks for the affected resources are released.
- *
- * @param transaction transaction to rollback
- * @return A completable future to be completed with the result once complete
- */
- CompletableFuture<Boolean> rollback(Transaction transaction);
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
deleted file mode 100644
index de734144..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.nio.ByteBuffer;
-
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import net.kuujo.copycat.cluster.internal.MemberInfo;
-import net.kuujo.copycat.raft.protocol.AppendRequest;
-import net.kuujo.copycat.raft.protocol.AppendResponse;
-import net.kuujo.copycat.raft.protocol.CommitRequest;
-import net.kuujo.copycat.raft.protocol.CommitResponse;
-import net.kuujo.copycat.raft.protocol.PollRequest;
-import net.kuujo.copycat.raft.protocol.PollResponse;
-import net.kuujo.copycat.raft.protocol.QueryRequest;
-import net.kuujo.copycat.raft.protocol.QueryResponse;
-import net.kuujo.copycat.raft.protocol.ReplicaInfo;
-import net.kuujo.copycat.raft.protocol.SyncRequest;
-import net.kuujo.copycat.raft.protocol.SyncResponse;
-import net.kuujo.copycat.raft.protocol.VoteRequest;
-import net.kuujo.copycat.raft.protocol.VoteResponse;
-import net.kuujo.copycat.util.serializer.SerializerConfig;
-
-/**
- * Serializer for DatabaseManager's interaction with Copycat.
- */
-public class DatabaseSerializer extends SerializerConfig {
-
- private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
- .nextId(KryoNamespace.FLOATING_ID)
- .register(AppendRequest.class)
- .register(AppendResponse.class)
- .register(SyncRequest.class)
- .register(SyncResponse.class)
- .register(VoteRequest.class)
- .register(VoteResponse.class)
- .register(PollRequest.class)
- .register(PollResponse.class)
- .register(QueryRequest.class)
- .register(QueryResponse.class)
- .register(CommitRequest.class)
- .register(CommitResponse.class)
- .register(ReplicaInfo.class)
- .register(MemberInfo.class)
- .build();
-
- private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
- .nextId(KryoNamespace.FLOATING_ID)
- .register(Versioned.class)
- .register(DatabaseUpdate.class)
- .register(DatabaseUpdate.Type.class)
- .register(Result.class)
- .register(UpdateResult.class)
- .register(Result.Status.class)
- .register(DefaultTransaction.class)
- .register(Transaction.State.class)
- .register(org.onosproject.store.consistent.impl.CommitResponse.class)
- .register(Match.class)
- .register(NodeId.class)
- .build();
-
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.BASIC)
- .register(COPYCAT)
- .register(ONOS_STORE)
- .build();
- }
- };
-
- @Override
- public ByteBuffer writeObject(Object object) {
- return ByteBuffer.wrap(SERIALIZER.encode(object));
- }
-
- @Override
- public <T> T readObject(ByteBuffer buffer) {
- return SERIALIZER.decode(buffer);
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
deleted file mode 100644
index 1136428b..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import net.kuujo.copycat.state.Command;
-import net.kuujo.copycat.state.Initializer;
-import net.kuujo.copycat.state.Query;
-import net.kuujo.copycat.state.StateContext;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * Database state.
- *
- */
-public interface DatabaseState<K, V> {
-
- /**
- * Initializes the database state.
- *
- * @param context The map state context.
- */
- @Initializer
- void init(StateContext<DatabaseState<K, V>> context);
-
- @Query
- Set<String> maps();
-
- @Query
- Map<String, Long> counters();
-
- @Query
- int mapSize(String mapName);
-
- @Query
- boolean mapIsEmpty(String mapName);
-
- @Query
- boolean mapContainsKey(String mapName, K key);
-
- @Query
- boolean mapContainsValue(String mapName, V value);
-
- @Query
- Versioned<V> mapGet(String mapName, K key);
-
- @Command
- Result<UpdateResult<K, V>> mapUpdate(String mapName, K key, Match<V> valueMatch, Match<Long> versionMatch, V value);
-
- @Command
- Result<Void> mapClear(String mapName);
-
- @Query
- Set<K> mapKeySet(String mapName);
-
- @Query
- Collection<Versioned<V>> mapValues(String mapName);
-
- @Query
- Set<Entry<K, Versioned<V>>> mapEntrySet(String mapName);
-
- @Command
- Long counterAddAndGet(String counterName, long delta);
-
- @Command
- Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue);
-
- @Command
- Long counterGetAndAdd(String counterName, long delta);
-
- @Query
- Long queueSize(String queueName);
-
- @Query
- byte[] queuePeek(String queueName);
-
- @Command
- byte[] queuePop(String queueName);
-
- @Command
- void queuePush(String queueName, byte[] entry);
-
- @Query
- Long counterGet(String counterName);
-
- @Command
- CommitResponse prepareAndCommit(Transaction transaction);
-
- @Command
- boolean prepare(Transaction transaction);
-
- @Command
- CommitResponse commit(Transaction transaction);
-
- @Command
- boolean rollback(Transaction transaction);
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
deleted file mode 100644
index d851eaa0..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.service.AsyncAtomicCounter;
-
-import java.util.concurrent.CompletableFuture;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Default implementation for a distributed AsyncAtomicCounter backed by
- * partitioned Raft DB.
- * <p>
- * The initial value will be zero.
- */
-public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
-
- private final String name;
- private final Database database;
- private final MeteringAgent monitor;
-
- private static final String PRIMITIVE_NAME = "atomicCounter";
- private static final String INCREMENT_AND_GET = "incrementAndGet";
- private static final String GET_AND_INCREMENT = "getAndIncrement";
- private static final String GET_AND_ADD = "getAndAdd";
- private static final String ADD_AND_GET = "addAndGet";
- private static final String GET = "get";
- private static final String SET = "set";
- private static final String COMPARE_AND_SET = "compareAndSet";
-
- public DefaultAsyncAtomicCounter(String name,
- Database database,
- boolean meteringEnabled) {
- this.name = checkNotNull(name);
- this.database = checkNotNull(database);
- this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
- }
-
- @Override
- public CompletableFuture<Long> incrementAndGet() {
- final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
- return addAndGet(1L)
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Long> get() {
- final MeteringAgent.Context timer = monitor.startTimer(GET);
- return database.counterGet(name)
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Long> getAndIncrement() {
- final MeteringAgent.Context timer = monitor.startTimer(GET_AND_INCREMENT);
- return getAndAdd(1L)
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Long> getAndAdd(long delta) {
- final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
- return database.counterGetAndAdd(name, delta)
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Long> addAndGet(long delta) {
- final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
- return database.counterAddAndGet(name, delta)
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Void> set(long value) {
- final MeteringAgent.Context timer = monitor.startTimer(SET);
- return database.counterSet(name, value)
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
- final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET);
- return database.counterCompareAndSet(name, expectedValue, updateValue)
- .whenComplete((r, e) -> timer.stop(e));
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
deleted file mode 100644
index af2bb74d..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Maps;
-
-import org.onlab.util.HexString;
-import org.onlab.util.SharedExecutors;
-import org.onlab.util.Tools;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP_UPDATE;
-import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * AsyncConsistentMap implementation that is backed by a Raft consensus
- * based database.
- *
- * @param <K> type of key.
- * @param <V> type of value.
- */
-public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
-
- private final String name;
- private final ApplicationId applicationId;
- private final Database database;
- private final Serializer serializer;
- private final boolean readOnly;
- private final boolean purgeOnUninstall;
-
- private static final String PRIMITIVE_NAME = "consistentMap";
- private static final String SIZE = "size";
- private static final String IS_EMPTY = "isEmpty";
- private static final String CONTAINS_KEY = "containsKey";
- private static final String CONTAINS_VALUE = "containsValue";
- private static final String GET = "get";
- private static final String COMPUTE_IF = "computeIf";
- private static final String PUT = "put";
- private static final String PUT_AND_GET = "putAndGet";
- private static final String PUT_IF_ABSENT = "putIfAbsent";
- private static final String REMOVE = "remove";
- private static final String CLEAR = "clear";
- private static final String KEY_SET = "keySet";
- private static final String VALUES = "values";
- private static final String ENTRY_SET = "entrySet";
- private static final String REPLACE = "replace";
- private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
-
- private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
-
- private final Logger log = getLogger(getClass());
- private final MeteringAgent monitor;
-
- private static final String ERROR_NULL_KEY = "Key cannot be null";
- private static final String ERROR_NULL_VALUE = "Null values are not allowed";
-
- // String representation of serialized byte[] -> original key Object
- private final LoadingCache<String, K> keyCache = CacheBuilder.newBuilder()
- .softValues()
- .build(new CacheLoader<String, K>() {
-
- @Override
- public K load(String key) {
- return serializer.decode(HexString.fromHexString(key));
- }
- });
-
- protected String sK(K key) {
- String s = HexString.toHexString(serializer.encode(key));
- keyCache.put(s, key);
- return s;
- }
-
- protected K dK(String key) {
- return keyCache.getUnchecked(key);
- }
-
- public DefaultAsyncConsistentMap(String name,
- ApplicationId applicationId,
- Database database,
- Serializer serializer,
- boolean readOnly,
- boolean purgeOnUninstall,
- boolean meteringEnabled) {
- this.name = checkNotNull(name, "map name cannot be null");
- this.applicationId = applicationId;
- this.database = checkNotNull(database, "database cannot be null");
- this.serializer = checkNotNull(serializer, "serializer cannot be null");
- this.readOnly = readOnly;
- this.purgeOnUninstall = purgeOnUninstall;
- this.database.registerConsumer(update -> {
- SharedExecutors.getSingleThreadExecutor().execute(() -> {
- if (listeners.isEmpty()) {
- return;
- }
- try {
- if (update.target() == MAP_UPDATE) {
- Result<UpdateResult<String, byte[]>> result = update.output();
- if (result.success() && result.value().mapName().equals(name)) {
- MapEvent<K, V> mapEvent = result.value()
- .<K, V>map(this::dK,
- v -> serializer.decode(Tools.copyOf(v)))
- .toMapEvent();
- notifyListeners(mapEvent);
- }
- } else if (update.target() == TX_COMMIT) {
- CommitResponse response = update.output();
- if (response.success()) {
- response.updates().forEach(u -> {
- if (u.mapName().equals(name)) {
- MapEvent<K, V> mapEvent =
- u.<K, V>map(this::dK,
- v -> serializer.decode(Tools.copyOf(v)))
- .toMapEvent();
- notifyListeners(mapEvent);
- }
- });
- }
- }
- } catch (Exception e) {
- log.warn("Error notifying listeners", e);
- }
- });
- });
- this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
- }
-
- /**
- * Returns this map name.
- * @return map name
- */
- public String name() {
- return name;
- }
-
- /**
- * Returns the serializer for map entries.
- * @return map entry serializer
- */
- public Serializer serializer() {
- return serializer;
- }
-
- /**
- * Returns the applicationId owning this map.
- * @return application Id
- */
- public ApplicationId applicationId() {
- return applicationId;
- }
-
- /**
- * Returns whether the map entries should be purged when the application
- * owning it is uninstalled.
- * @return true is map needs to cleared on app uninstall; false otherwise
- */
- public boolean purgeOnUninstall() {
- return purgeOnUninstall;
- }
-
- @Override
- public CompletableFuture<Integer> size() {
- final MeteringAgent.Context timer = monitor.startTimer(SIZE);
- return database.mapSize(name)
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Boolean> isEmpty() {
- final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
- return database.mapIsEmpty(name)
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Boolean> containsKey(K key) {
- checkNotNull(key, ERROR_NULL_KEY);
- final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY);
- return database.mapContainsKey(name, sK(key))
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Boolean> containsValue(V value) {
- checkNotNull(value, ERROR_NULL_VALUE);
- final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_VALUE);
- return database.mapContainsValue(name, serializer.encode(value))
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> get(K key) {
- checkNotNull(key, ERROR_NULL_KEY);
- final MeteringAgent.Context timer = monitor.startTimer(GET);
- return database.mapGet(name, sK(key))
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(v -> v != null ? v.map(serializer::decode) : null);
- }
-
- @Override
- public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
- Function<? super K, ? extends V> mappingFunction) {
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(mappingFunction, "Mapping function cannot be null");
- final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF_ABSENT);
- return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key))
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(v -> v.newValue());
- }
-
- @Override
- public CompletableFuture<Versioned<V>> computeIfPresent(K key,
- BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return computeIf(key, Objects::nonNull, remappingFunction);
- }
-
- @Override
- public CompletableFuture<Versioned<V>> compute(K key,
- BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return computeIf(key, v -> true, remappingFunction);
- }
-
- @Override
- public CompletableFuture<Versioned<V>> computeIf(K key,
- Predicate<? super V> condition,
- BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(condition, "predicate function cannot be null");
- checkNotNull(remappingFunction, "Remapping function cannot be null");
- final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF);
- return get(key).thenCompose(r1 -> {
- V existingValue = r1 == null ? null : r1.value();
- // if the condition evaluates to false, return existing value.
- if (!condition.test(existingValue)) {
- return CompletableFuture.completedFuture(r1);
- }
-
- AtomicReference<V> computedValue = new AtomicReference<>();
- // if remappingFunction throws an exception, return the exception.
- try {
- computedValue.set(remappingFunction.apply(key, existingValue));
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- if (computedValue.get() == null && r1 == null) {
- return CompletableFuture.completedFuture(null);
- }
- Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any();
- Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version());
- return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get())
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(v -> {
- if (v.updated()) {
- return v.newValue();
- } else {
- throw new ConsistentMapException.ConcurrentModification();
- }
- });
- });
- }
-
- @Override
- public CompletableFuture<Versioned<V>> put(K key, V value) {
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(value, ERROR_NULL_VALUE);
- final MeteringAgent.Context timer = monitor.startTimer(PUT);
- return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue())
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(value, ERROR_NULL_VALUE);
- final MeteringAgent.Context timer = monitor.startTimer(PUT_AND_GET);
- return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue())
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> remove(K key) {
- checkNotNull(key, ERROR_NULL_KEY);
- final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
- return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue())
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Void> clear() {
- checkIfUnmodifiable();
- final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
- return database.mapClear(name).thenApply(this::unwrapResult)
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Set<K>> keySet() {
- final MeteringAgent.Context timer = monitor.startTimer(KEY_SET);
- return database.mapKeySet(name)
- .thenApply(s -> newMappingKeySet(s))
- .whenComplete((r, e) -> timer.stop(e));
- }
-
- @Override
- public CompletableFuture<Collection<Versioned<V>>> values() {
- final MeteringAgent.Context timer = monitor.startTimer(VALUES);
- return database.mapValues(name)
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(c -> c
- .stream()
- .map(v -> v.<V>map(serializer::decode))
- .collect(Collectors.toList()));
- }
-
- @Override
- public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
- final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET);
- return database.mapEntrySet(name)
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(s -> newMappingEntrySet(s));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(value, ERROR_NULL_VALUE);
- final MeteringAgent.Context timer = monitor.startTimer(PUT_IF_ABSENT);
- return updateAndGet(key, Match.ifNull(), Match.any(), value)
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(v -> v.oldValue());
- }
-
- @Override
- public CompletableFuture<Boolean> remove(K key, V value) {
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(value, ERROR_NULL_VALUE);
- final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
- return updateAndGet(key, Match.ifValue(value), Match.any(), null)
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(v -> v.updated());
- }
-
- @Override
- public CompletableFuture<Boolean> remove(K key, long version) {
- checkNotNull(key, ERROR_NULL_KEY);
- final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
- return updateAndGet(key, Match.any(), Match.ifValue(version), null)
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(v -> v.updated());
- }
-
- @Override
- public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(oldValue, ERROR_NULL_VALUE);
- checkNotNull(newValue, ERROR_NULL_VALUE);
- final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
- return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue)
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(v -> v.updated());
- }
-
- @Override
- public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
- final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
- return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue)
- .whenComplete((r, e) -> timer.stop(e))
- .thenApply(v -> v.updated());
- }
-
- /**
- * Pre-update hook for performing required checks/actions before going forward with an update operation.
- * @param key map key.
- */
- protected void beforeUpdate(K key) {
- checkIfUnmodifiable();
- }
-
- private Set<K> newMappingKeySet(Set<String> s) {
- return new MappingSet<>(s, Collections::unmodifiableSet,
- this::sK, this::dK);
- }
-
- private Set<Entry<K, Versioned<V>>> newMappingEntrySet(Set<Entry<String, Versioned<byte[]>>> s) {
- return new MappingSet<>(s, Collections::unmodifiableSet,
- this::reverseMapRawEntry, this::mapRawEntry);
- }
-
- private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
- return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode));
- }
-
- private Map.Entry<String, Versioned<byte[]>> reverseMapRawEntry(Map.Entry<K, Versioned<V>> e) {
- return Maps.immutableEntry(sK(e.getKey()), e.getValue().map(serializer::encode));
- }
-
- private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key,
- Match<V> oldValueMatch,
- Match<Long> oldVersionMatch,
- V value) {
- beforeUpdate(key);
- return database.mapUpdate(name,
- sK(key),
- oldValueMatch.map(serializer::encode),
- oldVersionMatch,
- value == null ? null : serializer.encode(value))
- .thenApply(this::unwrapResult)
- .thenApply(r -> r.<K, V>map(this::dK, serializer::decode));
- }
-
- private <T> T unwrapResult(Result<T> result) {
- if (result.status() == Result.Status.LOCKED) {
- throw new ConsistentMapException.ConcurrentModification();
- } else if (result.success()) {
- return result.value();
- } else {
- throw new IllegalStateException("Must not be here");
- }
- }
-
- private void checkIfUnmodifiable() {
- if (readOnly) {
- throw new UnsupportedOperationException();
- }
- }
-
- @Override
- public void addListener(MapEventListener<K, V> listener) {
- listeners.add(listener);
- }
-
- @Override
- public void removeListener(MapEventListener<K, V> listener) {
- listeners.remove(listener);
- }
-
- protected void notifyListeners(MapEvent<K, V> event) {
- if (event == null) {
- return;
- }
- listeners.forEach(listener -> {
- try {
- listener.event(event);
- } catch (Exception e) {
- log.warn("Failure notifying listener about {}", event, e);
- }
- });
- }
-
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
deleted file mode 100644
index 2d6a956c..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.service.AsyncAtomicCounter;
-import org.onosproject.store.service.AtomicCounter;
-import org.onosproject.store.service.StorageException;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Default implementation for a distributed AtomicCounter backed by
- * partitioned Raft DB.
- * <p>
- * The initial value will be zero.
- */
-public class DefaultAtomicCounter implements AtomicCounter {
-
- private static final int OPERATION_TIMEOUT_MILLIS = 5000;
-
- private final AsyncAtomicCounter asyncCounter;
-
- public DefaultAtomicCounter(String name,
- Database database,
- boolean meteringEnabled) {
- asyncCounter = new DefaultAsyncAtomicCounter(name, database, meteringEnabled);
- }
-
- @Override
- public long incrementAndGet() {
- return complete(asyncCounter.incrementAndGet());
- }
-
- @Override
- public long getAndIncrement() {
- return complete(asyncCounter.getAndIncrement());
- }
-
- @Override
- public long getAndAdd(long delta) {
- return complete(asyncCounter.getAndAdd(delta));
- }
-
- @Override
- public long addAndGet(long delta) {
- return complete(asyncCounter.getAndAdd(delta));
- }
-
- @Override
- public void set(long value) {
- complete(asyncCounter.set(value));
- }
-
- @Override
- public boolean compareAndSet(long expectedValue, long updateValue) {
- return complete(asyncCounter.compareAndSet(expectedValue, updateValue));
- }
-
- @Override
- public long get() {
- return complete(asyncCounter.get());
- }
-
- private static <T> T complete(CompletableFuture<T> future) {
- try {
- return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new StorageException.Interrupted();
- } catch (TimeoutException e) {
- throw new StorageException.Timeout();
- } catch (ExecutionException e) {
- throw new StorageException(e.getCause());
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
deleted file mode 100644
index dba4443b..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.service.AsyncAtomicCounter;
-import org.onosproject.store.service.AtomicCounter;
-import org.onosproject.store.service.AtomicCounterBuilder;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Default implementation of AtomicCounterBuilder.
- */
-public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
-
- private String name;
- private boolean partitionsEnabled = true;
- private final Database partitionedDatabase;
- private final Database inMemoryDatabase;
- private boolean metering = true;
-
- public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
- this.inMemoryDatabase = inMemoryDatabase;
- this.partitionedDatabase = partitionedDatabase;
- }
-
- @Override
- public AtomicCounterBuilder withName(String name) {
- checkArgument(name != null && !name.isEmpty());
- this.name = name;
- return this;
- }
-
- @Override
- public AtomicCounterBuilder withPartitionsDisabled() {
- partitionsEnabled = false;
- return this;
- }
-
- @Override
- public AtomicCounter build() {
- validateInputs();
- Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
- return new DefaultAtomicCounter(name, database, metering);
- }
-
- @Override
- public AsyncAtomicCounter buildAsyncCounter() {
- validateInputs();
- Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
- return new DefaultAsyncAtomicCounter(name, database, metering);
- }
-
- @Override
- public AtomicCounterBuilder withMeteringDisabled() {
- metering = false;
- return this;
- }
-
- private void validateInputs() {
- checkState(name != null, "name must be specified");
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
deleted file mode 100644
index e8c93f31..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.service.AtomicValue;
-import org.onosproject.store.service.AtomicValueEvent;
-import org.onosproject.store.service.AtomicValueEventListener;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Versioned;
-
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-/**
- * Default implementation of AtomicValue.
- *
- * @param <V> value type
- */
-public class DefaultAtomicValue<V> implements AtomicValue<V> {
-
- private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
- private final ConsistentMap<String, byte[]> valueMap;
- private final String name;
- private final Serializer serializer;
- private final MapEventListener<String, byte[]> mapEventListener = new InternalMapEventListener();
- private final MeteringAgent monitor;
-
- private static final String COMPONENT_NAME = "atomicValue";
- private static final String GET = "get";
- private static final String GET_AND_SET = "getAndSet";
- private static final String COMPARE_AND_SET = "compareAndSet";
-
- public DefaultAtomicValue(ConsistentMap<String, byte[]> valueMap,
- String name,
- boolean meteringEnabled,
- Serializer serializer) {
- this.valueMap = valueMap;
- this.name = name;
- this.serializer = serializer;
- this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
- }
-
- @Override
- public boolean compareAndSet(V expect, V update) {
- final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
- try {
- if (expect == null) {
- if (update == null) {
- return true;
- }
- return valueMap.putIfAbsent(name, serializer.encode(update)) == null;
- } else {
- if (update == null) {
- return valueMap.remove(name, serializer.encode(expect));
- }
- return valueMap.replace(name, serializer.encode(expect), serializer.encode(update));
- }
- } finally {
- newTimer.stop(null);
- }
- }
-
- @Override
- public V get() {
- final MeteringAgent.Context newTimer = monitor.startTimer(GET);
- try {
- Versioned<byte[]> rawValue = valueMap.get(name);
- return rawValue == null ? null : serializer.decode(rawValue.value());
- } finally {
- newTimer.stop(null);
- }
- }
-
- @Override
- public V getAndSet(V value) {
- final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
- try {
- Versioned<byte[]> previousValue = value == null ?
- valueMap.remove(name) : valueMap.put(name, serializer.encode(value));
- return previousValue == null ? null : serializer.decode(previousValue.value());
- } finally {
- newTimer.stop(null);
- }
- }
-
- @Override
- public void set(V value) {
- getAndSet(value);
- }
-
- @Override
- public void addListener(AtomicValueEventListener<V> listener) {
- synchronized (listeners) {
- if (listeners.add(listener)) {
- if (listeners.size() == 1) {
- valueMap.addListener(mapEventListener);
- }
- }
- }
- }
-
- @Override
- public void removeListener(AtomicValueEventListener<V> listener) {
- synchronized (listeners) {
- if (listeners.remove(listener)) {
- if (listeners.size() == 0) {
- valueMap.removeListener(mapEventListener);
- }
- }
- }
- }
-
- private class InternalMapEventListener implements MapEventListener<String, byte[]> {
-
- @Override
- public void event(MapEvent<String, byte[]> mapEvent) {
- V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : serializer.decode(mapEvent.value().value());
- AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
- listeners.forEach(l -> l.event(atomicValueEvent));
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java
deleted file mode 100644
index b39004b3..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AtomicValue;
-import org.onosproject.store.service.AtomicValueBuilder;
-import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.Serializer;
-
-/**
- * Default implementation of AtomicValueBuilder.
- *
- * @param <V> value type
- */
-public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
-
- private Serializer serializer;
- private String name;
- private ConsistentMapBuilder<String, byte[]> mapBuilder;
- private boolean metering = true;
-
- public DefaultAtomicValueBuilder(DatabaseManager manager) {
- mapBuilder = manager.<String, byte[]>consistentMapBuilder()
- .withName("onos-atomic-values")
- .withMeteringDisabled()
- .withSerializer(Serializer.using(KryoNamespaces.BASIC));
- }
-
- @Override
- public AtomicValueBuilder<V> withName(String name) {
- this.name = name;
- return this;
- }
-
- @Override
- public AtomicValueBuilder<V> withSerializer(Serializer serializer) {
- this.serializer = serializer;
- return this;
- }
-
- @Override
- public AtomicValueBuilder<V> withPartitionsDisabled() {
- mapBuilder.withPartitionsDisabled();
- return this;
- }
-
- @Override
- public AtomicValueBuilder<V> withMeteringDisabled() {
- metering = false;
- return this;
- }
-
- @Override
- public AtomicValue<V> build() {
- return new DefaultAtomicValue<>(mapBuilder.build(), name, metering, serializer);
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
deleted file mode 100644
index 6f7b5487..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.Set;
-
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.Versioned;
-
-/**
- * ConsistentMap implementation that is backed by a Raft consensus
- * based database.
- *
- * @param <K> type of key.
- * @param <V> type of value.
- */
-public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
-
- private static final int OPERATION_TIMEOUT_MILLIS = 5000;
-
- private final DefaultAsyncConsistentMap<K, V> asyncMap;
- private Map<K, V> javaMap;
-
- public String name() {
- return asyncMap.name();
- }
-
- public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
- this.asyncMap = asyncMap;
- }
-
- @Override
- public int size() {
- return complete(asyncMap.size());
- }
-
- @Override
- public boolean isEmpty() {
- return complete(asyncMap.isEmpty());
- }
-
- @Override
- public boolean containsKey(K key) {
- return complete(asyncMap.containsKey(key));
- }
-
- @Override
- public boolean containsValue(V value) {
- return complete(asyncMap.containsValue(value));
- }
-
- @Override
- public Versioned<V> get(K key) {
- return complete(asyncMap.get(key));
- }
-
- @Override
- public Versioned<V> computeIfAbsent(K key,
- Function<? super K, ? extends V> mappingFunction) {
- return complete(asyncMap.computeIfAbsent(key, mappingFunction));
- }
-
- @Override
- public Versioned<V> computeIfPresent(K key,
- BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return complete(asyncMap.computeIfPresent(key, remappingFunction));
- }
-
- @Override
- public Versioned<V> compute(K key,
- BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return complete(asyncMap.compute(key, remappingFunction));
- }
-
- @Override
- public Versioned<V> computeIf(K key,
- Predicate<? super V> condition,
- BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return complete(asyncMap.computeIf(key, condition, remappingFunction));
- }
-
- @Override
- public Versioned<V> put(K key, V value) {
- return complete(asyncMap.put(key, value));
- }
-
- @Override
- public Versioned<V> putAndGet(K key, V value) {
- return complete(asyncMap.putAndGet(key, value));
- }
-
- @Override
- public Versioned<V> remove(K key) {
- return complete(asyncMap.remove(key));
- }
-
- @Override
- public void clear() {
- complete(asyncMap.clear());
- }
-
- @Override
- public Set<K> keySet() {
- return complete(asyncMap.keySet());
- }
-
- @Override
- public Collection<Versioned<V>> values() {
- return complete(asyncMap.values());
- }
-
- @Override
- public Set<Entry<K, Versioned<V>>> entrySet() {
- return complete(asyncMap.entrySet());
- }
-
- @Override
- public Versioned<V> putIfAbsent(K key, V value) {
- return complete(asyncMap.putIfAbsent(key, value));
- }
-
- @Override
- public boolean remove(K key, V value) {
- return complete(asyncMap.remove(key, value));
- }
-
- @Override
- public boolean remove(K key, long version) {
- return complete(asyncMap.remove(key, version));
- }
-
- @Override
- public boolean replace(K key, V oldValue, V newValue) {
- return complete(asyncMap.replace(key, oldValue, newValue));
- }
-
- @Override
- public boolean replace(K key, long oldVersion, V newValue) {
- return complete(asyncMap.replace(key, oldVersion, newValue));
- }
-
- private static <T> T complete(CompletableFuture<T> future) {
- try {
- return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ConsistentMapException.Interrupted();
- } catch (TimeoutException e) {
- throw new ConsistentMapException.Timeout();
- } catch (ExecutionException e) {
- if (e.getCause() instanceof ConsistentMapException) {
- throw (ConsistentMapException) e.getCause();
- } else {
- throw new ConsistentMapException(e.getCause());
- }
- }
- }
-
- @Override
- public void addListener(MapEventListener<K, V> listener) {
- asyncMap.addListener(listener);
- }
-
- @Override
- public void removeListener(MapEventListener<K, V> listener) {
- asyncMap.addListener(listener);
- }
-
- @Override
- public Map<K, V> asJavaMap() {
- synchronized (this) {
- if (javaMap == null) {
- javaMap = new ConsistentMapBackedJavaMap<>(this);
- }
- }
- return javaMap;
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
deleted file mode 100644
index 0e11794e..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.core.ApplicationId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.Serializer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Default Consistent Map builder.
- *
- * @param <K> type for map key
- * @param <V> type for map value
- */
-public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K, V> {
-
- private Serializer serializer;
- private String name;
- private ApplicationId applicationId;
- private boolean purgeOnUninstall = false;
- private boolean partitionsEnabled = true;
- private boolean readOnly = false;
- private boolean metering = true;
- private boolean relaxedReadConsistency = false;
- private final DatabaseManager manager;
-
- public DefaultConsistentMapBuilder(DatabaseManager manager) {
- this.manager = manager;
- }
-
- @Override
- public ConsistentMapBuilder<K, V> withName(String name) {
- checkArgument(name != null && !name.isEmpty());
- this.name = name;
- return this;
- }
-
- @Override
- public ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id) {
- checkArgument(id != null);
- this.applicationId = id;
- return this;
- }
-
- @Override
- public ConsistentMapBuilder<K, V> withPurgeOnUninstall() {
- purgeOnUninstall = true;
- return this;
- }
-
- @Override
- public ConsistentMapBuilder<K, V> withMeteringDisabled() {
- metering = false;
- return this;
- }
-
- @Override
- public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
- checkArgument(serializer != null);
- this.serializer = serializer;
- return this;
- }
-
- @Override
- public ConsistentMapBuilder<K, V> withPartitionsDisabled() {
- partitionsEnabled = false;
- return this;
- }
-
- @Override
- public ConsistentMapBuilder<K, V> withUpdatesDisabled() {
- readOnly = true;
- return this;
- }
-
- @Override
- public ConsistentMapBuilder<K, V> withRelaxedReadConsistency() {
- relaxedReadConsistency = true;
- return this;
- }
-
- private void validateInputs() {
- checkState(name != null, "name must be specified");
- checkState(serializer != null, "serializer must be specified");
- if (purgeOnUninstall) {
- checkState(applicationId != null, "ApplicationId must be specified when purgeOnUninstall is enabled");
- }
- }
-
- @Override
- public ConsistentMap<K, V> build() {
- return new DefaultConsistentMap<>(buildAndRegisterMap());
- }
-
- @Override
- public AsyncConsistentMap<K, V> buildAsyncMap() {
- return buildAndRegisterMap();
- }
-
- private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
- validateInputs();
- Database database = partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase;
- if (relaxedReadConsistency) {
- return manager.registerMap(
- new AsyncCachingConsistentMap<>(name,
- applicationId,
- database,
- serializer,
- readOnly,
- purgeOnUninstall,
- metering));
- } else {
- return manager.registerMap(
- new DefaultAsyncConsistentMap<>(name,
- applicationId,
- database,
- serializer,
- readOnly,
- purgeOnUninstall,
- metering));
- }
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
deleted file mode 100644
index 2a50fbd6..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.collect.Sets;
-import net.kuujo.copycat.resource.internal.AbstractResource;
-import net.kuujo.copycat.resource.internal.ResourceManager;
-import net.kuujo.copycat.state.StateMachine;
-import net.kuujo.copycat.state.internal.DefaultStateMachine;
-import net.kuujo.copycat.util.concurrent.Futures;
-import net.kuujo.copycat.util.function.TriConsumer;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/**
- * Default database.
- */
-public class DefaultDatabase extends AbstractResource<Database> implements Database {
- private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
- private DatabaseProxy<String, byte[]> proxy;
- private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
- private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- public DefaultDatabase(ResourceManager context) {
- super(context);
- this.stateMachine = new DefaultStateMachine(context,
- DatabaseState.class,
- DefaultDatabaseState.class,
- DefaultDatabase.class.getClassLoader());
- this.stateMachine.addStartupTask(() -> {
- stateMachine.registerWatcher(watcher);
- return CompletableFuture.completedFuture(null);
- });
- this.stateMachine.addShutdownTask(() -> {
- stateMachine.unregisterWatcher(watcher);
- return CompletableFuture.completedFuture(null);
- });
- }
-
- /**
- * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
- * return the completed future result.
- *
- * @param supplier The supplier to call if the database is open.
- * @param <T> The future result type.
- * @return A completable future that if this database is closed is immediately failed.
- */
- protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
- if (proxy == null) {
- return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
- }
- return supplier.get();
- }
-
- @Override
- public CompletableFuture<Set<String>> maps() {
- return checkOpen(() -> proxy.maps());
- }
-
- @Override
- public CompletableFuture<Map<String, Long>> counters() {
- return checkOpen(() -> proxy.counters());
- }
-
- @Override
- public CompletableFuture<Integer> mapSize(String mapName) {
- return checkOpen(() -> proxy.mapSize(mapName));
- }
-
- @Override
- public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
- return checkOpen(() -> proxy.mapIsEmpty(mapName));
- }
-
- @Override
- public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
- return checkOpen(() -> proxy.mapContainsKey(mapName, key));
- }
-
- @Override
- public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
- return checkOpen(() -> proxy.mapContainsValue(mapName, value));
- }
-
- @Override
- public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
- return checkOpen(() -> proxy.mapGet(mapName, key));
- }
-
- @Override
- public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
- String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) {
- return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value));
- }
-
- @Override
- public CompletableFuture<Result<Void>> mapClear(String mapName) {
- return checkOpen(() -> proxy.mapClear(mapName));
- }
-
- @Override
- public CompletableFuture<Set<String>> mapKeySet(String mapName) {
- return checkOpen(() -> proxy.mapKeySet(mapName));
- }
-
- @Override
- public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
- return checkOpen(() -> proxy.mapValues(mapName));
- }
-
- @Override
- public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
- return checkOpen(() -> proxy.mapEntrySet(mapName));
- }
-
- @Override
- public CompletableFuture<Long> counterGet(String counterName) {
- return checkOpen(() -> proxy.counterGet(counterName));
- }
-
- @Override
- public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
- return checkOpen(() -> proxy.counterAddAndGet(counterName, delta));
- }
-
- @Override
- public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
- return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta));
- }
-
- @Override
- public CompletableFuture<Void> counterSet(String counterName, long value) {
- return checkOpen(() -> proxy.counterSet(counterName, value));
- }
-
- @Override
- public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
- return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
- }
-
- @Override
- public CompletableFuture<Long> queueSize(String queueName) {
- return checkOpen(() -> proxy.queueSize(queueName));
- }
-
- @Override
- public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
- return checkOpen(() -> proxy.queuePush(queueName, entry));
- }
-
- @Override
- public CompletableFuture<byte[]> queuePop(String queueName) {
- return checkOpen(() -> proxy.queuePop(queueName));
- }
-
- @Override
- public CompletableFuture<byte[]> queuePeek(String queueName) {
- return checkOpen(() -> proxy.queuePeek(queueName));
- }
-
- @Override
- public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
- return checkOpen(() -> proxy.prepareAndCommit(transaction));
- }
-
- @Override
- public CompletableFuture<Boolean> prepare(Transaction transaction) {
- return checkOpen(() -> proxy.prepare(transaction));
- }
-
- @Override
- public CompletableFuture<CommitResponse> commit(Transaction transaction) {
- return checkOpen(() -> proxy.commit(transaction));
- }
-
- @Override
- public CompletableFuture<Boolean> rollback(Transaction transaction) {
- return checkOpen(() -> proxy.rollback(transaction));
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public synchronized CompletableFuture<Database> open() {
- return runStartupTasks()
- .thenCompose(v -> stateMachine.open())
- .thenRun(() -> {
- this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
- })
- .thenApply(v -> null);
- }
-
- @Override
- public synchronized CompletableFuture<Void> close() {
- proxy = null;
- return stateMachine.close()
- .thenCompose(v -> runShutdownTasks());
- }
-
- @Override
- public int hashCode() {
- return name().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof Database) {
- return name().equals(((Database) other).name());
- }
- return false;
- }
-
- @Override
- public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
- consumers.add(consumer);
- }
-
- @Override
- public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
- consumers.remove(consumer);
- }
-
- private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
- @Override
- public void accept(String name, Object input, Object output) {
- StateMachineUpdate update = new StateMachineUpdate(name, input, output);
- consumers.forEach(consumer -> consumer.accept(update));
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
deleted file mode 100644
index 8943fc87..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import net.kuujo.copycat.state.Initializer;
-import net.kuujo.copycat.state.StateContext;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-/**
- * Default database state.
- */
-public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
- private Long nextVersion;
- private Map<String, AtomicLong> counters;
- private Map<String, Map<String, Versioned<byte[]>>> maps;
- private Map<String, Queue<byte[]>> queues;
-
- /**
- * This locks map has a structure similar to the "tables" map above and
- * holds all the provisional updates made during a transaction's prepare phase.
- * The entry value is represented as the tuple: (transactionId, newValue)
- * If newValue == null that signifies this update is attempting to
- * delete the existing value.
- * This map also serves as a lock on the entries that are being updated.
- * The presence of a entry in this map indicates that element is
- * participating in a transaction and is currently locked for updates.
- */
- private Map<String, Map<String, Update>> locks;
-
- @Initializer
- @Override
- public void init(StateContext<DatabaseState<String, byte[]>> context) {
- counters = context.get("counters");
- if (counters == null) {
- counters = Maps.newConcurrentMap();
- context.put("counters", counters);
- }
- maps = context.get("maps");
- if (maps == null) {
- maps = Maps.newConcurrentMap();
- context.put("maps", maps);
- }
- locks = context.get("locks");
- if (locks == null) {
- locks = Maps.newConcurrentMap();
- context.put("locks", locks);
- }
- queues = context.get("queues");
- if (queues == null) {
- queues = Maps.newConcurrentMap();
- context.put("queues", queues);
- }
- nextVersion = context.get("nextVersion");
- if (nextVersion == null) {
- nextVersion = 0L;
- context.put("nextVersion", nextVersion);
- }
- }
-
- @Override
- public Set<String> maps() {
- return ImmutableSet.copyOf(maps.keySet());
- }
-
- @Override
- public Map<String, Long> counters() {
- Map<String, Long> counterMap = Maps.newHashMap();
- counters.forEach((k, v) -> counterMap.put(k, v.get()));
- return counterMap;
- }
-
- @Override
- public int mapSize(String mapName) {
- return getMap(mapName).size();
- }
-
- @Override
- public boolean mapIsEmpty(String mapName) {
- return getMap(mapName).isEmpty();
- }
-
- @Override
- public boolean mapContainsKey(String mapName, String key) {
- return getMap(mapName).containsKey(key);
- }
-
- @Override
- public boolean mapContainsValue(String mapName, byte[] value) {
- return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
- }
-
- @Override
- public Versioned<byte[]> mapGet(String mapName, String key) {
- return getMap(mapName).get(key);
- }
-
-
- @Override
- public Result<UpdateResult<String, byte[]>> mapUpdate(
- String mapName,
- String key,
- Match<byte[]> valueMatch,
- Match<Long> versionMatch,
- byte[] value) {
- if (isLockedForUpdates(mapName, key)) {
- return Result.locked();
- }
- Versioned<byte[]> currentValue = getMap(mapName).get(key);
- if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
- !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
- return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
- } else {
- if (value == null) {
- if (currentValue == null) {
- return Result.ok(new UpdateResult<>(false, mapName, key, null, null));
- } else {
- getMap(mapName).remove(key);
- return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
- }
- }
- Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
- getMap(mapName).put(key, newValue);
- return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
- }
- }
-
- @Override
- public Result<Void> mapClear(String mapName) {
- if (areTransactionsInProgress(mapName)) {
- return Result.locked();
- }
- getMap(mapName).clear();
- return Result.ok(null);
- }
-
- @Override
- public Set<String> mapKeySet(String mapName) {
- return ImmutableSet.copyOf(getMap(mapName).keySet());
- }
-
- @Override
- public Collection<Versioned<byte[]>> mapValues(String mapName) {
- return ImmutableList.copyOf(getMap(mapName).values());
- }
-
- @Override
- public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
- return ImmutableSet.copyOf(getMap(mapName)
- .entrySet()
- .stream()
- .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue()))
- .collect(Collectors.toSet()));
- }
-
- @Override
- public Long counterAddAndGet(String counterName, long delta) {
- return getCounter(counterName).addAndGet(delta);
- }
-
- @Override
- public Long counterGetAndAdd(String counterName, long delta) {
- return getCounter(counterName).getAndAdd(delta);
- }
-
- @Override
- public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
- return getCounter(counterName).compareAndSet(expectedValue, updateValue);
- }
-
- @Override
- public Long counterGet(String counterName) {
- return getCounter(counterName).get();
- }
-
- @Override
- public Long queueSize(String queueName) {
- return Long.valueOf(getQueue(queueName).size());
- }
-
- @Override
- public byte[] queuePeek(String queueName) {
- return getQueue(queueName).peek();
- }
-
- @Override
- public byte[] queuePop(String queueName) {
- return getQueue(queueName).poll();
- }
-
- @Override
- public void queuePush(String queueName, byte[] entry) {
- getQueue(queueName).offer(entry);
- }
-
- @Override
- public CommitResponse prepareAndCommit(Transaction transaction) {
- if (prepare(transaction)) {
- return commit(transaction);
- }
- return CommitResponse.failure();
- }
-
- @Override
- public boolean prepare(Transaction transaction) {
- if (transaction.updates().stream().anyMatch(update ->
- isLockedByAnotherTransaction(update.mapName(),
- update.key(),
- transaction.id()))) {
- return false;
- }
-
- if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
- transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
- return true;
- }
- return false;
- }
-
- @Override
- public CommitResponse commit(Transaction transaction) {
- return CommitResponse.success(Lists.transform(transaction.updates(),
- update -> commitProvisionalUpdate(update, transaction.id())));
- }
-
- @Override
- public boolean rollback(Transaction transaction) {
- transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
- return true;
- }
-
- private Map<String, Versioned<byte[]>> getMap(String mapName) {
- return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
- }
-
- private Map<String, Update> getLockMap(String mapName) {
- return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
- }
-
- private AtomicLong getCounter(String counterName) {
- return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
- }
-
- private Queue<byte[]> getQueue(String queueName) {
- return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
- }
-
- private boolean isUpdatePossible(DatabaseUpdate update) {
- Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
- switch (update.type()) {
- case PUT:
- case REMOVE:
- return true;
- case PUT_IF_ABSENT:
- return existingEntry == null;
- case PUT_IF_VERSION_MATCH:
- return existingEntry != null && existingEntry.version() == update.currentVersion();
- case PUT_IF_VALUE_MATCH:
- return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
- case REMOVE_IF_VERSION_MATCH:
- return existingEntry == null || existingEntry.version() == update.currentVersion();
- case REMOVE_IF_VALUE_MATCH:
- return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
- default:
- throw new IllegalStateException("Unsupported type: " + update.type());
- }
- }
-
- private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
- Map<String, Update> lockMap = getLockMap(update.mapName());
- switch (update.type()) {
- case PUT:
- case PUT_IF_ABSENT:
- case PUT_IF_VERSION_MATCH:
- case PUT_IF_VALUE_MATCH:
- lockMap.put(update.key(), new Update(transactionId, update.value()));
- break;
- case REMOVE:
- case REMOVE_IF_VERSION_MATCH:
- case REMOVE_IF_VALUE_MATCH:
- lockMap.put(update.key(), new Update(transactionId, null));
- break;
- default:
- throw new IllegalStateException("Unsupported type: " + update.type());
- }
- }
-
- private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
- String mapName = update.mapName();
- String key = update.key();
- Update provisionalUpdate = getLockMap(mapName).get(key);
- if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
- getLockMap(mapName).remove(key);
- } else {
- throw new IllegalStateException("Invalid transaction Id");
- }
- return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
- }
-
- private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
- String mapName = update.mapName();
- String key = update.key();
- Update provisionalUpdate = getLockMap(mapName).get(key);
- if (provisionalUpdate == null) {
- return;
- }
- if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
- getLockMap(mapName).remove(key);
- }
- }
-
- private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
- Update update = getLockMap(mapName).get(key);
- return update != null && !Objects.equal(transactionId, update.transactionId());
- }
-
- private boolean isLockedForUpdates(String mapName, String key) {
- return getLockMap(mapName).containsKey(key);
- }
-
- private boolean areTransactionsInProgress(String mapName) {
- return !getLockMap(mapName).isEmpty();
- }
-
- private class Update {
- private final long transactionId;
- private final byte[] value;
-
- public Update(long txId, byte[] value) {
- this.transactionId = txId;
- this.value = value;
- }
-
- public long transactionId() {
- return this.transactionId;
- }
-
- public byte[] value() {
- return this.value;
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
deleted file mode 100644
index 5f69fde8..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
-import org.onlab.util.SharedExecutors;
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.Serializer;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
-
-/**
- * DistributedQueue implementation that provides FIFO ordering semantics.
- *
- * @param <E> queue entry type
- */
-public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
-
- private final String name;
- private final Database database;
- private final Serializer serializer;
- private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
-
- private static final String PRIMITIVE_NAME = "distributedQueue";
- private static final String SIZE = "size";
- private static final String PUSH = "push";
- private static final String POP = "pop";
- private static final String PEEK = "peek";
-
- private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
- private final MeteringAgent monitor;
-
- public DefaultDistributedQueue(String name,
- Database database,
- Serializer serializer,
- boolean meteringEnabled) {
- this.name = checkNotNull(name, "queue name cannot be null");
- this.database = checkNotNull(database, "database cannot be null");
- this.serializer = checkNotNull(serializer, "serializer cannot be null");
- this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
- this.database.registerConsumer(update -> {
- SharedExecutors.getSingleThreadExecutor().execute(() -> {
- if (update.target() == QUEUE_PUSH) {
- List<Object> input = update.input();
- String queueName = (String) input.get(0);
- if (queueName.equals(name)) {
- tryPoll();
- }
- }
- });
- });
- }
-
- @Override
- public long size() {
- final MeteringAgent.Context timer = monitor.startTimer(SIZE);
- return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop(e)));
- }
-
- @Override
- public void push(E entry) {
- checkNotNull(entry, ERROR_NULL_ENTRY);
- final MeteringAgent.Context timer = monitor.startTimer(PUSH);
- Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
- .whenComplete((r, e) -> timer.stop(e)));
- }
-
- @Override
- public CompletableFuture<E> pop() {
- final MeteringAgent.Context timer = monitor.startTimer(POP);
- return database.queuePop(name)
- .whenComplete((r, e) -> timer.stop(e))
- .thenCompose(v -> {
- if (v != null) {
- return CompletableFuture.<E>completedFuture(serializer.decode(v));
- }
- CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
- pendingFutures.add(newPendingFuture);
- return newPendingFuture;
- });
-
- }
-
- @Override
- public E peek() {
- final MeteringAgent.Context timer = monitor.startTimer(PEEK);
- return Futures.getUnchecked(database.queuePeek(name)
- .thenApply(v -> v != null ? serializer.<E>decode(v) : null)
- .whenComplete((r, e) -> timer.stop(e)));
- }
-
- public String name() {
- return name;
- }
-
- protected void tryPoll() {
- Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
- for (CompletableFuture<E> future : pendingFutures) {
- E entry = Futures.getUnchecked(database.queuePop(name)
- .thenApply(v -> v != null ? serializer.decode(v) : null));
- if (entry != null) {
- future.complete(entry);
- completedFutures.add(future);
- } else {
- break;
- }
- }
- pendingFutures.removeAll(completedFutures);
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
deleted file mode 100644
index d6654e27..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.service.DistributedQueue;
-import org.onosproject.store.service.DistributedQueueBuilder;
-import org.onosproject.store.service.Serializer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Default implementation of a {@code DistributedQueueBuilder}.
- *
- * @param <E> queue entry type
- */
-public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
-
- private Serializer serializer;
- private String name;
- private boolean persistenceEnabled = true;
- private final DatabaseManager databaseManager;
- private boolean metering = true;
-
- public DefaultDistributedQueueBuilder(DatabaseManager databaseManager) {
- this.databaseManager = databaseManager;
- }
-
- @Override
- public DistributedQueueBuilder<E> withName(String name) {
- checkArgument(name != null && !name.isEmpty());
- this.name = name;
- return this;
- }
-
- @Override
- public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
- checkArgument(serializer != null);
- this.serializer = serializer;
- return this;
- }
-
- @Override
- public DistributedQueueBuilder<E> withMeteringDisabled() {
- metering = false;
- return this;
- }
-
- @Override
- public DistributedQueueBuilder<E> withPersistenceDisabled() {
- persistenceEnabled = false;
- return this;
- }
-
- private boolean validInputs() {
- return name != null && serializer != null;
- }
-
- @Override
- public DistributedQueue<E> build() {
- checkState(validInputs());
- return new DefaultDistributedQueue<>(
- name,
- persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
- serializer,
- metering);
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
deleted file mode 100644
index 677724df..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.DistributedSet;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.SetEvent;
-import org.onosproject.store.service.SetEventListener;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Implementation of distributed set that is backed by a ConsistentMap.
-
- * @param <E> set element type
- */
-public class DefaultDistributedSet<E> implements DistributedSet<E> {
-
- private static final String CONTAINS = "contains";
- private static final String PRIMITIVE_NAME = "distributedSet";
- private static final String SIZE = "size";
- private static final String IS_EMPTY = "isEmpty";
- private static final String ITERATOR = "iterator";
- private static final String TO_ARRAY = "toArray";
- private static final String ADD = "add";
- private static final String REMOVE = "remove";
- private static final String CONTAINS_ALL = "containsAll";
- private static final String ADD_ALL = "addAll";
- private static final String RETAIN_ALL = "retainAll";
- private static final String REMOVE_ALL = "removeAll";
- private static final String CLEAR = "clear";
-
- private final String name;
- private final ConsistentMap<E, Boolean> backingMap;
- private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
- private final MeteringAgent monitor;
-
- public DefaultDistributedSet(String name, boolean meteringEnabled, ConsistentMap<E, Boolean> backingMap) {
- this.name = name;
- this.backingMap = backingMap;
- monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
- }
-
- @Override
- public int size() {
- final MeteringAgent.Context timer = monitor.startTimer(SIZE);
- try {
- return backingMap.size();
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public boolean isEmpty() {
- final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
- try {
- return backingMap.isEmpty();
- } finally {
- timer.stop(null);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean contains(Object o) {
- final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
- try {
- return backingMap.containsKey((E) o);
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public Iterator<E> iterator() {
- final MeteringAgent.Context timer = monitor.startTimer(ITERATOR);
- //Do we have to measure this guy?
- try {
- return backingMap.keySet().iterator();
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public Object[] toArray() {
- final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
- try {
- return backingMap.keySet().stream().toArray();
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public <T> T[] toArray(T[] a) {
- final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
- try {
- return backingMap.keySet().stream().toArray(size -> a);
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public boolean add(E e) {
- final MeteringAgent.Context timer = monitor.startTimer(ADD);
- try {
- return backingMap.putIfAbsent(e, true) == null;
- } finally {
- timer.stop(null);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean remove(Object o) {
- final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
- try {
- return backingMap.remove((E) o) != null;
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public boolean containsAll(Collection<?> c) {
- final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
- try {
- return c.stream()
- .allMatch(this::contains);
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public boolean addAll(Collection<? extends E> c) {
- final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
- try {
- return c.stream()
- .map(this::add)
- .reduce(Boolean::logicalOr)
- .orElse(false);
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public boolean retainAll(Collection<?> c) {
- final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
- try {
- Set<?> retainSet = Sets.newHashSet(c);
- return backingMap.keySet()
- .stream()
- .filter(k -> !retainSet.contains(k))
- .map(this::remove)
- .reduce(Boolean::logicalOr)
- .orElse(false);
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public boolean removeAll(Collection<?> c) {
- final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
- try {
- Set<?> removeSet = Sets.newHashSet(c);
- return backingMap.keySet()
- .stream()
- .filter(removeSet::contains)
- .map(this::remove)
- .reduce(Boolean::logicalOr)
- .orElse(false);
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public void clear() {
- final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
- try {
- backingMap.clear();
- } finally {
- timer.stop(null);
- }
- }
-
- @Override
- public void addListener(SetEventListener<E> listener) {
- MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
- if (mapEvent.type() == MapEvent.Type.INSERT) {
- listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
- } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
- listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
- }
- };
- if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
- backingMap.addListener(mapEventListener);
- }
- }
-
- @Override
- public void removeListener(SetEventListener<E> listener) {
- MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
- if (mapEventListener != null) {
- backingMap.removeListener(mapEventListener);
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
deleted file mode 100644
index f7957f39..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.core.ApplicationId;
-import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.DistributedSet;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.DistributedSetBuilder;
-
-/**
- * Default distributed set builder.
- *
- * @param <E> type for set elements
- */
-public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E> {
-
- private String name;
- private ConsistentMapBuilder<E, Boolean> mapBuilder;
- private boolean metering = true;
-
- public DefaultDistributedSetBuilder(DatabaseManager manager) {
- this.mapBuilder = manager.consistentMapBuilder();
- mapBuilder.withMeteringDisabled();
- }
-
- @Override
- public DistributedSetBuilder<E> withName(String name) {
- mapBuilder.withName(name);
- this.name = name;
- return this;
- }
-
- @Override
- public DistributedSetBuilder<E> withApplicationId(ApplicationId id) {
- mapBuilder.withApplicationId(id);
- return this;
- }
-
- @Override
- public DistributedSetBuilder<E> withPurgeOnUninstall() {
- mapBuilder.withPurgeOnUninstall();
- return this;
- }
-
- @Override
- public DistributedSetBuilder<E> withSerializer(Serializer serializer) {
- mapBuilder.withSerializer(serializer);
- return this;
- }
-
- @Override
- public DistributedSetBuilder<E> withUpdatesDisabled() {
- mapBuilder.withUpdatesDisabled();
- return this;
- }
-
- @Override
- public DistributedSetBuilder<E> withRelaxedReadConsistency() {
- mapBuilder.withRelaxedReadConsistency();
- return this;
- }
-
- @Override
- public DistributedSetBuilder<E> withPartitionsDisabled() {
- mapBuilder.withPartitionsDisabled();
- return this;
- }
-
- @Override
- public DistributedSetBuilder<E> withMeteringDisabled() {
- metering = false;
- return this;
- }
-
- @Override
- public DistributedSet<E> build() {
- return new DefaultDistributedSet<E>(name, metering, mapBuilder.build());
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java
deleted file mode 100644
index 2ff7a2dc..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import java.util.List;
-
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * A Default transaction implementation.
- */
-public class DefaultTransaction implements Transaction {
-
- private final long transactionId;
- private final List<DatabaseUpdate> updates;
- private final State state;
- private final long lastUpdated;
-
- public DefaultTransaction(long transactionId, List<DatabaseUpdate> updates) {
- this(transactionId, updates, State.PREPARING, System.currentTimeMillis());
- }
-
- private DefaultTransaction(long transactionId, List<DatabaseUpdate> updates, State state, long lastUpdated) {
- this.transactionId = transactionId;
- this.updates = ImmutableList.copyOf(updates);
- this.state = state;
- this.lastUpdated = lastUpdated;
- }
-
- @Override
- public long id() {
- return transactionId;
- }
-
- @Override
- public List<DatabaseUpdate> updates() {
- return updates;
- }
-
- @Override
- public State state() {
- return state;
- }
-
- @Override
- public Transaction transition(State newState) {
- return new DefaultTransaction(transactionId, updates, newState, System.currentTimeMillis());
- }
-
- @Override
- public long lastUpdated() {
- return lastUpdated;
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
deleted file mode 100644
index 73888221..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.util.List;
-import java.util.Map;
-import java.util.function.Supplier;
-
-import static com.google.common.base.Preconditions.*;
-
-import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.TransactionContext;
-import org.onosproject.store.service.TransactionalMap;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Futures;
-
-/**
- * Default TransactionContext implementation.
- */
-public class DefaultTransactionContext implements TransactionContext {
- private static final String TX_NOT_OPEN_ERROR = "Transaction Context is not open";
-
- @SuppressWarnings("rawtypes")
- private final Map<String, DefaultTransactionalMap> txMaps = Maps.newConcurrentMap();
- private boolean isOpen = false;
- private final Database database;
- private final long transactionId;
- private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
-
- public DefaultTransactionContext(long transactionId,
- Database database,
- Supplier<ConsistentMapBuilder> mapBuilderSupplier) {
- this.transactionId = transactionId;
- this.database = checkNotNull(database);
- this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier);
- }
-
- @Override
- public long transactionId() {
- return transactionId;
- }
-
- @Override
- public void begin() {
- checkState(!isOpen, "Transaction Context is already open");
- isOpen = true;
- }
-
- @Override
- public boolean isOpen() {
- return isOpen;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
- Serializer serializer) {
- checkState(isOpen, TX_NOT_OPEN_ERROR);
- checkNotNull(mapName);
- checkNotNull(serializer);
- return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
- name,
- mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(),
- this,
- serializer));
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean commit() {
- // TODO: rework commit implementation to be more intuitive
- checkState(isOpen, TX_NOT_OPEN_ERROR);
- CommitResponse response = null;
- try {
- List<DatabaseUpdate> updates = Lists.newLinkedList();
- txMaps.values().forEach(m -> updates.addAll(m.prepareDatabaseUpdates()));
- Transaction transaction = new DefaultTransaction(transactionId, updates);
- response = Futures.getUnchecked(database.prepareAndCommit(transaction));
- return response.success();
- } catch (Exception e) {
- abort();
- return false;
- } finally {
- isOpen = false;
- }
- }
-
- @Override
- public void abort() {
- if (isOpen) {
- try {
- txMaps.values().forEach(m -> m.rollback());
- } finally {
- isOpen = false;
- }
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
deleted file mode 100644
index f20bfb80..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.service.TransactionContext;
-import org.onosproject.store.service.TransactionContextBuilder;
-
-/**
- * The default implementation of a transaction context builder. This builder
- * generates a {@link DefaultTransactionContext}.
- */
-public class DefaultTransactionContextBuilder implements TransactionContextBuilder {
-
- private boolean partitionsEnabled = true;
- private final DatabaseManager manager;
- private final long transactionId;
-
- public DefaultTransactionContextBuilder(DatabaseManager manager, long transactionId) {
- this.manager = manager;
- this.transactionId = transactionId;
- }
-
- @Override
- public TransactionContextBuilder withPartitionsDisabled() {
- partitionsEnabled = false;
- return this;
- }
-
- @Override
- public TransactionContext build() {
- return new DefaultTransactionContext(
- transactionId,
- partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
- () -> partitionsEnabled ? manager.consistentMapBuilder()
- : manager.consistentMapBuilder().withPartitionsDisabled());
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
deleted file mode 100644
index ade70335..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.onlab.util.HexString;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TransactionContext;
-import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.Versioned;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Default Transactional Map implementation that provides a repeatable reads
- * transaction isolation level.
- *
- * @param <K> key type
- * @param <V> value type.
- */
-public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
-
- private final TransactionContext txContext;
- private static final String TX_CLOSED_ERROR = "Transaction is closed";
- private final ConsistentMap<K, V> backingMap;
- private final String name;
- private final Serializer serializer;
- private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
- private final Map<K, V> writeCache = Maps.newConcurrentMap();
- private final Set<K> deleteSet = Sets.newConcurrentHashSet();
-
- private static final String ERROR_NULL_VALUE = "Null values are not allowed";
- private static final String ERROR_NULL_KEY = "Null key is not allowed";
-
- private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
- .softValues()
- .build(new CacheLoader<K, String>() {
-
- @Override
- public String load(K key) {
- return HexString.toHexString(serializer.encode(key));
- }
- });
-
- protected K dK(String key) {
- return serializer.decode(HexString.fromHexString(key));
- }
-
- public DefaultTransactionalMap(
- String name,
- ConsistentMap<K, V> backingMap,
- TransactionContext txContext,
- Serializer serializer) {
- this.name = name;
- this.backingMap = backingMap;
- this.txContext = txContext;
- this.serializer = serializer;
- }
-
- @Override
- public V get(K key) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(key, ERROR_NULL_KEY);
- if (deleteSet.contains(key)) {
- return null;
- }
- V latest = writeCache.get(key);
- if (latest != null) {
- return latest;
- } else {
- Versioned<V> v = readCache.computeIfAbsent(key, k -> backingMap.get(k));
- return v != null ? v.value() : null;
- }
- }
-
- @Override
- public V put(K key, V value) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(value, ERROR_NULL_VALUE);
-
- V latest = get(key);
- writeCache.put(key, value);
- deleteSet.remove(key);
- return latest;
- }
-
- @Override
- public V remove(K key) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- V latest = get(key);
- if (latest != null) {
- writeCache.remove(key);
- deleteSet.add(key);
- }
- return latest;
- }
-
- @Override
- public boolean remove(K key, V value) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(value, ERROR_NULL_VALUE);
- V latest = get(key);
- if (Objects.equal(value, latest)) {
- remove(key);
- return true;
- }
- return false;
- }
-
- @Override
- public boolean replace(K key, V oldValue, V newValue) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(oldValue, ERROR_NULL_VALUE);
- checkNotNull(newValue, ERROR_NULL_VALUE);
- V latest = get(key);
- if (Objects.equal(oldValue, latest)) {
- put(key, newValue);
- return true;
- }
- return false;
- }
-
- @Override
- public V putIfAbsent(K key, V value) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(value, ERROR_NULL_VALUE);
- V latest = get(key);
- if (latest == null) {
- put(key, value);
- }
- return latest;
- }
-
- protected List<DatabaseUpdate> prepareDatabaseUpdates() {
- List<DatabaseUpdate> updates = Lists.newLinkedList();
- deleteSet.forEach(key -> {
- Versioned<V> original = readCache.get(key);
- if (original != null) {
- updates.add(DatabaseUpdate.newBuilder()
- .withMapName(name)
- .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
- .withKey(keyCache.getUnchecked(key))
- .withCurrentVersion(original.version())
- .build());
- }
- });
- writeCache.forEach((key, value) -> {
- Versioned<V> original = readCache.get(key);
- if (original == null) {
- updates.add(DatabaseUpdate.newBuilder()
- .withMapName(name)
- .withType(DatabaseUpdate.Type.PUT_IF_ABSENT)
- .withKey(keyCache.getUnchecked(key))
- .withValue(serializer.encode(value))
- .build());
- } else {
- updates.add(DatabaseUpdate.newBuilder()
- .withMapName(name)
- .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
- .withKey(keyCache.getUnchecked(key))
- .withCurrentVersion(original.version())
- .withValue(serializer.encode(value))
- .build());
- }
- });
- return updates;
- }
-
- /**
- * Discards all changes made to this transactional map.
- */
- protected void rollback() {
- readCache.clear();
- writeCache.clear();
- deleteSet.clear();
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
deleted file mode 100644
index 1882b1b5..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEvent.Type;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
-import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
-
-/**
- * Distributed Lock Manager implemented on top of ConsistentMap.
- * <p>
- * This implementation makes use of ClusterService's failure
- * detection capabilities to detect and purge stale locks.
- * TODO: Ensure lock safety and liveness.
- */
-@Component(immediate = true, enabled = true)
-@Service
-public class DistributedLeadershipManager implements LeadershipService {
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected EventDeliveryService eventDispatcher;
-
- private final Logger log = getLogger(getClass());
- private ScheduledExecutorService electionRunner;
- private ScheduledExecutorService lockExecutor;
- private ScheduledExecutorService staleLeadershipPurgeExecutor;
- private ScheduledExecutorService leadershipRefresher;
-
- private ConsistentMap<String, NodeId> leaderMap;
- private ConsistentMap<String, List<NodeId>> candidateMap;
-
- private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
- private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
- private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
- private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
-
- private NodeId localNodeId;
- private Set<String> activeTopics = Sets.newConcurrentHashSet();
- private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
-
- // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
- private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
- private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
- private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
- private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
-
- private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
-
- private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
-
- @Activate
- public void activate() {
- leaderMap = storageService.<String, NodeId>consistentMapBuilder()
- .withName("onos-topic-leaders")
- .withSerializer(SERIALIZER)
- .withPartitionsDisabled().build();
- candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
- .withName("onos-topic-candidates")
- .withSerializer(SERIALIZER)
- .withPartitionsDisabled().build();
-
- leaderMap.addListener(event -> {
- log.debug("Received {}", event);
- LeadershipEvent.Type leadershipEventType = null;
- if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
- leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
- } else if (event.type() == MapEvent.Type.REMOVE) {
- leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
- }
- onLeadershipEvent(new LeadershipEvent(
- leadershipEventType,
- new Leadership(event.key(),
- event.value().value(),
- event.value().version(),
- event.value().creationTime())));
- });
-
- candidateMap.addListener(event -> {
- log.debug("Received {}", event);
- if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
- log.error("Entries must not be removed from candidate map");
- return;
- }
- onLeadershipEvent(new LeadershipEvent(
- LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(event.key(),
- event.value().value(),
- event.value().version(),
- event.value().creationTime())));
- });
-
- localNodeId = clusterService.getLocalNode().id();
-
- electionRunner = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "election-runner"));
- lockExecutor = Executors.newScheduledThreadPool(
- 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
- staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
- leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "refresh-thread"));
-
- clusterService.addListener(clusterEventListener);
-
- electionRunner.scheduleWithFixedDelay(
- this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
-
- leadershipRefresher.scheduleWithFixedDelay(
- this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
-
- listenerRegistry = new ListenerRegistry<>();
- eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- if (clusterService.getNodes().size() > 1) {
- // FIXME: Determine why this takes ~50 seconds to shutdown on a single node!
- leaderBoard.forEach((topic, leadership) -> {
- if (localNodeId.equals(leadership.leader())) {
- withdraw(topic);
- }
- });
- }
-
- clusterService.removeListener(clusterEventListener);
- eventDispatcher.removeSink(LeadershipEvent.class);
-
- electionRunner.shutdown();
- lockExecutor.shutdown();
- staleLeadershipPurgeExecutor.shutdown();
- leadershipRefresher.shutdown();
-
- log.info("Stopped");
- }
-
- @Override
- public Map<String, Leadership> getLeaderBoard() {
- return ImmutableMap.copyOf(leaderBoard);
- }
-
- @Override
- public Map<String, List<NodeId>> getCandidates() {
- return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
- }
-
- @Override
- public List<NodeId> getCandidates(String path) {
- Leadership current = candidateBoard.get(path);
- return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
- }
-
- @Override
- public NodeId getLeader(String path) {
- Leadership leadership = leaderBoard.get(path);
- return leadership != null ? leadership.leader() : null;
- }
-
- @Override
- public Leadership getLeadership(String path) {
- checkArgument(path != null);
- return leaderBoard.get(path);
- }
-
- @Override
- public Set<String> ownedTopics(NodeId nodeId) {
- checkArgument(nodeId != null);
- return leaderBoard.entrySet()
- .stream()
- .filter(entry -> nodeId.equals(entry.getValue().leader()))
- .map(Entry::getKey)
- .collect(Collectors.toSet());
- }
-
- @Override
- public CompletableFuture<Leadership> runForLeadership(String path) {
- log.debug("Running for leadership for topic: {}", path);
- CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
- doRunForLeadership(path, resultFuture);
- return resultFuture;
- }
-
- private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
- try {
- Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
- currentList -> currentList == null || !currentList.contains(localNodeId),
- (topic, currentList) -> {
- if (currentList == null) {
- return ImmutableList.of(localNodeId);
- } else {
- List<NodeId> newList = Lists.newLinkedList();
- newList.addAll(currentList);
- newList.add(localNodeId);
- return newList;
- }
- });
- log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
- activeTopics.add(path);
- Leadership leadership = electLeader(path, candidates.value());
- if (leadership == null) {
- pendingFutures.put(path, future);
- } else {
- future.complete(leadership);
- }
- } catch (ConsistentMapException e) {
- log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
- rerunForLeadership(path, future);
- }
- }
-
- @Override
- public CompletableFuture<Void> withdraw(String path) {
- activeTopics.remove(path);
- CompletableFuture<Void> resultFuture = new CompletableFuture<>();
- doWithdraw(path, resultFuture);
- return resultFuture;
- }
-
-
- private void doWithdraw(String path, CompletableFuture<Void> future) {
- if (activeTopics.contains(path)) {
- future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
- }
- try {
- leaderMap.computeIf(path,
- localNodeId::equals,
- (topic, leader) -> null);
- candidateMap.computeIf(path,
- candidates -> candidates != null && candidates.contains(localNodeId),
- (topic, candidates) -> candidates.stream()
- .filter(nodeId -> !localNodeId.equals(nodeId))
- .collect(Collectors.toList()));
- future.complete(null);
- } catch (Exception e) {
- log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
- retryWithdraw(path, future);
- }
- }
-
- @Override
- public boolean stepdown(String path) {
- if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
- return false;
- }
-
- try {
- return leaderMap.computeIf(path,
- localNodeId::equals,
- (topic, leader) -> null) == null;
- } catch (Exception e) {
- log.warn("Error executing stepdown for {}", path, e);
- }
- return false;
- }
-
- @Override
- public void addListener(LeadershipEventListener listener) {
- listenerRegistry.addListener(listener);
- }
-
- @Override
- public void removeListener(LeadershipEventListener listener) {
- listenerRegistry.removeListener(listener);
- }
-
- @Override
- public boolean makeTopCandidate(String path, NodeId nodeId) {
- Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
- candidates -> candidates != null &&
- candidates.contains(nodeId) &&
- !nodeId.equals(Iterables.getFirst(candidates, null)),
- (topic, candidates) -> {
- List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
- updatedCandidates.add(nodeId);
- candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
- return updatedCandidates;
- });
- List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
- return candidates.size() > 0 && nodeId.equals(candidates.get(0));
- }
-
- private Leadership electLeader(String path, List<NodeId> candidates) {
- Leadership currentLeadership = getLeadership(path);
- if (currentLeadership != null) {
- return currentLeadership;
- } else {
- NodeId topCandidate = candidates
- .stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .findFirst()
- .orElse(null);
- try {
- Versioned<NodeId> leader = localNodeId.equals(topCandidate)
- ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
- if (leader != null) {
- Leadership newLeadership = new Leadership(path,
- leader.value(),
- leader.version(),
- leader.creationTime());
- // Since reads only go through the local copy of leader board, we ought to update it
- // first before returning from this method.
- // This is to ensure a subsequent read will not read a stale value.
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership));
- return newLeadership;
- }
- } catch (Exception e) {
- log.debug("Failed to elect leader for {}", path, e);
- }
- }
- return null;
- }
-
- private void electLeaders() {
- try {
- candidateMap.entrySet().forEach(entry -> {
- String path = entry.getKey();
- Versioned<List<NodeId>> candidates = entry.getValue();
- // for active topics, check if this node can become a leader (if it isn't already)
- if (activeTopics.contains(path)) {
- lockExecutor.submit(() -> {
- Leadership leadership = electLeader(path, candidates.value());
- if (leadership != null) {
- CompletableFuture<Leadership> future = pendingFutures.remove(path);
- if (future != null) {
- future.complete(leadership);
- }
- }
- });
- }
- // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
- // and also to update local listeners.
- // Don't worry about duplicate events as they will be suppressed.
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(path,
- candidates.value(),
- candidates.version(),
- candidates.creationTime())));
- });
- } catch (Exception e) {
- log.debug("Failure electing leaders", e);
- }
- }
-
- private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
- log.trace("Leadership Event: time = {} type = {} event = {}",
- leadershipEvent.time(), leadershipEvent.type(),
- leadershipEvent);
-
- Leadership leadershipUpdate = leadershipEvent.subject();
- LeadershipEvent.Type eventType = leadershipEvent.type();
- String topic = leadershipUpdate.topic();
-
- AtomicBoolean updateAccepted = new AtomicBoolean(false);
- if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
- leaderBoard.compute(topic, (k, currentLeadership) -> {
- if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
- updateAccepted.set(true);
- return leadershipUpdate;
- }
- return currentLeadership;
- });
- } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
- leaderBoard.compute(topic, (k, currentLeadership) -> {
- if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
- updateAccepted.set(true);
- // FIXME: Removing entries from leaderboard is not safe and should be visited.
- return null;
- }
- return currentLeadership;
- });
- } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
- candidateBoard.compute(topic, (k, currentInfo) -> {
- if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
- updateAccepted.set(true);
- return leadershipUpdate;
- }
- return currentInfo;
- });
- } else {
- throw new IllegalStateException("Unknown event type.");
- }
-
- if (updateAccepted.get()) {
- eventDispatcher.post(leadershipEvent);
- }
- }
-
- private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
- lockExecutor.schedule(
- () -> doRunForLeadership(path, future),
- RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
- TimeUnit.MILLISECONDS);
- }
-
- private void retryWithdraw(String path, CompletableFuture<Void> future) {
- lockExecutor.schedule(
- () -> doWithdraw(path, future),
- RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
- TimeUnit.MILLISECONDS);
- }
-
- private void scheduleStaleLeadershipPurge(int afterDelaySec) {
- if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
- staleLeadershipPurgeExecutor.schedule(
- this::purgeStaleLeadership,
- afterDelaySec,
- TimeUnit.SECONDS);
- }
- }
-
- /**
- * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
- */
- private void purgeStaleLeadership() {
- AtomicBoolean rerunPurge = new AtomicBoolean(false);
- try {
- staleLeadershipPurgeScheduled.set(false);
- leaderMap.entrySet()
- .stream()
- .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
- .forEach(entry -> {
- String path = entry.getKey();
- NodeId nodeId = entry.getValue().value();
- try {
- leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
- } catch (Exception e) {
- log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
- rerunPurge.set(true);
- }
- });
-
- candidateMap.entrySet()
- .forEach(entry -> {
- String path = entry.getKey();
- Versioned<List<NodeId>> candidates = entry.getValue();
- List<NodeId> candidatesList = candidates != null
- ? candidates.value() : Collections.emptyList();
- List<NodeId> activeCandidatesList =
- candidatesList.stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
- .collect(Collectors.toList());
- if (activeCandidatesList.size() < candidatesList.size()) {
- Set<NodeId> removedCandidates =
- Sets.difference(Sets.newHashSet(candidatesList),
- Sets.newHashSet(activeCandidatesList));
- try {
- candidateMap.computeIf(path,
- c -> c.stream()
- .filter(n -> clusterService.getState(n) == INACTIVE)
- .count() > 0,
- (topic, c) -> c.stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .filter(n -> !localNodeId.equals(n) ||
- activeTopics.contains(path))
- .collect(Collectors.toList()));
- } catch (Exception e) {
- log.debug("Failed to evict inactive candidates {} from "
- + "candidate list for {}", removedCandidates, path, e);
- rerunPurge.set(true);
- }
- }
- });
- } catch (Exception e) {
- log.debug("Failure purging state leadership.", e);
- rerunPurge.set(true);
- }
-
- if (rerunPurge.get()) {
- log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
- scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
- }
- }
-
- private void refreshLeaderBoard() {
- try {
- Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
- leaderMap.entrySet().forEach(entry -> {
- String path = entry.getKey();
- Versioned<NodeId> leader = entry.getValue();
- Leadership leadership = new Leadership(path,
- leader.value(),
- leader.version(),
- leader.creationTime());
- newLeaderBoard.put(path, leadership);
- });
-
- // first take snapshot of current leader board.
- Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
-
- MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
-
- // evict stale leaders
- diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
- log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
- });
-
- // add missing leaders
- diff.entriesOnlyOnRight().forEach((path, leadership) -> {
- log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
- });
-
- // add updated leaders
- diff.entriesDiffering().forEach((path, difference) -> {
- Leadership current = difference.leftValue();
- Leadership updated = difference.rightValue();
- if (current.epoch() < updated.epoch()) {
- log.debug("Updated {} in leaderboard.", updated);
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
- }
- });
- } catch (Exception e) {
- log.debug("Failed to refresh leader board", e);
- }
- }
-
- private class InternalClusterEventListener implements ClusterEventListener {
-
- @Override
- public void event(ClusterEvent event) {
- if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
- scheduleStaleLeadershipPurge(0);
- }
- }
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java
deleted file mode 100644
index 9bf80a73..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.Iterators;
-
-/**
- * Set view backed by Set with element type {@code <BACK>} but returns
- * element as {@code <OUT>} for convenience.
- *
- * @param <BACK> Backing {@link Set} element type.
- * MappingSet will follow this type's equality behavior.
- * @param <OUT> external facing element type.
- * MappingSet will ignores equality defined by this type.
- */
-class MappingSet<BACK, OUT> implements Set<OUT> {
-
- private final Set<BACK> backedSet;
- private final Function<OUT, BACK> toBack;
- private final Function<BACK, OUT> toOut;
-
- public MappingSet(Set<BACK> backedSet,
- Function<Set<BACK>, Set<BACK>> supplier,
- Function<OUT, BACK> toBack, Function<BACK, OUT> toOut) {
- this.backedSet = supplier.apply(backedSet);
- this.toBack = toBack;
- this.toOut = toOut;
- }
-
- @Override
- public int size() {
- return backedSet.size();
- }
-
- @Override
- public boolean isEmpty() {
- return backedSet.isEmpty();
- }
-
- @Override
- public boolean contains(Object o) {
- return backedSet.contains(toBack.apply((OUT) o));
- }
-
- @Override
- public Iterator<OUT> iterator() {
- return Iterators.transform(backedSet.iterator(), toOut::apply);
- }
-
- @Override
- public Object[] toArray() {
- return backedSet.stream()
- .map(toOut)
- .toArray();
- }
-
- @Override
- public <T> T[] toArray(T[] a) {
- return backedSet.stream()
- .map(toOut)
- .toArray(size -> {
- if (size < a.length) {
- return (T[]) new Object[size];
- } else {
- Arrays.fill(a, null);
- return a;
- }
- });
- }
-
- @Override
- public boolean add(OUT e) {
- return backedSet.add(toBack.apply(e));
- }
-
- @Override
- public boolean remove(Object o) {
- return backedSet.remove(toBack.apply((OUT) o));
- }
-
- @Override
- public boolean containsAll(Collection<?> c) {
- return c.stream()
- .map(e -> toBack.apply((OUT) e))
- .allMatch(backedSet::contains);
- }
-
- @Override
- public boolean addAll(Collection<? extends OUT> c) {
- return backedSet.addAll(c.stream().map(toBack).collect(Collectors.toList()));
- }
-
- @Override
- public boolean retainAll(Collection<?> c) {
- return backedSet.retainAll(c.stream()
- .map(x -> toBack.apply((OUT) x))
- .collect(Collectors.toList()));
- }
-
- @Override
- public boolean removeAll(Collection<?> c) {
- return backedSet.removeAll(c.stream()
- .map(x -> toBack.apply((OUT) x))
- .collect(Collectors.toList()));
- }
-
- @Override
- public void clear() {
- backedSet.clear();
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Match.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Match.java
deleted file mode 100644
index 5f707d62..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Match.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.function.Function;
-
-/**
- * Utility class for checking matching values.
- *
- * @param <T> type of value
- */
-public final class Match<T> {
-
- private final boolean matchAny;
- private final T value;
-
- /**
- * Returns a Match that matches any value.
- * @param <T> match type
- * @return new instance
- */
- public static <T> Match<T> any() {
- return new Match<>();
- }
-
- /**
- * Returns a Match that matches null values.
- * @param <T> match type
- * @return new instance
- */
- public static <T> Match<T> ifNull() {
- return ifValue(null);
- }
-
- /**
- * Returns a Match that matches only specified value.
- * @param value value to match
- * @param <T> match type
- * @return new instance
- */
- public static <T> Match<T> ifValue(T value) {
- return new Match<>(value);
- }
-
- private Match() {
- matchAny = true;
- value = null;
- }
-
- private Match(T value) {
- matchAny = false;
- this.value = value;
- }
-
- /**
- * Maps this instance to a Match of another type.
- * @param mapper transformation function
- * @param <V> new match type
- * @return new instance
- */
- public <V> Match<V> map(Function<T, V> mapper) {
- if (matchAny) {
- return any();
- } else if (value == null) {
- return ifNull();
- } else {
- return ifValue(mapper.apply(value));
- }
- }
-
- /**
- * Checks if this instance matches specified value.
- * @param other other value
- * @return true if matches; false otherwise
- */
- public boolean matches(T other) {
- if (matchAny) {
- return true;
- } else if (other == null) {
- return value == null;
- } else {
- if (value instanceof byte[]) {
- return Arrays.equals((byte[]) value, (byte[]) other);
- }
- return Objects.equals(value, other);
- }
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(matchAny, value);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof Match)) {
- return false;
- }
- Match<T> that = (Match<T>) other;
- return Objects.equals(this.matchAny, that.matchAny) &&
- Objects.equals(this.value, that.value);
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("matchAny", matchAny)
- .add("value", value)
- .toString();
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MeteringAgent.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MeteringAgent.java
deleted file mode 100644
index 6475bf7b..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MeteringAgent.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Timer;
-import com.google.common.collect.Maps;
-import org.onlab.metrics.MetricsComponent;
-import org.onlab.metrics.MetricsFeature;
-import org.onlab.metrics.MetricsService;
-import org.onlab.osgi.DefaultServiceDirectory;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Agent that implements usage and performance monitoring via the metrics service.
- */
-public class MeteringAgent {
-
- private Counter exceptionCounter;
- private Counter perObjExceptionCounter;
- private MetricsService metricsService;
- private MetricsComponent metricsComponent;
- private MetricsFeature metricsFeature;
- private final Map<String, Timer> perObjOpTimers = Maps.newConcurrentMap();
- private final Map<String, Timer> perOpTimers = Maps.newConcurrentMap();
- private Timer perPrimitiveTimer;
- private Timer perObjTimer;
- private MetricsFeature wildcard;
- private final boolean activated;
- private Context nullTimer;
-
- /**
- * Constructs a new MeteringAgent for a given distributed primitive.
- * Instantiates the metrics service
- * Initializes all the general metrics for that object
- *
- * @param primitiveName Type of primitive to be metered
- * @param objName Global name of the primitive
- * @param activated boolean flag for whether metering is enabled or not
- */
- public MeteringAgent(String primitiveName, String objName, boolean activated) {
- checkNotNull(objName, "Object name cannot be null");
- this.activated = activated;
- nullTimer = new Context(null, "");
- if (this.activated) {
- this.metricsService = DefaultServiceDirectory.getService(MetricsService.class);
- this.metricsComponent = metricsService.registerComponent(primitiveName);
- this.metricsFeature = metricsComponent.registerFeature(objName);
- this.wildcard = metricsComponent.registerFeature("*");
- this.perObjTimer = metricsService.createTimer(metricsComponent, metricsFeature, "*");
- this.perPrimitiveTimer = metricsService.createTimer(metricsComponent, wildcard, "*");
- this.perObjExceptionCounter = metricsService.createCounter(metricsComponent, metricsFeature, "exceptions");
- this.exceptionCounter = metricsService.createCounter(metricsComponent, wildcard, "exceptions");
- }
- }
-
- /**
- * Initializes a specific timer for a given operation.
- *
- * @param op Specific operation being metered
- * @return timer context
- */
- public Context startTimer(String op) {
- if (!activated) {
- return nullTimer;
- }
- // Check if timer exists, if it doesn't creates it
- final Timer currTimer = perObjOpTimers.computeIfAbsent(op, timer ->
- metricsService.createTimer(metricsComponent, metricsFeature, op));
- perOpTimers.computeIfAbsent(op, timer -> metricsService.createTimer(metricsComponent, wildcard, op));
- // Starts timer
- return new Context(currTimer.time(), op);
- }
-
- /**
- * Timer.Context with a specific operation.
- */
- public class Context {
- private final Timer.Context context;
- private final String operation;
-
- /**
- * Constructs Context.
- *
- * @param context context
- * @param operation operation name
- */
- public Context(Timer.Context context, String operation) {
- this.context = context;
- this.operation = operation;
- }
-
- /**
- * Stops timer given a specific context and updates all related metrics.
- * @param e throwable
- */
- public void stop(Throwable e) {
- if (!activated) {
- return;
- }
- if (e == null) {
- //Stop and updates timer with specific measurements per map, per operation
- final long time = context.stop();
- //updates timer with aggregated measurements per map
- perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS);
- //updates timer with aggregated measurements per map
- perObjTimer.update(time, TimeUnit.NANOSECONDS);
- //updates timer with aggregated measurements per all Consistent Maps
- perPrimitiveTimer.update(time, TimeUnit.NANOSECONDS);
- } else {
- exceptionCounter.inc();
- perObjExceptionCounter.inc();
- }
- }
- }
-
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java
deleted file mode 100644
index d8593e37..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode.State;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MutexExecutionService;
-import org.onosproject.store.service.MutexTask;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Implementation of a MutexExecutionService.
- */
-@Component(immediate = true)
-@Service
-public class MutexExecutionManager implements MutexExecutionService {
-
- private final Logger log = getLogger(getClass());
-
- protected ConsistentMap<String, MutexState> lockMap;
- protected NodeId localNodeId;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener();
- private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
-
- private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap();
- private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap();
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
- lockMap = storageService.<String, MutexState>consistentMapBuilder()
- .withName("onos-mutexes")
- .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class))
- .withPartitionsDisabled()
- .build();
- lockMap.addListener(mapEventListener);
- clusterService.addListener(clusterEventListener);
- releaseOldLocks();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- lockMap.removeListener(mapEventListener);
- pending.values().forEach(future -> future.cancel(true));
- activeTasks.forEach((k, v) -> {
- v.stop();
- unlock(k);
- });
- clusterService.removeListener(clusterEventListener);
- log.info("Stopped");
- }
-
- @Override
- public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) {
- return lock(exclusionPath)
- .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath,
- k -> new InnerMutexTask(exclusionPath,
- task,
- state.term())))
- .thenAcceptAsync(t -> t.start(), executor)
- .whenComplete((r, e) -> unlock(exclusionPath));
- }
-
- protected CompletableFuture<MutexState> lock(String exclusionPath) {
- CompletableFuture<MutexState> future =
- pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>());
- tryLock(exclusionPath);
- return future;
- }
-
- /**
- * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to
- * the wait list.
- * @param exclusionPath exclusion path
- */
- protected void tryLock(String exclusionPath) {
- Tools.retryable(() -> lockMap.asJavaMap()
- .compute(exclusionPath,
- (k, v) -> MutexState.admit(v, localNodeId)),
- ConsistentMapException.ConcurrentModification.class,
- Integer.MAX_VALUE,
- 100).get();
- }
-
- /**
- * Releases lock for the specific path. This operation is idempotent.
- * @param exclusionPath exclusion path
- */
- protected void unlock(String exclusionPath) {
- Tools.retryable(() -> lockMap.asJavaMap()
- .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)),
- ConsistentMapException.ConcurrentModification.class,
- Integer.MAX_VALUE,
- 100).get();
- }
-
- /**
- * Detects and releases all locks held by this node.
- */
- private void releaseOldLocks() {
- Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder()))
- .keySet()
- .forEach(path -> {
- log.info("Detected zombie task still holding lock for {}. Releasing lock.", path);
- unlock(path);
- });
- }
-
- private class InternalLockMapEventListener implements MapEventListener<String, MutexState> {
-
- @Override
- public void event(MapEvent<String, MutexState> event) {
- log.debug("Received {}", event);
- if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) {
- pending.computeIfPresent(event.key(), (k, future) -> {
- MutexState state = Versioned.valueOrElse(event.value(), null);
- if (state != null && localNodeId.equals(state.holder())) {
- log.debug("Local node is now owner for {}", event.key());
- future.complete(state);
- return null;
- } else {
- return future;
- }
- });
- InnerMutexTask task = activeTasks.get(event.key());
- if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) {
- task.stop();
- }
- }
- }
- }
-
- private class InternalClusterEventListener implements ClusterEventListener {
-
- @Override
- public void event(ClusterEvent event) {
- if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED ||
- event.type() == ClusterEvent.Type.INSTANCE_REMOVED) {
- NodeId nodeId = event.subject().id();
- log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId);
- lockMap.asJavaMap().forEach((k, v) -> {
- if (v.contains(nodeId)) {
- lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId));
- }
- });
- }
- long activeNodes = clusterService.getNodes()
- .stream()
- .map(node -> clusterService.getState(node.id()))
- .filter(State.ACTIVE::equals)
- .count();
- if (clusterService.getNodes().size() > 1 && activeNodes == 1) {
- log.info("This node is partitioned away from the cluster. Stopping all inflight executions");
- activeTasks.forEach((k, v) -> {
- v.stop();
- });
- }
- }
- }
-
- private static final class MutexState {
-
- private final NodeId holder;
- private final List<NodeId> waitList;
- private final long term;
-
- public static MutexState admit(MutexState state, NodeId nodeId) {
- if (state == null) {
- return new MutexState(nodeId, 1L, Lists.newArrayList());
- } else if (state.holder() == null) {
- return new MutexState(nodeId, state.term() + 1, Lists.newArrayList());
- } else {
- if (!state.contains(nodeId)) {
- NodeId newHolder = state.holder();
- List<NodeId> newWaitList = Lists.newArrayList(state.waitList());
- newWaitList.add(nodeId);
- return new MutexState(newHolder, state.term(), newWaitList);
- } else {
- return state;
- }
- }
- }
-
- public static MutexState evict(MutexState state, NodeId nodeId) {
- return state.evict(nodeId);
- }
-
- public MutexState evict(NodeId nodeId) {
- if (nodeId.equals(holder)) {
- if (waitList.isEmpty()) {
- return new MutexState(null, term, waitList);
- }
- List<NodeId> newWaitList = Lists.newArrayList(waitList);
- NodeId newHolder = newWaitList.remove(0);
- return new MutexState(newHolder, term + 1, newWaitList);
- } else {
- NodeId newHolder = holder;
- List<NodeId> newWaitList = Lists.newArrayList(waitList);
- newWaitList.remove(nodeId);
- return new MutexState(newHolder, term, newWaitList);
- }
- }
-
- public NodeId holder() {
- return holder;
- }
-
- public List<NodeId> waitList() {
- return waitList;
- }
-
- public long term() {
- return term;
- }
-
- private boolean contains(NodeId nodeId) {
- return (nodeId.equals(holder) || waitList.contains(nodeId));
- }
-
- private MutexState(NodeId holder, long term, List<NodeId> waitList) {
- this.holder = holder;
- this.term = term;
- this.waitList = Lists.newArrayList(waitList);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("holder", holder)
- .add("term", term)
- .add("waitList", waitList)
- .toString();
- }
- }
-
- private class InnerMutexTask implements MutexTask {
- private final MutexTask task;
- private final String mutexPath;
- private final long term;
-
- public InnerMutexTask(String mutexPath, MutexTask task, long term) {
- this.mutexPath = mutexPath;
- this.term = term;
- this.task = task;
- }
-
- public long term() {
- return term;
- }
-
- @Override
- public void start() {
- log.debug("Starting execution for mutex task guarded by {}", mutexPath);
- task.start();
- log.debug("Finished execution for mutex task guarded by {}", mutexPath);
- }
-
- @Override
- public void stop() {
- log.debug("Stopping execution for mutex task guarded by {}", mutexPath);
- task.stop();
- }
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
deleted file mode 100644
index f741b367..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import net.kuujo.copycat.Task;
-import net.kuujo.copycat.cluster.Cluster;
-import net.kuujo.copycat.resource.ResourceState;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * A database that partitions the keys across one or more database partitions.
- */
-public class PartitionedDatabase implements Database {
-
- private final String name;
- private final Partitioner<String> partitioner;
- private final List<Database> partitions;
- private final AtomicBoolean isOpen = new AtomicBoolean(false);
- private static final String DB_NOT_OPEN = "Partitioned Database is not open";
- private TransactionManager transactionManager;
-
- public PartitionedDatabase(
- String name,
- Collection<Database> partitions) {
- this.name = name;
- this.partitions = partitions
- .stream()
- .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
- .collect(Collectors.toList());
- this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
- }
-
- /**
- * Returns the databases for individual partitions.
- * @return list of database partitions
- */
- public List<Database> getPartitions() {
- return partitions;
- }
-
- /**
- * Returns true if the database is open.
- * @return true if open, false otherwise
- */
- @Override
- public boolean isOpen() {
- return isOpen.get();
- }
-
- @Override
- public CompletableFuture<Set<String>> maps() {
- checkState(isOpen.get(), DB_NOT_OPEN);
- Set<String> mapNames = Sets.newConcurrentHashSet();
- return CompletableFuture.allOf(partitions
- .stream()
- .map(db -> db.maps().thenApply(mapNames::addAll))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> mapNames);
- }
-
- @Override
- public CompletableFuture<Map<String, Long>> counters() {
- checkState(isOpen.get(), DB_NOT_OPEN);
- Map<String, Long> counters = Maps.newConcurrentMap();
- return CompletableFuture.allOf(partitions
- .stream()
- .map(db -> db.counters()
- .thenApply(m -> {
- counters.putAll(m);
- return null;
- }))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> counters);
- }
-
- @Override
- public CompletableFuture<Integer> mapSize(String mapName) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- AtomicInteger totalSize = new AtomicInteger(0);
- return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> totalSize.get());
- }
-
- @Override
- public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return mapSize(mapName).thenApply(size -> size == 0);
- }
-
- @Override
- public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key);
- }
-
- @Override
- public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- AtomicBoolean containsValue = new AtomicBoolean(false);
- return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapContainsValue(mapName, value)
- .thenApply(v -> containsValue.compareAndSet(false, v)))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> containsValue.get());
- }
-
- @Override
- public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(mapName, key).mapGet(mapName, key);
- }
-
- @Override
- public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
- String mapName, String key, Match<byte[]> valueMatch,
- Match<Long> versionMatch, byte[] value) {
- return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value);
-
- }
-
- @Override
- public CompletableFuture<Result<Void>> mapClear(String mapName) {
- AtomicBoolean isLocked = new AtomicBoolean(false);
- checkState(isOpen.get(), DB_NOT_OPEN);
- return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapClear(mapName)
- .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
- }
-
- @Override
- public CompletableFuture<Set<String>> mapKeySet(String mapName) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- Set<String> keySet = Sets.newConcurrentHashSet();
- return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> keySet);
- }
-
- @Override
- public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
- return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapValues(mapName).thenApply(values::addAll))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> values);
- }
-
- @Override
- public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
- return CompletableFuture.allOf(partitions
- .stream()
- .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> entrySet);
- }
-
- @Override
- public CompletableFuture<Long> counterGet(String counterName) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(counterName, counterName).counterGet(counterName);
- }
-
- @Override
- public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
- }
-
- @Override
- public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
- }
-
- @Override
- public CompletableFuture<Void> counterSet(String counterName, long value) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(counterName, counterName).counterSet(counterName, value);
- }
-
- @Override
- public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(counterName, counterName).
- counterCompareAndSet(counterName, expectedValue, updateValue);
-
- }
-
- @Override
- public CompletableFuture<Long> queueSize(String queueName) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(queueName, queueName).queueSize(queueName);
- }
-
- @Override
- public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
- }
-
- @Override
- public CompletableFuture<byte[]> queuePop(String queueName) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(queueName, queueName).queuePop(queueName);
- }
-
- @Override
- public CompletableFuture<byte[]> queuePeek(String queueName) {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
- }
-
- @Override
- public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
- Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
- if (subTransactions.isEmpty()) {
- return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
- } else if (subTransactions.size() == 1) {
- Entry<Database, Transaction> entry =
- subTransactions.entrySet().iterator().next();
- return entry.getKey().prepareAndCommit(entry.getValue());
- } else {
- if (transactionManager == null) {
- throw new IllegalStateException("TransactionManager is not initialized");
- }
- return transactionManager.execute(transaction);
- }
- }
-
- @Override
- public CompletableFuture<Boolean> prepare(Transaction transaction) {
- Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
- AtomicBoolean status = new AtomicBoolean(true);
- return CompletableFuture.allOf(subTransactions.entrySet()
- .stream()
- .map(entry -> entry
- .getKey()
- .prepare(entry.getValue())
- .thenApply(v -> status.compareAndSet(true, v)))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> status.get());
- }
-
- @Override
- public CompletableFuture<CommitResponse> commit(Transaction transaction) {
- Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
- AtomicBoolean success = new AtomicBoolean(true);
- List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
- return CompletableFuture.allOf(subTransactions.entrySet()
- .stream()
- .map(entry -> entry.getKey().commit(entry.getValue())
- .thenAccept(response -> {
- success.set(success.get() && response.success());
- if (success.get()) {
- allUpdates.addAll(response.updates());
- }
- }))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> success.get() ?
- CommitResponse.success(allUpdates) : CommitResponse.failure());
- }
-
- @Override
- public CompletableFuture<Boolean> rollback(Transaction transaction) {
- Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
- return CompletableFuture.allOf(subTransactions.entrySet()
- .stream()
- .map(entry -> entry.getKey().rollback(entry.getValue()))
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> true);
- }
-
- @Override
- public CompletableFuture<Database> open() {
- return CompletableFuture.allOf(partitions
- .stream()
- .map(Database::open)
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> {
- isOpen.set(true);
- return this;
- });
- }
-
- @Override
- public CompletableFuture<Void> close() {
- checkState(isOpen.get(), DB_NOT_OPEN);
- return CompletableFuture.allOf(partitions
- .stream()
- .map(database -> database.close())
- .toArray(CompletableFuture[]::new));
- }
-
- @Override
- public boolean isClosed() {
- return !isOpen.get();
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public Cluster cluster() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Database addStartupTask(Task<CompletableFuture<Void>> task) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ResourceState state() {
- throw new UnsupportedOperationException();
- }
-
- private Map<Database, Transaction> createSubTransactions(
- Transaction transaction) {
- Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
- for (DatabaseUpdate update : transaction.updates()) {
- Database partition = partitioner.getPartition(update.mapName(), update.key());
- List<DatabaseUpdate> partitionUpdates =
- perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
- partitionUpdates.add(update);
- }
- Map<Database, Transaction> subTransactions = Maps.newHashMap();
- perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
- return subTransactions;
- }
-
- protected void setTransactionManager(TransactionManager transactionManager) {
- this.transactionManager = transactionManager;
- }
-
- @Override
- public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
- partitions.forEach(p -> p.registerConsumer(consumer));
- }
-
- @Override
- public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
- partitions.forEach(p -> p.unregisterConsumer(consumer));
- }
-}
-
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
deleted file mode 100644
index de630b90..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-/**
- * Partitioner is responsible for mapping keys to individual database partitions.
- *
- * @param <K> key type.
- */
-public interface Partitioner<K> {
-
- /**
- * Returns the database partition.
- * @param mapName map name
- * @param key key
- * @return Database partition
- */
- Database getPartition(String mapName, K key);
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java
deleted file mode 100644
index 856f706d..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-import java.util.Objects;
-
-/**
- * Result of a database update operation.
- *
- * @param <V> return value type
- */
-public final class Result<V> {
-
- public enum Status {
- /**
- * Indicates a successful update.
- */
- OK,
-
- /**
- * Indicates a failure due to underlying state being locked by another transaction.
- */
- LOCKED
- }
-
- private final Status status;
- private final V value;
-
- /**
- * Creates a new Result instance with the specified value with status set to Status.OK.
- *
- * @param <V> result value type
- * @param value result value
- * @return Result instance
- */
- public static <V> Result<V> ok(V value) {
- return new Result<>(value, Status.OK);
- }
-
- /**
- * Creates a new Result instance with status set to Status.LOCKED.
- *
- * @param <V> result value type
- * @return Result instance
- */
- public static <V> Result<V> locked() {
- return new Result<>(null, Status.LOCKED);
- }
-
- private Result(V value, Status status) {
- this.value = value;
- this.status = status;
- }
-
- /**
- * Returns true if this result indicates a successful execution i.e status is Status.OK.
- *
- * @return true if successful, false otherwise
- */
- public boolean success() {
- return status == Status.OK;
- }
-
- /**
- * Returns the status of database update operation.
- *
- * @return database update status
- */
- public Status status() {
- return status;
- }
-
- /**
- * Returns the return value for the update.
- *
- * @return value returned by database update. If the status is another
- * other than Status.OK, this returns a null
- */
- public V value() {
- return value;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(value, status);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof Result)) {
- return false;
- }
- Result<V> that = (Result<V>) other;
- return Objects.equals(this.value, that.value) &&
- Objects.equals(this.status, that.status);
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("status", status)
- .add("value", value)
- .toString();
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
deleted file mode 100644
index 40864286..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.util.List;
-
-/**
- * A simple Partitioner for mapping keys to database partitions.
- * <p>
- * This class uses a md5 hash based hashing scheme for hashing the key to
- * a partition.
- *
- */
-public class SimpleKeyHashPartitioner extends DatabasePartitioner {
-
- public SimpleKeyHashPartitioner(List<Database> partitions) {
- super(partitions);
- }
-
- @Override
- public Database getPartition(String mapName, String key) {
- return partitions.get(hash(key) % partitions.size());
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
deleted file mode 100644
index 8dc26e0f..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.util.List;
-
-/**
- * A simple Partitioner that uses the map name hash to
- * pick a partition.
- * <p>
- * This class uses a md5 hash based hashing scheme for hashing the map name to
- * a partition. This partitioner maps all keys for a map to the same database
- * partition.
- */
-public class SimpleTableHashPartitioner extends DatabasePartitioner {
-
- public SimpleTableHashPartitioner(List<Database> partitions) {
- super(partitions);
- }
-
- @Override
- public Database getPartition(String mapName, String key) {
- return partitions.get(hash(mapName) % partitions.size());
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
deleted file mode 100644
index 72356d0b..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-/**
- * Representation of a state machine update.
- */
-public class StateMachineUpdate {
-
- /**
- * Target data structure type this update is for.
- */
- enum Target {
- /**
- * Update is for a map.
- */
- MAP_UPDATE,
-
- /**
- * Update is a transaction commit.
- */
- TX_COMMIT,
-
- /**
- * Update is a queue push.
- */
- QUEUE_PUSH,
-
- /**
- * Update is for some other operation.
- */
- OTHER
- }
-
- private final String operationName;
- private final Object input;
- private final Object output;
-
- public StateMachineUpdate(String operationName, Object input, Object output) {
- this.operationName = operationName;
- this.input = input;
- this.output = output;
- }
-
- public Target target() {
- // FIXME: This check is brittle
- if (operationName.contains("mapUpdate")) {
- return Target.MAP_UPDATE;
- } else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) {
- return Target.TX_COMMIT;
- } else if (operationName.contains("queuePush")) {
- return Target.QUEUE_PUSH;
- } else {
- return Target.OTHER;
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T> T input() {
- return (T) input;
- }
-
- @SuppressWarnings("unchecked")
- public <T> T output() {
- return (T) output;
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("name", operationName)
- .add("input", input)
- .add("output", output)
- .toString();
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
deleted file mode 100644
index fc6e58d0..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import org.onlab.util.KryoNamespace;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.DatabaseUpdate;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Transaction;
-import org.onosproject.store.service.Versioned;
-import org.onosproject.store.service.Transaction.State;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Agent that runs the two phase commit protocol.
- */
-public class TransactionManager {
-
- private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder()
- .register(KryoNamespaces.BASIC)
- .nextId(KryoNamespace.FLOATING_ID)
- .register(Versioned.class)
- .register(DatabaseUpdate.class)
- .register(DatabaseUpdate.Type.class)
- .register(DefaultTransaction.class)
- .register(Transaction.State.class)
- .build();
-
- private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
- private final Database database;
- private final AsyncConsistentMap<Long, Transaction> transactions;
-
- /**
- * Constructs a new TransactionManager for the specified database instance.
- *
- * @param database database
- * @param mapBuilder builder for ConsistentMap instances
- */
- public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
- this.database = checkNotNull(database, "database cannot be null");
- this.transactions = mapBuilder.withName("onos-transactions")
- .withSerializer(serializer)
- .buildAsyncMap();
- }
-
- /**
- * Executes the specified transaction by employing a two phase commit protocol.
- *
- * @param transaction transaction to commit
- * @return transaction result. Result value true indicates a successful commit, false
- * indicates abort
- */
- public CompletableFuture<CommitResponse> execute(Transaction transaction) {
- // clean up if this transaction in already in a terminal state.
- if (transaction.state() == Transaction.State.COMMITTED ||
- transaction.state() == Transaction.State.ROLLEDBACK) {
- return transactions.remove(transaction.id()).thenApply(v -> CommitResponse.success(ImmutableList.of()));
- } else if (transaction.state() == Transaction.State.COMMITTING) {
- return commit(transaction);
- } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
- return rollback(transaction).thenApply(v -> CommitResponse.success(ImmutableList.of()));
- } else {
- return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
- }
- }
-
-
- /**
- * Returns all transactions in the system.
- *
- * @return future for a collection of transactions
- */
- public CompletableFuture<Collection<Transaction>> getTransactions() {
- return transactions.values().thenApply(c -> {
- Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
- return txns;
- });
- }
-
- private CompletableFuture<Boolean> prepare(Transaction transaction) {
- return transactions.put(transaction.id(), transaction)
- .thenCompose(v -> database.prepare(transaction))
- .thenCompose(status -> transactions.put(
- transaction.id(),
- transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
- .thenApply(v -> status));
- }
-
- private CompletableFuture<CommitResponse> commit(Transaction transaction) {
- return database.commit(transaction)
- .whenComplete((r, e) -> transactions.put(
- transaction.id(),
- transaction.transition(Transaction.State.COMMITTED)));
- }
-
- private CompletableFuture<CommitResponse> rollback(Transaction transaction) {
- return database.rollback(transaction)
- .thenCompose(v -> transactions.put(
- transaction.id(),
- transaction.transition(Transaction.State.ROLLEDBACK)))
- .thenApply(v -> CommitResponse.failure());
- }
-}
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java
deleted file mode 100644
index 50b78dd4..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import java.util.function.Function;
-
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.Versioned;
-
-/**
- * Result of a update operation.
- * <p>
- * Both old and new values are accessible along with a flag that indicates if the
- * the value was updated. If flag is false, oldValue and newValue both
- * point to the same unmodified value.
- * @param <V> result type
- */
-public class UpdateResult<K, V> {
-
- private final boolean updated;
- private final String mapName;
- private final K key;
- private final Versioned<V> oldValue;
- private final Versioned<V> newValue;
-
- public UpdateResult(boolean updated, String mapName, K key, Versioned<V> oldValue, Versioned<V> newValue) {
- this.updated = updated;
- this.mapName = mapName;
- this.key = key;
- this.oldValue = oldValue;
- this.newValue = newValue;
- }
-
- public boolean updated() {
- return updated;
- }
-
- public String mapName() {
- return mapName;
- }
-
- public K key() {
- return key;
- }
-
- public Versioned<V> oldValue() {
- return oldValue;
- }
-
- public Versioned<V> newValue() {
- return newValue;
- }
-
- public <K1, V1> UpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
- return new UpdateResult<>(updated,
- mapName,
- keyTransform.apply(key),
- oldValue == null ? null : oldValue.map(valueMapper),
- newValue == null ? null : newValue.map(valueMapper));
- }
-
- public MapEvent<K, V> toMapEvent() {
- if (!updated) {
- return null;
- } else {
- MapEvent.Type eventType = oldValue == null ?
- MapEvent.Type.INSERT : newValue == null ? MapEvent.Type.REMOVE : MapEvent.Type.UPDATE;
- Versioned<V> eventValue = eventType == MapEvent.Type.REMOVE ? oldValue : newValue;
- return new MapEvent<>(mapName(), eventType, key(), eventValue);
- }
- }
-} \ No newline at end of file
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/package-info.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/package-info.java
deleted file mode 100644
index 3dae86b5..00000000
--- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of partitioned and distributed store facility capable of
- * providing consistent update semantics.
- */
-package org.onosproject.store.consistent.impl; \ No newline at end of file