Add JournalSegmentFile.map()
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
index fa974b0ca42eee0b704df9afd1dd9ead437b18c9..f2658957e1e173d66e30839cd3938ade1c57f870 100644 (file)
@@ -8,21 +8,20 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotSuccess;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.eclipse.jdt.annotation.Nullable;
-import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -61,15 +60,15 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
 
         // Create the leader and 2 follower actors.
-        follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+        follower1Actor = newTestRaftActor(follower1Id, Map.of(leaderId, testActorPath(leaderId),
                 follower2Id, testActorPath(follower2Id)), newFollowerConfigParams());
 
-        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+        follower2Actor = newTestRaftActor(follower2Id, Map.of(leaderId, testActorPath(leaderId),
                 follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
 
-        Map<String, String> leaderPeerAddresses = ImmutableMap.<String, String>builder()
-                .put(follower1Id, follower1Actor.path().toString())
-                .put(follower2Id, follower2Actor.path().toString()).build();
+        Map<String, String> leaderPeerAddresses = Map.of(
+                follower1Id, follower1Actor.path().toString(),
+                follower2Id, follower2Actor.path().toString());
 
         leaderConfigParams = newLeaderConfigParams();
         leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
@@ -86,7 +85,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
 
         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
-        assertEquals("Current term > " + initialTerm, true, currentTerm > initialTerm);
+        assertTrue("Current term > " + initialTerm, currentTerm > initialTerm);
 
         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
@@ -95,6 +94,16 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("Leader created and elected");
     }
 
+    private void setupFollower2() {
+        follower2Actor = newTestRaftActor(follower2Id, Map.of(leaderId, testActorPath(leaderId),
+                follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
+
+        follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+        follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
+
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+    }
+
     /**
      * Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets
      * caught up via AppendEntries.
@@ -159,7 +168,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // to catch it up because no snapshotting was done so the follower's next index was present in the log.
         InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
                 InstallSnapshot.class);
-        Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
+        assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
 
         testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries complete");
     }
@@ -244,7 +253,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader did not try to install a snapshot to catch up follower 2.
         InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor,
                 InstallSnapshot.class);
-        Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
+        assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot);
 
         // Ensure there's at least 1 more heartbeat.
         MessageCollectorActor.clearMessages(leaderCollectorActor);
@@ -354,7 +363,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         // Send a server config change to test that the install snapshot includes the server config.
 
-        ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+        ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(List.of(
                 new ServerInfo(leaderId, true),
                 new ServerInfo(follower1Id, false),
                 new ServerInfo(follower2Id, false)));
@@ -383,6 +392,80 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
     }
 
+    /**
+     * Tests whether the leader reattempts to send a snapshot when a follower crashes before replying with
+     * InstallSnapshotReply after the last chunk has been sent.
+     */
+    @Test
+    public void testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation() throws Exception {
+        testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation starting");
+
+        setup();
+
+        sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
+
+        // Configure follower 2 to drop messages and lag.
+        follower2Actor.stop();
+
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
+
+        // Send 5 payloads - the second should cause a leader snapshot.
+        final MockPayload payload2 = sendPayloadData(leaderActor, "two");
+        final MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        final MockPayload payload4 = sendPayloadData(leaderActor, "four");
+        final MockPayload payload5 = sendPayloadData(leaderActor, "five");
+        final MockPayload payload6 = sendPayloadData(leaderActor, "six");
+
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
+        List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
+        verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
+        verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
+        verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
+
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+
+        testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation: "
+                + "sending 1 more payload to trigger second snapshot");
+
+        // Send another payload to trigger a second leader snapshot.
+        MockPayload payload7 = sendPayloadData(leaderActor, "seven");
+
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+
+        ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+        verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
+
+        // Verify follower 1 applies each log entry.
+        applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
+        verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
+        verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
+        verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
+
+        leaderActor.underlyingActor()
+                .startDropMessages(InstallSnapshotReply.class, reply -> reply.getChunkIndex() == 5);
+
+        setupFollower2();
+
+        MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 1);
+
+        follower2Actor.stop();
+
+        // need to get rid of persistence for follower2
+        InMemorySnapshotStore.clearSnapshotsFor(follower2Id);
+
+        leaderActor.underlyingActor().stopDropMessages(InstallSnapshotReply.class);
+
+        MessageCollectorActor.clearMessages(follower2CollectorActor);
+        setupFollower2();
+
+        MessageCollectorActor.expectMatching(follower2CollectorActor, SaveSnapshotSuccess.class, 1);
+    }
+
     /**
      * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
      * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
@@ -424,7 +507,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify a snapshot is not triggered.
         CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor,
                 CaptureSnapshot.class);
-        Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
+        assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
 
         expSnapshotState.add(payload1);
 
@@ -497,7 +580,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
 
         captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
-        Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
+        assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
 
         // Verify the follower 1 applies the state.
         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
@@ -529,8 +612,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Resume the lagging follower 2 and verify it receives an install snapshot from the leader.
      */
-    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
-            @Nullable ServerConfigurationPayload expServerConfig) {
+    private void verifyInstallSnapshotToLaggingFollower(final long lastAppliedIndex,
+            final @Nullable ServerConfigurationPayload expServerConfig) {
         testLog.info("verifyInstallSnapshotToLaggingFollower starting");
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
@@ -549,15 +632,15 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // This is OK - the next snapshot should delete it. In production, even if the system restarted
         // before another snapshot, they would both get applied which wouldn't hurt anything.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
-        Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
+        assertFalse("Expected at least 1 persisted snapshots", persistedSnapshots.isEmpty());
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
         verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
 
         int snapshotSize = SerializationUtils.serialize(persistedSnapshot.getState()).length;
-        final int expTotalChunks = snapshotSize / SNAPSHOT_CHUNK_SIZE
-                + (snapshotSize % SNAPSHOT_CHUNK_SIZE > 0 ? 1 : 0);
+        final int expTotalChunks = snapshotSize / MAXIMUM_MESSAGE_SLICE_SIZE
+                + (snapshotSize % MAXIMUM_MESSAGE_SLICE_SIZE > 0 ? 1 : 0);
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor,
                 InstallSnapshot.class);
@@ -576,7 +659,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
             assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
             assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
             assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
-            assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+            assertTrue("InstallSnapshotReply isSuccess", installSnapshotReply.isSuccess());
         }
 
         // Verify follower 2 applies the snapshot.
