/* * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onosproject.store.consistent.impl; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import net.kuujo.copycat.Task; import net.kuujo.copycat.cluster.Cluster; import net.kuujo.copycat.resource.ResourceState; import org.onosproject.store.service.DatabaseUpdate; import org.onosproject.store.service.Transaction; import org.onosproject.store.service.Versioned; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; /** * A database that partitions the keys across one or more database partitions. */ public class PartitionedDatabase implements Database { private final String name; private final Partitioner partitioner; private final List partitions; private final AtomicBoolean isOpen = new AtomicBoolean(false); private static final String DB_NOT_OPEN = "Partitioned Database is not open"; private TransactionManager transactionManager; public PartitionedDatabase( String name, Collection partitions) { this.name = name; this.partitions = partitions .stream() .sorted((db1, db2) -> db1.name().compareTo(db2.name())) .collect(Collectors.toList()); this.partitioner = new SimpleKeyHashPartitioner(this.partitions); } /** * Returns the databases for individual partitions. * @return list of database partitions */ public List getPartitions() { return partitions; } /** * Returns true if the database is open. * @return true if open, false otherwise */ @Override public boolean isOpen() { return isOpen.get(); } @Override public CompletableFuture> maps() { checkState(isOpen.get(), DB_NOT_OPEN); Set mapNames = Sets.newConcurrentHashSet(); return CompletableFuture.allOf(partitions .stream() .map(db -> db.maps().thenApply(mapNames::addAll)) .toArray(CompletableFuture[]::new)) .thenApply(v -> mapNames); } @Override public CompletableFuture> counters() { checkState(isOpen.get(), DB_NOT_OPEN); Map counters = Maps.newConcurrentMap(); return CompletableFuture.allOf(partitions .stream() .map(db -> db.counters() .thenApply(m -> { counters.putAll(m); return null; })) .toArray(CompletableFuture[]::new)) .thenApply(v -> counters); } @Override public CompletableFuture mapSize(String mapName) { checkState(isOpen.get(), DB_NOT_OPEN); AtomicInteger totalSize = new AtomicInteger(0); return CompletableFuture.allOf(partitions .stream() .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet)) .toArray(CompletableFuture[]::new)) .thenApply(v -> totalSize.get()); } @Override public CompletableFuture mapIsEmpty(String mapName) { checkState(isOpen.get(), DB_NOT_OPEN); return mapSize(mapName).thenApply(size -> size == 0); } @Override public CompletableFuture mapContainsKey(String mapName, String key) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key); } @Override public CompletableFuture mapContainsValue(String mapName, byte[] value) { checkState(isOpen.get(), DB_NOT_OPEN); AtomicBoolean containsValue = new AtomicBoolean(false); return CompletableFuture.allOf(partitions .stream() .map(p -> p.mapContainsValue(mapName, value) .thenApply(v -> containsValue.compareAndSet(false, v))) .toArray(CompletableFuture[]::new)) .thenApply(v -> containsValue.get()); } @Override public CompletableFuture> mapGet(String mapName, String key) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(mapName, key).mapGet(mapName, key); } @Override public CompletableFuture>> mapUpdate( String mapName, String key, Match valueMatch, Match versionMatch, byte[] value) { return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value); } @Override public CompletableFuture> mapClear(String mapName) { AtomicBoolean isLocked = new AtomicBoolean(false); checkState(isOpen.get(), DB_NOT_OPEN); return CompletableFuture.allOf(partitions .stream() .map(p -> p.mapClear(mapName) .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status()))) .toArray(CompletableFuture[]::new)) .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null)); } @Override public CompletableFuture> mapKeySet(String mapName) { checkState(isOpen.get(), DB_NOT_OPEN); Set keySet = Sets.newConcurrentHashSet(); return CompletableFuture.allOf(partitions .stream() .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll)) .toArray(CompletableFuture[]::new)) .thenApply(v -> keySet); } @Override public CompletableFuture>> mapValues(String mapName) { checkState(isOpen.get(), DB_NOT_OPEN); List> values = new CopyOnWriteArrayList<>(); return CompletableFuture.allOf(partitions .stream() .map(p -> p.mapValues(mapName).thenApply(values::addAll)) .toArray(CompletableFuture[]::new)) .thenApply(v -> values); } @Override public CompletableFuture>>> mapEntrySet(String mapName) { checkState(isOpen.get(), DB_NOT_OPEN); Set>> entrySet = Sets.newConcurrentHashSet(); return CompletableFuture.allOf(partitions .stream() .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll)) .toArray(CompletableFuture[]::new)) .thenApply(v -> entrySet); } @Override public CompletableFuture counterGet(String counterName) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(counterName, counterName).counterGet(counterName); } @Override public CompletableFuture counterAddAndGet(String counterName, long delta) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta); } @Override public CompletableFuture counterGetAndAdd(String counterName, long delta) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta); } @Override public CompletableFuture counterSet(String counterName, long value) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(counterName, counterName).counterSet(counterName, value); } @Override public CompletableFuture counterCompareAndSet(String counterName, long expectedValue, long updateValue) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(counterName, counterName). counterCompareAndSet(counterName, expectedValue, updateValue); } @Override public CompletableFuture queueSize(String queueName) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(queueName, queueName).queueSize(queueName); } @Override public CompletableFuture queuePush(String queueName, byte[] entry) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry); } @Override public CompletableFuture queuePop(String queueName) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(queueName, queueName).queuePop(queueName); } @Override public CompletableFuture queuePeek(String queueName) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(queueName, queueName).queuePeek(queueName); } @Override public CompletableFuture prepareAndCommit(Transaction transaction) { Map subTransactions = createSubTransactions(transaction); if (subTransactions.isEmpty()) { return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of())); } else if (subTransactions.size() == 1) { Entry entry = subTransactions.entrySet().iterator().next(); return entry.getKey().prepareAndCommit(entry.getValue()); } else { if (transactionManager == null) { throw new IllegalStateException("TransactionManager is not initialized"); } return transactionManager.execute(transaction); } } @Override public CompletableFuture prepare(Transaction transaction) { Map subTransactions = createSubTransactions(transaction); AtomicBoolean status = new AtomicBoolean(true); return CompletableFuture.allOf(subTransactions.entrySet() .stream() .map(entry -> entry .getKey() .prepare(entry.getValue()) .thenApply(v -> status.compareAndSet(true, v))) .toArray(CompletableFuture[]::new)) .thenApply(v -> status.get()); } @Override public CompletableFuture commit(Transaction transaction) { Map subTransactions = createSubTransactions(transaction); AtomicBoolean success = new AtomicBoolean(true); List> allUpdates = Lists.newArrayList(); return CompletableFuture.allOf(subTransactions.entrySet() .stream() .map(entry -> entry.getKey().commit(entry.getValue()) .thenAccept(response -> { success.set(success.get() && response.success()); if (success.get()) { allUpdates.addAll(response.updates()); } })) .toArray(CompletableFuture[]::new)) .thenApply(v -> success.get() ? CommitResponse.success(allUpdates) : CommitResponse.failure()); } @Override public CompletableFuture rollback(Transaction transaction) { Map subTransactions = createSubTransactions(transaction); return CompletableFuture.allOf(subTransactions.entrySet() .stream() .map(entry -> entry.getKey().rollback(entry.getValue())) .toArray(CompletableFuture[]::new)) .thenApply(v -> true); } @Override public CompletableFuture open() { return CompletableFuture.allOf(partitions .stream() .map(Database::open) .toArray(CompletableFuture[]::new)) .thenApply(v -> { isOpen.set(true); return this; }); } @Override public CompletableFuture close() { checkState(isOpen.get(), DB_NOT_OPEN); return CompletableFuture.allOf(partitions .stream() .map(database -> database.close()) .toArray(CompletableFuture[]::new)); } @Override public boolean isClosed() { return !isOpen.get(); } @Override public String name() { return name; } @Override public Cluster cluster() { throw new UnsupportedOperationException(); } @Override public Database addStartupTask(Task> task) { throw new UnsupportedOperationException(); } @Override public Database addShutdownTask(Task> task) { throw new UnsupportedOperationException(); } @Override public ResourceState state() { throw new UnsupportedOperationException(); } private Map createSubTransactions( Transaction transaction) { Map> perPartitionUpdates = Maps.newHashMap(); for (DatabaseUpdate update : transaction.updates()) { Database partition = partitioner.getPartition(update.mapName(), update.key()); List partitionUpdates = perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList()); partitionUpdates.add(update); } Map subTransactions = Maps.newHashMap(); perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v))); return subTransactions; } protected void setTransactionManager(TransactionManager transactionManager) { this.transactionManager = transactionManager; } @Override public void registerConsumer(Consumer consumer) { partitions.forEach(p -> p.registerConsumer(consumer)); } @Override public void unregisterConsumer(Consumer consumer) { partitions.forEach(p -> p.unregisterConsumer(consumer)); } }