aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java')
-rw-r--r--framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java126
1 files changed, 126 insertions, 0 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
new file mode 100644
index 00000000..fc6e58d0
--- /dev/null
+++ b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.DatabaseUpdate;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Transaction;
+import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.Transaction.State;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Agent that runs the two phase commit protocol.
+ */
+public class TransactionManager {
+
+ private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .nextId(KryoNamespace.FLOATING_ID)
+ .register(Versioned.class)
+ .register(DatabaseUpdate.class)
+ .register(DatabaseUpdate.Type.class)
+ .register(DefaultTransaction.class)
+ .register(Transaction.State.class)
+ .build();
+
+ private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
+ private final Database database;
+ private final AsyncConsistentMap<Long, Transaction> transactions;
+
+ /**
+ * Constructs a new TransactionManager for the specified database instance.
+ *
+ * @param database database
+ * @param mapBuilder builder for ConsistentMap instances
+ */
+ public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
+ this.database = checkNotNull(database, "database cannot be null");
+ this.transactions = mapBuilder.withName("onos-transactions")
+ .withSerializer(serializer)
+ .buildAsyncMap();
+ }
+
+ /**
+ * Executes the specified transaction by employing a two phase commit protocol.
+ *
+ * @param transaction transaction to commit
+ * @return transaction result. Result value true indicates a successful commit, false
+ * indicates abort
+ */
+ public CompletableFuture<CommitResponse> execute(Transaction transaction) {
+ // clean up if this transaction in already in a terminal state.
+ if (transaction.state() == Transaction.State.COMMITTED ||
+ transaction.state() == Transaction.State.ROLLEDBACK) {
+ return transactions.remove(transaction.id()).thenApply(v -> CommitResponse.success(ImmutableList.of()));
+ } else if (transaction.state() == Transaction.State.COMMITTING) {
+ return commit(transaction);
+ } else if (transaction.state() == Transaction.State.ROLLINGBACK) {
+ return rollback(transaction).thenApply(v -> CommitResponse.success(ImmutableList.of()));
+ } else {
+ return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
+ }
+ }
+
+
+ /**
+ * Returns all transactions in the system.
+ *
+ * @return future for a collection of transactions
+ */
+ public CompletableFuture<Collection<Transaction>> getTransactions() {
+ return transactions.values().thenApply(c -> {
+ Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
+ return txns;
+ });
+ }
+
+ private CompletableFuture<Boolean> prepare(Transaction transaction) {
+ return transactions.put(transaction.id(), transaction)
+ .thenCompose(v -> database.prepare(transaction))
+ .thenCompose(status -> transactions.put(
+ transaction.id(),
+ transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
+ .thenApply(v -> status));
+ }
+
+ private CompletableFuture<CommitResponse> commit(Transaction transaction) {
+ return database.commit(transaction)
+ .whenComplete((r, e) -> transactions.put(
+ transaction.id(),
+ transaction.transition(Transaction.State.COMMITTED)));
+ }
+
+ private CompletableFuture<CommitResponse> rollback(Transaction transaction) {
+ return database.rollback(transaction)
+ .thenCompose(v -> transactions.put(
+ transaction.id(),
+ transaction.transition(Transaction.State.ROLLEDBACK)))
+ .thenApply(v -> CommitResponse.failure());
+ }
+}