diff options
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java | 399 |
1 files changed, 0 insertions, 399 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java deleted file mode 100644 index f741b367..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.consistent.impl; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import net.kuujo.copycat.Task; -import net.kuujo.copycat.cluster.Cluster; -import net.kuujo.copycat.resource.ResourceState; -import org.onosproject.store.service.DatabaseUpdate; -import org.onosproject.store.service.Transaction; -import org.onosproject.store.service.Versioned; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import static com.google.common.base.Preconditions.checkState; - -/** - * A database that partitions the keys across one or more database partitions. - */ -public class PartitionedDatabase implements Database { - - private final String name; - private final Partitioner<String> partitioner; - private final List<Database> partitions; - private final AtomicBoolean isOpen = new AtomicBoolean(false); - private static final String DB_NOT_OPEN = "Partitioned Database is not open"; - private TransactionManager transactionManager; - - public PartitionedDatabase( - String name, - Collection<Database> partitions) { - this.name = name; - this.partitions = partitions - .stream() - .sorted((db1, db2) -> db1.name().compareTo(db2.name())) - .collect(Collectors.toList()); - this.partitioner = new SimpleKeyHashPartitioner(this.partitions); - } - - /** - * Returns the databases for individual partitions. - * @return list of database partitions - */ - public List<Database> getPartitions() { - return partitions; - } - - /** - * Returns true if the database is open. - * @return true if open, false otherwise - */ - @Override - public boolean isOpen() { - return isOpen.get(); - } - - @Override - public CompletableFuture<Set<String>> maps() { - checkState(isOpen.get(), DB_NOT_OPEN); - Set<String> mapNames = Sets.newConcurrentHashSet(); - return CompletableFuture.allOf(partitions - .stream() - .map(db -> db.maps().thenApply(mapNames::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> mapNames); - } - - @Override - public CompletableFuture<Map<String, Long>> counters() { - checkState(isOpen.get(), DB_NOT_OPEN); - Map<String, Long> counters = Maps.newConcurrentMap(); - return CompletableFuture.allOf(partitions - .stream() - .map(db -> db.counters() - .thenApply(m -> { - counters.putAll(m); - return null; - })) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> counters); - } - - @Override - public CompletableFuture<Integer> mapSize(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - AtomicInteger totalSize = new AtomicInteger(0); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> totalSize.get()); - } - - @Override - public CompletableFuture<Boolean> mapIsEmpty(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return mapSize(mapName).thenApply(size -> size == 0); - } - - @Override - public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key); - } - - @Override - public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) { - checkState(isOpen.get(), DB_NOT_OPEN); - AtomicBoolean containsValue = new AtomicBoolean(false); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapContainsValue(mapName, value) - .thenApply(v -> containsValue.compareAndSet(false, v))) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> containsValue.get()); - } - - @Override - public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(mapName, key).mapGet(mapName, key); - } - - @Override - public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate( - String mapName, String key, Match<byte[]> valueMatch, - Match<Long> versionMatch, byte[] value) { - return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value); - - } - - @Override - public CompletableFuture<Result<Void>> mapClear(String mapName) { - AtomicBoolean isLocked = new AtomicBoolean(false); - checkState(isOpen.get(), DB_NOT_OPEN); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapClear(mapName) - .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status()))) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null)); - } - - @Override - public CompletableFuture<Set<String>> mapKeySet(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - Set<String> keySet = Sets.newConcurrentHashSet(); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> keySet); - } - - @Override - public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>(); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapValues(mapName).thenApply(values::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> values); - } - - @Override - public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet(); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> entrySet); - } - - @Override - public CompletableFuture<Long> counterGet(String counterName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterGet(counterName); - } - - @Override - public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta); - } - - @Override - public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta); - } - - @Override - public CompletableFuture<Void> counterSet(String counterName, long value) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterSet(counterName, value); - } - - @Override - public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName). - counterCompareAndSet(counterName, expectedValue, updateValue); - - } - - @Override - public CompletableFuture<Long> queueSize(String queueName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queueSize(queueName); - } - - @Override - public CompletableFuture<Void> queuePush(String queueName, byte[] entry) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry); - } - - @Override - public CompletableFuture<byte[]> queuePop(String queueName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePop(queueName); - } - - @Override - public CompletableFuture<byte[]> queuePeek(String queueName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePeek(queueName); - } - - @Override - public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) { - Map<Database, Transaction> subTransactions = createSubTransactions(transaction); - if (subTransactions.isEmpty()) { - return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of())); - } else if (subTransactions.size() == 1) { - Entry<Database, Transaction> entry = - subTransactions.entrySet().iterator().next(); - return entry.getKey().prepareAndCommit(entry.getValue()); - } else { - if (transactionManager == null) { - throw new IllegalStateException("TransactionManager is not initialized"); - } - return transactionManager.execute(transaction); - } - } - - @Override - public CompletableFuture<Boolean> prepare(Transaction transaction) { - Map<Database, Transaction> subTransactions = createSubTransactions(transaction); - AtomicBoolean status = new AtomicBoolean(true); - return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry - .getKey() - .prepare(entry.getValue()) - .thenApply(v -> status.compareAndSet(true, v))) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> status.get()); - } - - @Override - public CompletableFuture<CommitResponse> commit(Transaction transaction) { - Map<Database, Transaction> subTransactions = createSubTransactions(transaction); - AtomicBoolean success = new AtomicBoolean(true); - List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList(); - return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry.getKey().commit(entry.getValue()) - .thenAccept(response -> { - success.set(success.get() && response.success()); - if (success.get()) { - allUpdates.addAll(response.updates()); - } - })) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> success.get() ? - CommitResponse.success(allUpdates) : CommitResponse.failure()); - } - - @Override - public CompletableFuture<Boolean> rollback(Transaction transaction) { - Map<Database, Transaction> subTransactions = createSubTransactions(transaction); - return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry.getKey().rollback(entry.getValue())) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> true); - } - - @Override - public CompletableFuture<Database> open() { - return CompletableFuture.allOf(partitions - .stream() - .map(Database::open) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> { - isOpen.set(true); - return this; - }); - } - - @Override - public CompletableFuture<Void> close() { - checkState(isOpen.get(), DB_NOT_OPEN); - return CompletableFuture.allOf(partitions - .stream() - .map(database -> database.close()) - .toArray(CompletableFuture[]::new)); - } - - @Override - public boolean isClosed() { - return !isOpen.get(); - } - - @Override - public String name() { - return name; - } - - @Override - public Cluster cluster() { - throw new UnsupportedOperationException(); - } - - @Override - public Database addStartupTask(Task<CompletableFuture<Void>> task) { - throw new UnsupportedOperationException(); - } - - @Override - public Database addShutdownTask(Task<CompletableFuture<Void>> task) { - throw new UnsupportedOperationException(); - } - - @Override - public ResourceState state() { - throw new UnsupportedOperationException(); - } - - private Map<Database, Transaction> createSubTransactions( - Transaction transaction) { - Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap(); - for (DatabaseUpdate update : transaction.updates()) { - Database partition = partitioner.getPartition(update.mapName(), update.key()); - List<DatabaseUpdate> partitionUpdates = - perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList()); - partitionUpdates.add(update); - } - Map<Database, Transaction> subTransactions = Maps.newHashMap(); - perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v))); - return subTransactions; - } - - protected void setTransactionManager(TransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - - @Override - public void registerConsumer(Consumer<StateMachineUpdate> consumer) { - partitions.forEach(p -> p.registerConsumer(consumer)); - } - - @Override - public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) { - partitions.forEach(p -> p.unregisterConsumer(consumer)); - } -} - |