Merge "Add missing copyright text"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsIntegrationTest.java
index bd670fd581153e673ef145ca178211c6f7e60f86..be748f3d26c3ae64d32ebac624e8100972459783 100644 (file)
@@ -14,9 +14,8 @@ import java.util.List;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -36,13 +35,14 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     private MockPayload recoveredPayload0;
     private MockPayload recoveredPayload1;
     private MockPayload recoveredPayload2;
+    private MockPayload payload3;
     private MockPayload payload4;
     private MockPayload payload5;
     private MockPayload payload6;
     private MockPayload payload7;
 
     @Test
-    public void runTest() {
+    public void runTest() throws Exception {
         testLog.info("testReplicationAndSnapshots starting");
 
         // Setup the persistent journal for the leader. We'll start up with 3 journal log entries (one less
@@ -55,7 +55,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(1, initialTerm, recoveredPayload1));
         recoveredPayload2 = new MockPayload("two");
         InMemoryJournal.addEntry(leaderId, seqId++, new ReplicatedLogImplEntry(2, initialTerm, recoveredPayload2));
-        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyLogEntries(2));
+        InMemoryJournal.addEntry(leaderId, seqId++, new ApplyJournalEntries(2));
 
         origLeaderJournal = InMemoryJournal.get(leaderId, ReplicatedLogImplEntry.class);
 
@@ -157,19 +157,21 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
      * 4 and we already have 3 entries in the journal log, this should initiate a snapshot. In this
      * scenario, the follower consensus and application of state is delayed until after the snapshot
      * completes.
+     * @throws Exception
      */
-    private void testFirstSnapshot() {
+    private void testFirstSnapshot() throws Exception {
         testLog.info("testFirstSnapshot starting");
 
-        byte[] snapshot = new byte[] {1,2,3,4};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(recoveredPayload0);
+        expSnapshotState.add(recoveredPayload1);
+        expSnapshotState.add(recoveredPayload2);
 
         // Delay the consensus by temporarily dropping the AppendEntries to both followers.
         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the payload.
-        MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        payload3 = sendPayloadData(leaderActor, "three");
 
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
@@ -185,7 +187,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         // the last applied log entry (2) even though the leader hasn't yet advanced its cached snapshot index.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), initialTerm, 2, currentTerm, 3);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3);
@@ -286,22 +288,25 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
     /**
      * Send one more payload to trigger another snapshot. In this scenario, we delay the snapshot until
      * consensus occurs and the leader applies the state.
+     * @throws Exception
      */
-    private void testSecondSnapshot() {
+    private void testSecondSnapshot() throws Exception {
         testLog.info("testSecondSnapshot starting");
 
-        byte[] snapshot = new byte[] {5,6,7,8};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
 
         // Delay the CaptureSnapshot message to the leader actor.
-        leaderActor.underlyingActor().startDropMessages(CaptureSnapshot.class);
+        leaderActor.underlyingActor().startDropMessages(CaptureSnapshotReply.class);
 
         // Send the payload.
         payload7 = sendPayloadData(leaderActor, "seven");
 
-        // Capture the CaptureSnapshot message so we can send it later.
-        CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(
-                leaderCollectorActor, CaptureSnapshot.class);
+        // Capture the CaptureSnapshotReply message so we can send it later.
+        CaptureSnapshotReply captureSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor,
+                CaptureSnapshotReply.class);
 
         // Wait for the state to be applied in the leader.
         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
@@ -321,12 +326,9 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader last applied", 7, leaderContext.getLastApplied());
         assertEquals("Leader replicatedToAllIndex", 5, leader.getReplicatedToAllIndex());
 
-        // Now deliver the CaptureSnapshot.
-        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshot.class);
-        leaderActor.tell(captureSnapshot, leaderActor);
-
-        // Wait for CaptureSnapshotReply to complete.
-        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, CaptureSnapshotReply.class);
+        // Now deliver the CaptureSnapshotReply.
+        leaderActor.underlyingActor().stopDropMessages(CaptureSnapshotReply.class);
+        leaderActor.tell(captureSnapshotReply, leaderActor);
 
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
@@ -341,11 +343,12 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 7, leaderContext.getCommitIndex());
 
-        // Verify the persisted snapshot. This should reflect the advanced snapshot index as the last applied
-        // log entry (6).
+        // Verify the persisted snapshot. This should reflect the snapshot index as the last applied
+        // log entry (7) and shouldn't contain any unapplied entries as we capture persisted the snapshot data
+        // when the snapshot is created (ie when the CaptureSnapshot is processed).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7);
@@ -389,6 +392,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex());
         assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex());
 
+        expSnapshotState.add(payload7);
+
         testLog.info("testSecondSnapshot ending");
     }
 
@@ -404,6 +409,8 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
 
         leaderActor.underlyingActor().waitForRecoveryComplete();
 
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+
         assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm());
         assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex());
         assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size());