diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl')
43 files changed, 0 insertions, 6836 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java deleted file mode 100644 index 92db5b44..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/AsyncCachingConsistentMap.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import java.util.concurrent.CompletableFuture; - -import org.onosproject.core.ApplicationId; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.Versioned; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - -/** - * Extension of {@link DefaultAsyncConsistentMap} that provides a weaker read consistency - * guarantee in return for better read performance. - * <p> - * For read/write operations that are local to a node this map implementation provides - * guarantees similar to a ConsistentMap. However for read/write operations executed - * across multiple nodes this implementation only provides eventual consistency. - * - * @param <K> key type - * @param <V> value type - */ -public class AsyncCachingConsistentMap<K, V> extends DefaultAsyncConsistentMap<K, V> { - - private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache = - CacheBuilder.newBuilder() - .maximumSize(10000) // TODO: make configurable - .build(new CacheLoader<K, CompletableFuture<Versioned<V>>>() { - @Override - public CompletableFuture<Versioned<V>> load(K key) - throws Exception { - return AsyncCachingConsistentMap.super.get(key); - } - }); - - public AsyncCachingConsistentMap(String name, - ApplicationId applicationId, - Database database, - Serializer serializer, - boolean readOnly, - boolean purgeOnUninstall, - boolean meteringEnabled) { - super(name, applicationId, database, serializer, readOnly, purgeOnUninstall, meteringEnabled); - addListener(event -> cache.invalidate(event.key())); - } - - @Override - public CompletableFuture<Versioned<V>> get(K key) { - CompletableFuture<Versioned<V>> cachedValue = cache.getIfPresent(key); - if (cachedValue != null) { - if (cachedValue.isCompletedExceptionally()) { - cache.invalidate(key); - } else { - return cachedValue; - } - } - return cache.getUnchecked(key); - } - - @Override - protected void beforeUpdate(K key) { - super.beforeUpdate(key); - cache.invalidate(key); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java deleted file mode 100644 index bbc8e6e0..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CommitResponse.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import static com.google.common.base.MoreObjects.toStringHelper; - -import java.util.Collections; -import java.util.List; - -import com.google.common.collect.ImmutableList; - -/** - * Result of a Transaction commit operation. - */ -public final class CommitResponse { - - private boolean success; - private List<UpdateResult<String, byte[]>> updates; - - public static CommitResponse success(List<UpdateResult<String, byte[]>> updates) { - return new CommitResponse(true, updates); - } - - public static CommitResponse failure() { - return new CommitResponse(false, Collections.emptyList()); - } - - private CommitResponse(boolean success, List<UpdateResult<String, byte[]>> updates) { - this.success = success; - this.updates = ImmutableList.copyOf(updates); - } - - public boolean success() { - return success; - } - - public List<UpdateResult<String, byte[]>> updates() { - return updates; - } - - @Override - public String toString() { - return toStringHelper(this) - .add("success", success) - .add("udpates", updates) - .toString(); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapBackedJavaMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapBackedJavaMap.java deleted file mode 100644 index 5183924c..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapBackedJavaMap.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.stream.Collectors; - -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.Versioned; - -import com.google.common.collect.Collections2; -import com.google.common.collect.Maps; - -/** - * Standard java Map backed by a ConsistentMap. - * - * @param <K> key type - * @param <V> value type - */ -public final class ConsistentMapBackedJavaMap<K, V> implements Map<K, V> { - - private final ConsistentMap<K, V> backingMap; - - public ConsistentMapBackedJavaMap(ConsistentMap<K, V> backingMap) { - this.backingMap = backingMap; - } - - @Override - public int size() { - return backingMap.size(); - } - - @Override - public boolean isEmpty() { - return backingMap.isEmpty(); - } - - @Override - public boolean containsKey(Object key) { - return backingMap.containsKey((K) key); - } - - @Override - public boolean containsValue(Object value) { - return backingMap.containsValue((V) value); - } - - @Override - public V get(Object key) { - return Versioned.valueOrElse(backingMap.get((K) key), null); - } - - @Override - public V getOrDefault(Object key, V defaultValue) { - return Versioned.valueOrElse(backingMap.get((K) key), defaultValue); - } - - @Override - public V put(K key, V value) { - return Versioned.valueOrElse(backingMap.put(key, value), null); - } - - @Override - public V putIfAbsent(K key, V value) { - return Versioned.valueOrElse(backingMap.putIfAbsent(key, value), null); - } - - @Override - public V remove(Object key) { - return Versioned.valueOrElse(backingMap.remove((K) key), null); - } - - @Override - public boolean remove(Object key, Object value) { - return backingMap.remove((K) key, (V) value); - } - - @Override - public V replace(K key, V value) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean replace(K key, V oldValue, V newValue) { - return backingMap.replace(key, oldValue, newValue); - } - - @Override - public void putAll(Map<? extends K, ? extends V> m) { - m.forEach((k, v) -> { - backingMap.put(k, v); - }); - } - - @Override - public void clear() { - backingMap.clear(); - } - - @Override - public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - return Versioned.valueOrElse(backingMap.compute(key, remappingFunction), null); - } - - @Override - public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { - return Versioned.valueOrElse(backingMap.computeIfAbsent(key, mappingFunction), null); - } - - @Override - public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - return Versioned.valueOrElse(backingMap.computeIfPresent(key, remappingFunction), null); - } - - @Override - public Set<K> keySet() { - return backingMap.keySet(); - } - - @Override - public Collection<V> values() { - return Collections2.transform(backingMap.values(), v -> v.value()); - } - - @Override - public Set<java.util.Map.Entry<K, V>> entrySet() { - return backingMap.entrySet() - .stream() - .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue().value())) - .collect(Collectors.toSet()); - } - - @Override - public void forEach(BiConsumer<? super K, ? super V> action) { - entrySet().forEach(e -> action.accept(e.getKey(), e.getValue())); - } - - @Override - public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { - return computeIfPresent(key, (k, v) -> v == null ? value : remappingFunction.apply(v, value)); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java deleted file mode 100644 index 88ddae62..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/CopycatCommunicationProtocol.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; - -import org.onlab.util.Tools; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.NodeId; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.MessageSubject; - -import net.kuujo.copycat.protocol.AbstractProtocol; -import net.kuujo.copycat.protocol.ProtocolClient; -import net.kuujo.copycat.protocol.ProtocolHandler; -import net.kuujo.copycat.protocol.ProtocolServer; -import net.kuujo.copycat.util.Configurable; - -/** - * Protocol for Copycat communication that employs - * {@code ClusterCommunicationService}. - */ -public class CopycatCommunicationProtocol extends AbstractProtocol { - - private static final MessageSubject COPYCAT_MESSAGE_SUBJECT = - new MessageSubject("onos-copycat-message"); - - protected ClusterService clusterService; - protected ClusterCommunicationService clusterCommunicator; - - public CopycatCommunicationProtocol(ClusterService clusterService, - ClusterCommunicationService clusterCommunicator) { - this.clusterService = clusterService; - this.clusterCommunicator = clusterCommunicator; - } - - @Override - public Configurable copy() { - return this; - } - - @Override - public ProtocolClient createClient(URI uri) { - NodeId nodeId = uriToNodeId(uri); - if (nodeId == null) { - throw new IllegalStateException("Unknown peer " + uri); - } - return new Client(nodeId); - } - - @Override - public ProtocolServer createServer(URI uri) { - return new Server(); - } - - private class Server implements ProtocolServer { - - @Override - public void handler(ProtocolHandler handler) { - if (handler == null) { - clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT); - } else { - clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT, - ByteBuffer::wrap, - handler, - Tools::byteBuffertoArray); - // FIXME: Tools::byteBuffertoArray involves a array copy. - } - } - - @Override - public CompletableFuture<Void> listen() { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture<Void> close() { - clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT); - return CompletableFuture.completedFuture(null); - } - } - - private class Client implements ProtocolClient { - private final NodeId peer; - - public Client(NodeId peer) { - this.peer = peer; - } - - @Override - public CompletableFuture<ByteBuffer> write(ByteBuffer request) { - return clusterCommunicator.sendAndReceive(request, - COPYCAT_MESSAGE_SUBJECT, - Tools::byteBuffertoArray, - ByteBuffer::wrap, - peer); - } - - @Override - public CompletableFuture<Void> connect() { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture<Void> close() { - return CompletableFuture.completedFuture(null); - } - } - - private NodeId uriToNodeId(URI uri) { - return clusterService.getNodes() - .stream() - .filter(node -> uri.getHost().equals(node.ip().toString())) - .map(ControllerNode::id) - .findAny() - .orElse(null); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java deleted file mode 100644 index 52a999a4..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - - -import java.util.function.Consumer; - -import net.kuujo.copycat.cluster.ClusterConfig; -import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; -import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig; -import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator; -import net.kuujo.copycat.resource.Resource; - -/** - * Database. - */ -public interface Database extends DatabaseProxy<String, byte[]>, Resource<Database> { - - /** - * Creates a new database with the default cluster configuration.<p> - * - * The database will be constructed with the default cluster configuration. The default cluster configuration - * searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration - * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p> - * - * Additionally, the database will be constructed with an database configuration that searches the classpath for - * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and - * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name - * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource - * configurations will be loaded according to namespaces as well; for example, `databases.conf`. - * - * @param name The database name. - * @return The database. - */ - static Database create(String name) { - return create(name, new ClusterConfig(), new DatabaseConfig()); - } - - /** - * Creates a new database.<p> - * - * The database will be constructed with an database configuration that searches the classpath for - * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and - * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name - * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource - * configurations will be loaded according to namespaces as well; for example, `databases.conf`. - * - * @param name The database name. - * @param cluster The cluster configuration. - * @return The database. - */ - static Database create(String name, ClusterConfig cluster) { - return create(name, cluster, new DatabaseConfig()); - } - - /** - * Creates a new database. - * - * @param name The database name. - * @param cluster The cluster configuration. - * @param config The database configuration. - - * @return The database. - */ - static Database create(String name, ClusterConfig cluster, DatabaseConfig config) { - ClusterCoordinator coordinator = - new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); - return coordinator.<Database>getResource(name, config.resolve(cluster)) - .addStartupTask(() -> coordinator.open().thenApply(v -> null)) - .addShutdownTask(coordinator::close); - } - - /** - * Tells whether the database supports change notifications. - * @return true if notifications are supported; false otherwise - */ - default boolean hasChangeNotificationSupport() { - return true; - } - - /** - * Registers a new consumer of StateMachineUpdates. - * @param consumer consumer to register - */ - void registerConsumer(Consumer<StateMachineUpdate> consumer); - - /** - * Unregisters a consumer of StateMachineUpdates. - * @param consumer consumer to unregister - */ - void unregisterConsumer(Consumer<StateMachineUpdate> consumer); -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java deleted file mode 100644 index bd774b99..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import com.typesafe.config.ConfigValueFactory; -import net.kuujo.copycat.cluster.ClusterConfig; -import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig; -import net.kuujo.copycat.protocol.Consistency; -import net.kuujo.copycat.resource.ResourceConfig; -import net.kuujo.copycat.state.StateLogConfig; -import net.kuujo.copycat.util.internal.Assert; - -import java.util.Map; - -/** - * Database configuration. - * - */ -public class DatabaseConfig extends ResourceConfig<DatabaseConfig> { - private static final String DATABASE_CONSISTENCY = "consistency"; - - private static final String DEFAULT_CONFIGURATION = "database-defaults"; - private static final String CONFIGURATION = "database"; - - private String name; - - public DatabaseConfig() { - super(CONFIGURATION, DEFAULT_CONFIGURATION); - } - - public DatabaseConfig(Map<String, Object> config) { - super(config, CONFIGURATION, DEFAULT_CONFIGURATION); - } - - public DatabaseConfig(String resource) { - super(resource, CONFIGURATION, DEFAULT_CONFIGURATION); - } - - protected DatabaseConfig(DatabaseConfig config) { - super(config); - } - - @Override - public DatabaseConfig copy() { - return new DatabaseConfig(this); - } - - /** - * Sets the database read consistency. - * - * @param consistency The database read consistency. - * @throws java.lang.NullPointerException If the consistency is {@code null} - */ - public void setConsistency(String consistency) { - this.config = config.withValue(DATABASE_CONSISTENCY, - ConfigValueFactory.fromAnyRef( - Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString())); - } - - /** - * Sets the database read consistency. - * - * @param consistency The database read consistency. - * @throws java.lang.NullPointerException If the consistency is {@code null} - */ - public void setConsistency(Consistency consistency) { - this.config = config.withValue(DATABASE_CONSISTENCY, - ConfigValueFactory.fromAnyRef( - Assert.isNotNull(consistency, "consistency").toString())); - } - - /** - * Returns the database read consistency. - * - * @return The database read consistency. - */ - public Consistency getConsistency() { - return Consistency.parse(config.getString(DATABASE_CONSISTENCY)); - } - - /** - * Sets the database read consistency, returning the configuration for method chaining. - * - * @param consistency The database read consistency. - * @return The database configuration. - * @throws java.lang.NullPointerException If the consistency is {@code null} - */ - public DatabaseConfig withConsistency(String consistency) { - setConsistency(consistency); - return this; - } - - /** - * Sets the database read consistency, returning the configuration for method chaining. - * - * @param consistency The database read consistency. - * @return The database configuration. - * @throws java.lang.NullPointerException If the consistency is {@code null} - */ - public DatabaseConfig withConsistency(Consistency consistency) { - setConsistency(consistency); - return this; - } - - /** - * Returns the database name. - * - * @return The database name - */ - public String getName() { - return name; - } - - /** - * Sets the database name, returning the configuration for method chaining. - * - * @param name The database name - * @return The database configuration - * @throws java.lang.NullPointerException If the name is {@code null} - */ - public DatabaseConfig withName(String name) { - setName(Assert.isNotNull(name, "name")); - return this; - } - - /** - * Sets the database name. - * - * @param name The database name - * @throws java.lang.NullPointerException If the name is {@code null} - */ - public void setName(String name) { - this.name = Assert.isNotNull(name, "name"); - } - - @Override - public CoordinatedResourceConfig resolve(ClusterConfig cluster) { - return new StateLogConfig(toMap()) - .resolve(cluster) - .withResourceType(DefaultDatabase.class); - } - -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java deleted file mode 100644 index 90d81ee7..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java +++ /dev/null @@ -1,442 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; - -import net.kuujo.copycat.CopycatConfig; -import net.kuujo.copycat.cluster.ClusterConfig; -import net.kuujo.copycat.cluster.Member; -import net.kuujo.copycat.cluster.Member.Type; -import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; -import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator; -import net.kuujo.copycat.log.BufferedLog; -import net.kuujo.copycat.log.FileLog; -import net.kuujo.copycat.log.Log; -import net.kuujo.copycat.protocol.Consistency; -import net.kuujo.copycat.protocol.Protocol; -import net.kuujo.copycat.util.concurrent.NamedThreadFactory; - -import org.apache.commons.lang.math.RandomUtils; -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.ReferencePolicy; -import org.apache.felix.scr.annotations.Service; -import org.onosproject.app.ApplicationEvent; -import org.onosproject.app.ApplicationListener; -import org.onosproject.app.ApplicationService; -import org.onosproject.cluster.ClusterMetadataService; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.NodeId; -import org.onosproject.core.ApplicationId; -import org.onosproject.core.IdGenerator; -import org.onosproject.persistence.PersistenceService; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl; -import org.onosproject.store.service.AtomicCounterBuilder; -import org.onosproject.store.service.AtomicValueBuilder; -import org.onosproject.store.service.ConsistentMapBuilder; -import org.onosproject.store.service.ConsistentMapException; -import org.onosproject.store.service.DistributedQueueBuilder; -import org.onosproject.store.service.EventuallyConsistentMapBuilder; -import org.onosproject.store.service.MapInfo; -import org.onosproject.store.service.PartitionInfo; -import org.onosproject.store.service.DistributedSetBuilder; -import org.onosproject.store.service.StorageAdminService; -import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.TransactionContextBuilder; -import org.slf4j.Logger; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; - -import static org.slf4j.LoggerFactory.getLogger; -import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED; -import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED; - -/** - * Database manager. - */ -@Component(immediate = true, enabled = true) -@Service -public class DatabaseManager implements StorageService, StorageAdminService { - - private final Logger log = getLogger(getClass()); - - public static final String BASE_PARTITION_NAME = "p0"; - - private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000; - private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000; - - private ClusterCoordinator coordinator; - protected PartitionedDatabase partitionedDatabase; - protected Database inMemoryDatabase; - protected NodeId localNodeId; - - private TransactionManager transactionManager; - private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong(); - - private ApplicationListener appListener = new InternalApplicationListener(); - - private final Multimap<String, DefaultAsyncConsistentMap> maps = - Multimaps.synchronizedMultimap(ArrayListMultimap.create()); - private final Multimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication = - Multimaps.synchronizedMultimap(ArrayListMultimap.create()); - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterMetadataService clusterMetadataService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC) - protected ApplicationService applicationService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected PersistenceService persistenceService; - - protected String nodeIdToUri(NodeId nodeId) { - ControllerNode node = clusterService.getNode(nodeId); - return String.format("onos://%s:%d", node.ip(), node.tcpPort()); - } - - protected void bindApplicationService(ApplicationService service) { - applicationService = service; - applicationService.addListener(appListener); - } - - protected void unbindApplicationService(ApplicationService service) { - applicationService.removeListener(appListener); - this.applicationService = null; - } - - @Activate - public void activate() { - localNodeId = clusterService.getLocalNode().id(); - - Map<String, Set<NodeId>> partitionMap = Maps.newHashMap(); - clusterMetadataService.getClusterMetadata().getPartitions().forEach(p -> { - partitionMap.put(p.getName(), Sets.newHashSet(p.getMembers())); - }); - - - String[] activeNodeUris = partitionMap.values() - .stream() - .reduce((s1, s2) -> Sets.union(s1, s2)) - .get() - .stream() - .map(this::nodeIdToUri) - .toArray(String[]::new); - - String localNodeUri = nodeIdToUri(clusterMetadataService.getLocalNode().id()); - Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator); - - ClusterConfig clusterConfig = new ClusterConfig() - .withProtocol(protocol) - .withElectionTimeout(electionTimeoutMillis(activeNodeUris)) - .withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris)) - .withMembers(activeNodeUris) - .withLocalMember(localNodeUri); - - CopycatConfig copycatConfig = new CopycatConfig() - .withName("onos") - .withClusterConfig(clusterConfig) - .withDefaultSerializer(new DatabaseSerializer()) - .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d"))); - - coordinator = new DefaultClusterCoordinator(copycatConfig.resolve()); - - DatabaseConfig inMemoryDatabaseConfig = - newDatabaseConfig(BASE_PARTITION_NAME, newInMemoryLog(), activeNodeUris); - inMemoryDatabase = coordinator - .getResource(inMemoryDatabaseConfig.getName(), inMemoryDatabaseConfig.resolve(clusterConfig) - .withSerializer(copycatConfig.getDefaultSerializer()) - .withDefaultExecutor(copycatConfig.getDefaultExecutor())); - - List<Database> partitions = partitionMap.entrySet() - .stream() - .map(entry -> { - String[] replicas = entry.getValue().stream().map(this::nodeIdToUri).toArray(String[]::new); - return newDatabaseConfig(entry.getKey(), newPersistentLog(), replicas); - }) - .map(config -> { - Database db = coordinator.getResource(config.getName(), config.resolve(clusterConfig) - .withSerializer(copycatConfig.getDefaultSerializer()) - .withDefaultExecutor(copycatConfig.getDefaultExecutor())); - return db; - }) - .collect(Collectors.toList()); - - partitionedDatabase = new PartitionedDatabase("onos-store", partitions); - - CompletableFuture<Void> status = coordinator.open() - .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open()) - .whenComplete((db, error) -> { - if (error != null) { - log.error("Failed to initialize database.", error); - } else { - log.info("Successfully initialized database."); - } - })); - - Futures.getUnchecked(status); - - transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder()); - partitionedDatabase.setTransactionManager(transactionManager); - - log.info("Started"); - } - - @Deactivate - public void deactivate() { - CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close()) - .thenCompose(v -> coordinator.close()) - .whenComplete((result, error) -> { - if (error != null) { - log.warn("Failed to cleanly close databases.", error); - } else { - log.info("Successfully closed databases."); - } - }); - ImmutableList.copyOf(maps.values()).forEach(this::unregisterMap); - if (applicationService != null) { - applicationService.removeListener(appListener); - } - log.info("Stopped"); - } - - @Override - public TransactionContextBuilder transactionContextBuilder() { - return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId()); - } - - @Override - public List<PartitionInfo> getPartitionInfo() { - return Lists.asList( - inMemoryDatabase, - partitionedDatabase.getPartitions().toArray(new Database[]{})) - .stream() - .map(DatabaseManager::toPartitionInfo) - .collect(Collectors.toList()); - } - - private Log newPersistentLog() { - String logDir = System.getProperty("karaf.data", "./data"); - return new FileLog() - .withDirectory(logDir) - .withSegmentSize(1073741824) // 1GB - .withFlushOnWrite(true) - .withSegmentInterval(Long.MAX_VALUE); - } - - private Log newInMemoryLog() { - return new BufferedLog() - .withFlushOnWrite(false) - .withFlushInterval(Long.MAX_VALUE) - .withSegmentSize(10485760) // 10MB - .withSegmentInterval(Long.MAX_VALUE); - } - - private DatabaseConfig newDatabaseConfig(String name, Log log, String[] replicas) { - return new DatabaseConfig() - .withName(name) - .withElectionTimeout(electionTimeoutMillis(replicas)) - .withHeartbeatInterval(heartbeatTimeoutMillis(replicas)) - .withConsistency(Consistency.DEFAULT) - .withLog(log) - .withDefaultSerializer(new DatabaseSerializer()) - .withReplicas(replicas); - } - - private long electionTimeoutMillis(String[] replicas) { - return replicas.length == 1 ? 10L : RAFT_ELECTION_TIMEOUT_MILLIS; - } - - private long heartbeatTimeoutMillis(String[] replicas) { - return electionTimeoutMillis(replicas) / 2; - } - - /** - * Maps a Raft Database object to a PartitionInfo object. - * - * @param database database containing input data - * @return PartitionInfo object - */ - private static PartitionInfo toPartitionInfo(Database database) { - return new PartitionInfo(database.name(), - database.cluster().term(), - database.cluster().members() - .stream() - .filter(member -> Type.ACTIVE.equals(member.type())) - .map(Member::uri) - .sorted() - .collect(Collectors.toList()), - database.cluster().leader() != null ? - database.cluster().leader().uri() : null); - } - - - @Override - public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() { - return new EventuallyConsistentMapBuilderImpl<>(clusterService, - clusterCommunicator, - persistenceService); - } - - @Override - public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() { - return new DefaultConsistentMapBuilder<>(this); - } - - @Override - public <E> DistributedSetBuilder<E> setBuilder() { - return new DefaultDistributedSetBuilder<>(this); - } - - - @Override - public <E> DistributedQueueBuilder<E> queueBuilder() { - return new DefaultDistributedQueueBuilder<>(this); - } - - @Override - public AtomicCounterBuilder atomicCounterBuilder() { - return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase); - } - - @Override - public <V> AtomicValueBuilder<V> atomicValueBuilder() { - return new DefaultAtomicValueBuilder<>(this); - } - - @Override - public List<MapInfo> getMapInfo() { - List<MapInfo> maps = Lists.newArrayList(); - maps.addAll(getMapInfo(inMemoryDatabase)); - maps.addAll(getMapInfo(partitionedDatabase)); - return maps; - } - - private List<MapInfo> getMapInfo(Database database) { - return complete(database.maps()) - .stream() - .map(name -> new MapInfo(name, complete(database.mapSize(name)))) - .filter(info -> info.size() > 0) - .collect(Collectors.toList()); - } - - - @Override - public Map<String, Long> getCounters() { - Map<String, Long> counters = Maps.newHashMap(); - counters.putAll(complete(inMemoryDatabase.counters())); - counters.putAll(complete(partitionedDatabase.counters())); - return counters; - } - - @Override - public Map<String, Long> getPartitionedDatabaseCounters() { - Map<String, Long> counters = Maps.newHashMap(); - counters.putAll(complete(partitionedDatabase.counters())); - return counters; - } - - @Override - public Map<String, Long> getInMemoryDatabaseCounters() { - Map<String, Long> counters = Maps.newHashMap(); - counters.putAll(complete(inMemoryDatabase.counters())); - return counters; - } - - @Override - public Collection<Transaction> getTransactions() { - return complete(transactionManager.getTransactions()); - } - - private static <T> T complete(CompletableFuture<T> future) { - try { - return future.get(DATABASE_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ConsistentMapException.Interrupted(); - } catch (TimeoutException e) { - throw new ConsistentMapException.Timeout(); - } catch (ExecutionException e) { - throw new ConsistentMapException(e.getCause()); - } - } - - @Override - public void redriveTransactions() { - getTransactions().stream().forEach(transactionManager::execute); - } - - protected <K, V> DefaultAsyncConsistentMap<K, V> registerMap(DefaultAsyncConsistentMap<K, V> map) { - maps.put(map.name(), map); - if (map.applicationId() != null) { - mapsByApplication.put(map.applicationId(), map); - } - return map; - } - - protected <K, V> void unregisterMap(DefaultAsyncConsistentMap<K, V> map) { - maps.remove(map.name(), map); - if (map.applicationId() != null) { - mapsByApplication.remove(map.applicationId(), map); - } - } - - private class InternalApplicationListener implements ApplicationListener { - @Override - public void event(ApplicationEvent event) { - if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) { - ApplicationId appId = event.subject().id(); - List<DefaultAsyncConsistentMap> mapsToRemove; - synchronized (mapsByApplication) { - mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId)); - } - mapsToRemove.forEach(DatabaseManager.this::unregisterMap); - if (event.type() == APP_UNINSTALLED) { - mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear()); - } - } - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java deleted file mode 100644 index 740f81ad..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import static com.google.common.base.Preconditions.checkState; - -import java.util.List; -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.hash.Hashing; - -/** - * Partitioner for mapping map entries to individual database partitions. - * <p> - * By default a md5 hash of the hash key (key or map name) is used to pick a - * partition. - */ -public abstract class DatabasePartitioner implements Partitioner<String> { - // Database partitions sorted by their partition name. - protected final List<Database> partitions; - - public DatabasePartitioner(List<Database> partitions) { - checkState(partitions != null && !partitions.isEmpty(), "Partitions cannot be null or empty"); - this.partitions = ImmutableList.copyOf(partitions); - } - - protected int hash(String key) { - return Math.abs(Hashing.md5().newHasher().putBytes(key.getBytes(Charsets.UTF_8)).hash().asInt()); - } - -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java deleted file mode 100644 index 1d81f998..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -/** - * Database proxy. - */ -public interface DatabaseProxy<K, V> { - - /** - * Returns a set of all map names. - * - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Set<String>> maps(); - - /** - * Returns a mapping from counter name to next value. - * - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Map<String, Long>> counters(); - - /** - * Returns the number of entries in map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Integer> mapSize(String mapName); - - /** - * Checks whether the map is empty. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Boolean> mapIsEmpty(String mapName); - - /** - * Checks whether the map contains a key. - * - * @param mapName map name - * @param key key to check. - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Boolean> mapContainsKey(String mapName, K key); - - /** - * Checks whether the map contains a value. - * - * @param mapName map name - * @param value The value to check. - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Boolean> mapContainsValue(String mapName, V value); - - /** - * Gets a value from the map. - * - * @param mapName map name - * @param key The key to get. - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Versioned<V>> mapGet(String mapName, K key); - - /** - * Updates the map. - * - * @param mapName map name - * @param key The key to set - * @param valueMatch match for checking existing value - * @param versionMatch match for checking existing version - * @param value new value - * @return A completable future to be completed with the result once complete - */ - CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate( - String mapName, K key, Match<V> valueMatch, Match<Long> versionMatch, V value); - - /** - * Clears the map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Result<Void>> mapClear(String mapName); - - /** - * Gets a set of keys in the map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Set<K>> mapKeySet(String mapName); - - /** - * Gets a collection of values in the map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Collection<Versioned<V>>> mapValues(String mapName); - - /** - * Gets a set of entries in the map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName); - - /** - * Atomically add the given value to current value of the specified counter. - * - * @param counterName counter name - * @param delta value to add - * @return updated value - */ - CompletableFuture<Long> counterAddAndGet(String counterName, long delta); - - /** - * Atomically add the given value to current value of the specified counter. - * - * @param counterName counter name - * @param delta value to add - * @return previous value - */ - CompletableFuture<Long> counterGetAndAdd(String counterName, long delta); - - - /** - * Atomically sets the given value to current value of the specified counter. - * - * @param counterName counter name - * @param value value to set - * @return void future - */ - CompletableFuture<Void> counterSet(String counterName, long value); - - /** - * Atomically sets the given counter to the specified update value if and only if the current value is equal to the - * expected value. - * @param counterName counter name - * @param expectedValue value to use for equivalence check - * @param update value to set if expected value is current value - * @return true if an update occurred, false otherwise - */ - CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update); - - /** - * Returns the current value of the specified atomic counter. - * - * @param counterName counter name - * @return current value - */ - CompletableFuture<Long> counterGet(String counterName); - - /** - * Returns the size of queue. - * - * @param queueName queue name - * @return queue size - */ - CompletableFuture<Long> queueSize(String queueName); - - /** - * Inserts an entry into the queue. - * - * @param queueName queue name - * @param entry queue entry - * @return void future - */ - CompletableFuture<Void> queuePush(String queueName, byte[] entry); - - /** - * Removes an entry from the queue if the queue is non-empty. - * - * @param queueName queue name - * @return entry future. Can be completed with null if queue is empty - */ - CompletableFuture<byte[]> queuePop(String queueName); - - /** - * Returns but does not remove an entry from the queue. - * - * @param queueName queue name - * @return entry. Can be null if queue is empty - */ - CompletableFuture<byte[]> queuePeek(String queueName); - - /** - * Prepare and commit the specified transaction. - * - * @param transaction transaction to commit (after preparation) - * @return A completable future to be completed with the result once complete - */ - CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction); - - /** - * Prepare the specified transaction for commit. A successful prepare implies - * all the affected resources are locked thus ensuring no concurrent updates can interfere. - * - * @param transaction transaction to prepare (for commit) - * @return A completable future to be completed with the result once complete. The future is completed - * with true if the transaction is successfully prepared i.e. all pre-conditions are met and - * applicable resources locked. - */ - CompletableFuture<Boolean> prepare(Transaction transaction); - - /** - * Commit the specified transaction. A successful commit implies - * all the updates are applied, are now durable and are now visible externally. - * - * @param transaction transaction to commit - * @return A completable future to be completed with the result once complete - */ - CompletableFuture<CommitResponse> commit(Transaction transaction); - - /** - * Rollback the specified transaction. A successful rollback implies - * all previously acquired locks for the affected resources are released. - * - * @param transaction transaction to rollback - * @return A completable future to be completed with the result once complete - */ - CompletableFuture<Boolean> rollback(Transaction transaction); -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java deleted file mode 100644 index de734144..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import java.nio.ByteBuffer; - -import org.onlab.util.KryoNamespace; -import org.onosproject.cluster.NodeId; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.KryoSerializer; -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import net.kuujo.copycat.cluster.internal.MemberInfo; -import net.kuujo.copycat.raft.protocol.AppendRequest; -import net.kuujo.copycat.raft.protocol.AppendResponse; -import net.kuujo.copycat.raft.protocol.CommitRequest; -import net.kuujo.copycat.raft.protocol.CommitResponse; -import net.kuujo.copycat.raft.protocol.PollRequest; -import net.kuujo.copycat.raft.protocol.PollResponse; -import net.kuujo.copycat.raft.protocol.QueryRequest; -import net.kuujo.copycat.raft.protocol.QueryResponse; -import net.kuujo.copycat.raft.protocol.ReplicaInfo; -import net.kuujo.copycat.raft.protocol.SyncRequest; -import net.kuujo.copycat.raft.protocol.SyncResponse; -import net.kuujo.copycat.raft.protocol.VoteRequest; -import net.kuujo.copycat.raft.protocol.VoteResponse; -import net.kuujo.copycat.util.serializer.SerializerConfig; - -/** - * Serializer for DatabaseManager's interaction with Copycat. - */ -public class DatabaseSerializer extends SerializerConfig { - - private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder() - .nextId(KryoNamespace.FLOATING_ID) - .register(AppendRequest.class) - .register(AppendResponse.class) - .register(SyncRequest.class) - .register(SyncResponse.class) - .register(VoteRequest.class) - .register(VoteResponse.class) - .register(PollRequest.class) - .register(PollResponse.class) - .register(QueryRequest.class) - .register(QueryResponse.class) - .register(CommitRequest.class) - .register(CommitResponse.class) - .register(ReplicaInfo.class) - .register(MemberInfo.class) - .build(); - - private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder() - .nextId(KryoNamespace.FLOATING_ID) - .register(Versioned.class) - .register(DatabaseUpdate.class) - .register(DatabaseUpdate.Type.class) - .register(Result.class) - .register(UpdateResult.class) - .register(Result.Status.class) - .register(DefaultTransaction.class) - .register(Transaction.State.class) - .register(org.onosproject.store.consistent.impl.CommitResponse.class) - .register(Match.class) - .register(NodeId.class) - .build(); - - private static final KryoSerializer SERIALIZER = new KryoSerializer() { - @Override - protected void setupKryoPool() { - serializerPool = KryoNamespace.newBuilder() - .register(KryoNamespaces.BASIC) - .register(COPYCAT) - .register(ONOS_STORE) - .build(); - } - }; - - @Override - public ByteBuffer writeObject(Object object) { - return ByteBuffer.wrap(SERIALIZER.encode(object)); - } - - @Override - public <T> T readObject(ByteBuffer buffer) { - return SERIALIZER.decode(buffer); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java deleted file mode 100644 index 1136428b..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import net.kuujo.copycat.state.Command; -import net.kuujo.copycat.state.Initializer; -import net.kuujo.copycat.state.Query; -import net.kuujo.copycat.state.StateContext; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -/** - * Database state. - * - */ -public interface DatabaseState<K, V> { - - /** - * Initializes the database state. - * - * @param context The map state context. - */ - @Initializer - void init(StateContext<DatabaseState<K, V>> context); - - @Query - Set<String> maps(); - - @Query - Map<String, Long> counters(); - - @Query - int mapSize(String mapName); - - @Query - boolean mapIsEmpty(String mapName); - - @Query - boolean mapContainsKey(String mapName, K key); - - @Query - boolean mapContainsValue(String mapName, V value); - - @Query - Versioned<V> mapGet(String mapName, K key); - - @Command - Result<UpdateResult<K, V>> mapUpdate(String mapName, K key, Match<V> valueMatch, Match<Long> versionMatch, V value); - - @Command - Result<Void> mapClear(String mapName); - - @Query - Set<K> mapKeySet(String mapName); - - @Query - Collection<Versioned<V>> mapValues(String mapName); - - @Query - Set<Entry<K, Versioned<V>>> mapEntrySet(String mapName); - - @Command - Long counterAddAndGet(String counterName, long delta); - - @Command - Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue); - - @Command - Long counterGetAndAdd(String counterName, long delta); - - @Query - Long queueSize(String queueName); - - @Query - byte[] queuePeek(String queueName); - - @Command - byte[] queuePop(String queueName); - - @Command - void queuePush(String queueName, byte[] entry); - - @Query - Long counterGet(String counterName); - - @Command - CommitResponse prepareAndCommit(Transaction transaction); - - @Command - boolean prepare(Transaction transaction); - - @Command - CommitResponse commit(Transaction transaction); - - @Command - boolean rollback(Transaction transaction); -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java deleted file mode 100644 index d851eaa0..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import org.onosproject.store.service.AsyncAtomicCounter; - -import java.util.concurrent.CompletableFuture; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Default implementation for a distributed AsyncAtomicCounter backed by - * partitioned Raft DB. - * <p> - * The initial value will be zero. - */ -public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { - - private final String name; - private final Database database; - private final MeteringAgent monitor; - - private static final String PRIMITIVE_NAME = "atomicCounter"; - private static final String INCREMENT_AND_GET = "incrementAndGet"; - private static final String GET_AND_INCREMENT = "getAndIncrement"; - private static final String GET_AND_ADD = "getAndAdd"; - private static final String ADD_AND_GET = "addAndGet"; - private static final String GET = "get"; - private static final String SET = "set"; - private static final String COMPARE_AND_SET = "compareAndSet"; - - public DefaultAsyncAtomicCounter(String name, - Database database, - boolean meteringEnabled) { - this.name = checkNotNull(name); - this.database = checkNotNull(database); - this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); - } - - @Override - public CompletableFuture<Long> incrementAndGet() { - final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET); - return addAndGet(1L) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Long> get() { - final MeteringAgent.Context timer = monitor.startTimer(GET); - return database.counterGet(name) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Long> getAndIncrement() { - final MeteringAgent.Context timer = monitor.startTimer(GET_AND_INCREMENT); - return getAndAdd(1L) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Long> getAndAdd(long delta) { - final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD); - return database.counterGetAndAdd(name, delta) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Long> addAndGet(long delta) { - final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET); - return database.counterAddAndGet(name, delta) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Void> set(long value) { - final MeteringAgent.Context timer = monitor.startTimer(SET); - return database.counterSet(name, value) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) { - final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET); - return database.counterCompareAndSet(name, expectedValue, updateValue) - .whenComplete((r, e) -> timer.stop(e)); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java deleted file mode 100644 index af2bb74d..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.Maps; - -import org.onlab.util.HexString; -import org.onlab.util.SharedExecutors; -import org.onlab.util.Tools; -import org.onosproject.core.ApplicationId; -import org.onosproject.store.service.AsyncConsistentMap; -import org.onosproject.store.service.ConsistentMapException; -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.MapEventListener; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.Versioned; -import org.slf4j.Logger; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP_UPDATE; -import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT; -import static org.slf4j.LoggerFactory.getLogger; - -/** - * AsyncConsistentMap implementation that is backed by a Raft consensus - * based database. - * - * @param <K> type of key. - * @param <V> type of value. - */ -public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> { - - private final String name; - private final ApplicationId applicationId; - private final Database database; - private final Serializer serializer; - private final boolean readOnly; - private final boolean purgeOnUninstall; - - private static final String PRIMITIVE_NAME = "consistentMap"; - private static final String SIZE = "size"; - private static final String IS_EMPTY = "isEmpty"; - private static final String CONTAINS_KEY = "containsKey"; - private static final String CONTAINS_VALUE = "containsValue"; - private static final String GET = "get"; - private static final String COMPUTE_IF = "computeIf"; - private static final String PUT = "put"; - private static final String PUT_AND_GET = "putAndGet"; - private static final String PUT_IF_ABSENT = "putIfAbsent"; - private static final String REMOVE = "remove"; - private static final String CLEAR = "clear"; - private static final String KEY_SET = "keySet"; - private static final String VALUES = "values"; - private static final String ENTRY_SET = "entrySet"; - private static final String REPLACE = "replace"; - private static final String COMPUTE_IF_ABSENT = "computeIfAbsent"; - - private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>(); - - private final Logger log = getLogger(getClass()); - private final MeteringAgent monitor; - - private static final String ERROR_NULL_KEY = "Key cannot be null"; - private static final String ERROR_NULL_VALUE = "Null values are not allowed"; - - // String representation of serialized byte[] -> original key Object - private final LoadingCache<String, K> keyCache = CacheBuilder.newBuilder() - .softValues() - .build(new CacheLoader<String, K>() { - - @Override - public K load(String key) { - return serializer.decode(HexString.fromHexString(key)); - } - }); - - protected String sK(K key) { - String s = HexString.toHexString(serializer.encode(key)); - keyCache.put(s, key); - return s; - } - - protected K dK(String key) { - return keyCache.getUnchecked(key); - } - - public DefaultAsyncConsistentMap(String name, - ApplicationId applicationId, - Database database, - Serializer serializer, - boolean readOnly, - boolean purgeOnUninstall, - boolean meteringEnabled) { - this.name = checkNotNull(name, "map name cannot be null"); - this.applicationId = applicationId; - this.database = checkNotNull(database, "database cannot be null"); - this.serializer = checkNotNull(serializer, "serializer cannot be null"); - this.readOnly = readOnly; - this.purgeOnUninstall = purgeOnUninstall; - this.database.registerConsumer(update -> { - SharedExecutors.getSingleThreadExecutor().execute(() -> { - if (listeners.isEmpty()) { - return; - } - try { - if (update.target() == MAP_UPDATE) { - Result<UpdateResult<String, byte[]>> result = update.output(); - if (result.success() && result.value().mapName().equals(name)) { - MapEvent<K, V> mapEvent = result.value() - .<K, V>map(this::dK, - v -> serializer.decode(Tools.copyOf(v))) - .toMapEvent(); - notifyListeners(mapEvent); - } - } else if (update.target() == TX_COMMIT) { - CommitResponse response = update.output(); - if (response.success()) { - response.updates().forEach(u -> { - if (u.mapName().equals(name)) { - MapEvent<K, V> mapEvent = - u.<K, V>map(this::dK, - v -> serializer.decode(Tools.copyOf(v))) - .toMapEvent(); - notifyListeners(mapEvent); - } - }); - } - } - } catch (Exception e) { - log.warn("Error notifying listeners", e); - } - }); - }); - this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); - } - - /** - * Returns this map name. - * @return map name - */ - public String name() { - return name; - } - - /** - * Returns the serializer for map entries. - * @return map entry serializer - */ - public Serializer serializer() { - return serializer; - } - - /** - * Returns the applicationId owning this map. - * @return application Id - */ - public ApplicationId applicationId() { - return applicationId; - } - - /** - * Returns whether the map entries should be purged when the application - * owning it is uninstalled. - * @return true is map needs to cleared on app uninstall; false otherwise - */ - public boolean purgeOnUninstall() { - return purgeOnUninstall; - } - - @Override - public CompletableFuture<Integer> size() { - final MeteringAgent.Context timer = monitor.startTimer(SIZE); - return database.mapSize(name) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Boolean> isEmpty() { - final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY); - return database.mapIsEmpty(name) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Boolean> containsKey(K key) { - checkNotNull(key, ERROR_NULL_KEY); - final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY); - return database.mapContainsKey(name, sK(key)) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Boolean> containsValue(V value) { - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_VALUE); - return database.mapContainsValue(name, serializer.encode(value)) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Versioned<V>> get(K key) { - checkNotNull(key, ERROR_NULL_KEY); - final MeteringAgent.Context timer = monitor.startTimer(GET); - return database.mapGet(name, sK(key)) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v != null ? v.map(serializer::decode) : null); - } - - @Override - public CompletableFuture<Versioned<V>> computeIfAbsent(K key, - Function<? super K, ? extends V> mappingFunction) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(mappingFunction, "Mapping function cannot be null"); - final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF_ABSENT); - return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key)) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.newValue()); - } - - @Override - public CompletableFuture<Versioned<V>> computeIfPresent(K key, - BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - return computeIf(key, Objects::nonNull, remappingFunction); - } - - @Override - public CompletableFuture<Versioned<V>> compute(K key, - BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - return computeIf(key, v -> true, remappingFunction); - } - - @Override - public CompletableFuture<Versioned<V>> computeIf(K key, - Predicate<? super V> condition, - BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(condition, "predicate function cannot be null"); - checkNotNull(remappingFunction, "Remapping function cannot be null"); - final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF); - return get(key).thenCompose(r1 -> { - V existingValue = r1 == null ? null : r1.value(); - // if the condition evaluates to false, return existing value. - if (!condition.test(existingValue)) { - return CompletableFuture.completedFuture(r1); - } - - AtomicReference<V> computedValue = new AtomicReference<>(); - // if remappingFunction throws an exception, return the exception. - try { - computedValue.set(remappingFunction.apply(key, existingValue)); - } catch (Exception e) { - return Tools.exceptionalFuture(e); - } - if (computedValue.get() == null && r1 == null) { - return CompletableFuture.completedFuture(null); - } - Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any(); - Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version()); - return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get()) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> { - if (v.updated()) { - return v.newValue(); - } else { - throw new ConsistentMapException.ConcurrentModification(); - } - }); - }); - } - - @Override - public CompletableFuture<Versioned<V>> put(K key, V value) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(PUT); - return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue()) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Versioned<V>> putAndGet(K key, V value) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(PUT_AND_GET); - return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue()) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Versioned<V>> remove(K key) { - checkNotNull(key, ERROR_NULL_KEY); - final MeteringAgent.Context timer = monitor.startTimer(REMOVE); - return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue()) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Void> clear() { - checkIfUnmodifiable(); - final MeteringAgent.Context timer = monitor.startTimer(CLEAR); - return database.mapClear(name).thenApply(this::unwrapResult) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Set<K>> keySet() { - final MeteringAgent.Context timer = monitor.startTimer(KEY_SET); - return database.mapKeySet(name) - .thenApply(s -> newMappingKeySet(s)) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture<Collection<Versioned<V>>> values() { - final MeteringAgent.Context timer = monitor.startTimer(VALUES); - return database.mapValues(name) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(c -> c - .stream() - .map(v -> v.<V>map(serializer::decode)) - .collect(Collectors.toList())); - } - - @Override - public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() { - final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET); - return database.mapEntrySet(name) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(s -> newMappingEntrySet(s)); - } - - @Override - public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(PUT_IF_ABSENT); - return updateAndGet(key, Match.ifNull(), Match.any(), value) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.oldValue()); - } - - @Override - public CompletableFuture<Boolean> remove(K key, V value) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(REMOVE); - return updateAndGet(key, Match.ifValue(value), Match.any(), null) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.updated()); - } - - @Override - public CompletableFuture<Boolean> remove(K key, long version) { - checkNotNull(key, ERROR_NULL_KEY); - final MeteringAgent.Context timer = monitor.startTimer(REMOVE); - return updateAndGet(key, Match.any(), Match.ifValue(version), null) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.updated()); - } - - @Override - public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(oldValue, ERROR_NULL_VALUE); - checkNotNull(newValue, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(REPLACE); - return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.updated()); - } - - @Override - public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) { - final MeteringAgent.Context timer = monitor.startTimer(REPLACE); - return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.updated()); - } - - /** - * Pre-update hook for performing required checks/actions before going forward with an update operation. - * @param key map key. - */ - protected void beforeUpdate(K key) { - checkIfUnmodifiable(); - } - - private Set<K> newMappingKeySet(Set<String> s) { - return new MappingSet<>(s, Collections::unmodifiableSet, - this::sK, this::dK); - } - - private Set<Entry<K, Versioned<V>>> newMappingEntrySet(Set<Entry<String, Versioned<byte[]>>> s) { - return new MappingSet<>(s, Collections::unmodifiableSet, - this::reverseMapRawEntry, this::mapRawEntry); - } - - private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) { - return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode)); - } - - private Map.Entry<String, Versioned<byte[]>> reverseMapRawEntry(Map.Entry<K, Versioned<V>> e) { - return Maps.immutableEntry(sK(e.getKey()), e.getValue().map(serializer::encode)); - } - - private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key, - Match<V> oldValueMatch, - Match<Long> oldVersionMatch, - V value) { - beforeUpdate(key); - return database.mapUpdate(name, - sK(key), - oldValueMatch.map(serializer::encode), - oldVersionMatch, - value == null ? null : serializer.encode(value)) - .thenApply(this::unwrapResult) - .thenApply(r -> r.<K, V>map(this::dK, serializer::decode)); - } - - private <T> T unwrapResult(Result<T> result) { - if (result.status() == Result.Status.LOCKED) { - throw new ConsistentMapException.ConcurrentModification(); - } else if (result.success()) { - return result.value(); - } else { - throw new IllegalStateException("Must not be here"); - } - } - - private void checkIfUnmodifiable() { - if (readOnly) { - throw new UnsupportedOperationException(); - } - } - - @Override - public void addListener(MapEventListener<K, V> listener) { - listeners.add(listener); - } - - @Override - public void removeListener(MapEventListener<K, V> listener) { - listeners.remove(listener); - } - - protected void notifyListeners(MapEvent<K, V> event) { - if (event == null) { - return; - } - listeners.forEach(listener -> { - try { - listener.event(event); - } catch (Exception e) { - log.warn("Failure notifying listener about {}", event, e); - } - }); - } - -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java deleted file mode 100644 index 2d6a956c..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import org.onosproject.store.service.AsyncAtomicCounter; -import org.onosproject.store.service.AtomicCounter; -import org.onosproject.store.service.StorageException; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Default implementation for a distributed AtomicCounter backed by - * partitioned Raft DB. - * <p> - * The initial value will be zero. - */ -public class DefaultAtomicCounter implements AtomicCounter { - - private static final int OPERATION_TIMEOUT_MILLIS = 5000; - - private final AsyncAtomicCounter asyncCounter; - - public DefaultAtomicCounter(String name, - Database database, - boolean meteringEnabled) { - asyncCounter = new DefaultAsyncAtomicCounter(name, database, meteringEnabled); - } - - @Override - public long incrementAndGet() { - return complete(asyncCounter.incrementAndGet()); - } - - @Override - public long getAndIncrement() { - return complete(asyncCounter.getAndIncrement()); - } - - @Override - public long getAndAdd(long delta) { - return complete(asyncCounter.getAndAdd(delta)); - } - - @Override - public long addAndGet(long delta) { - return complete(asyncCounter.getAndAdd(delta)); - } - - @Override - public void set(long value) { - complete(asyncCounter.set(value)); - } - - @Override - public boolean compareAndSet(long expectedValue, long updateValue) { - return complete(asyncCounter.compareAndSet(expectedValue, updateValue)); - } - - @Override - public long get() { - return complete(asyncCounter.get()); - } - - private static <T> T complete(CompletableFuture<T> future) { - try { - return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new StorageException.Interrupted(); - } catch (TimeoutException e) { - throw new StorageException.Timeout(); - } catch (ExecutionException e) { - throw new StorageException(e.getCause()); - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java deleted file mode 100644 index dba4443b..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import org.onosproject.store.service.AsyncAtomicCounter; -import org.onosproject.store.service.AtomicCounter; -import org.onosproject.store.service.AtomicCounterBuilder; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -/** - * Default implementation of AtomicCounterBuilder. - */ -public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder { - - private String name; - private boolean partitionsEnabled = true; - private final Database partitionedDatabase; - private final Database inMemoryDatabase; - private boolean metering = true; - - public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) { - this.inMemoryDatabase = inMemoryDatabase; - this.partitionedDatabase = partitionedDatabase; - } - - @Override - public AtomicCounterBuilder withName(String name) { - checkArgument(name != null && !name.isEmpty()); - this.name = name; - return this; - } - - @Override - public AtomicCounterBuilder withPartitionsDisabled() { - partitionsEnabled = false; - return this; - } - - @Override - public AtomicCounter build() { - validateInputs(); - Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; - return new DefaultAtomicCounter(name, database, metering); - } - - @Override - public AsyncAtomicCounter buildAsyncCounter() { - validateInputs(); - Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; - return new DefaultAsyncAtomicCounter(name, database, metering); - } - - @Override - public AtomicCounterBuilder withMeteringDisabled() { - metering = false; - return this; - } - - private void validateInputs() { - checkState(name != null, "name must be specified"); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java deleted file mode 100644 index e8c93f31..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import org.onosproject.store.service.AtomicValue; -import org.onosproject.store.service.AtomicValueEvent; -import org.onosproject.store.service.AtomicValueEventListener; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.MapEventListener; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.Versioned; - -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; - -/** - * Default implementation of AtomicValue. - * - * @param <V> value type - */ -public class DefaultAtomicValue<V> implements AtomicValue<V> { - - private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>(); - private final ConsistentMap<String, byte[]> valueMap; - private final String name; - private final Serializer serializer; - private final MapEventListener<String, byte[]> mapEventListener = new InternalMapEventListener(); - private final MeteringAgent monitor; - - private static final String COMPONENT_NAME = "atomicValue"; - private static final String GET = "get"; - private static final String GET_AND_SET = "getAndSet"; - private static final String COMPARE_AND_SET = "compareAndSet"; - - public DefaultAtomicValue(ConsistentMap<String, byte[]> valueMap, - String name, - boolean meteringEnabled, - Serializer serializer) { - this.valueMap = valueMap; - this.name = name; - this.serializer = serializer; - this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled); - } - - @Override - public boolean compareAndSet(V expect, V update) { - final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET); - try { - if (expect == null) { - if (update == null) { - return true; - } - return valueMap.putIfAbsent(name, serializer.encode(update)) == null; - } else { - if (update == null) { - return valueMap.remove(name, serializer.encode(expect)); - } - return valueMap.replace(name, serializer.encode(expect), serializer.encode(update)); - } - } finally { - newTimer.stop(null); - } - } - - @Override - public V get() { - final MeteringAgent.Context newTimer = monitor.startTimer(GET); - try { - Versioned<byte[]> rawValue = valueMap.get(name); - return rawValue == null ? null : serializer.decode(rawValue.value()); - } finally { - newTimer.stop(null); - } - } - - @Override - public V getAndSet(V value) { - final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET); - try { - Versioned<byte[]> previousValue = value == null ? - valueMap.remove(name) : valueMap.put(name, serializer.encode(value)); - return previousValue == null ? null : serializer.decode(previousValue.value()); - } finally { - newTimer.stop(null); - } - } - - @Override - public void set(V value) { - getAndSet(value); - } - - @Override - public void addListener(AtomicValueEventListener<V> listener) { - synchronized (listeners) { - if (listeners.add(listener)) { - if (listeners.size() == 1) { - valueMap.addListener(mapEventListener); - } - } - } - } - - @Override - public void removeListener(AtomicValueEventListener<V> listener) { - synchronized (listeners) { - if (listeners.remove(listener)) { - if (listeners.size() == 0) { - valueMap.removeListener(mapEventListener); - } - } - } - } - - private class InternalMapEventListener implements MapEventListener<String, byte[]> { - - @Override - public void event(MapEvent<String, byte[]> mapEvent) { - V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : serializer.decode(mapEvent.value().value()); - AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue); - listeners.forEach(l -> l.event(atomicValueEvent)); - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java deleted file mode 100644 index b39004b3..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.AtomicValue; -import org.onosproject.store.service.AtomicValueBuilder; -import org.onosproject.store.service.ConsistentMapBuilder; -import org.onosproject.store.service.Serializer; - -/** - * Default implementation of AtomicValueBuilder. - * - * @param <V> value type - */ -public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> { - - private Serializer serializer; - private String name; - private ConsistentMapBuilder<String, byte[]> mapBuilder; - private boolean metering = true; - - public DefaultAtomicValueBuilder(DatabaseManager manager) { - mapBuilder = manager.<String, byte[]>consistentMapBuilder() - .withName("onos-atomic-values") - .withMeteringDisabled() - .withSerializer(Serializer.using(KryoNamespaces.BASIC)); - } - - @Override - public AtomicValueBuilder<V> withName(String name) { - this.name = name; - return this; - } - - @Override - public AtomicValueBuilder<V> withSerializer(Serializer serializer) { - this.serializer = serializer; - return this; - } - - @Override - public AtomicValueBuilder<V> withPartitionsDisabled() { - mapBuilder.withPartitionsDisabled(); - return this; - } - - @Override - public AtomicValueBuilder<V> withMeteringDisabled() { - metering = false; - return this; - } - - @Override - public AtomicValue<V> build() { - return new DefaultAtomicValue<>(mapBuilder.build(), name, metering, serializer); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java deleted file mode 100644 index 6f7b5487..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.Set; - -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.ConsistentMapException; -import org.onosproject.store.service.MapEventListener; -import org.onosproject.store.service.Versioned; - -/** - * ConsistentMap implementation that is backed by a Raft consensus - * based database. - * - * @param <K> type of key. - * @param <V> type of value. - */ -public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> { - - private static final int OPERATION_TIMEOUT_MILLIS = 5000; - - private final DefaultAsyncConsistentMap<K, V> asyncMap; - private Map<K, V> javaMap; - - public String name() { - return asyncMap.name(); - } - - public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) { - this.asyncMap = asyncMap; - } - - @Override - public int size() { - return complete(asyncMap.size()); - } - - @Override - public boolean isEmpty() { - return complete(asyncMap.isEmpty()); - } - - @Override - public boolean containsKey(K key) { - return complete(asyncMap.containsKey(key)); - } - - @Override - public boolean containsValue(V value) { - return complete(asyncMap.containsValue(value)); - } - - @Override - public Versioned<V> get(K key) { - return complete(asyncMap.get(key)); - } - - @Override - public Versioned<V> computeIfAbsent(K key, - Function<? super K, ? extends V> mappingFunction) { - return complete(asyncMap.computeIfAbsent(key, mappingFunction)); - } - - @Override - public Versioned<V> computeIfPresent(K key, - BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - return complete(asyncMap.computeIfPresent(key, remappingFunction)); - } - - @Override - public Versioned<V> compute(K key, - BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - return complete(asyncMap.compute(key, remappingFunction)); - } - - @Override - public Versioned<V> computeIf(K key, - Predicate<? super V> condition, - BiFunction<? super K, ? super V, ? extends V> remappingFunction) { - return complete(asyncMap.computeIf(key, condition, remappingFunction)); - } - - @Override - public Versioned<V> put(K key, V value) { - return complete(asyncMap.put(key, value)); - } - - @Override - public Versioned<V> putAndGet(K key, V value) { - return complete(asyncMap.putAndGet(key, value)); - } - - @Override - public Versioned<V> remove(K key) { - return complete(asyncMap.remove(key)); - } - - @Override - public void clear() { - complete(asyncMap.clear()); - } - - @Override - public Set<K> keySet() { - return complete(asyncMap.keySet()); - } - - @Override - public Collection<Versioned<V>> values() { - return complete(asyncMap.values()); - } - - @Override - public Set<Entry<K, Versioned<V>>> entrySet() { - return complete(asyncMap.entrySet()); - } - - @Override - public Versioned<V> putIfAbsent(K key, V value) { - return complete(asyncMap.putIfAbsent(key, value)); - } - - @Override - public boolean remove(K key, V value) { - return complete(asyncMap.remove(key, value)); - } - - @Override - public boolean remove(K key, long version) { - return complete(asyncMap.remove(key, version)); - } - - @Override - public boolean replace(K key, V oldValue, V newValue) { - return complete(asyncMap.replace(key, oldValue, newValue)); - } - - @Override - public boolean replace(K key, long oldVersion, V newValue) { - return complete(asyncMap.replace(key, oldVersion, newValue)); - } - - private static <T> T complete(CompletableFuture<T> future) { - try { - return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ConsistentMapException.Interrupted(); - } catch (TimeoutException e) { - throw new ConsistentMapException.Timeout(); - } catch (ExecutionException e) { - if (e.getCause() instanceof ConsistentMapException) { - throw (ConsistentMapException) e.getCause(); - } else { - throw new ConsistentMapException(e.getCause()); - } - } - } - - @Override - public void addListener(MapEventListener<K, V> listener) { - asyncMap.addListener(listener); - } - - @Override - public void removeListener(MapEventListener<K, V> listener) { - asyncMap.addListener(listener); - } - - @Override - public Map<K, V> asJavaMap() { - synchronized (this) { - if (javaMap == null) { - javaMap = new ConsistentMapBackedJavaMap<>(this); - } - } - return javaMap; - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java deleted file mode 100644 index 0e11794e..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import org.onosproject.core.ApplicationId; -import org.onosproject.store.service.AsyncConsistentMap; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.ConsistentMapBuilder; -import org.onosproject.store.service.Serializer; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -/** - * Default Consistent Map builder. - * - * @param <K> type for map key - * @param <V> type for map value - */ -public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K, V> { - - private Serializer serializer; - private String name; - private ApplicationId applicationId; - private boolean purgeOnUninstall = false; - private boolean partitionsEnabled = true; - private boolean readOnly = false; - private boolean metering = true; - private boolean relaxedReadConsistency = false; - private final DatabaseManager manager; - - public DefaultConsistentMapBuilder(DatabaseManager manager) { - this.manager = manager; - } - - @Override - public ConsistentMapBuilder<K, V> withName(String name) { - checkArgument(name != null && !name.isEmpty()); - this.name = name; - return this; - } - - @Override - public ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id) { - checkArgument(id != null); - this.applicationId = id; - return this; - } - - @Override - public ConsistentMapBuilder<K, V> withPurgeOnUninstall() { - purgeOnUninstall = true; - return this; - } - - @Override - public ConsistentMapBuilder<K, V> withMeteringDisabled() { - metering = false; - return this; - } - - @Override - public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) { - checkArgument(serializer != null); - this.serializer = serializer; - return this; - } - - @Override - public ConsistentMapBuilder<K, V> withPartitionsDisabled() { - partitionsEnabled = false; - return this; - } - - @Override - public ConsistentMapBuilder<K, V> withUpdatesDisabled() { - readOnly = true; - return this; - } - - @Override - public ConsistentMapBuilder<K, V> withRelaxedReadConsistency() { - relaxedReadConsistency = true; - return this; - } - - private void validateInputs() { - checkState(name != null, "name must be specified"); - checkState(serializer != null, "serializer must be specified"); - if (purgeOnUninstall) { - checkState(applicationId != null, "ApplicationId must be specified when purgeOnUninstall is enabled"); - } - } - - @Override - public ConsistentMap<K, V> build() { - return new DefaultConsistentMap<>(buildAndRegisterMap()); - } - - @Override - public AsyncConsistentMap<K, V> buildAsyncMap() { - return buildAndRegisterMap(); - } - - private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() { - validateInputs(); - Database database = partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase; - if (relaxedReadConsistency) { - return manager.registerMap( - new AsyncCachingConsistentMap<>(name, - applicationId, - database, - serializer, - readOnly, - purgeOnUninstall, - metering)); - } else { - return manager.registerMap( - new DefaultAsyncConsistentMap<>(name, - applicationId, - database, - serializer, - readOnly, - purgeOnUninstall, - metering)); - } - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java deleted file mode 100644 index 2a50fbd6..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import com.google.common.collect.Sets; -import net.kuujo.copycat.resource.internal.AbstractResource; -import net.kuujo.copycat.resource.internal.ResourceManager; -import net.kuujo.copycat.state.StateMachine; -import net.kuujo.copycat.state.internal.DefaultStateMachine; -import net.kuujo.copycat.util.concurrent.Futures; -import net.kuujo.copycat.util.function.TriConsumer; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; -import java.util.function.Supplier; - -/** - * Default database. - */ -public class DefaultDatabase extends AbstractResource<Database> implements Database { - private final StateMachine<DatabaseState<String, byte[]>> stateMachine; - private DatabaseProxy<String, byte[]> proxy; - private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet(); - private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher(); - - @SuppressWarnings({"unchecked", "rawtypes"}) - public DefaultDatabase(ResourceManager context) { - super(context); - this.stateMachine = new DefaultStateMachine(context, - DatabaseState.class, - DefaultDatabaseState.class, - DefaultDatabase.class.getClassLoader()); - this.stateMachine.addStartupTask(() -> { - stateMachine.registerWatcher(watcher); - return CompletableFuture.completedFuture(null); - }); - this.stateMachine.addShutdownTask(() -> { - stateMachine.unregisterWatcher(watcher); - return CompletableFuture.completedFuture(null); - }); - } - - /** - * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to - * return the completed future result. - * - * @param supplier The supplier to call if the database is open. - * @param <T> The future result type. - * @return A completable future that if this database is closed is immediately failed. - */ - protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) { - if (proxy == null) { - return Futures.exceptionalFuture(new IllegalStateException("Database closed")); - } - return supplier.get(); - } - - @Override - public CompletableFuture<Set<String>> maps() { - return checkOpen(() -> proxy.maps()); - } - - @Override - public CompletableFuture<Map<String, Long>> counters() { - return checkOpen(() -> proxy.counters()); - } - - @Override - public CompletableFuture<Integer> mapSize(String mapName) { - return checkOpen(() -> proxy.mapSize(mapName)); - } - - @Override - public CompletableFuture<Boolean> mapIsEmpty(String mapName) { - return checkOpen(() -> proxy.mapIsEmpty(mapName)); - } - - @Override - public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) { - return checkOpen(() -> proxy.mapContainsKey(mapName, key)); - } - - @Override - public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) { - return checkOpen(() -> proxy.mapContainsValue(mapName, value)); - } - - @Override - public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) { - return checkOpen(() -> proxy.mapGet(mapName, key)); - } - - @Override - public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate( - String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) { - return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value)); - } - - @Override - public CompletableFuture<Result<Void>> mapClear(String mapName) { - return checkOpen(() -> proxy.mapClear(mapName)); - } - - @Override - public CompletableFuture<Set<String>> mapKeySet(String mapName) { - return checkOpen(() -> proxy.mapKeySet(mapName)); - } - - @Override - public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) { - return checkOpen(() -> proxy.mapValues(mapName)); - } - - @Override - public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) { - return checkOpen(() -> proxy.mapEntrySet(mapName)); - } - - @Override - public CompletableFuture<Long> counterGet(String counterName) { - return checkOpen(() -> proxy.counterGet(counterName)); - } - - @Override - public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) { - return checkOpen(() -> proxy.counterAddAndGet(counterName, delta)); - } - - @Override - public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) { - return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta)); - } - - @Override - public CompletableFuture<Void> counterSet(String counterName, long value) { - return checkOpen(() -> proxy.counterSet(counterName, value)); - } - - @Override - public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) { - return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update)); - } - - @Override - public CompletableFuture<Long> queueSize(String queueName) { - return checkOpen(() -> proxy.queueSize(queueName)); - } - - @Override - public CompletableFuture<Void> queuePush(String queueName, byte[] entry) { - return checkOpen(() -> proxy.queuePush(queueName, entry)); - } - - @Override - public CompletableFuture<byte[]> queuePop(String queueName) { - return checkOpen(() -> proxy.queuePop(queueName)); - } - - @Override - public CompletableFuture<byte[]> queuePeek(String queueName) { - return checkOpen(() -> proxy.queuePeek(queueName)); - } - - @Override - public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) { - return checkOpen(() -> proxy.prepareAndCommit(transaction)); - } - - @Override - public CompletableFuture<Boolean> prepare(Transaction transaction) { - return checkOpen(() -> proxy.prepare(transaction)); - } - - @Override - public CompletableFuture<CommitResponse> commit(Transaction transaction) { - return checkOpen(() -> proxy.commit(transaction)); - } - - @Override - public CompletableFuture<Boolean> rollback(Transaction transaction) { - return checkOpen(() -> proxy.rollback(transaction)); - } - - @Override - @SuppressWarnings("unchecked") - public synchronized CompletableFuture<Database> open() { - return runStartupTasks() - .thenCompose(v -> stateMachine.open()) - .thenRun(() -> { - this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader()); - }) - .thenApply(v -> null); - } - - @Override - public synchronized CompletableFuture<Void> close() { - proxy = null; - return stateMachine.close() - .thenCompose(v -> runShutdownTasks()); - } - - @Override - public int hashCode() { - return name().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other instanceof Database) { - return name().equals(((Database) other).name()); - } - return false; - } - - @Override - public void registerConsumer(Consumer<StateMachineUpdate> consumer) { - consumers.add(consumer); - } - - @Override - public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) { - consumers.remove(consumer); - } - - private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> { - @Override - public void accept(String name, Object input, Object output) { - StateMachineUpdate update = new StateMachineUpdate(name, input, output); - consumers.forEach(consumer -> consumer.accept(update)); - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java deleted file mode 100644 index 8943fc87..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java +++ /dev/null @@ -1,372 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import com.google.common.base.Objects; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import net.kuujo.copycat.state.Initializer; -import net.kuujo.copycat.state.StateContext; -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedList; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; - -/** - * Default database state. - */ -public class DefaultDatabaseState implements DatabaseState<String, byte[]> { - private Long nextVersion; - private Map<String, AtomicLong> counters; - private Map<String, Map<String, Versioned<byte[]>>> maps; - private Map<String, Queue<byte[]>> queues; - - /** - * This locks map has a structure similar to the "tables" map above and - * holds all the provisional updates made during a transaction's prepare phase. - * The entry value is represented as the tuple: (transactionId, newValue) - * If newValue == null that signifies this update is attempting to - * delete the existing value. - * This map also serves as a lock on the entries that are being updated. - * The presence of a entry in this map indicates that element is - * participating in a transaction and is currently locked for updates. - */ - private Map<String, Map<String, Update>> locks; - - @Initializer - @Override - public void init(StateContext<DatabaseState<String, byte[]>> context) { - counters = context.get("counters"); - if (counters == null) { - counters = Maps.newConcurrentMap(); - context.put("counters", counters); - } - maps = context.get("maps"); - if (maps == null) { - maps = Maps.newConcurrentMap(); - context.put("maps", maps); - } - locks = context.get("locks"); - if (locks == null) { - locks = Maps.newConcurrentMap(); - context.put("locks", locks); - } - queues = context.get("queues"); - if (queues == null) { - queues = Maps.newConcurrentMap(); - context.put("queues", queues); - } - nextVersion = context.get("nextVersion"); - if (nextVersion == null) { - nextVersion = 0L; - context.put("nextVersion", nextVersion); - } - } - - @Override - public Set<String> maps() { - return ImmutableSet.copyOf(maps.keySet()); - } - - @Override - public Map<String, Long> counters() { - Map<String, Long> counterMap = Maps.newHashMap(); - counters.forEach((k, v) -> counterMap.put(k, v.get())); - return counterMap; - } - - @Override - public int mapSize(String mapName) { - return getMap(mapName).size(); - } - - @Override - public boolean mapIsEmpty(String mapName) { - return getMap(mapName).isEmpty(); - } - - @Override - public boolean mapContainsKey(String mapName, String key) { - return getMap(mapName).containsKey(key); - } - - @Override - public boolean mapContainsValue(String mapName, byte[] value) { - return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value)); - } - - @Override - public Versioned<byte[]> mapGet(String mapName, String key) { - return getMap(mapName).get(key); - } - - - @Override - public Result<UpdateResult<String, byte[]>> mapUpdate( - String mapName, - String key, - Match<byte[]> valueMatch, - Match<Long> versionMatch, - byte[] value) { - if (isLockedForUpdates(mapName, key)) { - return Result.locked(); - } - Versioned<byte[]> currentValue = getMap(mapName).get(key); - if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) || - !versionMatch.matches(currentValue == null ? null : currentValue.version())) { - return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue)); - } else { - if (value == null) { - if (currentValue == null) { - return Result.ok(new UpdateResult<>(false, mapName, key, null, null)); - } else { - getMap(mapName).remove(key); - return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null)); - } - } - Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion); - getMap(mapName).put(key, newValue); - return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue)); - } - } - - @Override - public Result<Void> mapClear(String mapName) { - if (areTransactionsInProgress(mapName)) { - return Result.locked(); - } - getMap(mapName).clear(); - return Result.ok(null); - } - - @Override - public Set<String> mapKeySet(String mapName) { - return ImmutableSet.copyOf(getMap(mapName).keySet()); - } - - @Override - public Collection<Versioned<byte[]>> mapValues(String mapName) { - return ImmutableList.copyOf(getMap(mapName).values()); - } - - @Override - public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) { - return ImmutableSet.copyOf(getMap(mapName) - .entrySet() - .stream() - .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue())) - .collect(Collectors.toSet())); - } - - @Override - public Long counterAddAndGet(String counterName, long delta) { - return getCounter(counterName).addAndGet(delta); - } - - @Override - public Long counterGetAndAdd(String counterName, long delta) { - return getCounter(counterName).getAndAdd(delta); - } - - @Override - public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) { - return getCounter(counterName).compareAndSet(expectedValue, updateValue); - } - - @Override - public Long counterGet(String counterName) { - return getCounter(counterName).get(); - } - - @Override - public Long queueSize(String queueName) { - return Long.valueOf(getQueue(queueName).size()); - } - - @Override - public byte[] queuePeek(String queueName) { - return getQueue(queueName).peek(); - } - - @Override - public byte[] queuePop(String queueName) { - return getQueue(queueName).poll(); - } - - @Override - public void queuePush(String queueName, byte[] entry) { - getQueue(queueName).offer(entry); - } - - @Override - public CommitResponse prepareAndCommit(Transaction transaction) { - if (prepare(transaction)) { - return commit(transaction); - } - return CommitResponse.failure(); - } - - @Override - public boolean prepare(Transaction transaction) { - if (transaction.updates().stream().anyMatch(update -> - isLockedByAnotherTransaction(update.mapName(), - update.key(), - transaction.id()))) { - return false; - } - - if (transaction.updates().stream().allMatch(this::isUpdatePossible)) { - transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id())); - return true; - } - return false; - } - - @Override - public CommitResponse commit(Transaction transaction) { - return CommitResponse.success(Lists.transform(transaction.updates(), - update -> commitProvisionalUpdate(update, transaction.id()))); - } - - @Override - public boolean rollback(Transaction transaction) { - transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id())); - return true; - } - - private Map<String, Versioned<byte[]>> getMap(String mapName) { - return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap()); - } - - private Map<String, Update> getLockMap(String mapName) { - return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap()); - } - - private AtomicLong getCounter(String counterName) { - return counters.computeIfAbsent(counterName, name -> new AtomicLong(0)); - } - - private Queue<byte[]> getQueue(String queueName) { - return queues.computeIfAbsent(queueName, name -> new LinkedList<>()); - } - - private boolean isUpdatePossible(DatabaseUpdate update) { - Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key()); - switch (update.type()) { - case PUT: - case REMOVE: - return true; - case PUT_IF_ABSENT: - return existingEntry == null; - case PUT_IF_VERSION_MATCH: - return existingEntry != null && existingEntry.version() == update.currentVersion(); - case PUT_IF_VALUE_MATCH: - return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue()); - case REMOVE_IF_VERSION_MATCH: - return existingEntry == null || existingEntry.version() == update.currentVersion(); - case REMOVE_IF_VALUE_MATCH: - return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue()); - default: - throw new IllegalStateException("Unsupported type: " + update.type()); - } - } - - private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) { - Map<String, Update> lockMap = getLockMap(update.mapName()); - switch (update.type()) { - case PUT: - case PUT_IF_ABSENT: - case PUT_IF_VERSION_MATCH: - case PUT_IF_VALUE_MATCH: - lockMap.put(update.key(), new Update(transactionId, update.value())); - break; - case REMOVE: - case REMOVE_IF_VERSION_MATCH: - case REMOVE_IF_VALUE_MATCH: - lockMap.put(update.key(), new Update(transactionId, null)); - break; - default: - throw new IllegalStateException("Unsupported type: " + update.type()); - } - } - - private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) { - String mapName = update.mapName(); - String key = update.key(); - Update provisionalUpdate = getLockMap(mapName).get(key); - if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { - getLockMap(mapName).remove(key); - } else { - throw new IllegalStateException("Invalid transaction Id"); - } - return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value(); - } - - private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) { - String mapName = update.mapName(); - String key = update.key(); - Update provisionalUpdate = getLockMap(mapName).get(key); - if (provisionalUpdate == null) { - return; - } - if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { - getLockMap(mapName).remove(key); - } - } - - private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) { - Update update = getLockMap(mapName).get(key); - return update != null && !Objects.equal(transactionId, update.transactionId()); - } - - private boolean isLockedForUpdates(String mapName, String key) { - return getLockMap(mapName).containsKey(key); - } - - private boolean areTransactionsInProgress(String mapName) { - return !getLockMap(mapName).isEmpty(); - } - - private class Update { - private final long transactionId; - private final byte[] value; - - public Update(long txId, byte[] value) { - this.transactionId = txId; - this.value = value; - } - - public long transactionId() { - return this.transactionId; - } - - public byte[] value() { - return this.value; - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java deleted file mode 100644 index 5f69fde8..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; - -import org.onlab.util.SharedExecutors; -import org.onosproject.store.service.DistributedQueue; -import org.onosproject.store.service.Serializer; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import static com.google.common.base.Preconditions.checkNotNull; -import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH; - -/** - * DistributedQueue implementation that provides FIFO ordering semantics. - * - * @param <E> queue entry type - */ -public class DefaultDistributedQueue<E> implements DistributedQueue<E> { - - private final String name; - private final Database database; - private final Serializer serializer; - private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet(); - - private static final String PRIMITIVE_NAME = "distributedQueue"; - private static final String SIZE = "size"; - private static final String PUSH = "push"; - private static final String POP = "pop"; - private static final String PEEK = "peek"; - - private static final String ERROR_NULL_ENTRY = "Null entries are not allowed"; - private final MeteringAgent monitor; - - public DefaultDistributedQueue(String name, - Database database, - Serializer serializer, - boolean meteringEnabled) { - this.name = checkNotNull(name, "queue name cannot be null"); - this.database = checkNotNull(database, "database cannot be null"); - this.serializer = checkNotNull(serializer, "serializer cannot be null"); - this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); - this.database.registerConsumer(update -> { - SharedExecutors.getSingleThreadExecutor().execute(() -> { - if (update.target() == QUEUE_PUSH) { - List<Object> input = update.input(); - String queueName = (String) input.get(0); - if (queueName.equals(name)) { - tryPoll(); - } - } - }); - }); - } - - @Override - public long size() { - final MeteringAgent.Context timer = monitor.startTimer(SIZE); - return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop(e))); - } - - @Override - public void push(E entry) { - checkNotNull(entry, ERROR_NULL_ENTRY); - final MeteringAgent.Context timer = monitor.startTimer(PUSH); - Futures.getUnchecked(database.queuePush(name, serializer.encode(entry)) - .whenComplete((r, e) -> timer.stop(e))); - } - - @Override - public CompletableFuture<E> pop() { - final MeteringAgent.Context timer = monitor.startTimer(POP); - return database.queuePop(name) - .whenComplete((r, e) -> timer.stop(e)) - .thenCompose(v -> { - if (v != null) { - return CompletableFuture.<E>completedFuture(serializer.decode(v)); - } - CompletableFuture<E> newPendingFuture = new CompletableFuture<>(); - pendingFutures.add(newPendingFuture); - return newPendingFuture; - }); - - } - - @Override - public E peek() { - final MeteringAgent.Context timer = monitor.startTimer(PEEK); - return Futures.getUnchecked(database.queuePeek(name) - .thenApply(v -> v != null ? serializer.<E>decode(v) : null) - .whenComplete((r, e) -> timer.stop(e))); - } - - public String name() { - return name; - } - - protected void tryPoll() { - Set<CompletableFuture<E>> completedFutures = Sets.newHashSet(); - for (CompletableFuture<E> future : pendingFutures) { - E entry = Futures.getUnchecked(database.queuePop(name) - .thenApply(v -> v != null ? serializer.decode(v) : null)); - if (entry != null) { - future.complete(entry); - completedFutures.add(future); - } else { - break; - } - } - pendingFutures.removeAll(completedFutures); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java deleted file mode 100644 index d6654e27..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import org.onosproject.store.service.DistributedQueue; -import org.onosproject.store.service.DistributedQueueBuilder; -import org.onosproject.store.service.Serializer; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -/** - * Default implementation of a {@code DistributedQueueBuilder}. - * - * @param <E> queue entry type - */ -public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> { - - private Serializer serializer; - private String name; - private boolean persistenceEnabled = true; - private final DatabaseManager databaseManager; - private boolean metering = true; - - public DefaultDistributedQueueBuilder(DatabaseManager databaseManager) { - this.databaseManager = databaseManager; - } - - @Override - public DistributedQueueBuilder<E> withName(String name) { - checkArgument(name != null && !name.isEmpty()); - this.name = name; - return this; - } - - @Override - public DistributedQueueBuilder<E> withSerializer(Serializer serializer) { - checkArgument(serializer != null); - this.serializer = serializer; - return this; - } - - @Override - public DistributedQueueBuilder<E> withMeteringDisabled() { - metering = false; - return this; - } - - @Override - public DistributedQueueBuilder<E> withPersistenceDisabled() { - persistenceEnabled = false; - return this; - } - - private boolean validInputs() { - return name != null && serializer != null; - } - - @Override - public DistributedQueue<E> build() { - checkState(validInputs()); - return new DefaultDistributedQueue<>( - name, - persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase, - serializer, - metering); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java deleted file mode 100644 index 677724df..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.DistributedSet; -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.MapEventListener; -import org.onosproject.store.service.SetEvent; -import org.onosproject.store.service.SetEventListener; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -/** - * Implementation of distributed set that is backed by a ConsistentMap. - - * @param <E> set element type - */ -public class DefaultDistributedSet<E> implements DistributedSet<E> { - - private static final String CONTAINS = "contains"; - private static final String PRIMITIVE_NAME = "distributedSet"; - private static final String SIZE = "size"; - private static final String IS_EMPTY = "isEmpty"; - private static final String ITERATOR = "iterator"; - private static final String TO_ARRAY = "toArray"; - private static final String ADD = "add"; - private static final String REMOVE = "remove"; - private static final String CONTAINS_ALL = "containsAll"; - private static final String ADD_ALL = "addAll"; - private static final String RETAIN_ALL = "retainAll"; - private static final String REMOVE_ALL = "removeAll"; - private static final String CLEAR = "clear"; - - private final String name; - private final ConsistentMap<E, Boolean> backingMap; - private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap(); - private final MeteringAgent monitor; - - public DefaultDistributedSet(String name, boolean meteringEnabled, ConsistentMap<E, Boolean> backingMap) { - this.name = name; - this.backingMap = backingMap; - monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); - } - - @Override - public int size() { - final MeteringAgent.Context timer = monitor.startTimer(SIZE); - try { - return backingMap.size(); - } finally { - timer.stop(null); - } - } - - @Override - public boolean isEmpty() { - final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY); - try { - return backingMap.isEmpty(); - } finally { - timer.stop(null); - } - } - - @SuppressWarnings("unchecked") - @Override - public boolean contains(Object o) { - final MeteringAgent.Context timer = monitor.startTimer(CONTAINS); - try { - return backingMap.containsKey((E) o); - } finally { - timer.stop(null); - } - } - - @Override - public Iterator<E> iterator() { - final MeteringAgent.Context timer = monitor.startTimer(ITERATOR); - //Do we have to measure this guy? - try { - return backingMap.keySet().iterator(); - } finally { - timer.stop(null); - } - } - - @Override - public Object[] toArray() { - final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY); - try { - return backingMap.keySet().stream().toArray(); - } finally { - timer.stop(null); - } - } - - @Override - public <T> T[] toArray(T[] a) { - final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY); - try { - return backingMap.keySet().stream().toArray(size -> a); - } finally { - timer.stop(null); - } - } - - @Override - public boolean add(E e) { - final MeteringAgent.Context timer = monitor.startTimer(ADD); - try { - return backingMap.putIfAbsent(e, true) == null; - } finally { - timer.stop(null); - } - } - - @SuppressWarnings("unchecked") - @Override - public boolean remove(Object o) { - final MeteringAgent.Context timer = monitor.startTimer(REMOVE); - try { - return backingMap.remove((E) o) != null; - } finally { - timer.stop(null); - } - } - - @Override - public boolean containsAll(Collection<?> c) { - final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL); - try { - return c.stream() - .allMatch(this::contains); - } finally { - timer.stop(null); - } - } - - @Override - public boolean addAll(Collection<? extends E> c) { - final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL); - try { - return c.stream() - .map(this::add) - .reduce(Boolean::logicalOr) - .orElse(false); - } finally { - timer.stop(null); - } - } - - @Override - public boolean retainAll(Collection<?> c) { - final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL); - try { - Set<?> retainSet = Sets.newHashSet(c); - return backingMap.keySet() - .stream() - .filter(k -> !retainSet.contains(k)) - .map(this::remove) - .reduce(Boolean::logicalOr) - .orElse(false); - } finally { - timer.stop(null); - } - } - - @Override - public boolean removeAll(Collection<?> c) { - final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL); - try { - Set<?> removeSet = Sets.newHashSet(c); - return backingMap.keySet() - .stream() - .filter(removeSet::contains) - .map(this::remove) - .reduce(Boolean::logicalOr) - .orElse(false); - } finally { - timer.stop(null); - } - } - - @Override - public void clear() { - final MeteringAgent.Context timer = monitor.startTimer(CLEAR); - try { - backingMap.clear(); - } finally { - timer.stop(null); - } - } - - @Override - public void addListener(SetEventListener<E> listener) { - MapEventListener<E, Boolean> mapEventListener = mapEvent -> { - if (mapEvent.type() == MapEvent.Type.INSERT) { - listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key())); - } else if (mapEvent.type() == MapEvent.Type.REMOVE) { - listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key())); - } - }; - if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) { - backingMap.addListener(mapEventListener); - } - } - - @Override - public void removeListener(SetEventListener<E> listener) { - MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener); - if (mapEventListener != null) { - backingMap.removeListener(mapEventListener); - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java deleted file mode 100644 index f7957f39..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import org.onosproject.core.ApplicationId; -import org.onosproject.store.service.ConsistentMapBuilder; -import org.onosproject.store.service.DistributedSet; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.DistributedSetBuilder; - -/** - * Default distributed set builder. - * - * @param <E> type for set elements - */ -public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E> { - - private String name; - private ConsistentMapBuilder<E, Boolean> mapBuilder; - private boolean metering = true; - - public DefaultDistributedSetBuilder(DatabaseManager manager) { - this.mapBuilder = manager.consistentMapBuilder(); - mapBuilder.withMeteringDisabled(); - } - - @Override - public DistributedSetBuilder<E> withName(String name) { - mapBuilder.withName(name); - this.name = name; - return this; - } - - @Override - public DistributedSetBuilder<E> withApplicationId(ApplicationId id) { - mapBuilder.withApplicationId(id); - return this; - } - - @Override - public DistributedSetBuilder<E> withPurgeOnUninstall() { - mapBuilder.withPurgeOnUninstall(); - return this; - } - - @Override - public DistributedSetBuilder<E> withSerializer(Serializer serializer) { - mapBuilder.withSerializer(serializer); - return this; - } - - @Override - public DistributedSetBuilder<E> withUpdatesDisabled() { - mapBuilder.withUpdatesDisabled(); - return this; - } - - @Override - public DistributedSetBuilder<E> withRelaxedReadConsistency() { - mapBuilder.withRelaxedReadConsistency(); - return this; - } - - @Override - public DistributedSetBuilder<E> withPartitionsDisabled() { - mapBuilder.withPartitionsDisabled(); - return this; - } - - @Override - public DistributedSetBuilder<E> withMeteringDisabled() { - metering = false; - return this; - } - - @Override - public DistributedSet<E> build() { - return new DefaultDistributedSet<E>(name, metering, mapBuilder.build()); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java deleted file mode 100644 index 2ff7a2dc..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransaction.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import java.util.List; - -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Transaction; - -import com.google.common.collect.ImmutableList; - -/** - * A Default transaction implementation. - */ -public class DefaultTransaction implements Transaction { - - private final long transactionId; - private final List<DatabaseUpdate> updates; - private final State state; - private final long lastUpdated; - - public DefaultTransaction(long transactionId, List<DatabaseUpdate> updates) { - this(transactionId, updates, State.PREPARING, System.currentTimeMillis()); - } - - private DefaultTransaction(long transactionId, List<DatabaseUpdate> updates, State state, long lastUpdated) { - this.transactionId = transactionId; - this.updates = ImmutableList.copyOf(updates); - this.state = state; - this.lastUpdated = lastUpdated; - } - - @Override - public long id() { - return transactionId; - } - - @Override - public List<DatabaseUpdate> updates() { - return updates; - } - - @Override - public State state() { - return state; - } - - @Override - public Transaction transition(State newState) { - return new DefaultTransaction(transactionId, updates, newState, System.currentTimeMillis()); - } - - @Override - public long lastUpdated() { - return lastUpdated; - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java deleted file mode 100644 index 73888221..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - -import static com.google.common.base.Preconditions.*; - -import org.onosproject.store.service.ConsistentMapBuilder; -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.TransactionContext; -import org.onosproject.store.service.TransactionalMap; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.Futures; - -/** - * Default TransactionContext implementation. - */ -public class DefaultTransactionContext implements TransactionContext { - private static final String TX_NOT_OPEN_ERROR = "Transaction Context is not open"; - - @SuppressWarnings("rawtypes") - private final Map<String, DefaultTransactionalMap> txMaps = Maps.newConcurrentMap(); - private boolean isOpen = false; - private final Database database; - private final long transactionId; - private final Supplier<ConsistentMapBuilder> mapBuilderSupplier; - - public DefaultTransactionContext(long transactionId, - Database database, - Supplier<ConsistentMapBuilder> mapBuilderSupplier) { - this.transactionId = transactionId; - this.database = checkNotNull(database); - this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier); - } - - @Override - public long transactionId() { - return transactionId; - } - - @Override - public void begin() { - checkState(!isOpen, "Transaction Context is already open"); - isOpen = true; - } - - @Override - public boolean isOpen() { - return isOpen; - } - - @Override - @SuppressWarnings("unchecked") - public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName, - Serializer serializer) { - checkState(isOpen, TX_NOT_OPEN_ERROR); - checkNotNull(mapName); - checkNotNull(serializer); - return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>( - name, - mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(), - this, - serializer)); - } - - @SuppressWarnings("unchecked") - @Override - public boolean commit() { - // TODO: rework commit implementation to be more intuitive - checkState(isOpen, TX_NOT_OPEN_ERROR); - CommitResponse response = null; - try { - List<DatabaseUpdate> updates = Lists.newLinkedList(); - txMaps.values().forEach(m -> updates.addAll(m.prepareDatabaseUpdates())); - Transaction transaction = new DefaultTransaction(transactionId, updates); - response = Futures.getUnchecked(database.prepareAndCommit(transaction)); - return response.success(); - } catch (Exception e) { - abort(); - return false; - } finally { - isOpen = false; - } - } - - @Override - public void abort() { - if (isOpen) { - try { - txMaps.values().forEach(m -> m.rollback()); - } finally { - isOpen = false; - } - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java deleted file mode 100644 index f20bfb80..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import org.onosproject.store.service.TransactionContext; -import org.onosproject.store.service.TransactionContextBuilder; - -/** - * The default implementation of a transaction context builder. This builder - * generates a {@link DefaultTransactionContext}. - */ -public class DefaultTransactionContextBuilder implements TransactionContextBuilder { - - private boolean partitionsEnabled = true; - private final DatabaseManager manager; - private final long transactionId; - - public DefaultTransactionContextBuilder(DatabaseManager manager, long transactionId) { - this.manager = manager; - this.transactionId = transactionId; - } - - @Override - public TransactionContextBuilder withPartitionsDisabled() { - partitionsEnabled = false; - return this; - } - - @Override - public TransactionContext build() { - return new DefaultTransactionContext( - transactionId, - partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase, - () -> partitionsEnabled ? manager.consistentMapBuilder() - : manager.consistentMapBuilder().withPartitionsDisabled()); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java deleted file mode 100644 index ade70335..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionalMap.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.onlab.util.HexString; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.TransactionContext; -import org.onosproject.store.service.TransactionalMap; -import org.onosproject.store.service.Versioned; - -import static com.google.common.base.Preconditions.*; - -import com.google.common.base.Objects; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * Default Transactional Map implementation that provides a repeatable reads - * transaction isolation level. - * - * @param <K> key type - * @param <V> value type. - */ -public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> { - - private final TransactionContext txContext; - private static final String TX_CLOSED_ERROR = "Transaction is closed"; - private final ConsistentMap<K, V> backingMap; - private final String name; - private final Serializer serializer; - private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap(); - private final Map<K, V> writeCache = Maps.newConcurrentMap(); - private final Set<K> deleteSet = Sets.newConcurrentHashSet(); - - private static final String ERROR_NULL_VALUE = "Null values are not allowed"; - private static final String ERROR_NULL_KEY = "Null key is not allowed"; - - private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder() - .softValues() - .build(new CacheLoader<K, String>() { - - @Override - public String load(K key) { - return HexString.toHexString(serializer.encode(key)); - } - }); - - protected K dK(String key) { - return serializer.decode(HexString.fromHexString(key)); - } - - public DefaultTransactionalMap( - String name, - ConsistentMap<K, V> backingMap, - TransactionContext txContext, - Serializer serializer) { - this.name = name; - this.backingMap = backingMap; - this.txContext = txContext; - this.serializer = serializer; - } - - @Override - public V get(K key) { - checkState(txContext.isOpen(), TX_CLOSED_ERROR); - checkNotNull(key, ERROR_NULL_KEY); - if (deleteSet.contains(key)) { - return null; - } - V latest = writeCache.get(key); - if (latest != null) { - return latest; - } else { - Versioned<V> v = readCache.computeIfAbsent(key, k -> backingMap.get(k)); - return v != null ? v.value() : null; - } - } - - @Override - public V put(K key, V value) { - checkState(txContext.isOpen(), TX_CLOSED_ERROR); - checkNotNull(value, ERROR_NULL_VALUE); - - V latest = get(key); - writeCache.put(key, value); - deleteSet.remove(key); - return latest; - } - - @Override - public V remove(K key) { - checkState(txContext.isOpen(), TX_CLOSED_ERROR); - V latest = get(key); - if (latest != null) { - writeCache.remove(key); - deleteSet.add(key); - } - return latest; - } - - @Override - public boolean remove(K key, V value) { - checkState(txContext.isOpen(), TX_CLOSED_ERROR); - checkNotNull(value, ERROR_NULL_VALUE); - V latest = get(key); - if (Objects.equal(value, latest)) { - remove(key); - return true; - } - return false; - } - - @Override - public boolean replace(K key, V oldValue, V newValue) { - checkState(txContext.isOpen(), TX_CLOSED_ERROR); - checkNotNull(oldValue, ERROR_NULL_VALUE); - checkNotNull(newValue, ERROR_NULL_VALUE); - V latest = get(key); - if (Objects.equal(oldValue, latest)) { - put(key, newValue); - return true; - } - return false; - } - - @Override - public V putIfAbsent(K key, V value) { - checkState(txContext.isOpen(), TX_CLOSED_ERROR); - checkNotNull(value, ERROR_NULL_VALUE); - V latest = get(key); - if (latest == null) { - put(key, value); - } - return latest; - } - - protected List<DatabaseUpdate> prepareDatabaseUpdates() { - List<DatabaseUpdate> updates = Lists.newLinkedList(); - deleteSet.forEach(key -> { - Versioned<V> original = readCache.get(key); - if (original != null) { - updates.add(DatabaseUpdate.newBuilder() - .withMapName(name) - .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH) - .withKey(keyCache.getUnchecked(key)) - .withCurrentVersion(original.version()) - .build()); - } - }); - writeCache.forEach((key, value) -> { - Versioned<V> original = readCache.get(key); - if (original == null) { - updates.add(DatabaseUpdate.newBuilder() - .withMapName(name) - .withType(DatabaseUpdate.Type.PUT_IF_ABSENT) - .withKey(keyCache.getUnchecked(key)) - .withValue(serializer.encode(value)) - .build()); - } else { - updates.add(DatabaseUpdate.newBuilder() - .withMapName(name) - .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH) - .withKey(keyCache.getUnchecked(key)) - .withCurrentVersion(original.version()) - .withValue(serializer.encode(value)) - .build()); - } - }); - return updates; - } - - /** - * Discards all changes made to this transactional map. - */ - protected void rollback() { - readCache.clear(); - writeCache.clear(); - deleteSet.clear(); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java deleted file mode 100644 index 1882b1b5..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java +++ /dev/null @@ -1,605 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.MapDifference; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import org.apache.commons.lang.math.RandomUtils; -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onosproject.cluster.ClusterEvent; -import org.onosproject.cluster.ClusterEvent.Type; -import org.onosproject.cluster.ClusterEventListener; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.Leadership; -import org.onosproject.cluster.LeadershipEvent; -import org.onosproject.cluster.LeadershipEventListener; -import org.onosproject.cluster.LeadershipService; -import org.onosproject.cluster.NodeId; -import org.onosproject.event.ListenerRegistry; -import org.onosproject.event.EventDeliveryService; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.ConsistentMapException; -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.Versioned; -import org.slf4j.Logger; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -import static com.google.common.base.Preconditions.checkArgument; -import static org.onlab.util.Tools.groupedThreads; -import static org.slf4j.LoggerFactory.getLogger; -import static org.onosproject.cluster.ControllerNode.State.ACTIVE; -import static org.onosproject.cluster.ControllerNode.State.INACTIVE; - -/** - * Distributed Lock Manager implemented on top of ConsistentMap. - * <p> - * This implementation makes use of ClusterService's failure - * detection capabilities to detect and purge stale locks. - * TODO: Ensure lock safety and liveness. - */ -@Component(immediate = true, enabled = true) -@Service -public class DistributedLeadershipManager implements LeadershipService { - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected EventDeliveryService eventDispatcher; - - private final Logger log = getLogger(getClass()); - private ScheduledExecutorService electionRunner; - private ScheduledExecutorService lockExecutor; - private ScheduledExecutorService staleLeadershipPurgeExecutor; - private ScheduledExecutorService leadershipRefresher; - - private ConsistentMap<String, NodeId> leaderMap; - private ConsistentMap<String, List<NodeId>> candidateMap; - - private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry; - private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap(); - private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap(); - private final ClusterEventListener clusterEventListener = new InternalClusterEventListener(); - - private NodeId localNodeId; - private Set<String> activeTopics = Sets.newConcurrentHashSet(); - private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap(); - - // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS) - private static final int WAIT_BEFORE_RETRY_MILLIS = 150; - private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2; - private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2; - private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2; - - private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false); - - private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API); - - @Activate - public void activate() { - leaderMap = storageService.<String, NodeId>consistentMapBuilder() - .withName("onos-topic-leaders") - .withSerializer(SERIALIZER) - .withPartitionsDisabled().build(); - candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder() - .withName("onos-topic-candidates") - .withSerializer(SERIALIZER) - .withPartitionsDisabled().build(); - - leaderMap.addListener(event -> { - log.debug("Received {}", event); - LeadershipEvent.Type leadershipEventType = null; - if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) { - leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED; - } else if (event.type() == MapEvent.Type.REMOVE) { - leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED; - } - onLeadershipEvent(new LeadershipEvent( - leadershipEventType, - new Leadership(event.key(), - event.value().value(), - event.value().version(), - event.value().creationTime()))); - }); - - candidateMap.addListener(event -> { - log.debug("Received {}", event); - if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) { - log.error("Entries must not be removed from candidate map"); - return; - } - onLeadershipEvent(new LeadershipEvent( - LeadershipEvent.Type.CANDIDATES_CHANGED, - new Leadership(event.key(), - event.value().value(), - event.value().version(), - event.value().creationTime()))); - }); - - localNodeId = clusterService.getLocalNode().id(); - - electionRunner = Executors.newSingleThreadScheduledExecutor( - groupedThreads("onos/store/leadership", "election-runner")); - lockExecutor = Executors.newScheduledThreadPool( - 4, groupedThreads("onos/store/leadership", "election-thread-%d")); - staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor( - groupedThreads("onos/store/leadership", "stale-leadership-evictor")); - leadershipRefresher = Executors.newSingleThreadScheduledExecutor( - groupedThreads("onos/store/leadership", "refresh-thread")); - - clusterService.addListener(clusterEventListener); - - electionRunner.scheduleWithFixedDelay( - this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS); - - leadershipRefresher.scheduleWithFixedDelay( - this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS); - - listenerRegistry = new ListenerRegistry<>(); - eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry); - - log.info("Started"); - } - - @Deactivate - public void deactivate() { - if (clusterService.getNodes().size() > 1) { - // FIXME: Determine why this takes ~50 seconds to shutdown on a single node! - leaderBoard.forEach((topic, leadership) -> { - if (localNodeId.equals(leadership.leader())) { - withdraw(topic); - } - }); - } - - clusterService.removeListener(clusterEventListener); - eventDispatcher.removeSink(LeadershipEvent.class); - - electionRunner.shutdown(); - lockExecutor.shutdown(); - staleLeadershipPurgeExecutor.shutdown(); - leadershipRefresher.shutdown(); - - log.info("Stopped"); - } - - @Override - public Map<String, Leadership> getLeaderBoard() { - return ImmutableMap.copyOf(leaderBoard); - } - - @Override - public Map<String, List<NodeId>> getCandidates() { - return Maps.toMap(candidateBoard.keySet(), this::getCandidates); - } - - @Override - public List<NodeId> getCandidates(String path) { - Leadership current = candidateBoard.get(path); - return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates()); - } - - @Override - public NodeId getLeader(String path) { - Leadership leadership = leaderBoard.get(path); - return leadership != null ? leadership.leader() : null; - } - - @Override - public Leadership getLeadership(String path) { - checkArgument(path != null); - return leaderBoard.get(path); - } - - @Override - public Set<String> ownedTopics(NodeId nodeId) { - checkArgument(nodeId != null); - return leaderBoard.entrySet() - .stream() - .filter(entry -> nodeId.equals(entry.getValue().leader())) - .map(Entry::getKey) - .collect(Collectors.toSet()); - } - - @Override - public CompletableFuture<Leadership> runForLeadership(String path) { - log.debug("Running for leadership for topic: {}", path); - CompletableFuture<Leadership> resultFuture = new CompletableFuture<>(); - doRunForLeadership(path, resultFuture); - return resultFuture; - } - - private void doRunForLeadership(String path, CompletableFuture<Leadership> future) { - try { - Versioned<List<NodeId>> candidates = candidateMap.computeIf(path, - currentList -> currentList == null || !currentList.contains(localNodeId), - (topic, currentList) -> { - if (currentList == null) { - return ImmutableList.of(localNodeId); - } else { - List<NodeId> newList = Lists.newLinkedList(); - newList.addAll(currentList); - newList.add(localNodeId); - return newList; - } - }); - log.debug("In the leadership race for topic {} with candidates {}", path, candidates); - activeTopics.add(path); - Leadership leadership = electLeader(path, candidates.value()); - if (leadership == null) { - pendingFutures.put(path, future); - } else { - future.complete(leadership); - } - } catch (ConsistentMapException e) { - log.debug("Failed to enter topic leader race for {}. Retrying.", path, e); - rerunForLeadership(path, future); - } - } - - @Override - public CompletableFuture<Void> withdraw(String path) { - activeTopics.remove(path); - CompletableFuture<Void> resultFuture = new CompletableFuture<>(); - doWithdraw(path, resultFuture); - return resultFuture; - } - - - private void doWithdraw(String path, CompletableFuture<Void> future) { - if (activeTopics.contains(path)) { - future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path))); - } - try { - leaderMap.computeIf(path, - localNodeId::equals, - (topic, leader) -> null); - candidateMap.computeIf(path, - candidates -> candidates != null && candidates.contains(localNodeId), - (topic, candidates) -> candidates.stream() - .filter(nodeId -> !localNodeId.equals(nodeId)) - .collect(Collectors.toList())); - future.complete(null); - } catch (Exception e) { - log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e); - retryWithdraw(path, future); - } - } - - @Override - public boolean stepdown(String path) { - if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) { - return false; - } - - try { - return leaderMap.computeIf(path, - localNodeId::equals, - (topic, leader) -> null) == null; - } catch (Exception e) { - log.warn("Error executing stepdown for {}", path, e); - } - return false; - } - - @Override - public void addListener(LeadershipEventListener listener) { - listenerRegistry.addListener(listener); - } - - @Override - public void removeListener(LeadershipEventListener listener) { - listenerRegistry.removeListener(listener); - } - - @Override - public boolean makeTopCandidate(String path, NodeId nodeId) { - Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path, - candidates -> candidates != null && - candidates.contains(nodeId) && - !nodeId.equals(Iterables.getFirst(candidates, null)), - (topic, candidates) -> { - List<NodeId> updatedCandidates = new ArrayList<>(candidates.size()); - updatedCandidates.add(nodeId); - candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add); - return updatedCandidates; - }); - List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList(); - return candidates.size() > 0 && nodeId.equals(candidates.get(0)); - } - - private Leadership electLeader(String path, List<NodeId> candidates) { - Leadership currentLeadership = getLeadership(path); - if (currentLeadership != null) { - return currentLeadership; - } else { - NodeId topCandidate = candidates - .stream() - .filter(n -> clusterService.getState(n) == ACTIVE) - .findFirst() - .orElse(null); - try { - Versioned<NodeId> leader = localNodeId.equals(topCandidate) - ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path); - if (leader != null) { - Leadership newLeadership = new Leadership(path, - leader.value(), - leader.version(), - leader.creationTime()); - // Since reads only go through the local copy of leader board, we ought to update it - // first before returning from this method. - // This is to ensure a subsequent read will not read a stale value. - onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership)); - return newLeadership; - } - } catch (Exception e) { - log.debug("Failed to elect leader for {}", path, e); - } - } - return null; - } - - private void electLeaders() { - try { - candidateMap.entrySet().forEach(entry -> { - String path = entry.getKey(); - Versioned<List<NodeId>> candidates = entry.getValue(); - // for active topics, check if this node can become a leader (if it isn't already) - if (activeTopics.contains(path)) { - lockExecutor.submit(() -> { - Leadership leadership = electLeader(path, candidates.value()); - if (leadership != null) { - CompletableFuture<Leadership> future = pendingFutures.remove(path); - if (future != null) { - future.complete(leadership); - } - } - }); - } - // Raise a CANDIDATES_CHANGED event to force refresh local candidate board - // and also to update local listeners. - // Don't worry about duplicate events as they will be suppressed. - onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, - new Leadership(path, - candidates.value(), - candidates.version(), - candidates.creationTime()))); - }); - } catch (Exception e) { - log.debug("Failure electing leaders", e); - } - } - - private void onLeadershipEvent(LeadershipEvent leadershipEvent) { - log.trace("Leadership Event: time = {} type = {} event = {}", - leadershipEvent.time(), leadershipEvent.type(), - leadershipEvent); - - Leadership leadershipUpdate = leadershipEvent.subject(); - LeadershipEvent.Type eventType = leadershipEvent.type(); - String topic = leadershipUpdate.topic(); - - AtomicBoolean updateAccepted = new AtomicBoolean(false); - if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) { - leaderBoard.compute(topic, (k, currentLeadership) -> { - if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) { - updateAccepted.set(true); - return leadershipUpdate; - } - return currentLeadership; - }); - } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) { - leaderBoard.compute(topic, (k, currentLeadership) -> { - if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) { - updateAccepted.set(true); - // FIXME: Removing entries from leaderboard is not safe and should be visited. - return null; - } - return currentLeadership; - }); - } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) { - candidateBoard.compute(topic, (k, currentInfo) -> { - if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) { - updateAccepted.set(true); - return leadershipUpdate; - } - return currentInfo; - }); - } else { - throw new IllegalStateException("Unknown event type."); - } - - if (updateAccepted.get()) { - eventDispatcher.post(leadershipEvent); - } - } - - private void rerunForLeadership(String path, CompletableFuture<Leadership> future) { - lockExecutor.schedule( - () -> doRunForLeadership(path, future), - RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS), - TimeUnit.MILLISECONDS); - } - - private void retryWithdraw(String path, CompletableFuture<Void> future) { - lockExecutor.schedule( - () -> doWithdraw(path, future), - RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS), - TimeUnit.MILLISECONDS); - } - - private void scheduleStaleLeadershipPurge(int afterDelaySec) { - if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) { - staleLeadershipPurgeExecutor.schedule( - this::purgeStaleLeadership, - afterDelaySec, - TimeUnit.SECONDS); - } - } - - /** - * Purges locks held by inactive nodes and evicts inactive nodes from candidacy. - */ - private void purgeStaleLeadership() { - AtomicBoolean rerunPurge = new AtomicBoolean(false); - try { - staleLeadershipPurgeScheduled.set(false); - leaderMap.entrySet() - .stream() - .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE) - .forEach(entry -> { - String path = entry.getKey(); - NodeId nodeId = entry.getValue().value(); - try { - leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null); - } catch (Exception e) { - log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e); - rerunPurge.set(true); - } - }); - - candidateMap.entrySet() - .forEach(entry -> { - String path = entry.getKey(); - Versioned<List<NodeId>> candidates = entry.getValue(); - List<NodeId> candidatesList = candidates != null - ? candidates.value() : Collections.emptyList(); - List<NodeId> activeCandidatesList = - candidatesList.stream() - .filter(n -> clusterService.getState(n) == ACTIVE) - .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path)) - .collect(Collectors.toList()); - if (activeCandidatesList.size() < candidatesList.size()) { - Set<NodeId> removedCandidates = - Sets.difference(Sets.newHashSet(candidatesList), - Sets.newHashSet(activeCandidatesList)); - try { - candidateMap.computeIf(path, - c -> c.stream() - .filter(n -> clusterService.getState(n) == INACTIVE) - .count() > 0, - (topic, c) -> c.stream() - .filter(n -> clusterService.getState(n) == ACTIVE) - .filter(n -> !localNodeId.equals(n) || - activeTopics.contains(path)) - .collect(Collectors.toList())); - } catch (Exception e) { - log.debug("Failed to evict inactive candidates {} from " - + "candidate list for {}", removedCandidates, path, e); - rerunPurge.set(true); - } - } - }); - } catch (Exception e) { - log.debug("Failure purging state leadership.", e); - rerunPurge.set(true); - } - - if (rerunPurge.get()) { - log.debug("Rescheduling stale leadership purge due to errors encountered in previous run"); - scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC); - } - } - - private void refreshLeaderBoard() { - try { - Map<String, Leadership> newLeaderBoard = Maps.newHashMap(); - leaderMap.entrySet().forEach(entry -> { - String path = entry.getKey(); - Versioned<NodeId> leader = entry.getValue(); - Leadership leadership = new Leadership(path, - leader.value(), - leader.version(), - leader.creationTime()); - newLeaderBoard.put(path, leadership); - }); - - // first take snapshot of current leader board. - Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard); - - MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard); - - // evict stale leaders - diff.entriesOnlyOnLeft().forEach((path, leadership) -> { - log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership); - onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership)); - }); - - // add missing leaders - diff.entriesOnlyOnRight().forEach((path, leadership) -> { - log.debug("Adding {} to leaderboard. It is now the active leader.", leadership); - onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership)); - }); - - // add updated leaders - diff.entriesDiffering().forEach((path, difference) -> { - Leadership current = difference.leftValue(); - Leadership updated = difference.rightValue(); - if (current.epoch() < updated.epoch()) { - log.debug("Updated {} in leaderboard.", updated); - onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated)); - } - }); - } catch (Exception e) { - log.debug("Failed to refresh leader board", e); - } - } - - private class InternalClusterEventListener implements ClusterEventListener { - - @Override - public void event(ClusterEvent event) { - if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) { - scheduleStaleLeadershipPurge(0); - } - } - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java deleted file mode 100644 index 9bf80a73..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MappingSet.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - -import com.google.common.collect.Iterators; - -/** - * Set view backed by Set with element type {@code <BACK>} but returns - * element as {@code <OUT>} for convenience. - * - * @param <BACK> Backing {@link Set} element type. - * MappingSet will follow this type's equality behavior. - * @param <OUT> external facing element type. - * MappingSet will ignores equality defined by this type. - */ -class MappingSet<BACK, OUT> implements Set<OUT> { - - private final Set<BACK> backedSet; - private final Function<OUT, BACK> toBack; - private final Function<BACK, OUT> toOut; - - public MappingSet(Set<BACK> backedSet, - Function<Set<BACK>, Set<BACK>> supplier, - Function<OUT, BACK> toBack, Function<BACK, OUT> toOut) { - this.backedSet = supplier.apply(backedSet); - this.toBack = toBack; - this.toOut = toOut; - } - - @Override - public int size() { - return backedSet.size(); - } - - @Override - public boolean isEmpty() { - return backedSet.isEmpty(); - } - - @Override - public boolean contains(Object o) { - return backedSet.contains(toBack.apply((OUT) o)); - } - - @Override - public Iterator<OUT> iterator() { - return Iterators.transform(backedSet.iterator(), toOut::apply); - } - - @Override - public Object[] toArray() { - return backedSet.stream() - .map(toOut) - .toArray(); - } - - @Override - public <T> T[] toArray(T[] a) { - return backedSet.stream() - .map(toOut) - .toArray(size -> { - if (size < a.length) { - return (T[]) new Object[size]; - } else { - Arrays.fill(a, null); - return a; - } - }); - } - - @Override - public boolean add(OUT e) { - return backedSet.add(toBack.apply(e)); - } - - @Override - public boolean remove(Object o) { - return backedSet.remove(toBack.apply((OUT) o)); - } - - @Override - public boolean containsAll(Collection<?> c) { - return c.stream() - .map(e -> toBack.apply((OUT) e)) - .allMatch(backedSet::contains); - } - - @Override - public boolean addAll(Collection<? extends OUT> c) { - return backedSet.addAll(c.stream().map(toBack).collect(Collectors.toList())); - } - - @Override - public boolean retainAll(Collection<?> c) { - return backedSet.retainAll(c.stream() - .map(x -> toBack.apply((OUT) x)) - .collect(Collectors.toList())); - } - - @Override - public boolean removeAll(Collection<?> c) { - return backedSet.removeAll(c.stream() - .map(x -> toBack.apply((OUT) x)) - .collect(Collectors.toList())); - } - - @Override - public void clear() { - backedSet.clear(); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Match.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Match.java deleted file mode 100644 index 5f707d62..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Match.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import static com.google.common.base.MoreObjects.toStringHelper; - -import java.util.Arrays; -import java.util.Objects; -import java.util.function.Function; - -/** - * Utility class for checking matching values. - * - * @param <T> type of value - */ -public final class Match<T> { - - private final boolean matchAny; - private final T value; - - /** - * Returns a Match that matches any value. - * @param <T> match type - * @return new instance - */ - public static <T> Match<T> any() { - return new Match<>(); - } - - /** - * Returns a Match that matches null values. - * @param <T> match type - * @return new instance - */ - public static <T> Match<T> ifNull() { - return ifValue(null); - } - - /** - * Returns a Match that matches only specified value. - * @param value value to match - * @param <T> match type - * @return new instance - */ - public static <T> Match<T> ifValue(T value) { - return new Match<>(value); - } - - private Match() { - matchAny = true; - value = null; - } - - private Match(T value) { - matchAny = false; - this.value = value; - } - - /** - * Maps this instance to a Match of another type. - * @param mapper transformation function - * @param <V> new match type - * @return new instance - */ - public <V> Match<V> map(Function<T, V> mapper) { - if (matchAny) { - return any(); - } else if (value == null) { - return ifNull(); - } else { - return ifValue(mapper.apply(value)); - } - } - - /** - * Checks if this instance matches specified value. - * @param other other value - * @return true if matches; false otherwise - */ - public boolean matches(T other) { - if (matchAny) { - return true; - } else if (other == null) { - return value == null; - } else { - if (value instanceof byte[]) { - return Arrays.equals((byte[]) value, (byte[]) other); - } - return Objects.equals(value, other); - } - } - - @Override - public int hashCode() { - return Objects.hash(matchAny, value); - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object other) { - if (!(other instanceof Match)) { - return false; - } - Match<T> that = (Match<T>) other; - return Objects.equals(this.matchAny, that.matchAny) && - Objects.equals(this.value, that.value); - } - - @Override - public String toString() { - return toStringHelper(this) - .add("matchAny", matchAny) - .add("value", value) - .toString(); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MeteringAgent.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MeteringAgent.java deleted file mode 100644 index 6475bf7b..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MeteringAgent.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Timer; -import com.google.common.collect.Maps; -import org.onlab.metrics.MetricsComponent; -import org.onlab.metrics.MetricsFeature; -import org.onlab.metrics.MetricsService; -import org.onlab.osgi.DefaultServiceDirectory; - -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Agent that implements usage and performance monitoring via the metrics service. - */ -public class MeteringAgent { - - private Counter exceptionCounter; - private Counter perObjExceptionCounter; - private MetricsService metricsService; - private MetricsComponent metricsComponent; - private MetricsFeature metricsFeature; - private final Map<String, Timer> perObjOpTimers = Maps.newConcurrentMap(); - private final Map<String, Timer> perOpTimers = Maps.newConcurrentMap(); - private Timer perPrimitiveTimer; - private Timer perObjTimer; - private MetricsFeature wildcard; - private final boolean activated; - private Context nullTimer; - - /** - * Constructs a new MeteringAgent for a given distributed primitive. - * Instantiates the metrics service - * Initializes all the general metrics for that object - * - * @param primitiveName Type of primitive to be metered - * @param objName Global name of the primitive - * @param activated boolean flag for whether metering is enabled or not - */ - public MeteringAgent(String primitiveName, String objName, boolean activated) { - checkNotNull(objName, "Object name cannot be null"); - this.activated = activated; - nullTimer = new Context(null, ""); - if (this.activated) { - this.metricsService = DefaultServiceDirectory.getService(MetricsService.class); - this.metricsComponent = metricsService.registerComponent(primitiveName); - this.metricsFeature = metricsComponent.registerFeature(objName); - this.wildcard = metricsComponent.registerFeature("*"); - this.perObjTimer = metricsService.createTimer(metricsComponent, metricsFeature, "*"); - this.perPrimitiveTimer = metricsService.createTimer(metricsComponent, wildcard, "*"); - this.perObjExceptionCounter = metricsService.createCounter(metricsComponent, metricsFeature, "exceptions"); - this.exceptionCounter = metricsService.createCounter(metricsComponent, wildcard, "exceptions"); - } - } - - /** - * Initializes a specific timer for a given operation. - * - * @param op Specific operation being metered - * @return timer context - */ - public Context startTimer(String op) { - if (!activated) { - return nullTimer; - } - // Check if timer exists, if it doesn't creates it - final Timer currTimer = perObjOpTimers.computeIfAbsent(op, timer -> - metricsService.createTimer(metricsComponent, metricsFeature, op)); - perOpTimers.computeIfAbsent(op, timer -> metricsService.createTimer(metricsComponent, wildcard, op)); - // Starts timer - return new Context(currTimer.time(), op); - } - - /** - * Timer.Context with a specific operation. - */ - public class Context { - private final Timer.Context context; - private final String operation; - - /** - * Constructs Context. - * - * @param context context - * @param operation operation name - */ - public Context(Timer.Context context, String operation) { - this.context = context; - this.operation = operation; - } - - /** - * Stops timer given a specific context and updates all related metrics. - * @param e throwable - */ - public void stop(Throwable e) { - if (!activated) { - return; - } - if (e == null) { - //Stop and updates timer with specific measurements per map, per operation - final long time = context.stop(); - //updates timer with aggregated measurements per map - perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS); - //updates timer with aggregated measurements per map - perObjTimer.update(time, TimeUnit.NANOSECONDS); - //updates timer with aggregated measurements per all Consistent Maps - perPrimitiveTimer.update(time, TimeUnit.NANOSECONDS); - } else { - exceptionCounter.inc(); - perObjExceptionCounter.inc(); - } - } - } - -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java deleted file mode 100644 index d8593e37..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/MutexExecutionManager.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.util.Tools; -import org.onosproject.cluster.ClusterEvent; -import org.onosproject.cluster.ClusterEventListener; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode.State; -import org.onosproject.cluster.NodeId; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.ConsistentMapException; -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.MapEventListener; -import org.onosproject.store.service.MutexExecutionService; -import org.onosproject.store.service.MutexTask; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.Versioned; -import org.slf4j.Logger; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Implementation of a MutexExecutionService. - */ -@Component(immediate = true) -@Service -public class MutexExecutionManager implements MutexExecutionService { - - private final Logger log = getLogger(getClass()); - - protected ConsistentMap<String, MutexState> lockMap; - protected NodeId localNodeId; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener(); - private final ClusterEventListener clusterEventListener = new InternalClusterEventListener(); - - private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap(); - private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap(); - - @Activate - public void activate() { - localNodeId = clusterService.getLocalNode().id(); - lockMap = storageService.<String, MutexState>consistentMapBuilder() - .withName("onos-mutexes") - .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class)) - .withPartitionsDisabled() - .build(); - lockMap.addListener(mapEventListener); - clusterService.addListener(clusterEventListener); - releaseOldLocks(); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - lockMap.removeListener(mapEventListener); - pending.values().forEach(future -> future.cancel(true)); - activeTasks.forEach((k, v) -> { - v.stop(); - unlock(k); - }); - clusterService.removeListener(clusterEventListener); - log.info("Stopped"); - } - - @Override - public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) { - return lock(exclusionPath) - .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath, - k -> new InnerMutexTask(exclusionPath, - task, - state.term()))) - .thenAcceptAsync(t -> t.start(), executor) - .whenComplete((r, e) -> unlock(exclusionPath)); - } - - protected CompletableFuture<MutexState> lock(String exclusionPath) { - CompletableFuture<MutexState> future = - pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>()); - tryLock(exclusionPath); - return future; - } - - /** - * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to - * the wait list. - * @param exclusionPath exclusion path - */ - protected void tryLock(String exclusionPath) { - Tools.retryable(() -> lockMap.asJavaMap() - .compute(exclusionPath, - (k, v) -> MutexState.admit(v, localNodeId)), - ConsistentMapException.ConcurrentModification.class, - Integer.MAX_VALUE, - 100).get(); - } - - /** - * Releases lock for the specific path. This operation is idempotent. - * @param exclusionPath exclusion path - */ - protected void unlock(String exclusionPath) { - Tools.retryable(() -> lockMap.asJavaMap() - .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)), - ConsistentMapException.ConcurrentModification.class, - Integer.MAX_VALUE, - 100).get(); - } - - /** - * Detects and releases all locks held by this node. - */ - private void releaseOldLocks() { - Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder())) - .keySet() - .forEach(path -> { - log.info("Detected zombie task still holding lock for {}. Releasing lock.", path); - unlock(path); - }); - } - - private class InternalLockMapEventListener implements MapEventListener<String, MutexState> { - - @Override - public void event(MapEvent<String, MutexState> event) { - log.debug("Received {}", event); - if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) { - pending.computeIfPresent(event.key(), (k, future) -> { - MutexState state = Versioned.valueOrElse(event.value(), null); - if (state != null && localNodeId.equals(state.holder())) { - log.debug("Local node is now owner for {}", event.key()); - future.complete(state); - return null; - } else { - return future; - } - }); - InnerMutexTask task = activeTasks.get(event.key()); - if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) { - task.stop(); - } - } - } - } - - private class InternalClusterEventListener implements ClusterEventListener { - - @Override - public void event(ClusterEvent event) { - if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED || - event.type() == ClusterEvent.Type.INSTANCE_REMOVED) { - NodeId nodeId = event.subject().id(); - log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId); - lockMap.asJavaMap().forEach((k, v) -> { - if (v.contains(nodeId)) { - lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId)); - } - }); - } - long activeNodes = clusterService.getNodes() - .stream() - .map(node -> clusterService.getState(node.id())) - .filter(State.ACTIVE::equals) - .count(); - if (clusterService.getNodes().size() > 1 && activeNodes == 1) { - log.info("This node is partitioned away from the cluster. Stopping all inflight executions"); - activeTasks.forEach((k, v) -> { - v.stop(); - }); - } - } - } - - private static final class MutexState { - - private final NodeId holder; - private final List<NodeId> waitList; - private final long term; - - public static MutexState admit(MutexState state, NodeId nodeId) { - if (state == null) { - return new MutexState(nodeId, 1L, Lists.newArrayList()); - } else if (state.holder() == null) { - return new MutexState(nodeId, state.term() + 1, Lists.newArrayList()); - } else { - if (!state.contains(nodeId)) { - NodeId newHolder = state.holder(); - List<NodeId> newWaitList = Lists.newArrayList(state.waitList()); - newWaitList.add(nodeId); - return new MutexState(newHolder, state.term(), newWaitList); - } else { - return state; - } - } - } - - public static MutexState evict(MutexState state, NodeId nodeId) { - return state.evict(nodeId); - } - - public MutexState evict(NodeId nodeId) { - if (nodeId.equals(holder)) { - if (waitList.isEmpty()) { - return new MutexState(null, term, waitList); - } - List<NodeId> newWaitList = Lists.newArrayList(waitList); - NodeId newHolder = newWaitList.remove(0); - return new MutexState(newHolder, term + 1, newWaitList); - } else { - NodeId newHolder = holder; - List<NodeId> newWaitList = Lists.newArrayList(waitList); - newWaitList.remove(nodeId); - return new MutexState(newHolder, term, newWaitList); - } - } - - public NodeId holder() { - return holder; - } - - public List<NodeId> waitList() { - return waitList; - } - - public long term() { - return term; - } - - private boolean contains(NodeId nodeId) { - return (nodeId.equals(holder) || waitList.contains(nodeId)); - } - - private MutexState(NodeId holder, long term, List<NodeId> waitList) { - this.holder = holder; - this.term = term; - this.waitList = Lists.newArrayList(waitList); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("holder", holder) - .add("term", term) - .add("waitList", waitList) - .toString(); - } - } - - private class InnerMutexTask implements MutexTask { - private final MutexTask task; - private final String mutexPath; - private final long term; - - public InnerMutexTask(String mutexPath, MutexTask task, long term) { - this.mutexPath = mutexPath; - this.term = term; - this.task = task; - } - - public long term() { - return term; - } - - @Override - public void start() { - log.debug("Starting execution for mutex task guarded by {}", mutexPath); - task.start(); - log.debug("Finished execution for mutex task guarded by {}", mutexPath); - } - - @Override - public void stop() { - log.debug("Stopping execution for mutex task guarded by {}", mutexPath); - task.stop(); - } - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java deleted file mode 100644 index f741b367..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import net.kuujo.copycat.Task; -import net.kuujo.copycat.cluster.Cluster; -import net.kuujo.copycat.resource.ResourceState; -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import static com.google.common.base.Preconditions.checkState; - -/** - * A database that partitions the keys across one or more database partitions. - */ -public class PartitionedDatabase implements Database { - - private final String name; - private final Partitioner<String> partitioner; - private final List<Database> partitions; - private final AtomicBoolean isOpen = new AtomicBoolean(false); - private static final String DB_NOT_OPEN = "Partitioned Database is not open"; - private TransactionManager transactionManager; - - public PartitionedDatabase( - String name, - Collection<Database> partitions) { - this.name = name; - this.partitions = partitions - .stream() - .sorted((db1, db2) -> db1.name().compareTo(db2.name())) - .collect(Collectors.toList()); - this.partitioner = new SimpleKeyHashPartitioner(this.partitions); - } - - /** - * Returns the databases for individual partitions. - * @return list of database partitions - */ - public List<Database> getPartitions() { - return partitions; - } - - /** - * Returns true if the database is open. - * @return true if open, false otherwise - */ - @Override - public boolean isOpen() { - return isOpen.get(); - } - - @Override - public CompletableFuture<Set<String>> maps() { - checkState(isOpen.get(), DB_NOT_OPEN); - Set<String> mapNames = Sets.newConcurrentHashSet(); - return CompletableFuture.allOf(partitions - .stream() - .map(db -> db.maps().thenApply(mapNames::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> mapNames); - } - - @Override - public CompletableFuture<Map<String, Long>> counters() { - checkState(isOpen.get(), DB_NOT_OPEN); - Map<String, Long> counters = Maps.newConcurrentMap(); - return CompletableFuture.allOf(partitions - .stream() - .map(db -> db.counters() - .thenApply(m -> { - counters.putAll(m); - return null; - })) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> counters); - } - - @Override - public CompletableFuture<Integer> mapSize(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - AtomicInteger totalSize = new AtomicInteger(0); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> totalSize.get()); - } - - @Override - public CompletableFuture<Boolean> mapIsEmpty(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return mapSize(mapName).thenApply(size -> size == 0); - } - - @Override - public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key); - } - - @Override - public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) { - checkState(isOpen.get(), DB_NOT_OPEN); - AtomicBoolean containsValue = new AtomicBoolean(false); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapContainsValue(mapName, value) - .thenApply(v -> containsValue.compareAndSet(false, v))) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> containsValue.get()); - } - - @Override - public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(mapName, key).mapGet(mapName, key); - } - - @Override - public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate( - String mapName, String key, Match<byte[]> valueMatch, - Match<Long> versionMatch, byte[] value) { - return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value); - - } - - @Override - public CompletableFuture<Result<Void>> mapClear(String mapName) { - AtomicBoolean isLocked = new AtomicBoolean(false); - checkState(isOpen.get(), DB_NOT_OPEN); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapClear(mapName) - .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status()))) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null)); - } - - @Override - public CompletableFuture<Set<String>> mapKeySet(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - Set<String> keySet = Sets.newConcurrentHashSet(); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> keySet); - } - - @Override - public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>(); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapValues(mapName).thenApply(values::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> values); - } - - @Override - public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet(); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> entrySet); - } - - @Override - public CompletableFuture<Long> counterGet(String counterName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterGet(counterName); - } - - @Override - public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta); - } - - @Override - public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta); - } - - @Override - public CompletableFuture<Void> counterSet(String counterName, long value) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterSet(counterName, value); - } - - @Override - public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName). - counterCompareAndSet(counterName, expectedValue, updateValue); - - } - - @Override - public CompletableFuture<Long> queueSize(String queueName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queueSize(queueName); - } - - @Override - public CompletableFuture<Void> queuePush(String queueName, byte[] entry) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry); - } - - @Override - public CompletableFuture<byte[]> queuePop(String queueName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePop(queueName); - } - - @Override - public CompletableFuture<byte[]> queuePeek(String queueName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePeek(queueName); - } - - @Override - public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) { - Map<Database, Transaction> subTransactions = createSubTransactions(transaction); - if (subTransactions.isEmpty()) { - return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of())); - } else if (subTransactions.size() == 1) { - Entry<Database, Transaction> entry = - subTransactions.entrySet().iterator().next(); - return entry.getKey().prepareAndCommit(entry.getValue()); - } else { - if (transactionManager == null) { - throw new IllegalStateException("TransactionManager is not initialized"); - } - return transactionManager.execute(transaction); - } - } - - @Override - public CompletableFuture<Boolean> prepare(Transaction transaction) { - Map<Database, Transaction> subTransactions = createSubTransactions(transaction); - AtomicBoolean status = new AtomicBoolean(true); - return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry - .getKey() - .prepare(entry.getValue()) - .thenApply(v -> status.compareAndSet(true, v))) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> status.get()); - } - - @Override - public CompletableFuture<CommitResponse> commit(Transaction transaction) { - Map<Database, Transaction> subTransactions = createSubTransactions(transaction); - AtomicBoolean success = new AtomicBoolean(true); - List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList(); - return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry.getKey().commit(entry.getValue()) - .thenAccept(response -> { - success.set(success.get() && response.success()); - if (success.get()) { - allUpdates.addAll(response.updates()); - } - })) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> success.get() ? - CommitResponse.success(allUpdates) : CommitResponse.failure()); - } - - @Override - public CompletableFuture<Boolean> rollback(Transaction transaction) { - Map<Database, Transaction> subTransactions = createSubTransactions(transaction); - return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry.getKey().rollback(entry.getValue())) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> true); - } - - @Override - public CompletableFuture<Database> open() { - return CompletableFuture.allOf(partitions - .stream() - .map(Database::open) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> { - isOpen.set(true); - return this; - }); - } - - @Override - public CompletableFuture<Void> close() { - checkState(isOpen.get(), DB_NOT_OPEN); - return CompletableFuture.allOf(partitions - .stream() - .map(database -> database.close()) - .toArray(CompletableFuture[]::new)); - } - - @Override - public boolean isClosed() { - return !isOpen.get(); - } - - @Override - public String name() { - return name; - } - - @Override - public Cluster cluster() { - throw new UnsupportedOperationException(); - } - - @Override - public Database addStartupTask(Task<CompletableFuture<Void>> task) { - throw new UnsupportedOperationException(); - } - - @Override - public Database addShutdownTask(Task<CompletableFuture<Void>> task) { - throw new UnsupportedOperationException(); - } - - @Override - public ResourceState state() { - throw new UnsupportedOperationException(); - } - - private Map<Database, Transaction> createSubTransactions( - Transaction transaction) { - Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap(); - for (DatabaseUpdate update : transaction.updates()) { - Database partition = partitioner.getPartition(update.mapName(), update.key()); - List<DatabaseUpdate> partitionUpdates = - perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList()); - partitionUpdates.add(update); - } - Map<Database, Transaction> subTransactions = Maps.newHashMap(); - perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v))); - return subTransactions; - } - - protected void setTransactionManager(TransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - - @Override - public void registerConsumer(Consumer<StateMachineUpdate> consumer) { - partitions.forEach(p -> p.registerConsumer(consumer)); - } - - @Override - public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) { - partitions.forEach(p -> p.unregisterConsumer(consumer)); - } -} - diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java deleted file mode 100644 index de630b90..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -/** - * Partitioner is responsible for mapping keys to individual database partitions. - * - * @param <K> key type. - */ -public interface Partitioner<K> { - - /** - * Returns the database partition. - * @param mapName map name - * @param key key - * @return Database partition - */ - Database getPartition(String mapName, K key); -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java deleted file mode 100644 index 856f706d..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Result.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import static com.google.common.base.MoreObjects.toStringHelper; - -import java.util.Objects; - -/** - * Result of a database update operation. - * - * @param <V> return value type - */ -public final class Result<V> { - - public enum Status { - /** - * Indicates a successful update. - */ - OK, - - /** - * Indicates a failure due to underlying state being locked by another transaction. - */ - LOCKED - } - - private final Status status; - private final V value; - - /** - * Creates a new Result instance with the specified value with status set to Status.OK. - * - * @param <V> result value type - * @param value result value - * @return Result instance - */ - public static <V> Result<V> ok(V value) { - return new Result<>(value, Status.OK); - } - - /** - * Creates a new Result instance with status set to Status.LOCKED. - * - * @param <V> result value type - * @return Result instance - */ - public static <V> Result<V> locked() { - return new Result<>(null, Status.LOCKED); - } - - private Result(V value, Status status) { - this.value = value; - this.status = status; - } - - /** - * Returns true if this result indicates a successful execution i.e status is Status.OK. - * - * @return true if successful, false otherwise - */ - public boolean success() { - return status == Status.OK; - } - - /** - * Returns the status of database update operation. - * - * @return database update status - */ - public Status status() { - return status; - } - - /** - * Returns the return value for the update. - * - * @return value returned by database update. If the status is another - * other than Status.OK, this returns a null - */ - public V value() { - return value; - } - - @Override - public int hashCode() { - return Objects.hash(value, status); - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object other) { - if (!(other instanceof Result)) { - return false; - } - Result<V> that = (Result<V>) other; - return Objects.equals(this.value, that.value) && - Objects.equals(this.status, that.status); - } - - @Override - public String toString() { - return toStringHelper(this) - .add("status", status) - .add("value", value) - .toString(); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java deleted file mode 100644 index 40864286..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import java.util.List; - -/** - * A simple Partitioner for mapping keys to database partitions. - * <p> - * This class uses a md5 hash based hashing scheme for hashing the key to - * a partition. - * - */ -public class SimpleKeyHashPartitioner extends DatabasePartitioner { - - public SimpleKeyHashPartitioner(List<Database> partitions) { - super(partitions); - } - - @Override - public Database getPartition(String mapName, String key) { - return partitions.get(hash(key) % partitions.size()); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java deleted file mode 100644 index 8dc26e0f..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import java.util.List; - -/** - * A simple Partitioner that uses the map name hash to - * pick a partition. - * <p> - * This class uses a md5 hash based hashing scheme for hashing the map name to - * a partition. This partitioner maps all keys for a map to the same database - * partition. - */ -public class SimpleTableHashPartitioner extends DatabasePartitioner { - - public SimpleTableHashPartitioner(List<Database> partitions) { - super(partitions); - } - - @Override - public Database getPartition(String mapName, String key) { - return partitions.get(hash(mapName) % partitions.size()); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java deleted file mode 100644 index 72356d0b..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import static com.google.common.base.MoreObjects.toStringHelper; - -/** - * Representation of a state machine update. - */ -public class StateMachineUpdate { - - /** - * Target data structure type this update is for. - */ - enum Target { - /** - * Update is for a map. - */ - MAP_UPDATE, - - /** - * Update is a transaction commit. - */ - TX_COMMIT, - - /** - * Update is a queue push. - */ - QUEUE_PUSH, - - /** - * Update is for some other operation. - */ - OTHER - } - - private final String operationName; - private final Object input; - private final Object output; - - public StateMachineUpdate(String operationName, Object input, Object output) { - this.operationName = operationName; - this.input = input; - this.output = output; - } - - public Target target() { - // FIXME: This check is brittle - if (operationName.contains("mapUpdate")) { - return Target.MAP_UPDATE; - } else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) { - return Target.TX_COMMIT; - } else if (operationName.contains("queuePush")) { - return Target.QUEUE_PUSH; - } else { - return Target.OTHER; - } - } - - @SuppressWarnings("unchecked") - public <T> T input() { - return (T) input; - } - - @SuppressWarnings("unchecked") - public <T> T output() { - return (T) output; - } - - @Override - public String toString() { - return toStringHelper(this) - .add("name", operationName) - .add("input", input) - .add("output", output) - .toString(); - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java deleted file mode 100644 index fc6e58d0..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -import org.onlab.util.KryoNamespace; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.AsyncConsistentMap; -import org.onosproject.store.service.ConsistentMapBuilder; -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; -import org.onosproject.store.service.Transaction.State; - -import com.google.common.collect.ImmutableList; - -/** - * Agent that runs the two phase commit protocol. - */ -public class TransactionManager { - - private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder() - .register(KryoNamespaces.BASIC) - .nextId(KryoNamespace.FLOATING_ID) - .register(Versioned.class) - .register(DatabaseUpdate.class) - .register(DatabaseUpdate.Type.class) - .register(DefaultTransaction.class) - .register(Transaction.State.class) - .build(); - - private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE)); - private final Database database; - private final AsyncConsistentMap<Long, Transaction> transactions; - - /** - * Constructs a new TransactionManager for the specified database instance. - * - * @param database database - * @param mapBuilder builder for ConsistentMap instances - */ - public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) { - this.database = checkNotNull(database, "database cannot be null"); - this.transactions = mapBuilder.withName("onos-transactions") - .withSerializer(serializer) - .buildAsyncMap(); - } - - /** - * Executes the specified transaction by employing a two phase commit protocol. - * - * @param transaction transaction to commit - * @return transaction result. Result value true indicates a successful commit, false - * indicates abort - */ - public CompletableFuture<CommitResponse> execute(Transaction transaction) { - // clean up if this transaction in already in a terminal state. - if (transaction.state() == Transaction.State.COMMITTED || - transaction.state() == Transaction.State.ROLLEDBACK) { - return transactions.remove(transaction.id()).thenApply(v -> CommitResponse.success(ImmutableList.of())); - } else if (transaction.state() == Transaction.State.COMMITTING) { - return commit(transaction); - } else if (transaction.state() == Transaction.State.ROLLINGBACK) { - return rollback(transaction).thenApply(v -> CommitResponse.success(ImmutableList.of())); - } else { - return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction)); - } - } - - - /** - * Returns all transactions in the system. - * - * @return future for a collection of transactions - */ - public CompletableFuture<Collection<Transaction>> getTransactions() { - return transactions.values().thenApply(c -> { - Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList()); - return txns; - }); - } - - private CompletableFuture<Boolean> prepare(Transaction transaction) { - return transactions.put(transaction.id(), transaction) - .thenCompose(v -> database.prepare(transaction)) - .thenCompose(status -> transactions.put( - transaction.id(), - transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK)) - .thenApply(v -> status)); - } - - private CompletableFuture<CommitResponse> commit(Transaction transaction) { - return database.commit(transaction) - .whenComplete((r, e) -> transactions.put( - transaction.id(), - transaction.transition(Transaction.State.COMMITTED))); - } - - private CompletableFuture<CommitResponse> rollback(Transaction transaction) { - return database.rollback(transaction) - .thenCompose(v -> transactions.put( - transaction.id(), - transaction.transition(Transaction.State.ROLLEDBACK))) - .thenApply(v -> CommitResponse.failure()); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java deleted file mode 100644 index 50b78dd4..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateResult.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.consistent.impl; - -import java.util.function.Function; - -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.Versioned; - -/** - * Result of a update operation. - * <p> - * Both old and new values are accessible along with a flag that indicates if the - * the value was updated. If flag is false, oldValue and newValue both - * point to the same unmodified value. - * @param <V> result type - */ -public class UpdateResult<K, V> { - - private final boolean updated; - private final String mapName; - private final K key; - private final Versioned<V> oldValue; - private final Versioned<V> newValue; - - public UpdateResult(boolean updated, String mapName, K key, Versioned<V> oldValue, Versioned<V> newValue) { - this.updated = updated; - this.mapName = mapName; - this.key = key; - this.oldValue = oldValue; - this.newValue = newValue; - } - - public boolean updated() { - return updated; - } - - public String mapName() { - return mapName; - } - - public K key() { - return key; - } - - public Versioned<V> oldValue() { - return oldValue; - } - - public Versioned<V> newValue() { - return newValue; - } - - public <K1, V1> UpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) { - return new UpdateResult<>(updated, - mapName, - keyTransform.apply(key), - oldValue == null ? null : oldValue.map(valueMapper), - newValue == null ? null : newValue.map(valueMapper)); - } - - public MapEvent<K, V> toMapEvent() { - if (!updated) { - return null; - } else { - MapEvent.Type eventType = oldValue == null ? - MapEvent.Type.INSERT : newValue == null ? MapEvent.Type.REMOVE : MapEvent.Type.UPDATE; - Versioned<V> eventValue = eventType == MapEvent.Type.REMOVE ? oldValue : newValue; - return new MapEvent<>(mapName(), eventType, key(), eventValue); - } - } -}
\ No newline at end of file diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/package-info.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/package-info.java deleted file mode 100644 index 3dae86b5..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation of partitioned and distributed store facility capable of - * providing consistent update semantics. - */ -package org.onosproject.store.consistent.impl;
\ No newline at end of file |