Forward pending transactions on leadership change 99/35199/9
authorTom Pantelis <tpanteli@brocade.com>
Mon, 22 Feb 2016 01:41:16 +0000 (20:41 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 24 Mar 2016 19:39:32 +0000 (19:39 +0000)
If the current leader is deposed for some reason and it has pending
tx's, on leader change, the tx's will now be converted to
BatchedModifications messages and forwarded to the new leader in the
ready state so the new leader can complete the commits. Previously
they were aborted and failed.

In addition, if there's a current transaction that had already been
canCommitted and possibly preCommitted, CanCommitTransaction and
CommitTransaction messages will also be sent to the new leader appropriately.

Change-Id: I9bbd68856586c15464f4ca0e844f040c6bb5f30a
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 4b471cfa4ad9304fb604221e641f549c77287b55..9c2e91d7bb4821f4fb5b56ca0299ab8aa1a4b7c5 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,4 +67,9 @@ final class ChainedCommitCohort extends ShardDataTreeCohort {
     DataTreeCandidateTip getCandidate() {
         return delegate.getCandidate();
     }
+
+    @Override
+    DataTreeModification getDataTreeModification() {
+        return delegate.getDataTreeModification();
+    }
 }
\ No newline at end of file
index 72b31d5598d80aaf7f7961e33c562d72dbbb7bc1..fbf3c0bd9a0d751ade49c99e996f8b524192dd48 100644 (file)
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
@@ -327,8 +328,19 @@ public class Shard extends RaftActor {
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
-        if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
-            shardMBean.incrementFailedTransactionsCount();
+        if (isLeader()) {
+            if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+                shardMBean.incrementFailedTransactionsCount();
+            }
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader == null) {
+                messageRetrySupport.addMessageToRetry(commit, getSender(),
+                        "Could not commit transaction " + commit.getTransactionID());
+            } else {
+                LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
+                leader.forward(commit, getContext());
+            }
         }
     }
 
@@ -336,7 +348,27 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
 
         try {
-            cohortEntry.commit();
+            try {
+                cohortEntry.commit();
+            } catch(ExecutionException e) {
+                // We may get a "store tree and candidate base differ" IllegalStateException from commit under
+                // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
+                // resort. Eg, we're a follower and a tx payload is replicated but the leader goes down before
+                // applying it to the state. We then become the leader and a second tx is pre-committed and
+                // replicated. When consensus occurs, this will cause the first tx to be applied as a foreign
+                // candidate via applyState prior to the second tx. Since the second tx has already been
+                // pre-committed, when it gets here to commit it will get an IllegalStateException.
+
+                // FIXME - this is not an ideal way to handle this scenario. This is temporary - a cleaner
+                // solution will be forthcoming.
+                if(e.getCause() instanceof IllegalStateException) {
+                    LOG.debug("{}: commit failed for transaction {} - retrying as foreign candidate", persistenceId(),
+                            transactionID, e);
+                    store.applyForeignCandidate(transactionID, cohortEntry.getCandidate());
+                } else {
+                    throw e;
+                }
+            }
 
             sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), getSelf());
 
@@ -393,7 +425,19 @@ public class Shard extends RaftActor {
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
-        commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+
+        if (isLeader()) {
+            commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader == null) {
+                messageRetrySupport.addMessageToRetry(canCommit, getSender(),
+                        "Could not canCommit transaction " + canCommit.getTransactionID());
+            } else {
+                LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
+                leader.forward(canCommit, getContext());
+            }
+        }
     }
 
     protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
@@ -689,9 +733,6 @@ public class Shard extends RaftActor {
             }
 
             store.closeAllTransactionChains();
