Bug 5450: Query akka cluster state on Follower ElectionTimeout
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index d3932f50c63135979e2f1b53dddec79d703fb310..2f7c790269f0f9702157b6468234154e9e1cb43d 100644 (file)
@@ -43,6 +43,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
@@ -99,7 +100,7 @@ import scala.concurrent.duration.FiniteDuration;
  *
  * @author Thomas Pantelis
  */
-public class DistributedDataStoreRemotingIntegrationTest {
+public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
     private static final String[] CARS = {"cars"};
@@ -121,6 +122,8 @@ public class DistributedDataStoreRemotingIntegrationTest {
     private final DatastoreContext.Builder followerDatastoreContextBuilder =
             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+    private final TransactionIdentifier tx1 = nextTransactionId();
+    private final TransactionIdentifier tx2 = nextTransactionId();
 
     private DistributedDataStore followerDistributedDataStore;
     private DistributedDataStore leaderDistributedDataStore;
@@ -524,6 +527,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
                 shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
 
         followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
 
@@ -575,7 +579,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
         modification.ready();
 
-        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction("tx-1" , modification, true);
+        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true);
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -594,7 +598,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
         modification.ready();
 
-        readyLocal = new ReadyLocalTransaction("tx-2" , modification, false);
+        readyLocal = new ReadyLocalTransaction(tx2 , modification, false);
 
         carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
@@ -611,7 +615,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
                 leaderDistributedDataStore.getActorContext(), Arrays.asList(
-                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
+                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
@@ -640,9 +644,9 @@ public class DistributedDataStoreRemotingIntegrationTest {
         MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
         new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
 
-        ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction("tx-1",
+        ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-1", modification), true);
+                        Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true);
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         Object resp = followerTestKit.expectMsgClass(Object.class);
@@ -660,9 +664,9 @@ public class DistributedDataStoreRemotingIntegrationTest {
         MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
         new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
 
-        forwardedReady = new ForwardedReadyTransaction("tx-2",
+        forwardedReady = new ForwardedReadyTransaction(tx2,
                 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
-                        Mockito.mock(ShardDataTreeTransactionParent.class), "tx-2", modification), false);
+                        Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false);
 
         carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
         resp = followerTestKit.expectMsgClass(Object.class);
@@ -679,7 +683,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
         ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
                 leaderDistributedDataStore.getActorContext(), Arrays.asList(
-                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), "tx-2");
+                        new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
         cohort.canCommit().get(5, TimeUnit.SECONDS);
         cohort.preCommit().get(5, TimeUnit.SECONDS);
         cohort.commit().get(5, TimeUnit.SECONDS);
@@ -709,6 +713,13 @@ public class DistributedDataStoreRemotingIntegrationTest {
             }
         });
 
+        MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertEquals("getLastApplied", 0, raftState.getLastApplied());
+            }
+        });
+
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.
 
@@ -771,6 +782,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
                 shardElectionTimeoutFactor(10));
 
+        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
         leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
 
         // Submit all tx's - the messages should get queued for retry.
@@ -947,6 +959,8 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
+        Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
         sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
@@ -987,6 +1001,8 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
             JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
 
+            Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
+
             sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder.
                 operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));