Bug 6540: Fix journal issues on leader changes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
index 03387abcd5cca234649b9f58796a86992e9b8b00..c6f5d72529e3e29d38964ede40faa57aa497c6a9 100644 (file)
@@ -8,24 +8,34 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -304,7 +314,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
 
-        testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 1 more payload to trigger second snapshot");
+        testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: sending 1 more payload to trigger second snapshot");
+
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
 
         // Send another payload to trigger a second leader snapshot.
         MockPayload payload7 = sendPayloadData(leaderActor, "seven");
@@ -331,6 +345,25 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         expSnapshotState.add(payload5);
         expSnapshotState.add(payload6);
 
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+        // Send a server config change to test that the install snapshot includes the server config.
+
+        ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(leaderId, true),
+                new ServerInfo(follower1Id, false),
+                new ServerInfo(follower2Id, false)));
+        leaderContext.updatePeerIds(serverConfig);
+        ((AbstractLeader)leader).updateMinReplicaCount();
+        leaderActor.tell(serverConfig, ActorRef.noSender());
+
+        applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+        verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
+
+        applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
+        verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
+
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
@@ -341,7 +374,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         expSnapshotState.add(payload7);
 
-        verifyInstallSnapshotToLaggingFollower(7);
+        verifyInstallSnapshotToLaggingFollower(8, serverConfig);
 
         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
     }
@@ -388,6 +421,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         expSnapshotState.add(payload1);
 
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
+
         // Send another payload with a large enough relative size in combination with the last payload
         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
         MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
@@ -419,7 +456,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         expSnapshotState.add(payload2);
 
-        verifyInstallSnapshotToLaggingFollower(2L);
+        verifyInstallSnapshotToLaggingFollower(2L, null);
 
         // Sends a payload with index 3.
         verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
@@ -493,7 +530,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
      *
      * @throws Exception
      */
-    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex) throws Exception {
+    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
+            @Nullable ServerConfigurationPayload expServerConfig) throws Exception {
         List<Snapshot> persistedSnapshots;
         List<ReplicatedLogEntry> unAppliedEntry;
         ApplySnapshot applySnapshot;
@@ -561,6 +599,21 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // the log. In addition replicatedToAllIndex should've advanced.
         verifyLeadersTrimmedLog(lastAppliedIndex);
 
+        if(expServerConfig != null) {
+            Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
+            assertEquals("Leader snapshot server config", expServerInfo,
+                    new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
+
+            assertEquals("Follower 2 snapshot server config", expServerInfo,
+                    new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
+
+            ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
+            assertNotNull("Follower 2 server config is null", follower2ServerConfig);
+
+            assertEquals("Follower 2 server config", expServerInfo,
+                    new HashSet<>(follower2ServerConfig.getServerConfig()));
+        }
+
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);