Fix intermittent failure in testWriteTransactionWithSingleShard 21/54821/3
authorTom Pantelis <tompantelis@gmail.com>
Wed, 12 Apr 2017 06:20:18 +0000 (02:20 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Fri, 14 Apr 2017 16:02:43 +0000 (16:02 +0000)
DistributedDataStoreRemotingIntegrationTest.testWriteTransactionWithSingleShard
fails intermittently with tell-based protocol in verifyCars after it has
reinstated the follower where it's expecting just car1 but the
data tree contains car 1 and car2. This is b/c the delete transaction for car2
prior to reinstatement wasn't applied on recovery due to the corresponding
ApplyJournalEntries message missing from the persisted journal. The test
expects 2 ApplyJournalEntries messages to be persisted corresponding to the
2 transactions but tell-based persists other payloads as well so there may
be 3 ApplyJournalEntries messages. I changed the code to handle this case.

Also the assertion failure in verifyCars caused it to bypass shutting down
the ActorSystem which resulted in several other failures in tests that try to
use the same port configuration due to the port already in use. So I made
changes to ensure ActorSystems are shutdown properly.

Change-Id: Id6316d71fcd9eb3e768c6b1f676fa0e9be1287a2
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractClusterRefActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java

index f47ff18027468caa790d8fa4a17b9e3303f762fc..567ccadb325203b27e99f1a4233098d9334c0a49 100644 (file)
@@ -26,7 +26,7 @@ public abstract class AbstractClusterRefActorTest extends AbstractTest {
 
     @AfterClass
     public static void tearDownClass() throws IOException {
 
     @AfterClass
     public static void tearDownClass() throws IOException {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
         system = null;
     }
 
         system = null;
     }
 
index fc6445e8f29dee2639d77727636d72844138f837..8152c9e6e2cebfe7e221be31fb0b85585d927489 100644 (file)
@@ -12,13 +12,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.MockitoAnnotations.initMocks;
 
 import akka.actor.ActorRef;
 import static org.mockito.MockitoAnnotations.initMocks;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.testkit.TestActorRef;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
@@ -43,7 +39,6 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
 
     protected final String shardMrgIDSuffix = "config" + ID_COUNTER++;
     protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
 
     protected final String shardMrgIDSuffix = "config" + ID_COUNTER++;
     protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
-    protected final Collection<ActorSystem> actorSystems = new ArrayList<>();
     protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
             .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
             .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
     protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
             .dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
             .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
@@ -85,10 +80,6 @@ public class AbstractShardManagerTest extends AbstractClusterRefActorTest {
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
 
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
 
-        for (final ActorSystem system : actorSystems) {
-            JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
-        }
-
         mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
         mockShardActor = null;
 
         mockShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
         mockShardActor = null;
 
index c0df6bd0059cf106ec549c83692ef49b20854e05..790c5c7ea05fab0c18586e72857e3466123c630c 100644 (file)
@@ -7,8 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicLong;
+import org.junit.After;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
@@ -32,6 +38,8 @@ public abstract class AbstractTest {
     private static final AtomicLong HISTORY_COUNTER = new AtomicLong();
     private static final AtomicLong TX_COUNTER = new AtomicLong();
 
     private static final AtomicLong HISTORY_COUNTER = new AtomicLong();
     private static final AtomicLong TX_COUNTER = new AtomicLong();
 
+    private final Collection<ActorSystem> actorSystems = new ArrayList<>();
+
     protected static void setUpStatic() {
         HISTORY_COUNTER.set(1L);
         TX_COUNTER.set(1L);
     protected static void setUpStatic() {
         HISTORY_COUNTER.set(1L);
         TX_COUNTER.set(1L);
@@ -49,4 +57,17 @@ public abstract class AbstractTest {
             throws Exception {
         return Await.result(FutureConverters.toScala(completionStage), timeout);
     }
             throws Exception {
         return Await.result(FutureConverters.toScala(completionStage), timeout);
     }
+
+    @After
+    public void actorSystemCleanup() {
+        for (final ActorSystem system : actorSystems) {
+            JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
+        }
+    }
+
+    protected ActorSystem newActorSystem(String name, String config) {
+        ActorSystem system = ActorSystem.create(name, ConfigFactory.load().getConfig(config));
+        actorSystems.add(system);
+        return system;
+    }
 }
 }
index a8fc8eeceeee6d013d289fd149e6714652e911c1..20dd17d15c7e70b49d1e99c427151c215a18db4f 100644 (file)
@@ -111,7 +111,7 @@ public class DistributedDataStoreIntegrationTest {
     @Parameter
     public Class<? extends AbstractDataStore> testParameter;
 
     @Parameter
     public Class<? extends AbstractDataStore> testParameter;
 
-    private static ActorSystem system;
+    private ActorSystem system;
 
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
             .shardHeartbeatIntervalInMillis(100);
 
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
             .shardHeartbeatIntervalInMillis(100);
@@ -125,7 +125,7 @@ public class DistributedDataStoreIntegrationTest {
 
     @After
     public void tearDown() throws IOException {
 
     @After
     public void tearDown() throws IOException {
-        JavaTestKit.shutdownActorSystem(system);
+        JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE);
         system = null;
     }
 
         system = null;
     }
 
index da8c7cb725aad6d4b929d7c300261cff260a4920..3298fa2c05c2464342e1254cbc0ff9b4dd3c043b 100644 (file)
@@ -27,6 +27,7 @@ import akka.pattern.AskTimeoutException;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
@@ -39,8 +40,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -239,7 +242,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars(testName);
 
         final String followerCarShardName = "member-2-shard-cars-" + testName;
         initDatastoresWithCars(testName);
 
         final String followerCarShardName = "member-2-shard-cars-" + testName;
-        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class);
 
         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
 
         DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
         assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@@ -277,20 +279,46 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // Re-instate the follower member 2 as a single-node to verify replication and recovery.
 
 
         // Re-instate the follower member 2 as a single-node to verify replication and recovery.
 
-        InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+        // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
+        // applied and all the log entries from the leader. Since we've verified the car data above we know that
+        // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
+        final AtomicLong leaderLastAppliedIndex = new AtomicLong();
+        IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
+            state -> leaderLastAppliedIndex.set(state.getLastApplied()));
+
+        // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
+        // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
+        // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
+        // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
+        // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
+        // until we find the one that encompasses the leader's lastAppliedIndex.
+        Stopwatch sw = Stopwatch.createStarted();
+        boolean done = false;
+        while (!done) {
+            final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
+                    ApplyJournalEntries.class);
+            for (ApplyJournalEntries aje: entries) {
+                if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
+                    done = true;
+                    break;
+                }
+            }
+
+            assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
+                    + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
 
 
-        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
-        JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+        JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
 
 
-        final ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load()
-                .getConfig("Member2"));
+        final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
 
         try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
 
         try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
-                .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
+                .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
-
-        JavaTestKit.shutdownActorSystem(newSystem);
     }
 
     @Test
     }
 
     @Test
index 6a8830c25b2d213bf105decd8e170c306e37d2d9..be996e7f1726e6d69eff15bdf625a39a42b8aa31 100644 (file)
@@ -44,7 +44,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.ConfigFactory;
 import java.net.URI;
 import java.util.AbstractMap;
 import java.util.Arrays;
 import java.net.URI;
 import java.util.AbstractMap;
 import java.util.Arrays;
@@ -139,9 +138,7 @@ public class ShardManagerTest extends AbstractShardManagerTest {
     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
     private ActorSystem newActorSystem(String config) {
     private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
     private ActorSystem newActorSystem(String config) {
-        ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config));
-        actorSystems.add(system);
-        return system;
+        return newActorSystem("cluster-test", config);
     }
 
     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
     }
 
     private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
index 522a7d39f9bb744a94b4af7c374c59f8ef462ef0..d81cedb5a90b65aae91f7ad021f48987d603c003 100644 (file)
@@ -129,8 +129,8 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
             followerOperDatastore.close();
         }
 
             followerOperDatastore.close();
         }
 
-        JavaTestKit.shutdownActorSystem(leaderSystem);
-        JavaTestKit.shutdownActorSystem(followerSystem);
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+        JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
 
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
 
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();