Bug 6540: Fix journal issues on leader changes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
index 89d69886edd292e80eb08c9851d40a7313d29b40..c6f5d72529e3e29d38964ede40faa57aa497c6a9 100644 (file)
@@ -8,24 +8,34 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -304,7 +314,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
 
-        testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 1 more payload to trigger second snapshot");
+        testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: sending 1 more payload to trigger second snapshot");
+
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
 
         // Send another payload to trigger a second leader snapshot.
         MockPayload payload7 = sendPayloadData(leaderActor, "seven");
@@ -331,6 +345,25 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         expSnapshotState.add(payload5);
         expSnapshotState.add(payload6);
 
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+        // Send a server config change to test that the install snapshot includes the server config.
+
+        ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(leaderId, true),
+                new ServerInfo(follower1Id, false),
+                new ServerInfo(follower2Id, false)));
+        leaderContext.updatePeerIds(serverConfig);
+        ((AbstractLeader)leader).updateMinReplicaCount();
+        leaderActor.tell(serverConfig, ActorRef.noSender());
+
+        applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+        verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
+
+        applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
+        verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
+
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
@@ -341,7 +374,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         expSnapshotState.add(payload7);
 
-        verifyInstallSnapshotToLaggingFollower(7);
+        verifyInstallSnapshotToLaggingFollower(8, serverConfig);
 
         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
     }
@@ -388,6 +421,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         expSnapshotState.add(payload1);
 
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
+
         // Send another payload with a large enough relative size in combination with the last payload
         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
         MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
@@ -419,7 +456,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         expSnapshotState.add(payload2);
 
-        verifyInstallSnapshotToLaggingFollower(2L);
+        verifyInstallSnapshotToLaggingFollower(2L, null);
 
         // Sends a payload with index 3.
         verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
@@ -493,32 +530,58 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
      *
      * @throws Exception
      */
-    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex) throws Exception {
+    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
+            @Nullable ServerConfigurationPayload expServerConfig) throws Exception {
         List<Snapshot> persistedSnapshots;
         List<ReplicatedLogEntry> unAppliedEntry;
         ApplySnapshot applySnapshot;
         InstallSnapshot installSnapshot;
-        InstallSnapshotReply installSnapshotReply;
 
         testLog.info("testInstallSnapshotToLaggingFollower starting");
 
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+
         // Now stop dropping AppendEntries in follower 2.
         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
 
+
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
+        // the snapshot store because the second snapshot was initiated by the follower install snapshot and
+        // not because the batch count was reached so the persisted journal sequence number wasn't advanced
+        // far enough to cause the previous snapshot to be deleted. This is because
+        // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
+        // This is OK - the next snapshot should delete it. In production, even if the system restarted
+        // before another snapshot, they would both get applied which wouldn't hurt anything.
+        persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
+        Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
+        Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
+        unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
+        assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
+
+        int snapshotSize = persistedSnapshot.getState().length;
+        int expTotalChunks = (snapshotSize / SNAPSHOT_CHUNK_SIZE) + ((snapshotSize % SNAPSHOT_CHUNK_SIZE) > 0 ? 1 : 0);
+
         installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
         assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
         assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
         assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
-        assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
+        assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
         assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
         //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
 
-        installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
-        assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
-        assertEquals("InstallSnapshotReply getChunkIndex", 1, installSnapshotReply.getChunkIndex());
-        assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
-        assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+        List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
+                leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
+        int index = 1;
+        for(InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
+            assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
+            assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
+            assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
+            assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+        }
 
         // Verify follower 2 applies the snapshot.
         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
@@ -536,19 +599,20 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // the log. In addition replicatedToAllIndex should've advanced.
         verifyLeadersTrimmedLog(lastAppliedIndex);
 
-        // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
-        // the snapshot store because the second snapshot was initiated by the follower install snapshot and
-        // not because the batch count was reached so the persisted journal sequence number wasn't advanced
-        // far enough to cause the previous snapshot to be deleted. This is because
-        // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
-        // This is OK - the next snapshot should delete it. In production, even if the system restarted
-        // before another snapshot, they would both get applied which wouldn't hurt anything.
-        persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
-        Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
-        Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
-        unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
-        assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
+        if(expServerConfig != null) {
+            Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
+            assertEquals("Leader snapshot server config", expServerInfo,
+                    new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
+
+            assertEquals("Follower 2 snapshot server config", expServerInfo,
+                    new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
+
+            ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
+            assertNotNull("Follower 2 server config is null", follower2ServerConfig);
+
+            assertEquals("Follower 2 server config", expServerInfo,
+                    new HashSet<>(follower2ServerConfig.getServerConfig()));
+        }
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);