Fix intermittent unit test failures
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index f7a5630336a7f69e5520e52fd2bd9cea094b8fa2..6a4a6315c7d4dfdf3789316fbef32fdb3d07cc87 100644 (file)
@@ -23,8 +23,10 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.dispatch.Futures;
 import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -54,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
@@ -84,6 +87,9 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * End-to-end distributed data store tests that exercise remote shards and transactions.
@@ -534,6 +540,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
@@ -563,7 +570,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
         }
 
-        assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+        assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
 
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
 
@@ -587,8 +594,11 @@ public class DistributedDataStoreRemotingIntegrationTest {
         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
+        Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
+        Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
-                leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+                leaderDistributedDataStore.getActorContext(), Arrays.asList(
+                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
@@ -596,6 +606,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
@@ -618,7 +629,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1",
                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true, true);
+                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true);
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -626,7 +637,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
             throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
         }
 
-        assertEquals("Response type", CommitTransactionReply.SERIALIZABLE_CLASS, resp.getClass());
+        assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
 
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
 
@@ -638,7 +649,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         forwardedReady = new ForwardedReadyTransaction("tx-2",
                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), true, false);
+                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), false);
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
@@ -651,8 +662,11 @@ public class DistributedDataStoreRemotingIntegrationTest {
         ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
                 ((ReadyTransactionReply)resp).getCohortPath());
 
+        Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
+        Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
-                leaderDistributedDataStore.getActorContext(), Arrays.asList(Futures.successful(txActor)), "tx-2");
+                leaderDistributedDataStore.getActorContext(), Arrays.asList(
+                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
@@ -720,6 +734,71 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
     }
 
+    @Test
+    public void testLeadershipTransferOnShutdown() throws Exception {
+        leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
+        followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
+        String testName = "testLeadershipTransferOnShutdown";
+        initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
+
+        IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
+        DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
+                MODULE_SHARDS_CARS_PEOPLE_1_2_3, false);
+
+        // Create and submit a couple tx's so they're pending.
+
+        DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+        DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
+
+        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+            @Override
+            public void verify(ShardStats stats) {
+                assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize());
+            }
+        });
+
+        writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        writeTx.write(CarsModel.newCarPath("optima"), car);
+        DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
+
+        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+            @Override
+            public void verify(ShardStats stats) {
+                assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize());
+            }
+        });
+
+        // Gracefully stop the leader via a Shutdown message.
+
+        sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+                shardElectionTimeoutFactor(100));
+
+        FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+        Future<ActorRef> future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars");
+        ActorRef leaderActor = Await.result(future, duration);
+
+        Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
+
+        // Commit the 2 transactions. They should finish and succeed.
+
+        followerTestKit.doCommit(cohort1);
+        followerTestKit.doCommit(cohort2);
+
+        // Wait for the leader actor stopped.
+
+        Boolean stopped = Await.result(stopFuture, duration);
+        assertEquals("Stopped", Boolean.TRUE, stopped);
+
+        // Verify leadership was transferred by reading the committed data from the other nodes.
+
+        verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car);
+        verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car);
+    }
+
     @Test
     public void testTransactionWithIsolatedLeader() throws Throwable {
         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);