Bug 2318: Ensure previous Tx in chain is readied before creating the next 35/12535/2
authortpantelis <tpanteli@brocade.com>
Sat, 1 Nov 2014 08:10:20 +0000 (04:10 -0400)
committertpantelis <tpanteli@brocade.com>
Sat, 1 Nov 2014 22:20:21 +0000 (18:20 -0400)
In TransactionChainProxy, subclassed TransactionProxy (ChainedTransactionProxy)
to hook into the TransactionProxy to capture the cohort Futures on ready
and to customize the returned create shard Tx Future.

The previous Tx's ready Futures are captured and cached in the enclosing
TransactionChainProxy instance. When the client creates the next Tx in
the chain, the Future from the shard Tx create message is wrapped by a
sequence Future that combines all the previous ready Futures. Thus, when
the previous ready Futures complete, the shard Tx create is sent and
that Future is used to complete the returned Promise.

Change-Id: If002e4d705510aee3f560c506318d25f386768e5
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java

index b467ee4ddbf56c456c2e9f0f62381eefa173e31d..93f9e6b7de1e2085f01d36c74a37b84ff7ecb4d2 100644 (file)
@@ -9,18 +9,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.List;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import scala.concurrent.Await;
 import scala.concurrent.Future;
-
-import java.util.Collections;
-import java.util.List;
+import scala.concurrent.Promise;
 
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
@@ -28,7 +27,7 @@ import java.util.List;
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
     private final String transactionChainId;
-    private volatile List<Future<ActorSelection>> cohortFutures = Collections.emptyList();
+    private volatile SimpleEntry<Object, List<Future<ActorSelection>>> previousTxReadyFutures;
 
     public TransactionChainProxy(ActorContext actorContext) {
         this.actorContext = actorContext;
@@ -37,20 +36,17 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_ONLY, this);
+        return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_WRITE, this);
+        return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.WRITE_ONLY, this);
+        return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
@@ -63,17 +59,62 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{
         return transactionChainId;
     }
 
-    public void onTransactionReady(List<Future<ActorSelection>> cohortFutures){
-        this.cohortFutures = cohortFutures;
-    }
+    private class ChainedTransactionProxy extends TransactionProxy {
+
+        ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+            super(actorContext, transactionType, transactionChainId);
+        }
+
+        @Override
+        protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+            if(!cohortFutures.isEmpty()) {
+                previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures);
+            } else {
+                previousTxReadyFutures = null;
+            }
+        }
+
+        /**
+         * This method is overridden to ensure the previous Tx's ready operations complete
+         * before we create the next shard Tx in the chain to avoid creation failures if the
+         * previous Tx's ready operations haven't completed yet.
+         */
+        @Override
+        protected Future<Object> sendCreateTransaction(final ActorSelection shard,
+                final Object serializedCreateMessage) {
+            // Check if there are any previous ready Futures. Also make sure the previous ready
+            // Futures aren't for this Tx as deadlock would occur if tried to wait on our own
+            // Futures. This may happen b/c the shard Tx creates are done async so it's possible
+            // for the client to ready this Tx before we've even attempted to create a shard Tx.
+            if(previousTxReadyFutures == null ||
+                    previousTxReadyFutures.getKey().equals(getIdentifier())) {
+                return super.sendCreateTransaction(shard, serializedCreateMessage);
+            }
+
+            // Combine the ready Futures into 1.
+            Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
+                    previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher());
+
+            // Add a callback for completion of the combined Futures.
+            final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+            OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
+                @Override
+                public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
+                    if(failure != null) {
+                        // A Ready Future failed so fail the returned Promise.
+                        createTxPromise.failure(failure);
+                    } else {
+                        // Send the CreateTx message and use the resulting Future to complete the
+                        // returned Promise.
+                        createTxPromise.completeWith(actorContext.executeOperationAsync(shard,
+                                serializedCreateMessage));
+                    }
+                }
+            };
+
+            combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
 
-    public void waitTillCurrentTransactionReady(){
-        try {
-            Await.result(Futures
-                .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()),
-                actorContext.getOperationDuration());
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e);
+            return createTxPromise.future();
         }
     }
 }
index ffb1ab7c55064ecf7683e1857800b256c9c82a1d..40880d907580bbaec6a81756d07b3b8b37210ade 100644 (file)
@@ -21,6 +21,14 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -50,15 +58,6 @@ import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
-import javax.annotation.concurrent.GuardedBy;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
  * <p>
@@ -182,23 +181,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final TransactionType transactionType;
     private final ActorContext actorContext;
     private final TransactionIdentifier identifier;