@@ -599,18 +682,18 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         verifyLeadersTrimmedLog(lastAppliedIndex);
 
         if (expServerConfig != null) {
-            Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
+            Set<ServerInfo> expServerInfo = Set.copyOf(expServerConfig.getServerConfig());
             assertEquals("Leader snapshot server config", expServerInfo,
-                    new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
+                Set.copyOf(persistedSnapshot.getServerConfiguration().getServerConfig()));
 
             assertEquals("Follower 2 snapshot server config", expServerInfo,
-                    new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
+                Set.copyOf(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
 
             ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
             assertNotNull("Follower 2 server config is null", follower2ServerConfig);
 
             assertEquals("Follower 2 server config", expServerInfo,
-                    new HashSet<>(follower2ServerConfig.getServerConfig()));
+                Set.copyOf(follower2ServerConfig.getServerConfig()));
         }
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
@@ -681,8 +764,9 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries
         // added after the snapshot as the persisted journal should've been purged to the snapshot
         // sequence number.
-        verifyPersistedJournal(leaderId, Arrays.asList(new SimpleReplicatedLogEntry(5, currentTerm, payload5),
-                new SimpleReplicatedLogEntry(6, currentTerm, payload6)));
+        verifyPersistedJournal(leaderId, List.of(
+            new SimpleReplicatedLogEntry(5, currentTerm, payload5),
+            new SimpleReplicatedLogEntry(6, currentTerm, payload6)));
 
         // Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index.
         List<ApplyJournalEntries> persistedApplyJournalEntries =
@@ -695,8 +779,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
             }
         }
 
-        Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6),
-                found);
+        assertTrue("ApplyJournalEntries with index 6 not found in leader's persisted journal", found);
 
         // Verify follower 1 applies the 3 log entries.
         applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
@@ -727,8 +810,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Kill the leader actor, reinstate it and verify the recovered journal.
      */
-    private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex,
-            long firstJournalEntryIndex) {
+    private void verifyLeaderRecoveryAfterReinstatement(final long lastIndex, final long snapshotIndex,
+            final long firstJournalEntryIndex) {
         testLog.info("verifyLeaderRecoveryAfterReinstatement starting: lastIndex: {}, snapshotIndex: {}, "
             + "firstJournalEntryIndex: {}", lastIndex, snapshotIndex, firstJournalEntryIndex);
 
@@ -761,8 +844,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("verifyLeaderRecoveryAfterReinstatement ending");
     }
 
-    private void sendInitialPayloadsReplicatedToAllFollowers(String... data) {
-
+    private void sendInitialPayloadsReplicatedToAllFollowers(final String... data) {
         // Send the payloads.
         for (String d: data) {
             expSnapshotState.add(sendPayloadData(leaderActor, d));
@@ -771,25 +853,27 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         int numEntries = data.length;
 
         // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
-        List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor,
-                ApplyState.class, numEntries);
+        final var leaderStates = MessageCollectorActor.expectMatching(leaderCollectorActor,
+            ApplyState.class, numEntries);
         for (int i = 0; i < expSnapshotState.size(); i++) {
-            MockPayload payload = expSnapshotState.get(i);
-            verifyApplyState(applyStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
+            final MockPayload payload = expSnapshotState.get(i);
+            verifyApplyState(leaderStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload);
         }
 
         // Verify follower 1 applies each log entry.
-        applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, numEntries);
+        final var follower1States = MessageCollectorActor.expectMatching(follower1CollectorActor,
+            ApplyState.class, numEntries);
         for (int i = 0; i < expSnapshotState.size(); i++) {
-            MockPayload payload = expSnapshotState.get(i);
-            verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
+            final MockPayload payload = expSnapshotState.get(i);
+            verifyApplyState(follower1States.get(i), null, null, currentTerm, i, payload);
         }
 
         // Verify follower 2 applies each log entry.
-        applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, numEntries);
+        final var follower2States = MessageCollectorActor.expectMatching(follower2CollectorActor,
+            ApplyState.class, numEntries);
         for (int i = 0; i < expSnapshotState.size(); i++) {
-            MockPayload payload = expSnapshotState.get(i);
-            verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload);
+            final MockPayload payload = expSnapshotState.get(i);
+            verifyApplyState(follower2States.get(i), null, null, currentTerm, i, payload);
         }
 
         // Ensure there's at least 1 more heartbeat.