Compare messages in the log
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RecoveryIntegrationTest.java
index 9fe03f9673c9f55cf8759dc51cba69b2c5bf437d..47b299478040de4ab14bf9f01fb4ae305dce342e 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+
 import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
@@ -18,11 +19,10 @@ import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
@@ -41,12 +41,12 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
                 newFollowerConfigParams());
 
-        Map<String, String> peerAddresses = new HashMap<>();
-        peerAddresses.put(follower1Id, follower1Actor.path().toString());
-        peerAddresses.put(follower2Id, "");
+        Map<String, String> leaderPeerAddresses = new HashMap<>();
+        leaderPeerAddresses.put(follower1Id, follower1Actor.path().toString());
+        leaderPeerAddresses.put(follower2Id, "");
 
         leaderConfigParams = newLeaderConfigParams();
-        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+        leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
 
         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
@@ -63,15 +63,15 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
-        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+        final MockPayload payload2 = sendPayloadData(leaderActor, "two");
 
         // This should trigger a snapshot.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        final MockPayload payload3 = sendPayloadData(leaderActor, "three");
 
         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
 
         // Send another payload.
-        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+        final MockPayload payload4 = sendPayloadData(leaderActor, "four");
 
         // Now deliver the AppendEntries to the follower
         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
@@ -108,13 +108,13 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         // Block these messages initially so we can control the sequence.
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
-        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+        final MockPayload payload2 = sendPayloadData(leaderActor, "two");
 
         // This should trigger a snapshot.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        final MockPayload payload3 = sendPayloadData(leaderActor, "three");
 
         // Send another payload.
-        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+        final MockPayload payload4 = sendPayloadData(leaderActor, "four");
 
         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
 
@@ -140,7 +140,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
     }
 
     @Test
-    public void testFollowerRecoveryAfterInstallSnapshot() throws Exception {
+    public void testFollowerRecoveryAfterInstallSnapshot() {
 
         send2InitialPayloads();
 
@@ -152,7 +152,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
 
         leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
 
-        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+        final MockPayload payload2 = sendPayloadData(leaderActor, "two");
 
         // Verify the leader applies the 3rd payload state.
         MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
@@ -182,16 +182,11 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         // Wait for the follower to persist the snapshot.
         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
 
-        // The last applied entry on the leader is included in the snapshot but is also sent in a subsequent
-        // AppendEntries because the InstallSnapshot message lastIncludedIndex field is set to the leader's
-        // snapshotIndex and not the actual last index included in the snapshot.
-        // FIXME? - is this OK?
-        MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
-        List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2, payload2);
+        final List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
 
         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
-        assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
 
         killActor(follower2Actor);
@@ -205,10 +200,55 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
 
         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
-        assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
     }
 
+    @Test
+    public void testRecoveryDeleteEntries() {
+        send2InitialPayloads();
+
+        sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        sendPayloadData(leaderActor, "three");
+
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+
+        // Disconnect follower from leader
+        killActor(follower1Actor);
+
+        // Send another payloads
+        sendPayloadData(leaderActor, "four");
+        sendPayloadData(leaderActor, "five");
+
+        verifyRaftState(leaderActor, raftState -> {
+            assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
+        });
+
+        // Remove entries started from 4 index
+        leaderActor.underlyingActor().getReplicatedLog().removeFromAndPersist(4);
+
+        verifyRaftState(leaderActor, raftState -> {
+            assertEquals("leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
+        });
+
+        // Send new payloads
+        final MockPayload payload4 = sendPayloadData(leaderActor, "newFour");
+        final MockPayload payload5 = sendPayloadData(leaderActor, "newFive");
+
+        verifyRaftState(leaderActor, raftState -> {
+            assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
+        });
+
+        reinstateLeaderActor();
+
+        final var log = leaderActor.underlyingActor().getReplicatedLog();
+        assertEquals("Leader last index", 5, log.lastIndex());
+        assertEquals(List.of(payload4, payload5), List.of(log.get(4).getData(), log.get(5).getData()));
+    }
+
     private void reinstateLeaderActor() {
         killActor(leaderActor);