Fix intermittent failure in testWriteTransactionWithSingleShard 22/54822/3
authorTom Pantelis <tompantelis@gmail.com>
Wed, 12 Apr 2017 06:20:18 +0000 (02:20 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Wed, 12 Apr 2017 11:55:03 +0000 (11:55 +0000)
DistributedDataStoreRemotingIntegrationTest.testWriteTransactionWithSingleShard
fails intermittently with tell-based protocol in verifyCars after it has
reinstated the follower where it's expecting just car1 but the
data tree contains car 1 and car2. This is b/c the delete transaction for car2
prior to reinstatement wasn't applied on recovery due to the corresponding
ApplyJournalEntries message missing from the persisted journal. The test
expects 2 ApplyJournalEntries messages to be persisted corresponding to the
2 transactions but tell-based persists other payloads as well so there may
be 3 ApplyJournalEntries messages. I changed the code to handle this case.

Also the assertion failure in verifyCars caused it to bypass shutting down
the ActorSystem which resulted in several other failures in tests that try to
use the same port configuration due to the port already in use. So I made
changes to ensure ActorSystems are shutdown properly.

Change-Id: Id6316d71fcd9eb3e768c6b1f676fa0e9be1287a2
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractClusterRefActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java

index f47ff18027468caa790d8fa4a17b9e3303f762fc..567ccadb325203b27e99f1a4233098d9334c0a49 100644 (file)
@@ -26,7 +26,7 @@ public abstract class AbstractClusterRefActorTest extends AbstractTest {
 
     @AfterClass
     public static void tearDownClass() throws IOException {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
         system = null;
     }
 
index fc6445e8f29dee2639d77727636d72844138f837..8152c9e6e2cebfe7e221be31fb0b85585d927489 100644 (file)
@@ -12,13 +12,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.MockitoAnnotations.initMocks;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -43,7 +39,6 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
 
     protected final String shardMrgIDSuffix = "config" + ID_COUNTER++;
     protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
-    protected final Collection<ActorSystem> actorSystems = new ArrayList<>();
     protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
             .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
             .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
@@ -85,10 +80,6 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
 
-        for (final ActorSystem system : actorSystems) {
-            JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
-        }
-
         mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
         mockShardActor = null;
 
index c0df6bd0059cf106ec549c83692ef49b20854e05..790c5c7ea05fab0c18586e72857e3466123c630c 100644 (file)
@@ -7,8 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicLong;
+import org.junit.After;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
@@ -32,6 +38,8 @@ public abstract class AbstractTest {
     private static final AtomicLong HISTORY_COUNTER = new AtomicLong();
     private static final AtomicLong TX_COUNTER = new AtomicLong();
 
+    private final Collection<ActorSystem> actorSystems = new ArrayList<>();
+
     protected static void setUpStatic() {
         HISTORY_COUNTER.set(1L);
         TX_COUNTER.set(1L);
@@ -49,4 +57,17 @@ public abstract class AbstractTest {
             throws Exception {
         return Await.result(FutureConverters.toScala(completionStage), timeout);
     }
+
+    @After
+    public void actorSystemCleanup() {
+        for (final ActorSystem system : actorSystems) {
+            JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
+        }
+    }
+
+    protected ActorSystem newActorSystem(String name, String config) {
+        ActorSystem system = ActorSystem.create(name, ConfigFactory.load().getConfig(config));
+        actorSystems.add(system);
+        return system;
+    }
 }
index a8fc8eeceeee6d013d289fd149e6714652e911c1..20dd17d15c7e70b49d1e99c427151c215a18db4f 100644 (file)
@@ -111,7 +111,7 @@ public class DistributedDataStoreIntegrationTest {
     @Parameter
     public Class<? extends AbstractDataStore> testParameter;
 
-    private static ActorSystem system;
+    private ActorSystem system;
 
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
             .shardHeartbeatIntervalInMillis(100);
@@ -125,7 +125,7 @@ public class DistributedDataStoreIntegrationTest {
 
     @After
     public void tearDown() throws IOException {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
         system = null;
     }
 
index 163005e36dd99cba5650a6c2ae0c9c69a1373e25..4e0619be1780c607d2d62c737d25ebde8857a691 100644 (file)
@@ -26,6 +26,7 @@ import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
@@ -38,8 +39,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -240,7 +243,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars(testName);
 
         final String followerCarShardName = "member-2-shard-cars-" + testName;
-        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class);
 
         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -278,21 +280,46 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // Re-instate the follower member 2 as a single-node to verify replication and recovery.
 
-        InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+        // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
+        // applied and all the log entries from the leader. Since we've verified the car data above we know that
+        // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
+        final AtomicLong leaderLastAppliedIndex = new AtomicLong();
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
+            state -> leaderLastAppliedIndex.set(state.getLastApplied()));
+
+        // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
+        // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
+        // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
+        // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
+        // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
+        // until we find the one that encompasses the leader's lastAppliedIndex.
+        Stopwatch sw = Stopwatch.createStarted();
+        boolean done = false;
+        while (!done) {
+            final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
+                    ApplyJournalEntries.class);
+            for (ApplyJournalEntries aje: entries) {
+                if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
+                    done = true;
+                    break;
+                }
+            }
+
+            assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
+                    + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
-        JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+        JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
 
-        final ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load()
-                .getConfig("Member2"));
+        final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
 
-        try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
-                commitTimeout)
-                .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
+        try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
+                .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
-
-        JavaTestKit.shutdownActorSystem(newSystem);
     }
 
     @Test
index 6a8830c25b2d213bf105decd8e170c306e37d2d9..be996e7f1726e6d69eff15bdf625a39a42b8aa31 100644 (file)
@@ -44,7 +44,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.ConfigFactory;
 import java.net.URI;
 import java.util.AbstractMap;
 import java.util.Arrays;
@@ -139,9 +138,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
     private ActorSystem newActorSystem(String config) {
-        ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
-        actorSystems.add(system);
-        return system;
+        return newActorSystem("cluster-test", config);
     }
 
     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
index 522a7d39f9bb744a94b4af7c374c59f8ef462ef0..d81cedb5a90b65aae91f7ad021f48987d603c003 100644 (file)
@@ -129,8 +129,8 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
             followerOperDatastore.close();
         }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem);
-        JavaTestKit.shutdownActorSystem(followerSystem);
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+        JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
 
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();