-
-            commitCoordinator.abortPendingTransactions(
-                    "The transacton was aborted due to inflight leadership change.", this);
         }
 
         if(hasLeader && !isIsolatedLeader()) {
@@ -703,7 +744,31 @@ public class Shard extends RaftActor {
     protected void onLeaderChanged(String oldLeader, String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
 
-        if(hasLeader() && !isIsolatedLeader()) {
+        boolean hasLeader = hasLeader();
+        if(hasLeader && !isLeader()) {
+            // Another leader was elected. If we were the previous leader and had pending transactions, convert
+            // them to transaction messages and send to the new leader.
+            ActorSelection leader = getLeader();
+            if(leader != null) {
+                Collection<Object> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
+                        datastoreContext.getShardBatchedModificationCount());
+
+                if(!messagesToForward.isEmpty()) {
+                    LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
+                            messagesToForward.size(), leader);
+
+                    for(Object message: messagesToForward) {
+                        leader.tell(message, self());
+                    }
+                }
+            } else {
+                commitCoordinator.abortPendingTransactions(
+                        "The transacton was aborted due to inflight leadership change and the leader address isn't available.",
+                        this);
+            }
+        }
+
+        if(hasLeader && !isIsolatedLeader()) {
             messageRetrySupport.retryMessages();
         }
     }
index 76131e257b893a1671f74f01b1e508e2689ca731..739321b06876ca6331bd785acb6074944ac86daf 100644 (file)
@@ -24,10 +24,13 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
@@ -35,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.slf4j.Logger;
 
 /**
@@ -305,8 +309,9 @@ class ShardCommitCoordinator {
             doCanCommit(currentCohortEntry);
         } else {
             if(log.isDebugEnabled()) {
-                log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
-                        name, queuedCohortEntries.peek().getTransactionID(), transactionID);
+                log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
+                        queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
+                                transactionID);
             }
         }
     }
@@ -353,7 +358,6 @@ class ShardCommitCoordinator {
                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
                 }
             } else {
-                // FIXME - use caller's version
                 cohortEntry.getReplySender().tell(
                         canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
                             CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
@@ -486,21 +490,80 @@ class ShardCommitCoordinator {
             return;
         }
 
-        List<CohortEntry> cohortEntries = new ArrayList<>();
+        List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+
+        log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
+
+        for(CohortEntry cohortEntry: cohortEntries) {
+            if(cohortEntry.getReplySender() != null) {
+                cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+            }
+        }
+    }
 
+    private List<CohortEntry> getAndClearPendingCohortEntries() {
+        List<CohortEntry> cohortEntries = new ArrayList<>();
         if(currentCohortEntry != null) {
             cohortEntries.add(currentCohortEntry);
+            cohortCache.remove(currentCohortEntry.getTransactionID());
             currentCohortEntry = null;
         }
 
-        cohortEntries.addAll(queuedCohortEntries);
+        for(CohortEntry cohortEntry: queuedCohortEntries) {
+            cohortEntries.add(cohortEntry);
+            cohortCache.remove(cohortEntry.getTransactionID());
+        }
+
         queuedCohortEntries.clear();
+        return cohortEntries;
+    }
 
+    Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
+        if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        Collection<Object> messages = new ArrayList<>();
+        List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
         for(CohortEntry cohortEntry: cohortEntries) {
-            if(cohortEntry.getReplySender() != null) {
-                cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+            if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
+                continue;
+            }
+
+            final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+            cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
+                @Override
+                protected BatchedModifications getModifications() {
+                    if(newModifications.isEmpty() ||
+                            newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+                                cohortEntry.getClientVersion(), ""));
+                    }
+
+                    return newModifications.getLast();
+                }
+            });
+
+            if(!newModifications.isEmpty()) {
+                BatchedModifications last = newModifications.getLast();
+                last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
+                last.setReady(true);
+                last.setTotalMessagesSent(newModifications.size());
+                messages.addAll(newModifications);
+
+                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.CAN_COMMITTED) {
+                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+                            cohortEntry.getClientVersion()));
+                }
+
+                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.PRE_COMMITTED) {
+                    messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+                            cohortEntry.getClientVersion()));
+                }
             }
         }
+
+        return messages;
     }
 
     /**
@@ -612,6 +675,14 @@ class ShardCommitCoordinator {
     }
 
     static class CohortEntry {
+        enum State {
+            PENDING,
+            CAN_COMMITTED,
+            PRE_COMMITTED,
+            COMMITTED,
+            ABORTED
+        }
+
         private final String transactionID;
         private ShardDataTreeCohort cohort;
         private final ReadWriteShardDataTreeTransaction transaction;
@@ -621,7 +692,7 @@ class ShardCommitCoordinator {
         private boolean doImmediateCommit;
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
         private int totalBatchedModificationsReceived;
-        private boolean aborted;
+        private State state = State.PENDING;
         private final short clientVersion;
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
@@ -650,10 +721,18 @@ class ShardCommitCoordinator {
             return clientVersion;
         }
 
+        State getState() {
+            return state;
+        }
+
         DataTreeCandidate getCandidate() {
             return cohort.getCandidate();
         }
 
+        DataTreeModification getDataTreeModification() {
+            return cohort.getDataTreeModification();
+        }
+
         ReadWriteShardDataTreeTransaction getTransaction() {
             return transaction;
         }
@@ -681,6 +760,8 @@ class ShardCommitCoordinator {
         }
 
         boolean canCommit() throws InterruptedException, ExecutionException {
+            state = State.CAN_COMMITTED;
+
             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
             // about possibly accessing our state on a different thread outside of our dispatcher.
             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
@@ -690,15 +771,17 @@ class ShardCommitCoordinator {
         }
 
         void preCommit() throws InterruptedException, ExecutionException {
+            state = State.PRE_COMMITTED;
             cohort.preCommit().get();
         }
 
         void commit() throws InterruptedException, ExecutionException {
+            state = State.COMMITTED;
             cohort.commit().get();
         }
 
         void abort() throws InterruptedException, ExecutionException {
-            aborted = true;
+            state = State.ABORTED;
             cohort.abort().get();
         }
 
@@ -749,7 +832,7 @@ class ShardCommitCoordinator {
 
 
         boolean isAborted() {
-            return aborted;
+            return state == State.ABORTED;
         }
 
         @Override
index 213e36a570ce1c445e11f8d6e175b37abf9a7388..f7cdd4e8dc3e8c13730e668627e03782703edffc 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 public abstract class ShardDataTreeCohort {
     ShardDataTreeCohort() {
@@ -17,6 +18,7 @@ public abstract class ShardDataTreeCohort {
     }
 
     abstract DataTreeCandidateTip getCandidate();
+    abstract DataTreeModification getDataTreeModification();
 
     @VisibleForTesting
     public abstract ListenableFuture<Boolean> canCommit();
index 806329aa5bcce31494034ba9e3232aef440318c3..5dda3612a14150c1397664d2ef94918795a09ea0 100644 (file)
@@ -43,7 +43,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
 
     @Override
     public ListenableFuture<Boolean> canCommit() {
-        DataTreeModification modification = dataTreeModification();
+        DataTreeModification modification = getDataTreeModification();
         try {
             dataTree.getDataTree().validate(modification);
             LOG.trace("Transaction {} validated", transaction);
@@ -69,7 +69,7 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
     @Override
     public ListenableFuture<Void> preCommit() {
         try {
-            candidate = dataTree.getDataTree().prepare(dataTreeModification());
+            candidate = dataTree.getDataTree().prepare(getDataTreeModification());
             /*
              * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
              *        persist on the candidate (which gives us a Future).
@@ -86,7 +86,8 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         }
     }
 
-    private DataTreeModification dataTreeModification() {
+    @Override
+    DataTreeModification getDataTreeModification() {
         DataTreeModification dataTreeModification = transaction;
         if(transaction instanceof PruningDataTreeModification){
             dataTreeModification = ((PruningDataTreeModification) transaction).getResultingModification();
index 037a1dec5b076dc9a073ce2d796d4ebf078a09e5..9c1103e60b2e4b8a641bdde277ae1e7aaf19f583 100644 (file)
@@ -28,6 +28,7 @@ import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
@@ -679,14 +680,14 @@ public class DistributedDataStoreRemotingIntegrationTest {
     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
         followerDatastoreContextBuilder.shardBatchedModificationCount(2);
         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
-        initDatastoresWithCars("testTransactionForwardedToLeaderAfterRetry");
+        initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
 
         // Do an initial write to get the primary shard info cached.
 
-        DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
-        writeTx1.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-        writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-        followerTestKit.doCommit(writeTx1.ready());
+        DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        initialWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+        followerTestKit.doCommit(initialWriteTx.ready());
 
         // Wait for the commit to be replicated to the follower.
 
@@ -697,21 +698,51 @@ public class DistributedDataStoreRemotingIntegrationTest {
             }
         });
 
-        // Create and prepare wo and rw tx's.
+        // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
+        // the leader shard.
+
+        DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+        writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        writeTx1.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+        DOMStoreThreePhaseCommitCohort writeTx1Cohort = writeTx1.ready();
+        ListenableFuture<Boolean> writeTx1CanCommit = writeTx1Cohort.canCommit();
+        writeTx1CanCommit.get(5, TimeUnit.SECONDS);
+
+        // Prepare and ready another WO tx that writes to 2 shards but don't canCommit yet. This will be queued
+        // in the leader shard.
 
-        writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
+        DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
         LinkedList<MapEntryNode> cars = new LinkedList<>();
         int carIndex = 1;
-        for(; carIndex <= 5; carIndex++) {
+        cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+        writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+        carIndex++;
+        NormalizedNode<?, ?> people = PeopleModel.newPersonMapNode();
+        writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
+        DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
+
+        // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
+        // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the
+        // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be
+        // sent on ready.
+
+        DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
+        for(int i = 1; i <= 5; i++, carIndex++) {
             cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
-            writeTx1.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+            writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         }
 
-        DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
+        // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications
+        // message on ready.
+
+        DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
         cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
-        writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+        writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
         carIndex++;
 
+        // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the
+        // leader shard on ready.
+
         DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
         cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
         readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
@@ -731,21 +762,28 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
 
-        // Submit tx's and enable elections on the follower so it becomes the leader, at which point the
-        // readied tx's should get forwarded from the previous leader.
+        // Submit all tx's - the messages should get queued for retry.
+
+        ListenableFuture<Boolean> writeTx2CanCommit = writeTx2Cohort.canCommit();
+        DOMStoreThreePhaseCommitCohort writeTx3Cohort = writeTx3.ready();
+        DOMStoreThreePhaseCommitCohort writeTx4Cohort = writeTx4.ready();
+        DOMStoreThreePhaseCommitCohort rwTxCohort = readWriteTx.ready();
 
-        DOMStoreThreePhaseCommitCohort cohort1 = writeTx1.ready();
-        DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
-        DOMStoreThreePhaseCommitCohort cohort3 = readWriteTx.ready();
+        // Enable elections on the other follower so it becomes the leader, at which point the
+        // tx's should get forwarded from the previous leader to the new leader to complete the commits.
 
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
                 customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
 
-        followerTestKit.doCommit(cohort1);
-        followerTestKit.doCommit(cohort2);
-        followerTestKit.doCommit(cohort3);
+        followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
+        followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
+        followerTestKit.doCommit(writeTx3Cohort);
+        followerTestKit.doCommit(writeTx4Cohort);
+        followerTestKit.doCommit(rwTxCohort);
 
-        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), cars.toArray(new MapEntryNode[cars.size()]));
+        DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
+        verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
+        verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
     }
 
     @Test
index f639c8db49c659921f1f472284eb8812ad95b485..407cc609f6e2a263f8de61fa4ba8dae149f8c14a 100644 (file)
@@ -1293,7 +1293,7 @@ public class ShardTest extends AbstractShardTest {
             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
-            doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+            doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
 
             final String transactionID2 = "tx2";