CDS: Changes to Tx abort in Shard 62/24262/2
authorTom Pantelis <tpanteli@brocade.com>
Wed, 1 Jul 2015 22:59:51 +0000 (18:59 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 20 Jul 2015 21:30:48 +0000 (21:30 +0000)
I noticed when a tx times out on the front-end during CAN_COMMIT, it
tries to abort the tx but it may not get aborted in the Shard and the
front-end gets an AskTimeoutEx on the abort. The reason is that the
Shard only processes the abort request if the tx is the current tx being
committed. If it isn't, the request is ignored and no response is sent,
resulting in the front-end timeout.

I think it makes sense to also process the abort if the tx is sitting in
the queue awaiting CAN_COMMIT. If the front-end says to abort for any
reason, the Shard should honor it. Also, if it isn't aborted, the Shard
may dequeue it sometime later and attempt to commit it which can lead to
unpredictable results if prior commits failed.

As per the comments in the Helium patch, I did some re-factoring to make
it a bit cleaner. I moved the abort code from the Shard to the
ShardCommitCoordinator. This makes it consistent with the other tx
phases where the Shard mostly delegates to the ShardCommitCoordinator. I
also removed the getCohort method from CohortEntry and added appropriate
methods so the internal cohort instance isn't exposed.

There's more refactoring/cleanup that can be done re: Futures and also
moving CohortEntry into its own class (it's large enough) but I don't
want to overload this patch.

Change-Id: I73c79a5e4a2b39b7ee4d97a011de2d29b050dbc4
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 70e0f223f41ab77de24b6df940d12acd39279e9a)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index ce3cae481e766f4ba4b7f93593e8d12a282ee294..5a10c3e961bc8e038a0dd3969134b903aace2a56 100644 (file)
@@ -18,9 +18,6 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactio
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -83,7 +79,7 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+    protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
     @VisibleForTesting
     static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
@@ -326,7 +322,7 @@ public class Shard extends RaftActor {
     }
 
     void continueCommit(final CohortEntry cohortEntry) throws Exception {
-        final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
+        final DataTreeCandidate candidate = cohortEntry.getCandidate();
 
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
@@ -349,10 +345,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            cohortEntry.getCohort().commit().get();
+            cohortEntry.commit();
 
             sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
 
@@ -385,7 +378,7 @@ public class Shard extends RaftActor {
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
                 try {
-                    store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+                    store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
                 } catch (DataValidationFailedException e) {
                     shardMBean.incrementFailedTransactionsCount();
                     LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
@@ -494,38 +487,7 @@ public class Shard extends RaftActor {
     }
 
     void doAbortTransaction(final String transactionID, final ActorRef sender) {
-        final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry != null) {
-            LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
-
-            // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
-            // aborted during replication in which case we may still commit locally if replication
-            // succeeds.
-            commitCoordinator.currentTransactionComplete(transactionID, false);
-
-            final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
-            final ActorRef self = getSelf();
-
-            Futures.addCallback(future, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(final Void v) {
-                    shardMBean.incrementAbortTransactionsCount();
-
-                    if(sender != null) {
-                        sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
-                    }
-                }
-
-                @Override
-                public void onFailure(final Throwable t) {
-                    LOG.error("{}: An exception happened during abort", persistenceId(), t);
-
-                    if(sender != null) {
-                        sender.tell(new akka.actor.Status.Failure(t), self);
-                    }
-                }
-            });
-        }
+        commitCoordinator.handleAbort(transactionID, sender, this);
     }
 
     private void handleCreateTransaction(final Object message) {
index 3431755b5109d8b30a2e5137e626b1d565751c71..5e88cc73d38c4ae6dd31d61282503e169f2d09c9 100644 (file)
@@ -21,6 +21,7 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
@@ -30,6 +31,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 
 /**
@@ -96,6 +98,10 @@ class ShardCommitCoordinator {
     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
         if(queuedCohortEntries.size() < queueCapacity) {
             queuedCohortEntries.offer(cohortEntry);
+
+            log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
+                    queuedCohortEntries.size());
+
             return true;
         } else {
             cohortCache.remove(cohortEntry.getTransactionID());
@@ -314,10 +320,7 @@ class ShardCommitCoordinator {
     private void doCanCommit(final CohortEntry cohortEntry) {
         boolean canCommit = false;
         try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            canCommit = cohortEntry.getCohort().canCommit().get();
+            canCommit = cohortEntry.canCommit();
 
             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
 
@@ -361,10 +364,7 @@ class ShardCommitCoordinator {
         // normally fail since we ensure only one concurrent 3-phase commit.
 
         try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            cohortEntry.getCohort().preCommit().get();
+            cohortEntry.preCommit();
 
             cohortEntry.getShard().continueCommit(cohortEntry);
 
@@ -409,6 +409,41 @@ class ShardCommitCoordinator {
         return doCommit(cohortEntry);
     }
 
+    void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+        CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
+        if(cohortEntry != null) {
+            // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
+            // aborted during replication in which case we may still commit locally if replication
+            // succeeds.
+            currentTransactionComplete(transactionID, false);
+        } else {
+            cohortEntry = getAndRemoveCohortEntry(transactionID);
+        }
+
+        if(cohortEntry == null) {
+            return;
+        }
+
+        log.debug("{}: Aborting transaction {}", name, transactionID);
+
+        final ActorRef self = shard.getSelf();
+        try {
+            cohortEntry.abort();
+
+            shard.getShardMBean().incrementAbortTransactionsCount();
+
+            if(sender != null) {
+                sender.tell(new AbortTransactionReply().toSerializable(), self);
+            }
+        } catch (Exception e) {
+            log.error("{}: An exception happened during abort", name, e);
+
+            if(sender != null) {
+                sender.tell(new akka.actor.Status.Failure(e), self);
+            }
+        }
+    }
+
     /**
      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
      * matches the current entry.
@@ -483,12 +518,12 @@ class ShardCommitCoordinator {
             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
-
-                iter.remove();
-                cohortCache.remove(next.getTransactionID());
-            } else {
+            } else if(!next.isAborted()) {
                 break;
             }
+
+            iter.remove();
+            cohortCache.remove(next.getTransactionID());
         }
     }
 
@@ -511,6 +546,7 @@ class ShardCommitCoordinator {
         private boolean doImmediateCommit;
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
         private int totalBatchedModificationsReceived;
+        private boolean aborted;
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
             this.transaction = Preconditions.checkNotNull(transaction);
@@ -539,8 +575,8 @@ class ShardCommitCoordinator {
             return transactionID;
         }
 
-        ShardDataTreeCohort getCohort() {
-            return cohort;
+        DataTreeCandidate getCandidate() {
+            return cohort.getCandidate();
         }
 
         int getTotalBatchedModificationsReceived() {
@@ -565,6 +601,28 @@ class ShardCommitCoordinator {
             }
         }
 
+        boolean canCommit() throws InterruptedException, ExecutionException {
+            // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
+            // about possibly accessing our state on a different thread outside of our dispatcher.
+            // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
+            // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
+            // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
+            return cohort.canCommit().get();
+        }
+
+        void preCommit() throws InterruptedException, ExecutionException {
+            cohort.preCommit().get();
+        }
+
+        void commit() throws InterruptedException, ExecutionException {
+            cohort.commit().get();
+        }
+
+        void abort() throws InterruptedException, ExecutionException {
+            aborted = true;
+            cohort.abort().get();
+        }
+
         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
             Preconditions.checkState(cohort == null, "cohort was already set");
 
@@ -610,6 +668,11 @@ class ShardCommitCoordinator {
             this.shard = shard;
         }
 
+
+        boolean isAborted() {
+            return aborted;
+        }
+
         @Override
         public String toString() {
             StringBuilder builder = new StringBuilder();
index 83b15b99df43f74e0a8a5d61d7493b1792c17e8b..24a5225b8744edb82981f8eb7dc4ab07eee4a6b4 100644 (file)
@@ -10,6 +10,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -124,29 +125,6 @@ public class ShardTest extends AbstractShardTest {
 
     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
 
-    final CountDownLatch recoveryComplete = new CountDownLatch(1);
-
-    protected Props newShardPropsWithRecoveryComplete() {
-
-        final Creator<Shard> creator = new Creator<Shard>() {
-            @Override
-            public Shard create() throws Exception {
-                return new Shard(shardID, Collections.<String,String>emptyMap(),
-                        newDatastoreContext(), SCHEMA_CONTEXT) {
-                    @Override
-                    protected void onRecoveryComplete() {
-                        try {
-                            super.onRecoveryComplete();
-                        } finally {
-                            recoveryComplete.countDown();
-                        }
-                    }
-                };
-            }
-        };
-        return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
-    }
-
     @Test
     public void testRegisterChangeListener() throws Exception {
         new ShardTestKit(getSystem()) {{
@@ -478,9 +456,14 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testApplySnapshot() throws Exception {
+
+        ShardTestKit testkit = new ShardTestKit(getSystem());
+
         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
+        testkit.waitUntilLeader(shard);
+
         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
         store.setSchemaContext(SCHEMA_CONTEXT);
 
@@ -509,8 +492,12 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testApplyState() throws Exception {
 
+        ShardTestKit testkit = new ShardTestKit(getSystem());
+
         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
 
+        testkit.waitUntilLeader(shard);
+
         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
@@ -527,9 +514,11 @@ public class ShardTest extends AbstractShardTest {
     @Test
     public void testApplyStateWithCandidatePayload() throws Exception {
 
-        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
+        ShardTestKit testkit = new ShardTestKit(getSystem());
+
+        final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
 
-        recoveryComplete.await(5,  TimeUnit.SECONDS);
+        testkit.waitUntilLeader(shard);
 
         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
@@ -2129,11 +2118,11 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testAbortTransaction() throws Throwable {
+    public void testAbortCurrentTransaction() throws Throwable {
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                    "testAbortTransaction");
+                    "testAbortCurrentTransaction");
 
             waitUntilLeader(shard);
 
@@ -2195,6 +2184,78 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testAbortQueuedTransaction() throws Throwable {
+        dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
+        new ShardTestKit(getSystem()) {{
+            final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
+            @SuppressWarnings("serial")
+            final Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
+                            dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
+                        @Override
+                        public void onReceiveCommand(final Object message) throws Exception {
+                            super.onReceiveCommand(message);
+                            if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+                                if(cleaupCheckLatch.get() != null) {
+                                    cleaupCheckLatch.get().countDown();
+                                }
+                            }
+                        }
+                    };
+                }
+            };
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    Props.create(new DelegatingShardCreator(creator)).withDispatcher(
+                            Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
+
+            waitUntilLeader(shard);
+
+            final String transactionID = "tx1";
+
+            final MutableCompositeModification modification = new MutableCompositeModification();
+            final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
+            doReturn(Futures.immediateFuture(null)).when(cohort).abort();
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            // Ready the tx.
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+            // Send the AbortTransaction message.
+
+            shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
+            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+            verify(cohort).abort();
+
+            // Verify the tx cohort is removed from queue at the cleanup check interval.
+
+            cleaupCheckLatch.set(new CountDownLatch(1));
+            assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
+                    cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
+
+            assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
+
+            // Now send CanCommitTransaction - should fail.
+
+            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
+
+            Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
+            assertTrue("Failure type", failure instanceof IllegalStateException);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @Test
     public void testCreateSnapshot() throws Exception {
         testCreateSnapshot(true, "testCreateSnapshot");