Fix intermittent failure in testWriteTransactionWithSingleShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index da8c7cb725aad6d4b929d7c300261cff260a4920..3298fa2c05c2464342e1254cbc0ff9b4dd3c043b 100644 (file)
@@ -27,6 +27,7 @@ import akka.pattern.AskTimeoutException;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
@@ -39,8 +40,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -239,7 +242,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars(testName);
 
         final String followerCarShardName = "member-2-shard-cars-" + testName;
-        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class);
 
         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -277,20 +279,46 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // Re-instate the follower member 2 as a single-node to verify replication and recovery.
 
-        InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+        // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
+        // applied and all the log entries from the leader. Since we've verified the car data above we know that
+        // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
+        final AtomicLong leaderLastAppliedIndex = new AtomicLong();
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
+            state -> leaderLastAppliedIndex.set(state.getLastApplied()));
+
+        // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
+        // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
+        // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
+        // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
+        // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
+        // until we find the one that encompasses the leader's lastAppliedIndex.
+        Stopwatch sw = Stopwatch.createStarted();
+        boolean done = false;
+        while (!done) {
+            final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
+                    ApplyJournalEntries.class);
+            for (ApplyJournalEntries aje: entries) {
+                if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
+                    done = true;
+                    break;
+                }
+            }
+
+            assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
+                    + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
-        JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+        JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
 
-        final ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load()
-                .getConfig("Member2"));
+        final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
 
         try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
-                .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
+                .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
-
-        JavaTestKit.shutdownActorSystem(newSystem);
     }
 
     @Test