Cleanup test format
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RecoveryIntegrationTest.java
index a8f490e75119678fc84084c50c85dec204941b1d..ca53d2e6c59658c9310dd7c559d1ea0b15226ec4 100644 (file)
@@ -8,16 +8,22 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
 /**
@@ -35,11 +41,12 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         follower1Actor = newTestRaftActor(follower1Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
                 newFollowerConfigParams());
 
-        peerAddresses = ImmutableMap.<String, String>builder().
-                put(follower1Id, follower1Actor.path().toString()).build();
+        Map<String, String> leaderPeerAddresses = new HashMap<>();
+        leaderPeerAddresses.put(follower1Id, follower1Actor.path().toString());
+        leaderPeerAddresses.put(follower2Id, "");
 
         leaderConfigParams = newLeaderConfigParams();
-        leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams);
+        leaderActor = newTestRaftActor(leaderId, leaderPeerAddresses, leaderConfigParams);
 
         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
@@ -53,31 +60,23 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         send2InitialPayloads();
 
         // Block these messages initially so we can control the sequence.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
         leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
-        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+        final MockPayload payload2 = sendPayloadData(leaderActor, "two");
 
         // This should trigger a snapshot.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        final MockPayload payload3 = sendPayloadData(leaderActor, "three");
 
         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
-
-        // First, deliver the CaptureSnapshot to the leader.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
         // Send another payload.
-        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+        final MockPayload payload4 = sendPayloadData(leaderActor, "four");
 
         // Now deliver the AppendEntries to the follower
         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
 
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
 
         // Now deliver the CaptureSnapshotReply to the leader.
         CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(
@@ -102,38 +101,30 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
     }
 
     @Test
-    public void testStatePersistedBetweenInitiateSnapshotAndCapture() {
+    public void testStatePersistedAfterSnapshotPersisted() {
 
         send2InitialPayloads();
 
         // Block these messages initially so we can control the sequence.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
-        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+        final MockPayload payload2 = sendPayloadData(leaderActor, "two");
 
         // This should trigger a snapshot.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        final MockPayload payload3 = sendPayloadData(leaderActor, "three");
 
         // Send another payload.
-        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+        final MockPayload payload4 = sendPayloadData(leaderActor, "four");
 
         MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
 
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
+        // Wait for snapshot complete.
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
 
-        // First, deliver the AppendEntries to the follower
+        // Now deliver the AppendEntries to the follower
         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
 
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
-
-        // Now deliver the CaptureSnapshot to the leader.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
-        // Wait for snapshot complete.
-        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
 
         reinstateLeaderActor();
 
@@ -144,53 +135,118 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
         assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
 
-        // payloads 2, 3, and 4 were applied after the snapshot was initiated and before it was captured so
-        // were included in the snapshot. They were also included as unapplied entries in the snapshot as
-        // they weren't yet applied to the state at the time the snapshot was initiated. They were applied to the
-        // state on recovery by the ApplyJournalEntries messages which remained in the persisted log.
-        // This is a side effect of trimming the persisted log to the sequence number captured at the time
-        // the snapshot was initiated.
-        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4, payload2,
-                payload3, payload4), leaderActor.underlyingActor().getState());
+        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
+                leaderActor.underlyingActor().getState());
     }
 
     @Test
-    public void testApplyJournalEntriesPersistedAfterSnapshotPersisted() {
+    public void testFollowerRecoveryAfterInstallSnapshot() {
 
         send2InitialPayloads();
 
-        // Block these messages initially so we can control the sequence.
-        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+        leader = leaderActor.underlyingActor().getCurrentBehavior();
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
 
-        MockPayload payload2 = sendPayloadData(leaderActor, "two");
+        leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
 
-        // This should trigger a snapshot.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        final MockPayload payload2 = sendPayloadData(leaderActor, "two");
 
-        // Send another payload.
-        MockPayload payload4 = sendPayloadData(leaderActor, "four");
+        // Verify the leader applies the 3rd payload state.
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
 
-        MessageCollectorActor.expectMatching(follower1CollectorActor, AppendEntries.class, 3);
+        MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
+
+        assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
+        assertEquals("Leader last applied", 2, leaderContext.getLastApplied());
+        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex());
+
+        killActor(follower2Actor);
+
+        InMemoryJournal.clear();
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+        TestRaftActor follower2Underlying = follower2Actor.underlyingActor();
+        follower2CollectorActor = follower2Underlying.collectorActor();
+        follower2Context = follower2Underlying.getRaftActorContext();
+
+        leaderActor.tell(new SetPeerAddress(follower2Id, follower2Actor.path().toString()), ActorRef.noSender());
+
+        // The leader should install a snapshot so wait for the follower to receive ApplySnapshot.
+        MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
+
+        // Wait for the follower to persist the snapshot.
+        MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
+
+        final List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
+
+        assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
+        assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
+        assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
+
+        killActor(follower2Actor);
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId)),
+                newFollowerConfigParams());
+
+        follower2Underlying = follower2Actor.underlyingActor();
+        follower2Underlying.waitForRecoveryComplete();
+        follower2Context = follower2Underlying.getRaftActorContext();
+
+        assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
+        assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
+        assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
+    }
+
+    @Test
+    public void testRecoveryDeleteEntries() {
+        send2InitialPayloads();
+
+        sendPayloadData(leaderActor, "two");
+
+        // This should trigger a snapshot.
+        sendPayloadData(leaderActor, "three");
 
-        // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
 
-        // Now deliver the AppendEntries to the follower
-        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+        // Disconnect follower from leader
+        killActor(follower1Actor);
 
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 3);
+        // Send another payloads
+        sendPayloadData(leaderActor, "four");
+        sendPayloadData(leaderActor, "five");
 
-        reinstateLeaderActor();
+        verifyRaftState(leaderActor, raftState -> {
+            assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
+        });
 
-        assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
-        assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex());
-        assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size());
-        assertEquals("Leader journal last index", 4, leaderContext.getReplicatedLog().lastIndex());
-        assertEquals("Leader commit index", 4, leaderContext.getCommitIndex());
-        assertEquals("Leader last applied", 4, leaderContext.getLastApplied());
+        // Remove entries started from 4 index
+        leaderActor.underlyingActor().getReplicatedLog().removeFromAndPersist(4);
 
-        assertEquals("Leader state", Arrays.asList(payload0, payload1, payload2, payload3, payload4),
-                leaderActor.underlyingActor().getState());
+        verifyRaftState(leaderActor, raftState -> {
+            assertEquals("leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
+        });
+
+        // Send new payloads
+        final MockPayload payload4 = sendPayloadData(leaderActor, "newFour");
+        final MockPayload payload5 = sendPayloadData(leaderActor, "newFive");
+
+        verifyRaftState(leaderActor, raftState -> {
+            assertEquals("leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
+        });
+
+        reinstateLeaderActor();
+
+        assertEquals("Leader last index", 5 , leaderActor.underlyingActor().getReplicatedLog().lastIndex());
+        assertEquals(payload4, leaderActor.underlyingActor().getReplicatedLog().get(4).getData());
+        assertEquals(payload5, leaderActor.underlyingActor().getReplicatedLog().get(5).getData());
     }
 
     private void reinstateLeaderActor() {
@@ -208,6 +264,9 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         currentTerm = leaderContext.getTermInformation().getCurrentTerm();
 
         payload0 = sendPayloadData(leaderActor, "zero");
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+
         payload1 = sendPayloadData(leaderActor, "one");
 
         // Verify the leader applies the states.