X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java;h=ff9b8ce63010f02c4774b33bf6c926396d9dc240;hp=d4a9f7701b0bec82a775f14ee439463ce2da2f78;hb=e3a22ae5edead2319553bb4dfce59e359386d535;hpb=3940ce6060e027fe870244346e5309229cc8dc48 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index d4a9f7701b..ff9b8ce630 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.raft; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import akka.persistence.SaveSnapshotSuccess; import com.google.common.collect.ImmutableMap; @@ -185,6 +184,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + expSnapshotState.add(payload0); + expSnapshotState.add(payload1); + expSnapshotState.add(payload2); + testLog.info("testInitialReplications complete"); } @@ -198,8 +201,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}", leader.getReplicatedToAllIndex()); - leaderActor.underlyingActor().setSnapshot(new byte[] {2}); - follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); // Send the first payload - this should cause the first snapshot. @@ -207,8 +208,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); - byte[] snapshot = new byte[] {6}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(payload3); testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads"); @@ -273,7 +273,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's persisted snapshot. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4); @@ -313,6 +313,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + expSnapshotState.add(payload4); + expSnapshotState.add(payload5); + expSnapshotState.add(payload6); + expSnapshotState.add(payload7); + testLog.info("testSubsequentReplicationsAndSnapshots complete"); } @@ -327,8 +332,6 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A leader.getReplicatedToAllIndex()); leaderActor.underlyingActor().setMockTotalMemory(1000); - byte[] snapshot = new byte[] {6}; - leaderActor.underlyingActor().setSnapshot(snapshot); // We'll expect a ReplicatedLogImplEntry message and an ApplyJournalEntries message added to the journal. InMemoryJournal.addWriteMessagesCompleteLatch(leaderId, 2); @@ -351,6 +354,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class); Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot); + expSnapshotState.add(payload8); + // Send another payload with a large enough relative size in combination with the last payload // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot. payload9 = sendPayloadData(leaderActor, "nine", 201); @@ -383,7 +388,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's persisted snapshot. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9, snapshot); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9); @@ -451,6 +456,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(leaderCollectorActor); MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + + expSnapshotState.add(payload10); } /** @@ -467,8 +474,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A InstallSnapshot installSnapshot; InstallSnapshotReply installSnapshotReply; - byte[] snapshot = new byte[] {10}; - leaderActor.underlyingActor().setSnapshot(snapshot); + expSnapshotState.add(payload9); // Now stop dropping AppendEntries in follower 2. follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); @@ -480,7 +486,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks()); assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm()); assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex()); - assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray()); + //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray()); installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class); assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm()); @@ -490,7 +496,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify follower 2 applies the snapshot. applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class); - verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8, snapshot); + verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size()); // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot. @@ -523,7 +529,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0); Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1); - verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9, snapshot); + verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9); unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size()); @@ -535,16 +541,14 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A /** * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower. + * @throws Exception */ - private void testFinalReplicationsAndSnapshot() { + private void testFinalReplicationsAndSnapshot() throws Exception { List applyStates; ApplyState applyState; testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex()); - byte[] snapshot = new byte[] {14}; - leaderActor.underlyingActor().setSnapshot(snapshot); - // Send another payload - a snapshot should occur. payload11 = sendPayloadData(leaderActor, "eleven"); @@ -557,7 +561,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify the leader's last persisted snapshot (previous ones may not be purged yet). List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1); - verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11, snapshot); + verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11); List unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11);