Bug 6587: Retain state when transitioning between Leader and IsolatedLeader
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index d6faff694c66123d07576e44916d2d714eb45e2e..4c0aac48346a59e2db49db247cdaa37bbbd2abf0 100644 (file)
@@ -41,16 +41,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
-import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
@@ -58,7 +54,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -706,19 +701,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // Wait for the commit to be replicated to the follower.
 
 
         // Wait for the commit to be replicated to the follower.
 
-        MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() {
-            @Override
-            public void verify(OnDemandRaftState raftState) {
-                assertEquals("getLastApplied", 0, raftState.getLastApplied());
-            }
-        });
+        MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
+                raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
 
 
-        MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() {
-            @Override
-            public void verify(OnDemandRaftState raftState) {
-                assertEquals("getLastApplied", 0, raftState.getLastApplied());
-            }
-        });
+        MemberNode.verifyRaftState(followerDistributedDataStore, "people",
+                raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
 
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.
 
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.
@@ -769,12 +756,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
         readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
 
         cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
         readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
 
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
-            @Override
-            public void verify(ShardStats stats) {
-                assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount());
-            }
-        });
+        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+                stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
 
         // Disable elections on the leader so it switches to follower.
 
 
         // Disable elections on the leader so it switches to follower.
 
@@ -828,24 +811,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
 
             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
 
-            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
-                @Override
-                public void verify(ShardStats stats) {
-                    assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize());
-                }
-            });
+            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+                    stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
 
             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
             writeTx.write(CarsModel.newCarPath("optima"), car);
             DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
 
 
             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
             writeTx.write(CarsModel.newCarPath("optima"), car);
             DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
 
-            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
-                @Override
-                public void verify(ShardStats stats) {
-                    assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize());
-                }
-            });
+            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+                    stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
 
             // Gracefully stop the leader via a Shutdown message.
 
 
             // Gracefully stop the leader via a Shutdown message.
 
@@ -877,29 +852,40 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionWithIsolatedLeader() throws Throwable {
 
     @Test
     public void testTransactionWithIsolatedLeader() throws Throwable {
-        leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);
+        // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
+        leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
         String testName = "testTransactionWithIsolatedLeader";
         initDatastoresWithCars(testName);
 
         String testName = "testTransactionWithIsolatedLeader";
         initDatastoresWithCars(testName);
 
-        DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
-        failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader.
+        DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
 
+        // Tx that is submitted after the leader transitions to IsolatedLeader.
+        DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        // Tx that is submitted after the follower is reinstated.
         DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
         successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
         DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
         successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
+        // Stop the follower
         followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
         followerDistributedDataStore.close();
         followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
 
         followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
         followerDistributedDataStore.close();
         followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
 
-        MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() {
-            @Override
-            public void verify(OnDemandRaftState raftState) {
-                assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState());
-            }
-        });
+        // Submit the preIsolatedLeaderWriteTx so it's pending
+        DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
+
+        // Change the isolated leader check interval low so it changes to IsolatedLeader.
+        sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+                shardIsolatedLeaderCheckIntervalInMillis(200));
+
+        MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
+                raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
 
         try {
 
         try {
-            leaderTestKit.doCommit(failWriteTx.ready());
+            leaderTestKit.doCommit(noShardLeaderWriteTx.ready());
             fail("Expected NoShardLeaderException");
         } catch (ExecutionException e) {
             assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
             fail("Expected NoShardLeaderException");
         } catch (ExecutionException e) {
             assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
@@ -908,12 +894,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
                 shardElectionTimeoutFactor(100));
 
         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
                 shardElectionTimeoutFactor(100));
 
-        DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready();
+        DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
 
         followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
                 MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
 
 
         followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
                 MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
 
-        leaderTestKit.doCommit(writeTxCohort);
+        leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
+        leaderTestKit.doCommit(successTxCohort);
     }
 
     @Test(expected=AskTimeoutException.class)
     }
 
     @Test(expected=AskTimeoutException.class)
@@ -1017,12 +1004,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
     private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
-        Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
-            @Override
-            public DatastoreContext answer(InvocationOnMock invocation) {
-                return newBuilder.build();
-            }
-        };
+        Answer<DatastoreContext> answer = invocation -> newBuilder.build();
         Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
         Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
         dataStore.onDatastoreContextUpdated(mockContextFactory);
         Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
         Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
         dataStore.onDatastoreContextUpdated(mockContextFactory);