Bug 2948: Recovered log entries not applied after prior snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
index d4a9f7701b0bec82a775f14ee439463ce2da2f78..ff9b8ce63010f02c4774b33bf6c926396d9dc240 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
@@ -185,6 +184,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload0);
+        expSnapshotState.add(payload1);
+        expSnapshotState.add(payload2);
+
         testLog.info("testInitialReplications complete");
     }
 
@@ -198,8 +201,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}",
                 leader.getReplicatedToAllIndex());
 
-        leaderActor.underlyingActor().setSnapshot(new byte[] {2});
-
         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
 
         // Send the first payload - this should cause the first snapshot.
@@ -207,8 +208,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
 
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload3);
 
         testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads");
 
@@ -273,7 +273,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4);
@@ -313,6 +313,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
 
+        expSnapshotState.add(payload4);
+        expSnapshotState.add(payload5);
+        expSnapshotState.add(payload6);
+        expSnapshotState.add(payload7);
+
         testLog.info("testSubsequentReplicationsAndSnapshots complete");
     }
 
@@ -327,8 +332,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
                 leader.getReplicatedToAllIndex());
 
         leaderActor.underlyingActor().setMockTotalMemory(1000);
-        byte[] snapshot = new byte[] {6};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
 
         // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal.
         InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2);
@@ -351,6 +354,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class);
         Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot);
 
+        expSnapshotState.add(payload8);
+
         // Send another payload with a large enough relative size in combination with the last payload
         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
         payload9 = sendPayloadData(leaderActor, "nine", 201);
@@ -383,7 +388,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
-        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9);
@@ -451,6 +456,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
+
+        expSnapshotState.add(payload10);
     }
 
     /**
@@ -467,8 +474,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         InstallSnapshot installSnapshot;
         InstallSnapshotReply installSnapshotReply;
 
-        byte[] snapshot = new byte[] {10};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
+        expSnapshotState.add(payload9);
 
         // Now stop dropping AppendEntries in follower 2.
         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
@@ -480,7 +486,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
         assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
-        assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
+        //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
 
         installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
         assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
@@ -490,7 +496,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         // Verify follower 2 applies the snapshot.
         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
-        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot);
+        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8);
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
 
         // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
@@ -523,7 +529,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9);
         unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
 
@@ -535,16 +541,14 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
     /**
      * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and
      * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower.
+     * @throws Exception
      */
-    private void testFinalReplicationsAndSnapshot() {
+    private void testFinalReplicationsAndSnapshot() throws Exception {
         List<ApplyState> applyStates;
         ApplyState applyState;
 
         testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex());
 
-        byte[] snapshot = new byte[] {14};
-        leaderActor.underlyingActor().setSnapshot(snapshot);
-
         // Send another payload - a snapshot should occur.
         payload11 = sendPayloadData(leaderActor, "eleven");
 
@@ -557,7 +561,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // Verify the leader's last persisted snapshot (previous ones may not be purged yet).
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
-        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot);
+        verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11);
         List<ReplicatedLogEntry> unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size());
         verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11);