From 86a3188d6db1e7da2803a6b36daa2898fe045e45 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 14 Dec 2015 18:27:10 -0500 Subject: [PATCH] Bug 4774: Wait for prior RO tx creates on tx chain Added a priorReadOnlyTxPromises map to TransactionChainProxy that holds Promise instances for each read-only tx. When the parent class completes the primary shard lookup and creates the TransactionContext (either success or failure), onTransactionContextCreated is called which completes the Promise. A write tx that is created prior to completion will wait on the Promise's Future via findPrimaryShard. Change-Id: Ib1a620cfd5be3e38f633b3faf9ef7a31abaaf345 Signed-off-by: Tom Pantelis --- .../AbstractTransactionContextFactory.java | 47 +++++++--- .../datastore/TransactionChainProxy.java | 90 +++++++++++++++++-- .../datastore/TransactionContextFactory.java | 6 +- .../DistributedDataStoreIntegrationTest.java | 45 ++++++++++ 4 files changed, 167 insertions(+), 21 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index a0071c3f47..4fda059f31 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -55,7 +55,12 @@ abstract class AbstractTransactionContextFactory findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier().toString()); + Future findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier()); if(findPrimaryFuture.isCompleted()) { Try maybe = findPrimaryFuture.value().get(); if(maybe.isSuccess()) { @@ -154,7 +167,8 @@ abstract class AbstractTransactionContextFactory findPrimaryShard(String shardName, String txId); + protected abstract Future findPrimaryShard(@Nonnull String shardName, + @Nonnull TransactionIdentifier txId); /** * Create local transaction factory for specified shard, backed by specified shard leader @@ -175,6 +189,13 @@ abstract class AbstractTransactionContextFactory void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection> cohortFutures); + /** + * Callback invoked when the internal TransactionContext has been created for a transaction. + * + * @param transactionId the ID of the transaction. + */ + protected abstract void onTransactionContextCreated(@Nonnull TransactionIdentifier transactionId); + private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 4b75fbbd5c..d230a956c2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -8,11 +8,18 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; +import akka.dispatch.Futures; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionChainIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; @@ -120,6 +127,27 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory + * A Promise is added via newReadOnlyTransaction. When the parent class completes the primary shard + * lookup and creates the TransactionContext (either success or failure), onTransactionContextCreated is + * called which completes the Promise. A write tx that is created prior to completion will wait on the + * Promise's Future via findPrimaryShard. + */ + private final ConcurrentMap> priorReadOnlyTxPromises = new ConcurrentHashMap<>(); + TransactionChainProxy(final TransactionContextFactory parent) { super(parent.getActorContext()); @@ -134,7 +162,9 @@ final class TransactionChainProxy extends AbstractTransactionContextFactorypromise()); + return transactionProxy; } @Override @@ -178,15 +208,16 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory findPrimaryShard(final String shardName, final String txId) { + protected Future findPrimaryShard(final String shardName, final TransactionIdentifier txId) { // Read current state atomically final State localState = currentState; // There are no outstanding futures, shortcut - final Future previous = localState.previousFuture(); + Future previous = localState.previousFuture(); if (previous == null) { - return parent.findPrimaryShard(shardName, txId); + return combineFutureWithPossiblePriorReadOnlyTxFutures(parent.findPrimaryShard(shardName, txId), txId); } final String previousTransactionId; @@ -199,8 +230,10 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory returnPromise = akka.dispatch.Futures.promise(); + final Promise returnPromise = Futures.promise(); final OnComplete onComplete = new OnComplete() { @Override @@ -224,6 +257,42 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory Future combineFutureWithPossiblePriorReadOnlyTxFutures(final Future future, + final TransactionIdentifier txId) { + if(!priorReadOnlyTxPromises.containsKey(txId) && !priorReadOnlyTxPromises.isEmpty()) { + Collection>> priorReadOnlyTxPromiseEntries = + new ArrayList<>(priorReadOnlyTxPromises.entrySet()); + if(priorReadOnlyTxPromiseEntries.isEmpty()) { + return future; + } + + List> priorReadOnlyTxFutures = new ArrayList<>(priorReadOnlyTxPromiseEntries.size()); + for(Entry> entry: priorReadOnlyTxPromiseEntries) { + LOG.debug("Tx: {} - waiting on future for prior read-only Tx {}", txId, entry.getKey()); + priorReadOnlyTxFutures.add(entry.getValue().future()); + } + + Future> combinedFutures = Futures.sequence(priorReadOnlyTxFutures, + getActorContext().getClientDispatcher()); + + final Promise returnPromise = Futures.promise(); + final OnComplete> onComplete = new OnComplete>() { + @Override + public void onComplete(final Throwable failure, final Iterable notUsed) { + LOG.debug("Tx: {} - prior read-only Tx futures complete", txId); + + // Complete the returned Promise with the original Future. + returnPromise.completeWith(future); + } + }; + + combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); + return returnPromise.future(); + } else { + return future; + } + } + @Override protected void onTransactionReady(final TransactionIdentifier transaction, final Collection> cohortFutures) { final State localState = currentState; @@ -238,8 +307,7 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory> combined = akka.dispatch.Futures.sequence( - cohortFutures, getActorContext().getClientDispatcher()); + final Future> combined = Futures.sequence(cohortFutures, getActorContext().getClientDispatcher()); // Record the we have outstanding futures final State newState = new Submitted(transaction, combined); @@ -255,6 +323,14 @@ final class TransactionChainProxy extends AbstractTransactionContextFactory promise = priorReadOnlyTxPromises.remove(transactionId); + if(promise != null) { + promise.success(null); + } + } + @Override protected TransactionIdentifier nextIdentifier() { return transactionChainId.newTransactionIdentifier(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java index 1d141aec2e..db8dedcf35 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java @@ -45,7 +45,7 @@ final class TransactionContextFactory extends AbstractTransactionContextFactory< } @Override - protected Future findPrimaryShard(final String shardName, final String txId) { + protected Future findPrimaryShard(final String shardName, TransactionIdentifier txId) { return getActorContext().findPrimaryShardAsync(shardName); } @@ -57,4 +57,8 @@ final class TransactionContextFactory extends AbstractTransactionContextFactory< DOMStoreTransactionChain createTransactionChain() { return new TransactionChainProxy(this); } + + @Override + protected void onTransactionContextCreated(TransactionIdentifier transactionId) { + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 7acde4268f..8585cc6acd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -989,6 +989,51 @@ public class DistributedDataStoreIntegrationTest { }}; } + @Test + public void testChainWithReadOnlyTxAfterPreviousReady() throws Throwable { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testChainWithReadOnlyTxAfterPreviousReady", "test-1"); + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + // Create a write tx and submit. + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + // Create read-only tx's and issue a read. + + CheckedFuture>, ReadFailedException> readFuture1 = + txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH); + + CheckedFuture>, ReadFailedException> readFuture2 = + txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH); + + // Create another write tx and issue the write. + + DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); + writeTx2.write(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + // Ensure the reads succeed. + + assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent()); + assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent()); + + // Ensure the writes succeed. + + DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); + + doCommit(cohort1); + doCommit(cohort2); + + assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH). + checkedGet(5, TimeUnit.SECONDS).isPresent()); + }}; + } + @Test public void testChainedTransactionFailureWithSingleShard() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ -- 2.36.6