/* * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onosproject.store.consistent.impl; import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import net.kuujo.copycat.state.Initializer; import net.kuujo.copycat.state.StateContext; import org.onosproject.store.service.DatabaseUpdate; import org.onosproject.store.service.Transaction; import org.onosproject.store.service.Versioned; import java.util.Arrays; import java.util.Collection; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** * Default database state. */ public class DefaultDatabaseState implements DatabaseState { private Long nextVersion; private Map counters; private Map>> maps; private Map> queues; /** * This locks map has a structure similar to the "tables" map above and * holds all the provisional updates made during a transaction's prepare phase. * The entry value is represented as the tuple: (transactionId, newValue) * If newValue == null that signifies this update is attempting to * delete the existing value. * This map also serves as a lock on the entries that are being updated. * The presence of a entry in this map indicates that element is * participating in a transaction and is currently locked for updates. */ private Map> locks; @Initializer @Override public void init(StateContext> context) { counters = context.get("counters"); if (counters == null) { counters = Maps.newConcurrentMap(); context.put("counters", counters); } maps = context.get("maps"); if (maps == null) { maps = Maps.newConcurrentMap(); context.put("maps", maps); } locks = context.get("locks"); if (locks == null) { locks = Maps.newConcurrentMap(); context.put("locks", locks); } queues = context.get("queues"); if (queues == null) { queues = Maps.newConcurrentMap(); context.put("queues", queues); } nextVersion = context.get("nextVersion"); if (nextVersion == null) { nextVersion = 0L; context.put("nextVersion", nextVersion); } } @Override public Set maps() { return ImmutableSet.copyOf(maps.keySet()); } @Override public Map counters() { Map counterMap = Maps.newHashMap(); counters.forEach((k, v) -> counterMap.put(k, v.get())); return counterMap; } @Override public int mapSize(String mapName) { return getMap(mapName).size(); } @Override public boolean mapIsEmpty(String mapName) { return getMap(mapName).isEmpty(); } @Override public boolean mapContainsKey(String mapName, String key) { return getMap(mapName).containsKey(key); } @Override public boolean mapContainsValue(String mapName, byte[] value) { return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value)); } @Override public Versioned mapGet(String mapName, String key) { return getMap(mapName).get(key); } @Override public Result> mapUpdate( String mapName, String key, Match valueMatch, Match versionMatch, byte[] value) { if (isLockedForUpdates(mapName, key)) { return Result.locked(); } Versioned currentValue = getMap(mapName).get(key); if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) || !versionMatch.matches(currentValue == null ? null : currentValue.version())) { return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue)); } else { if (value == null) { if (currentValue == null) { return Result.ok(new UpdateResult<>(false, mapName, key, null, null)); } else { getMap(mapName).remove(key); return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null)); } } Versioned newValue = new Versioned<>(value, ++nextVersion); getMap(mapName).put(key, newValue); return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue)); } } @Override public Result mapClear(String mapName) { if (areTransactionsInProgress(mapName)) { return Result.locked(); } getMap(mapName).clear(); return Result.ok(null); } @Override public Set mapKeySet(String mapName) { return ImmutableSet.copyOf(getMap(mapName).keySet()); } @Override public Collection> mapValues(String mapName) { return ImmutableList.copyOf(getMap(mapName).values()); } @Override public Set>> mapEntrySet(String mapName) { return ImmutableSet.copyOf(getMap(mapName) .entrySet() .stream() .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue())) .collect(Collectors.toSet())); } @Override public Long counterAddAndGet(String counterName, long delta) { return getCounter(counterName).addAndGet(delta); } @Override public Long counterGetAndAdd(String counterName, long delta) { return getCounter(counterName).getAndAdd(delta); } @Override public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) { return getCounter(counterName).compareAndSet(expectedValue, updateValue); } @Override public Long counterGet(String counterName) { return getCounter(counterName).get(); } @Override public Long queueSize(String queueName) { return Long.valueOf(getQueue(queueName).size()); } @Override public byte[] queuePeek(String queueName) { return getQueue(queueName).peek(); } @Override public byte[] queuePop(String queueName) { return getQueue(queueName).poll(); } @Override public void queuePush(String queueName, byte[] entry) { getQueue(queueName).offer(entry); } @Override public CommitResponse prepareAndCommit(Transaction transaction) { if (prepare(transaction)) { return commit(transaction); } return CommitResponse.failure(); } @Override public boolean prepare(Transaction transaction) { if (transaction.updates().stream().anyMatch(update -> isLockedByAnotherTransaction(update.mapName(), update.key(), transaction.id()))) { return false; } if (transaction.updates().stream().allMatch(this::isUpdatePossible)) { transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id())); return true; } return false; } @Override public CommitResponse commit(Transaction transaction) { return CommitResponse.success(Lists.transform(transaction.updates(), update -> commitProvisionalUpdate(update, transaction.id()))); } @Override public boolean rollback(Transaction transaction) { transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id())); return true; } private Map> getMap(String mapName) { return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap()); } private Map getLockMap(String mapName) { return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap()); } private AtomicLong getCounter(String counterName) { return counters.computeIfAbsent(counterName, name -> new AtomicLong(0)); } private Queue getQueue(String queueName) { return queues.computeIfAbsent(queueName, name -> new LinkedList<>()); } private boolean isUpdatePossible(DatabaseUpdate update) { Versioned existingEntry = mapGet(update.mapName(), update.key()); switch (update.type()) { case PUT: case REMOVE: return true; case PUT_IF_ABSENT: return existingEntry == null; case PUT_IF_VERSION_MATCH: return existingEntry != null && existingEntry.version() == update.currentVersion(); case PUT_IF_VALUE_MATCH: return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue()); case REMOVE_IF_VERSION_MATCH: return existingEntry == null || existingEntry.version() == update.currentVersion(); case REMOVE_IF_VALUE_MATCH: return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue()); default: throw new IllegalStateException("Unsupported type: " + update.type()); } } private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) { Map lockMap = getLockMap(update.mapName()); switch (update.type()) { case PUT: case PUT_IF_ABSENT: case PUT_IF_VERSION_MATCH: case PUT_IF_VALUE_MATCH: lockMap.put(update.key(), new Update(transactionId, update.value())); break; case REMOVE: case REMOVE_IF_VERSION_MATCH: case REMOVE_IF_VALUE_MATCH: lockMap.put(update.key(), new Update(transactionId, null)); break; default: throw new IllegalStateException("Unsupported type: " + update.type()); } } private UpdateResult commitProvisionalUpdate(DatabaseUpdate update, long transactionId) { String mapName = update.mapName(); String key = update.key(); Update provisionalUpdate = getLockMap(mapName).get(key); if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { getLockMap(mapName).remove(key); } else { throw new IllegalStateException("Invalid transaction Id"); } return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value(); } private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) { String mapName = update.mapName(); String key = update.key(); Update provisionalUpdate = getLockMap(mapName).get(key); if (provisionalUpdate == null) { return; } if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { getLockMap(mapName).remove(key); } } private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) { Update update = getLockMap(mapName).get(key); return update != null && !Objects.equal(transactionId, update.transactionId()); } private boolean isLockedForUpdates(String mapName, String key) { return getLockMap(mapName).containsKey(key); } private boolean areTransactionsInProgress(String mapName) { return !getLockMap(mapName).isEmpty(); } private class Update { private final long transactionId; private final byte[] value; public Update(long txId, byte[] value) { this.transactionId = txId; this.value = value; } public long transactionId() { return this.transactionId; } public byte[] value() { return this.value; } } }