-    private final TransactionChainProxy transactionChainProxy;
+    private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
-        this(actorContext, transactionType, null);
+        this(actorContext, transactionType, "");
     }
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            TransactionChainProxy transactionChainProxy) {
+            String transactionChainId) {
         this.actorContext = Preconditions.checkNotNull(actorContext,
             "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
             "transactionType should not be null");
         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
             "schemaContext should not be null");
-        this.transactionChainProxy = transactionChainProxy;
+        this.transactionChainId = transactionChainId;
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -433,14 +432,32 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-        if(transactionChainProxy != null){
-            transactionChainProxy.onTransactionReady(cohortFutures);
-        }
+        onTransactionReady(cohortFutures);
 
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
                 identifier.toString());
     }
 
+    /**
+     * Method for derived classes to be notified when the transaction has been readied.
+     *
+     * @param cohortFutures the cohort Futures for each shard transaction.
+     */
+    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
+    }
+
+    /**
+     * Method called to send a CreateTransaction message to a shard.
+     *
+     * @param shard the shard actor to send to
+     * @param serializedCreateMessage the serialized message to send
+     * @return the response Future
+     */
+    protected Future<Object> sendCreateTransaction(ActorSelection shard,
+            Object serializedCreateMessage) {
+        return actorContext.executeOperationAsync(shard, serializedCreateMessage);
+    }
+
     @Override
     public Object getIdentifier() {
         return this.identifier;
@@ -502,10 +519,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     public String getTransactionChainId() {
-        if(transactionChainProxy == null){
-            return "";
-        }
-        return transactionChainProxy.getTransactionChainId();
+        return transactionChainId;
     }
 
     /**
@@ -591,7 +605,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
+            Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
                     new CreateTransaction(identifier.toString(),
                             TransactionProxy.this.transactionType.ordinal(),
                             getTransactionChainId()).toSerializable());
index cec7ce1e3fc250a84231f84e83d97996bbe2e55a..25915b198ca3cda368a92771356aaaca92ed57e4 100644 (file)
@@ -1,11 +1,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -26,16 +32,10 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
@@ -566,31 +566,84 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // 2. Write some data
 
-            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            writeTx.write(TestModel.TEST_PATH, containerNode);
+            NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            writeTx.write(TestModel.TEST_PATH, testNode);
 
             // 3. Ready the Tx for commit
 
-            DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+            final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
 
-            // 4. Commit the Tx
+            // 4. Commit the Tx on another thread that first waits for the second read Tx.
 
-            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort.preCommit().get(5, TimeUnit.SECONDS);
-            cohort.commit().get(5, TimeUnit.SECONDS);
+            final CountDownLatch continueCommit1 = new CountDownLatch(1);
+            final CountDownLatch commit1Done = new CountDownLatch(1);
+            final AtomicReference<Exception> commit1Error = new AtomicReference<>();
+            new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        continueCommit1.await();
+                        doCommit(cohort1);
+                    } catch (Exception e) {
+                        commit1Error.set(e);
+                    } finally {
+                        commit1Done.countDown();
+                    }
+                }
+            }.start();
 
-            // 5. Verify the data in the store
+            // 5. Create a new read Tx from the chain to read and verify the data from the first
+            // Tx is visible after being readied.
 
             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
-
             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
             assertEquals("isPresent", true, optional.isPresent());
-            assertEquals("Data node", containerNode, optional.get());
+            assertEquals("Data node", testNode, optional.get());
+
+            // 6. Create a new RW Tx from the chain, write more data, and ready it
+
+            DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+            MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+            rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
+
+            DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
+
+            // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
+            // verify it is visible.
+
+            readTx = txChain.newReadOnlyTransaction();
+            optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", outerNode, optional.get());
+
+            // 8. Wait for the 2 commits to complete and close the chain.
+
+            continueCommit1.countDown();
+            Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
+
+            if(commit1Error.get() != null) {
+                throw commit1Error.get();
+            }
+
+            doCommit(cohort2);
 
             txChain.close();
 
+            // 9. Create a new read Tx from the data store and verify committed data.
+
+            readTx = dataStore.newReadOnlyTransaction();
+            optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", outerNode, optional.get());
+
             cleanup(dataStore);
+        }
+
+        private void doCommit(final DOMStoreThreePhaseCommitCohort cohort1) throws Exception {
+            Boolean canCommit = cohort1.canCommit().get(5, TimeUnit.SECONDS);
+            assertEquals("canCommit", true, canCommit);
+            cohort1.preCommit().get(5, TimeUnit.SECONDS);
+            cohort1.commit().get(5, TimeUnit.SECONDS);
         }};
     }