Fix intermittent failure in testWriteTransactionWithSingleShard 21/54821/3
authorTom Pantelis <tompantelis@gmail.com>
Wed, 12 Apr 2017 06:20:18 +0000 (02:20 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Fri, 14 Apr 2017 16:02:43 +0000 (16:02 +0000)
DistributedDataStoreRemotingIntegrationTest.testWriteTransactionWithSingleShard
fails intermittently with tell-based protocol in verifyCars after it has
reinstated the follower where it's expecting just car1 but the
data tree contains car 1 and car2. This is b/c the delete transaction for car2
prior to reinstatement wasn't applied on recovery due to the corresponding
ApplyJournalEntries message missing from the persisted journal. The test
expects 2 ApplyJournalEntries messages to be persisted corresponding to the
2 transactions but tell-based persists other payloads as well so there may
be 3 ApplyJournalEntries messages. I changed the code to handle this case.

Also the assertion failure in verifyCars caused it to bypass shutting down
the ActorSystem which resulted in several other failures in tests that try to
use the same port configuration due to the port already in use. So I made
changes to ensure ActorSystems are shutdown properly.

Change-Id: Id6316d71fcd9eb3e768c6b1f676fa0e9be1287a2
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractClusterRefActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java

index f47ff18..567ccad 100644 (file)
@@ -26,7 +26,7 @@ public abstract class AbstractClusterRefActorTest extends AbstractTest {
 
     @AfterClass
     public static void tearDownClass() throws IOException {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
         system = null;
     }
 
index fc6445e..8152c9e 100644 (file)
@@ -12,13 +12,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.MockitoAnnotations.initMocks;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -43,7 +39,6 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
 
     protected final String shardMrgIDSuffix = "config" + ID_COUNTER++;
     protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
-    protected final Collection<ActorSystem> actorSystems = new ArrayList<>();
     protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
             .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
             .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
@@ -85,10 +80,6 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
 
-        for (final ActorSystem system : actorSystems) {
-            JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
-        }
-
         mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
         mockShardActor = null;
 
index c0df6bd..790c5c7 100644 (file)
@@ -7,8 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicLong;
+import org.junit.After;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
@@ -32,6 +38,8 @@ public abstract class AbstractTest {
     private static final AtomicLong HISTORY_COUNTER = new AtomicLong();
     private static final AtomicLong TX_COUNTER = new AtomicLong();
 
+    private final Collection<ActorSystem> actorSystems = new ArrayList<>();
+
     protected static void setUpStatic() {
         HISTORY_COUNTER.set(1L);
         TX_COUNTER.set(1L);
@@ -49,4 +57,17 @@ public abstract class AbstractTest {
             throws Exception {
         return Await.result(FutureConverters.toScala(completionStage), timeout);
     }
+
+    @After
+    public void actorSystemCleanup() {
+        for (final ActorSystem system : actorSystems) {
+            JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
+        }
+    }
+
+    protected ActorSystem newActorSystem(String name, String config) {
+        ActorSystem system = ActorSystem.create(name, ConfigFactory.load().getConfig(config));
+        actorSystems.add(system);
+        return system;
+    }
 }
index a8fc8ee..20dd17d 100644 (file)
@@ -111,7 +111,7 @@ public class DistributedDataStoreIntegrationTest {
     @Parameter
     public Class<? extends AbstractDataStore> testParameter;
 
-    private static ActorSystem system;
+    private ActorSystem system;
 
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
             .shardHeartbeatIntervalInMillis(100);
@@ -125,7 +125,7 @@ public class DistributedDataStoreIntegrationTest {
 
     @After
     public void tearDown() throws IOException {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
         system = null;
     }
 
index da8c7cb..3298fa2 100644 (file)
@@ -27,6 +27,7 @@ import akka.pattern.AskTimeoutException;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
@@ -39,8 +40,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -239,7 +242,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars(testName);
 
         final String followerCarShardName = "member-2-shard-cars-" + testName;
-        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class);
 
         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -277,20 +279,46 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // Re-instate the follower member 2 as a single-node to verify replication and recovery.
 
-        InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+        // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
+        // applied and all the log entries from the leader. Since we've verified the car data above we know that
+        // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
+        final AtomicLong leaderLastAppliedIndex = new AtomicLong();
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
+            state -> leaderLastAppliedIndex.set(state.getLastApplied()));
+
+        // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
+        // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
+        // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
+        // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
+        // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
+        // until we find the one that encompasses the leader's lastAppliedIndex.
+        Stopwatch sw = Stopwatch.createStarted();
+        boolean done = false;
+        while (!done) {
+            final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
+                    ApplyJournalEntries.class);
+            for (ApplyJournalEntries aje: entries) {
+                if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
+                    done = true;
+                    break;
+                }
+            }
+
+            assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
+                    + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
-        JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+        JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
 
-        final ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load()
-                .getConfig("Member2"));
+        final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
 
         try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
-                .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
+                .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
-
-        JavaTestKit.shutdownActorSystem(newSystem);
     }
 
     @Test
index 6a8830c..be996e7 100644 (file)
@@ -44,7 +44,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.ConfigFactory;
 import java.net.URI;
 import java.util.AbstractMap;
 import java.util.Arrays;
@@ -139,9 +138,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
     private ActorSystem newActorSystem(String config) {
-        ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
-        actorSystems.add(system);
-        return system;
+        return newActorSystem("cluster-test", config);
     }
 
     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
index 522a7d3..d81cedb 100644 (file)
@@ -129,8 +129,8 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
             followerOperDatastore.close();
         }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem);
-        JavaTestKit.shutdownActorSystem(followerSystem);
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+        JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
 
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.