summaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
diff options
context:
space:
mode:
authorAshlee Young <ashlee@onosfw.com>2015-09-09 22:15:21 -0700
committerAshlee Young <ashlee@onosfw.com>2015-09-09 22:15:21 -0700
commit13d05bc8458758ee39cb829098241e89616717ee (patch)
tree22a4d1ce65f15952f07a3df5af4b462b4697cb3a /framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
parent6139282e1e93c2322076de4b91b1c85d0bc4a8b3 (diff)
ONOS checkin based on commit tag e796610b1f721d02f9b0e213cf6f7790c10ecd60
Change-Id: Ife8810491034fe7becdba75dda20de4267bd15cd
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java368
1 files changed, 368 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
new file mode 100644
index 00000000..9d3505bd
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -0,0 +1,368 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.consistent.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.Set;
+
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.StateContext;
+
+/**
+ * Default database state.
+ */
+public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
+ private Long nextVersion;
+ private Map<String, AtomicLong> counters;
+ private Map<String, Map<String, Versioned<byte[]>>> maps;
+ private Map<String, Queue<byte[]>> queues;
+
+ /**
+ * This locks map has a structure similar to the "tables" map above and
+ * holds all the provisional updates made during a transaction's prepare phase.
+ * The entry value is represented as the tuple: (transactionId, newValue)
+ * If newValue == null that signifies this update is attempting to
+ * delete the existing value.
+ * This map also serves as a lock on the entries that are being updated.
+ * The presence of a entry in this map indicates that element is
+ * participating in a transaction and is currently locked for updates.
+ */
+ private Map<String, Map<String, Update>> locks;
+
+ @Initializer
+ @Override
+ public void init(StateContext<DatabaseState<String, byte[]>> context) {
+ counters = context.get("counters");
+ if (counters == null) {
+ counters = Maps.newConcurrentMap();
+ context.put("counters", counters);
+ }
+ maps = context.get("maps");
+ if (maps == null) {
+ maps = Maps.newConcurrentMap();
+ context.put("maps", maps);
+ }
+ locks = context.get("locks");
+ if (locks == null) {
+ locks = Maps.newConcurrentMap();
+ context.put("locks", locks);
+ }
+ queues = context.get("queues");
+ if (queues == null) {
+ queues = Maps.newConcurrentMap();
+ context.put("queues", queues);
+ }
+ nextVersion = context.get("nextVersion");
+ if (nextVersion == null) {
+ nextVersion = new Long(0);
+ context.put("nextVersion", nextVersion);
+ }
+ }
+
+ @Override
+ public Set<String> maps() {
+ return ImmutableSet.copyOf(maps.keySet());
+ }
+
+ @Override
+ public Map<String, Long> counters() {
+ Map<String, Long> counterMap = Maps.newHashMap();
+ counters.forEach((k, v) -> counterMap.put(k, v.get()));
+ return counterMap;
+ }
+
+ @Override
+ public int mapSize(String mapName) {
+ return getMap(mapName).size();
+ }
+
+ @Override
+ public boolean mapIsEmpty(String mapName) {
+ return getMap(mapName).isEmpty();
+ }
+
+ @Override
+ public boolean mapContainsKey(String mapName, String key) {
+ return getMap(mapName).containsKey(key);
+ }
+
+ @Override
+ public boolean mapContainsValue(String mapName, byte[] value) {
+ return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
+ }
+
+ @Override
+ public Versioned<byte[]> mapGet(String mapName, String key) {
+ return getMap(mapName).get(key);
+ }
+
+
+ @Override
+ public Result<UpdateResult<String, byte[]>> mapUpdate(
+ String mapName,
+ String key,
+ Match<byte[]> valueMatch,
+ Match<Long> versionMatch,
+ byte[] value) {
+ if (isLockedForUpdates(mapName, key)) {
+ return Result.locked();
+ }
+ Versioned<byte[]> currentValue = getMap(mapName).get(key);
+ if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
+ !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
+ return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
+ } else {
+ if (value == null) {
+ if (currentValue == null) {
+ return Result.ok(new UpdateResult<>(false, mapName, key, null, null));
+ } else {
+ getMap(mapName).remove(key);
+ return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
+ }
+ }
+ Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
+ getMap(mapName).put(key, newValue);
+ return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
+ }
+ }
+
+ @Override
+ public Result<Void> mapClear(String mapName) {
+ if (areTransactionsInProgress(mapName)) {
+ return Result.locked();
+ }
+ getMap(mapName).clear();
+ return Result.ok(null);
+ }
+
+ @Override
+ public Set<String> mapKeySet(String mapName) {
+ return ImmutableSet.copyOf(getMap(mapName).keySet());
+ }
+
+ @Override
+ public Collection<Versioned<byte[]>> mapValues(String mapName) {
+ return ImmutableList.copyOf(getMap(mapName).values());
+ }
+
+ @Override
+ public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
+ return ImmutableSet.copyOf(getMap(mapName)
+ .entrySet()
+ .stream()
+ .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public Long counterAddAndGet(String counterName, long delta) {
+ return getCounter(counterName).addAndGet(delta);
+ }
+
+ @Override
+ public Long counterGetAndAdd(String counterName, long delta) {
+ return getCounter(counterName).getAndAdd(delta);
+ }
+
+ @Override
+ public Long counterGet(String counterName) {
+ return getCounter(counterName).get();
+ }
+
+ @Override
+ public Long queueSize(String queueName) {
+ return Long.valueOf(getQueue(queueName).size());
+ }
+
+ @Override
+ public byte[] queuePeek(String queueName) {
+ return getQueue(queueName).peek();
+ }
+
+ @Override
+ public byte[] queuePop(String queueName) {
+ return getQueue(queueName).poll();
+ }
+
+ @Override
+ public void queuePush(String queueName, byte[] entry) {
+ getQueue(queueName).offer(entry);
+ }
+
+ @Override
+ public CommitResponse prepareAndCommit(Transaction transaction) {
+ if (prepare(transaction)) {
+ return commit(transaction);
+ }
+ return CommitResponse.failure();
+ }
+
+ @Override
+ public boolean prepare(Transaction transaction) {
+ if (transaction.updates().stream().anyMatch(update ->
+ isLockedByAnotherTransaction(update.mapName(),
+ update.key(),
+ transaction.id()))) {
+ return false;
+ }
+
+ if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
+ transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public CommitResponse commit(Transaction transaction) {
+ return CommitResponse.success(Lists.transform(transaction.updates(),
+ update -> commitProvisionalUpdate(update, transaction.id())));
+ }
+
+ @Override
+ public boolean rollback(Transaction transaction) {
+ transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
+ return true;
+ }
+
+ private Map<String, Versioned<byte[]>> getMap(String mapName) {
+ return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
+ }
+
+ private Map<String, Update> getLockMap(String mapName) {
+ return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
+ }
+
+ private AtomicLong getCounter(String counterName) {
+ return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
+ }
+
+ private Queue<byte[]> getQueue(String queueName) {
+ return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
+ }
+
+ private boolean isUpdatePossible(DatabaseUpdate update) {
+ Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
+ switch (update.type()) {
+ case PUT:
+ case REMOVE:
+ return true;
+ case PUT_IF_ABSENT:
+ return existingEntry == null;
+ case PUT_IF_VERSION_MATCH:
+ return existingEntry != null && existingEntry.version() == update.currentVersion();
+ case PUT_IF_VALUE_MATCH:
+ return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
+ case REMOVE_IF_VERSION_MATCH:
+ return existingEntry == null || existingEntry.version() == update.currentVersion();
+ case REMOVE_IF_VALUE_MATCH:
+ return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
+ default:
+ throw new IllegalStateException("Unsupported type: " + update.type());
+ }
+ }
+
+ private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ Map<String, Update> lockMap = getLockMap(update.mapName());
+ switch (update.type()) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ case PUT_IF_VERSION_MATCH:
+ case PUT_IF_VALUE_MATCH:
+ lockMap.put(update.key(), new Update(transactionId, update.value()));
+ break;
+ case REMOVE:
+ case REMOVE_IF_VERSION_MATCH:
+ case REMOVE_IF_VALUE_MATCH:
+ lockMap.put(update.key(), new Update(transactionId, null));
+ break;
+ default:
+ throw new IllegalStateException("Unsupported type: " + update.type());
+ }
+ }
+
+ private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ String mapName = update.mapName();
+ String key = update.key();
+ Update provisionalUpdate = getLockMap(mapName).get(key);
+ if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
+ getLockMap(mapName).remove(key);
+ } else {
+ throw new IllegalStateException("Invalid transaction Id");
+ }
+ return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
+ }
+
+ private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
+ String mapName = update.mapName();
+ String key = update.key();
+ Update provisionalUpdate = getLockMap(mapName).get(key);
+ if (provisionalUpdate == null) {
+ return;
+ }
+ if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
+ getLockMap(mapName).remove(key);
+ }
+ }
+
+ private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
+ Update update = getLockMap(mapName).get(key);
+ return update != null && !Objects.equal(transactionId, update.transactionId());
+ }
+
+ private boolean isLockedForUpdates(String mapName, String key) {
+ return getLockMap(mapName).containsKey(key);
+ }
+
+ private boolean areTransactionsInProgress(String mapName) {
+ return !getLockMap(mapName).isEmpty();
+ }
+
+ private class Update {
+ private final long transactionId;
+ private final byte[] value;
+
+ public Update(long txId, byte[] value) {
+ this.transactionId = txId;
+ this.value = value;
+ }
+
+ public long transactionId() {
+ return this.transactionId;
+ }
+
+ public byte[] value() {
+ return this.value;
+ }
+ }
+}