From 2ce392e287211179691e9ad9c738a6776effacd8 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Sat, 1 Nov 2014 04:10:20 -0400 Subject: [PATCH] Bug 2318: Ensure previous Tx in chain is readied before creating the next In TransactionChainProxy, subclassed TransactionProxy (ChainedTransactionProxy) to hook into the TransactionProxy to capture the cohort Futures on ready and to customize the returned create shard Tx Future. The previous Tx's ready Futures are captured and cached in the enclosing TransactionChainProxy instance. When the client creates the next Tx in the chain, the Future from the shard Tx create message is wrapped by a sequence Future that combines all the previous ready Futures. Thus, when the previous ready Futures complete, the shard Tx create is sent and that Future is used to complete the returned Promise. Change-Id: If002e4d705510aee3f560c506318d25f386768e5 Signed-off-by: tpantelis --- .../datastore/TransactionChainProxy.java | 85 +++++++++++++----- .../cluster/datastore/TransactionProxy.java | 56 +++++++----- .../DistributedDataStoreIntegrationTest.java | 89 +++++++++++++++---- 3 files changed, 169 insertions(+), 61 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index b467ee4ddb..93f9e6b7de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -9,18 +9,17 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Futures; +import akka.dispatch.OnComplete; +import java.util.AbstractMap.SimpleEntry; +import java.util.List; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import scala.concurrent.Await; import scala.concurrent.Future; - -import java.util.Collections; -import java.util.List; +import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard @@ -28,7 +27,7 @@ import java.util.List; public class TransactionChainProxy implements DOMStoreTransactionChain{ private final ActorContext actorContext; private final String transactionChainId; - private volatile List> cohortFutures = Collections.emptyList(); + private volatile SimpleEntry>> previousTxReadyFutures; public TransactionChainProxy(ActorContext actorContext) { this.actorContext = actorContext; @@ -37,20 +36,17 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{ @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, this); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_WRITE, this); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.WRITE_ONLY, this); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); } @Override @@ -63,17 +59,62 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{ return transactionChainId; } - public void onTransactionReady(List> cohortFutures){ - this.cohortFutures = cohortFutures; - } + private class ChainedTransactionProxy extends TransactionProxy { + + ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) { + super(actorContext, transactionType, transactionChainId); + } + + @Override + protected void onTransactionReady(List> cohortFutures) { + if(!cohortFutures.isEmpty()) { + previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures); + } else { + previousTxReadyFutures = null; + } + } + + /** + * This method is overridden to ensure the previous Tx's ready operations complete + * before we create the next shard Tx in the chain to avoid creation failures if the + * previous Tx's ready operations haven't completed yet. + */ + @Override + protected Future sendCreateTransaction(final ActorSelection shard, + final Object serializedCreateMessage) { + // Check if there are any previous ready Futures. Also make sure the previous ready + // Futures aren't for this Tx as deadlock would occur if tried to wait on our own + // Futures. This may happen b/c the shard Tx creates are done async so it's possible + // for the client to ready this Tx before we've even attempted to create a shard Tx. + if(previousTxReadyFutures == null || + previousTxReadyFutures.getKey().equals(getIdentifier())) { + return super.sendCreateTransaction(shard, serializedCreateMessage); + } + + // Combine the ready Futures into 1. + Future> combinedFutures = akka.dispatch.Futures.sequence( + previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher()); + + // Add a callback for completion of the combined Futures. + final Promise createTxPromise = akka.dispatch.Futures.promise(); + OnComplete> onComplete = new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable notUsed) { + if(failure != null) { + // A Ready Future failed so fail the returned Promise. + createTxPromise.failure(failure); + } else { + // Send the CreateTx message and use the resulting Future to complete the + // returned Promise. + createTxPromise.completeWith(actorContext.executeOperationAsync(shard, + serializedCreateMessage)); + } + } + }; + + combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); - public void waitTillCurrentTransactionReady(){ - try { - Await.result(Futures - .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()), - actorContext.getOperationDuration()); - } catch (Exception e) { - throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e); + return createTxPromise.future(); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index ffb1ab7c55..40880d9075 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -21,6 +21,14 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -50,15 +58,6 @@ import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; -import javax.annotation.concurrent.GuardedBy; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard *

@@ -182,23 +181,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final TransactionType transactionType; private final ActorContext actorContext; private final TransactionIdentifier identifier; - private final TransactionChainProxy transactionChainProxy; + private final String transactionChainId; private final SchemaContext schemaContext; private boolean inReadyState; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, null); + this(actorContext, transactionType, ""); } public TransactionProxy(ActorContext actorContext, TransactionType transactionType, - TransactionChainProxy transactionChainProxy) { + String transactionChainId) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null"); this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), "schemaContext should not be null"); - this.transactionChainProxy = transactionChainProxy; + this.transactionChainId = transactionChainId; String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ @@ -433,14 +432,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - if(transactionChainProxy != null){ - transactionChainProxy.onTransactionReady(cohortFutures); - } + onTransactionReady(cohortFutures); return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, identifier.toString()); } + /** + * Method for derived classes to be notified when the transaction has been readied. + * + * @param cohortFutures the cohort Futures for each shard transaction. + */ + protected void onTransactionReady(List> cohortFutures) { + } + + /** + * Method called to send a CreateTransaction message to a shard. + * + * @param shard the shard actor to send to + * @param serializedCreateMessage the serialized message to send + * @return the response Future + */ + protected Future sendCreateTransaction(ActorSelection shard, + Object serializedCreateMessage) { + return actorContext.executeOperationAsync(shard, serializedCreateMessage); + } + @Override public Object getIdentifier() { return this.identifier; @@ -502,10 +519,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } public String getTransactionChainId() { - if(transactionChainProxy == null){ - return ""; - } - return transactionChainProxy.getTransactionChainId(); + return transactionChainId; } /** @@ -591,7 +605,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - Future createTxFuture = actorContext.executeOperationAsync(primaryShard, + Future createTxFuture = sendCreateTransaction(primaryShard, new CreateTransaction(identifier.toString(), TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index cec7ce1e3f..25915b198c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,11 +1,17 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; @@ -26,16 +32,10 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @@ -566,31 +566,84 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 2. Write some data - NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - writeTx.write(TestModel.TEST_PATH, containerNode); + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); // 3. Ready the Tx for commit - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - // 4. Commit the Tx + // 4. Commit the Tx on another thread that first waits for the second read Tx. - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + final CountDownLatch continueCommit1 = new CountDownLatch(1); + final CountDownLatch commit1Done = new CountDownLatch(1); + final AtomicReference commit1Error = new AtomicReference<>(); + new Thread() { + @Override + public void run() { + try { + continueCommit1.await(); + doCommit(cohort1); + } catch (Exception e) { + commit1Error.set(e); + } finally { + commit1Done.countDown(); + } + } + }.start(); - // 5. Verify the data in the store + // 5. Create a new read Tx from the chain to read and verify the data from the first + // Tx is visible after being readied. DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", containerNode, optional.get()); + assertEquals("Data node", testNode, optional.get()); + + // 6. Create a new RW Tx from the chain, write more data, and ready it + + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); + + DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); + + // 7. Create a new read Tx from the chain to read the data from the last RW Tx to + // verify it is visible. + + readTx = txChain.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + + // 8. Wait for the 2 commits to complete and close the chain. + + continueCommit1.countDown(); + Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); + + if(commit1Error.get() != null) { + throw commit1Error.get(); + } + + doCommit(cohort2); txChain.close(); + // 9. Create a new read Tx from the data store and verify committed data. + + readTx = dataStore.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + cleanup(dataStore); + } + + private void doCommit(final DOMStoreThreePhaseCommitCohort cohort1) throws Exception { + Boolean canCommit = cohort1.canCommit().get(5, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort1.preCommit().get(5, TimeUnit.SECONDS); + cohort1.commit().get(5, TimeUnit.SECONDS); }}; } -- 2.36.6