diff options
author | 2015-09-09 22:15:21 -0700 | |
---|---|---|
committer | 2015-09-09 22:15:21 -0700 | |
commit | 13d05bc8458758ee39cb829098241e89616717ee (patch) | |
tree | 22a4d1ce65f15952f07a3df5af4b462b4697cb3a /framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java | |
parent | 6139282e1e93c2322076de4b91b1c85d0bc4a8b3 (diff) |
ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60
Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java')
-rw-r--r-- | framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java new file mode 100644 index 00000000..9d3505bd --- /dev/null +++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java @@ -0,0 +1,368 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.store.consistent.impl; + +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.Set; + +import org.onosproject.store.service.DatabaseUpdate; +import org.onosproject.store.service.Transaction; +import org.onosproject.store.service.Versioned; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import net.kuujo.copycat.state.Initializer; +import net.kuujo.copycat.state.StateContext; + +/** + * Default database state. + */ +public class DefaultDatabaseState implements DatabaseState<String, byte[]> { + private Long nextVersion; + private Map<String, AtomicLong> counters; + private Map<String, Map<String, Versioned<byte[]>>> maps; + private Map<String, Queue<byte[]>> queues; + + /** + * This locks map has a structure similar to the "tables" map above and + * holds all the provisional updates made during a transaction's prepare phase. + * The entry value is represented as the tuple: (transactionId, newValue) + * If newValue == null that signifies this update is attempting to + * delete the existing value. + * This map also serves as a lock on the entries that are being updated. + * The presence of a entry in this map indicates that element is + * participating in a transaction and is currently locked for updates. + */ + private Map<String, Map<String, Update>> locks; + + @Initializer + @Override + public void init(StateContext<DatabaseState<String, byte[]>> context) { + counters = context.get("counters"); + if (counters == null) { + counters = Maps.newConcurrentMap(); + context.put("counters", counters); + } + maps = context.get("maps"); + if (maps == null) { + maps = Maps.newConcurrentMap(); + context.put("maps", maps); + } + locks = context.get("locks"); + if (locks == null) { + locks = Maps.newConcurrentMap(); + context.put("locks", locks); + } + queues = context.get("queues"); + if (queues == null) { + queues = Maps.newConcurrentMap(); + context.put("queues", queues); + } + nextVersion = context.get("nextVersion"); + if (nextVersion == null) { + nextVersion = new Long(0); + context.put("nextVersion", nextVersion); + } + } + + @Override + public Set<String> maps() { + return ImmutableSet.copyOf(maps.keySet()); + } + + @Override + public Map<String, Long> counters() { + Map<String, Long> counterMap = Maps.newHashMap(); + counters.forEach((k, v) -> counterMap.put(k, v.get())); + return counterMap; + } + + @Override + public int mapSize(String mapName) { + return getMap(mapName).size(); + } + + @Override + public boolean mapIsEmpty(String mapName) { + return getMap(mapName).isEmpty(); + } + + @Override + public boolean mapContainsKey(String mapName, String key) { + return getMap(mapName).containsKey(key); + } + + @Override + public boolean mapContainsValue(String mapName, byte[] value) { + return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value)); + } + + @Override + public Versioned<byte[]> mapGet(String mapName, String key) { + return getMap(mapName).get(key); + } + + + @Override + public Result<UpdateResult<String, byte[]>> mapUpdate( + String mapName, + String key, + Match<byte[]> valueMatch, + Match<Long> versionMatch, + byte[] value) { + if (isLockedForUpdates(mapName, key)) { + return Result.locked(); + } + Versioned<byte[]> currentValue = getMap(mapName).get(key); + if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) || + !versionMatch.matches(currentValue == null ? null : currentValue.version())) { + return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue)); + } else { + if (value == null) { + if (currentValue == null) { + return Result.ok(new UpdateResult<>(false, mapName, key, null, null)); + } else { + getMap(mapName).remove(key); + return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null)); + } + } + Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion); + getMap(mapName).put(key, newValue); + return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue)); + } + } + + @Override + public Result<Void> mapClear(String mapName) { + if (areTransactionsInProgress(mapName)) { + return Result.locked(); + } + getMap(mapName).clear(); + return Result.ok(null); + } + + @Override + public Set<String> mapKeySet(String mapName) { + return ImmutableSet.copyOf(getMap(mapName).keySet()); + } + + @Override + public Collection<Versioned<byte[]>> mapValues(String mapName) { + return ImmutableList.copyOf(getMap(mapName).values()); + } + + @Override + public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) { + return ImmutableSet.copyOf(getMap(mapName) + .entrySet() + .stream() + .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue())) + .collect(Collectors.toSet())); + } + + @Override + public Long counterAddAndGet(String counterName, long delta) { + return getCounter(counterName).addAndGet(delta); + } + + @Override + public Long counterGetAndAdd(String counterName, long delta) { + return getCounter(counterName).getAndAdd(delta); + } + + @Override + public Long counterGet(String counterName) { + return getCounter(counterName).get(); + } + + @Override + public Long queueSize(String queueName) { + return Long.valueOf(getQueue(queueName).size()); + } + + @Override + public byte[] queuePeek(String queueName) { + return getQueue(queueName).peek(); + } + + @Override + public byte[] queuePop(String queueName) { + return getQueue(queueName).poll(); + } + + @Override + public void queuePush(String queueName, byte[] entry) { + getQueue(queueName).offer(entry); + } + + @Override + public CommitResponse prepareAndCommit(Transaction transaction) { + if (prepare(transaction)) { + return commit(transaction); + } + return CommitResponse.failure(); + } + + @Override + public boolean prepare(Transaction transaction) { + if (transaction.updates().stream().anyMatch(update -> + isLockedByAnotherTransaction(update.mapName(), + update.key(), + transaction.id()))) { + return false; + } + + if (transaction.updates().stream().allMatch(this::isUpdatePossible)) { + transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id())); + return true; + } + return false; + } + + @Override + public CommitResponse commit(Transaction transaction) { + return CommitResponse.success(Lists.transform(transaction.updates(), + update -> commitProvisionalUpdate(update, transaction.id()))); + } + + @Override + public boolean rollback(Transaction transaction) { + transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id())); + return true; + } + + private Map<String, Versioned<byte[]>> getMap(String mapName) { + return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap()); + } + + private Map<String, Update> getLockMap(String mapName) { + return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap()); + } + + private AtomicLong getCounter(String counterName) { + return counters.computeIfAbsent(counterName, name -> new AtomicLong(0)); + } + + private Queue<byte[]> getQueue(String queueName) { + return queues.computeIfAbsent(queueName, name -> new LinkedList<>()); + } + + private boolean isUpdatePossible(DatabaseUpdate update) { + Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key()); + switch (update.type()) { + case PUT: + case REMOVE: + return true; + case PUT_IF_ABSENT: + return existingEntry == null; + case PUT_IF_VERSION_MATCH: + return existingEntry != null && existingEntry.version() == update.currentVersion(); + case PUT_IF_VALUE_MATCH: + return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue()); + case REMOVE_IF_VERSION_MATCH: + return existingEntry == null || existingEntry.version() == update.currentVersion(); + case REMOVE_IF_VALUE_MATCH: + return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue()); + default: + throw new IllegalStateException("Unsupported type: " + update.type()); + } + } + + private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) { + Map<String, Update> lockMap = getLockMap(update.mapName()); + switch (update.type()) { + case PUT: + case PUT_IF_ABSENT: + case PUT_IF_VERSION_MATCH: + case PUT_IF_VALUE_MATCH: + lockMap.put(update.key(), new Update(transactionId, update.value())); + break; + case REMOVE: + case REMOVE_IF_VERSION_MATCH: + case REMOVE_IF_VALUE_MATCH: + lockMap.put(update.key(), new Update(transactionId, null)); + break; + default: + throw new IllegalStateException("Unsupported type: " + update.type()); + } + } + + private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) { + String mapName = update.mapName(); + String key = update.key(); + Update provisionalUpdate = getLockMap(mapName).get(key); + if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { + getLockMap(mapName).remove(key); + } else { + throw new IllegalStateException("Invalid transaction Id"); + } + return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value(); + } + + private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) { + String mapName = update.mapName(); + String key = update.key(); + Update provisionalUpdate = getLockMap(mapName).get(key); + if (provisionalUpdate == null) { + return; + } + if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { + getLockMap(mapName).remove(key); + } + } + + private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) { + Update update = getLockMap(mapName).get(key); + return update != null && !Objects.equal(transactionId, update.transactionId()); + } + + private boolean isLockedForUpdates(String mapName, String key) { + return getLockMap(mapName).containsKey(key); + } + + private boolean areTransactionsInProgress(String mapName) { + return !getLockMap(mapName).isEmpty(); + } + + private class Update { + private final long transactionId; + private final byte[] value; + + public Update(long txId, byte[] value) { + this.transactionId = txId; + this.value = value; + } + + public long transactionId() { + return this.transactionId; + } + + public byte[] value() { + return this.value; + } + } +} |