From fed267bf1b8a9ea81d1ee7c9721962863b98e391 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 10 Jul 2015 08:30:11 -0400 Subject: [PATCH] CDS: Real snapshot log trimming changes Bug 2692 changed real snapshotting to trim the in-memory log based on the current replicatedToAllIndex for the normal case when all follower's are up and "fake" snapshotting is advancing replicatedToAllIndex. When a follower is down, real snapshots don't trim the log b/c replicatedToAllIndex isn't advancing unless the memory threshold is exceeded. So it will let the in-memory log keep growing past the snapshot batch count. This sort of defeats the purpose of snapshotting, ie to keep the journal size in check both on disk and in memory. It's also a bit dangerous - it can chew up a lot of memory and starve the rest of the system and cause large STW GC's once the follower comes back up and the log is cleared. This can also cause multiple snapshots in the follower once it comes back and catches up - eg, if it's behind 60K entries, it will snapshot after each 20K batch in quick succession. To alleviate the potential excessive memory growth, in addition to trimming the log from the captured lastAppliedIndex if the log memory size threshold is exceeded, I changed the code to do the same if the log size exceeds the snapshot batch count. So if a follower is down long enough to exceed the snapshot batch count, the leader will install a single snapshot to catch up the follower. Otherwise, the follower will be caught up via AppendEntries. I also noticed that if snapshot tries happen in quick succession, a second attempt may be tried prematurely while the previous one is in the PERSISTING state. This is b/c isCapturing() returns false in the PERSISTING state. The state machine prevents another capture from actually initiating b/c only the IDLE state implements capture but I think to be clean we should disallow it by returning true from isCapturing() in the PERSISTING state. This avoids state violations during valid workflows (it's not invalid to attempt a capture while another is in progress). Therefore, since every state now returns true for isCapturing() except IDLE, I made true the default in AbstractSnapshotState so only IDLE overrides it. I added more end-to-end test cases to ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest to cover leader snapshots where the log size exceeds the snapshotBatchCount, where the memory threshold is exceeded and where neither of the first 2 conditions are met and the log is effectively not trimmed. The former 2 cases trim the log to last applied and result in a snapshot installed on the follower once it's resumed. The latter case results in the leader catching up the follower via AppendEntries. Change-Id: Iaec9ba94232a17d6fa7b192c31c431b328e3d22e Signed-off-by: Tom Pantelis (cherry picked from commit 7a992a25fa34c6ce4a1d52919f7aa71dd1d20060) --- .../cluster/raft/SnapshotManager.java | 45 +- .../AbstractRaftActorIntegrationTest.java | 39 +- .../cluster/raft/RaftActorTest.java | 17 +- ...otsWithLaggingFollowerIntegrationTest.java | 564 ++++++++++-------- .../cluster/raft/SnapshotManagerTest.java | 36 +- 5 files changed, 419 insertions(+), 282 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 77875a567e..970fb8fc71 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -114,7 +114,7 @@ public class SnapshotManager implements SnapshotState { @Override public boolean isCapturing() { - return false; + return true; } @Override @@ -188,6 +188,11 @@ public class SnapshotManager implements SnapshotState { private class Idle extends AbstractSnapshotState { + @Override + public boolean isCapturing() { + return false; + } + private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) { TermInformationReader lastAppliedTermInfoReader = lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(), @@ -270,11 +275,6 @@ public class SnapshotManager implements SnapshotState { private class Creating extends AbstractSnapshotState { - @Override - public boolean isCapturing() { - return true; - } - @Override public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) { // create a snapshot object from the state provided and save it @@ -287,21 +287,32 @@ public class SnapshotManager implements SnapshotState { context.getPersistenceProvider().saveSnapshot(snapshot); - LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), snapshot.getLogMessage()); + LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot.getLogMessage()); long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; - if (context.getReplicatedLog().dataSize() > dataThreshold) { + boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold; + boolean logSizeExceededSnapshotBatchCount = + context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount(); +LOG.debug("Log size: {}, getSnapshotBatchCount: {}",context.getReplicatedLog().size(),context.getConfigParams().getSnapshotBatchCount()); + if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) { if(LOG.isDebugEnabled()) { - LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}", - persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold, - captureSnapshot.getLastAppliedIndex()); + if(dataSizeThresholdExceeded) { + LOG.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}", + context.getId(), context.getReplicatedLog().dataSize(), dataThreshold, + captureSnapshot.getLastAppliedIndex()); + } else { + LOG.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}", + context.getId(), context.getReplicatedLog().size(), + context.getConfigParams().getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex()); + } } - // if memory is less, clear the log based on lastApplied. - // this could/should only happen if one of the followers is down - // as normally we keep removing from the log when its replicated to all. + // We either exceeded the memory threshold or the log size exceeded the snapshot batch + // count so, to keep the log memory footprint in check, clear the log based on lastApplied. + // This could/should only happen if one of the followers is down as normally we keep + // removing from the log as entries are replicated to all. context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); @@ -326,9 +337,9 @@ public class SnapshotManager implements SnapshotState { context.getReplicatedLog().getSnapshotTerm()); } - LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + - "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " + + "and term: {}", context.getId(), context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm()); if (context.getId().equals(currentBehavior.getLeaderId()) && captureSnapshot.isInstallSnapshotInitiated()) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 74440b5c24..7cd8936912 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -178,7 +178,9 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest protected long initialTerm = 5; protected long currentTerm; - protected List expSnapshotState = new ArrayList<>(); + protected int snapshotBatchCount = 4; + + protected List expSnapshotState = new ArrayList<>(); @After public void tearDown() { @@ -191,7 +193,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(1); - configParams.setSnapshotBatchCount(4); + configParams.setSnapshotBatchCount(snapshotBatchCount); configParams.setSnapshotDataThresholdPercentage(70); configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); return configParams; @@ -251,7 +253,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest @SuppressWarnings("unchecked") protected void verifySnapshot(String prefix, Snapshot snapshot, long lastAppliedTerm, - int lastAppliedIndex, long lastTerm, long lastIndex) + long lastAppliedIndex, long lastTerm, long lastIndex) throws Exception { assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm()); assertEquals(prefix + " Snapshot getLastAppliedIndex", lastAppliedIndex, snapshot.getLastAppliedIndex()); @@ -259,7 +261,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest assertEquals(prefix + " Snapshot getLastIndex", lastIndex, snapshot.getLastIndex()); List actualState = (List)MockRaftActor.toObject(snapshot.getState()); - assertEquals(prefix + " Snapshot getState size", expSnapshotState.size(), actualState.size()); + assertEquals(String.format("%s Snapshot getState size. Expected %s: . Actual: %s", prefix, expSnapshotState, + actualState), expSnapshotState.size(), actualState.size()); for(int i = 0; i < expSnapshotState.size(); i++) { assertEquals(prefix + " Snapshot state " + i, expSnapshotState.get(i), actualState.get(i)); } @@ -309,4 +312,32 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest protected String testActorPath(String id){ return "akka://test/user" + id; } + + protected void verifyLeadersTrimmedLog(long lastIndex) { + verifyTrimmedLog("Leader", leaderActor, lastIndex, lastIndex - 1); + } + + protected void verifyLeadersTrimmedLog(long lastIndex, long replicatedToAllIndex) { + verifyTrimmedLog("Leader", leaderActor, lastIndex, replicatedToAllIndex); + } + + protected void verifyFollowersTrimmedLog(int num, TestActorRef actorRef, long lastIndex) { + verifyTrimmedLog("Follower " + num, actorRef, lastIndex, lastIndex - 1); + } + + protected void verifyTrimmedLog(String name, TestActorRef actorRef, long lastIndex, + long replicatedToAllIndex) { + TestRaftActor actor = actorRef.underlyingActor(); + RaftActorContext context = actor.getRaftActorContext(); + long snapshotIndex = lastIndex - 1; + assertEquals(name + " snapshot term", snapshotIndex < 0 ? -1 : currentTerm, + context.getReplicatedLog().getSnapshotTerm()); + assertEquals(name + " snapshot index", snapshotIndex, context.getReplicatedLog().getSnapshotIndex()); + assertEquals(name + " journal log size", 1, context.getReplicatedLog().size()); + assertEquals(name + " journal last index", lastIndex, context.getReplicatedLog().lastIndex()); + assertEquals(name + " commit index", lastIndex, context.getCommitIndex()); + assertEquals(name + " last applied", lastIndex, context.getLastApplied()); + assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex, + actor.getCurrentBehavior().getReplicatedToAllIndex()); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index eb70c8b028..3275737cf7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -1,9 +1,9 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; @@ -582,7 +582,7 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory()); - assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader); @@ -686,7 +686,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); + assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing()); // The commit is needed to complete the snapshot creation process followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower); @@ -787,7 +787,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("foo-3"), new MockRaftActorContext.MockPayload("foo-4"))); leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); - assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size()); @@ -831,15 +831,16 @@ public class RaftActorTest extends AbstractActorTest { leaderActor.setCurrentBehavior(leader); assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state()); - // Persist another entry (this will cause a CaptureSnapshot to be triggered - leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh")); + // Simulate an install snaphost to a follower. + leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall( + leaderActor.getReplicatedLog().last(), -1, "member1"); // Now send a CaptureSnapshotReply mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); // Trimming log in this scenario is a no-op assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(-1, leader.getReplicatedToAllIndex()); }}; @@ -882,7 +883,7 @@ public class RaftActorTest extends AbstractActorTest { // Trimming log in this scenario is a no-op assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex()); - assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); + assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing()); assertEquals(3, leader.getReplicatedToAllIndex()); }}; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index 6c258d33c8..89d69886ed 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -38,15 +38,7 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; */ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest { - private MockPayload payload9; - private MockPayload payload11; - private MockPayload payload12; - private MockPayload payload13; - - @Test - public void runTest() throws Exception { - testLog.info("testReplicationAndSnapshotsWithLaggingFollower starting"); - + private void setup() { leaderId = factory.generateActorId("leader"); follower1Id = factory.generateActorId("follower"); follower2Id = factory.generateActorId("follower"); @@ -87,32 +79,17 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A follower2CollectorActor = follower2Actor.underlyingActor().collectorActor(); testLog.info("Leader created and elected"); - - testInitialReplications(); - - testSubsequentReplicationsAndSnapshots(); - - testLeaderSnapshotTriggeredByMemoryThresholdExceeded(); - - testInstallSnapshotToLaggingFollower(); - - verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot(); - - testFinalReplicationsAndSnapshot(); - - testLeaderReinstatement(); - - testLog.info("testReplicationAndSnapshotsWithLaggingFollower ending"); } /** - * Send 3 payload instances with follower 2 temporarily lagging. - * - * @throws Exception + * Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets + * caught up via AppendEntries. */ - private void testInitialReplications() throws Exception { + @Test + public void testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception { + testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries starting: sending 2 new payloads"); - testLog.info("testInitialReplications starting: sending 2 new payloads"); + setup(); // Simulate lagging by dropping AppendEntries messages in follower 2. follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); @@ -120,19 +97,16 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Send the payloads. MockPayload payload0 = sendPayloadData(leaderActor, "zero"); MockPayload payload1 = sendPayloadData(leaderActor, "one"); - MockPayload payload2 = sendPayloadData(leaderActor, "two"); // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond. - List applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3); + List applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2); verifyApplyState(applyStates.get(0), leaderCollectorActor, payload0.toString(), currentTerm, 0, payload0); verifyApplyState(applyStates.get(1), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1); - verifyApplyState(applyStates.get(2), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2); // Verify follower 1 applies each log entry. - applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3); + applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2); verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0); verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1); - verifyApplyState(applyStates.get(2), null, null, currentTerm, 2, payload2); // Ensure there's at least 1 more heartbeat. MessageCollectorActor.clearMessages(leaderCollectorActor); @@ -142,37 +116,28 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // been replicated to follower 2. assertEquals("Leader snapshot term", -1, leaderContext.getReplicatedLog().getSnapshotTerm()); assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader commit index", 2, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 2, leaderContext.getLastApplied()); + assertEquals("Leader journal log size", 2, leaderContext.getReplicatedLog().size()); + assertEquals("Leader journal last index", 1, leaderContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 1, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 1, leaderContext.getLastApplied()); assertEquals("Leader replicatedToAllIndex", -1, leader.getReplicatedToAllIndex()); - testLog.info("Step 3: new entries applied - re-enabling follower {}", follower2Id); + testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries: new entries applied - resuming follower {}", follower2Id); // Now stop dropping AppendEntries in follower 2. follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); // Verify follower 2 applies each log entry. - applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3); + applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 2); verifyApplyState(applyStates.get(0), null, null, currentTerm, 0, payload0); verifyApplyState(applyStates.get(1), null, null, currentTerm, 1, payload1); - verifyApplyState(applyStates.get(2), null, null, currentTerm, 2, payload2); // Ensure there's at least 1 more heartbeat. MessageCollectorActor.clearMessages(leaderCollectorActor); MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class); // The leader should now have performed fake snapshots to trim the log. - assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader commit index", 2, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 2, leaderContext.getLastApplied()); - // Note - replicatedToAllIndex always lags 1 behind last applied since it trims the log up to the - // last applied index. The next entry successfully replicated to followers woild advance it. - assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex()); + verifyLeadersTrimmedLog(1); // Even though follower 2 lagged behind, the leader should not have tried to install a snapshot // to catch it up because no snapshotting was done so the follower's next index was present in the log. @@ -180,83 +145,91 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A InstallSnapshot.class); Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot); - MessageCollectorActor.clearMessages(leaderCollectorActor); - MessageCollectorActor.clearMessages(follower1CollectorActor); - MessageCollectorActor.clearMessages(follower2CollectorActor); - - expSnapshotState.add(payload0); - expSnapshotState.add(payload1); - expSnapshotState.add(payload2); - - testLog.info("testInitialReplications complete"); + testLog.info("testReplicationsWithLaggingFollowerCaughtUpViaAppendEntries complete"); } /** - * Send 5 more payloads with follower 2 lagging. Since the snapshotBatch count is 4, this should cause - * 2 leader snapshots and follower 2's log will be behind by 5 entries. + * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2 + * lagging but not enough for the leader to trim its log from the last applied index. Follower 2's log + * will be behind by several entries and, when it is resumed, it should be caught up via AppendEntries + * sent by the leader. * * @throws Exception */ - private void testSubsequentReplicationsAndSnapshots() throws Exception { - testLog.info("testSubsequentReplicationsAndSnapshots starting: sending first payload, replicatedToAllIndex: {}", - leader.getReplicatedToAllIndex()); + @Test + public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries() throws Exception { + testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries starting"); + + setup(); + + sendInitialPayloadsReplicatedToAllFollowers("zero", "one"); + // Configure follower 2 to drop messages and lag. follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); - // Send the first payload - this should cause the first snapshot. + // Send the first payload and verify it gets applied by the leader and follower 1. + MockPayload payload2 = sendPayloadData(leaderActor, "two"); + + ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class); + verifyApplyState(applyState, leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2); + + applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class); + verifyApplyState(applyState, null, null, currentTerm, 2, payload2); + + expSnapshotState.add(payload2); + + MessageCollectorActor.clearMessages(leaderCollectorActor); + MessageCollectorActor.clearMessages(follower1CollectorActor); + + // Send another payload - this should cause a snapshot due to snapshotBatchCount reached. MockPayload payload3 = sendPayloadData(leaderActor, "three"); MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); - expSnapshotState.add(payload3); + testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 2 more payloads"); - testLog.info("testSubsequentReplicationsAndSnapshots: sending 4 more payloads"); - - // Send the next 4. The last one should cause the second snapshot. + // Send 2 more payloads - not enough to trigger another snapshot. MockPayload payload4 = sendPayloadData(leaderActor, "four"); MockPayload payload5 = sendPayloadData(leaderActor, "five"); - MockPayload payload6 = sendPayloadData(leaderActor, "six"); - MockPayload payload7 = sendPayloadData(leaderActor, "seven"); // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond. - List applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5); + List applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3); verifyApplyState(applyStates.get(0), leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3); verifyApplyState(applyStates.get(1), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4); verifyApplyState(applyStates.get(2), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5); - verifyApplyState(applyStates.get(3), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6); - verifyApplyState(applyStates.get(4), leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7); // Verify follower 1 applies each log entry. - applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 5); + applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3); verifyApplyState(applyStates.get(0), null, null, currentTerm, 3, payload3); verifyApplyState(applyStates.get(1), null, null, currentTerm, 4, payload4); verifyApplyState(applyStates.get(2), null, null, currentTerm, 5, payload5); - verifyApplyState(applyStates.get(3), null, null, currentTerm, 6, payload6); - verifyApplyState(applyStates.get(4), null, null, currentTerm, 7, payload7); - // Wait for snapshot completion. - MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); - - // The first new entry applied should have caused the leader to advanced the snapshot index to the - // last previously applied index (2) that was replicated to all followers. + // The snapshot should have caused the leader to advanced the snapshot index to the + // last previously applied index (1) that was replicated to all followers at the time of capture. + // Note: since the log size (3) did not exceed the snapshot batch count (4), the leader should not + // have trimmed the log to the last index actually applied (5). assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 2, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 5, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader commit index", 7, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 7, leaderContext.getLastApplied()); - assertEquals("Leader replicatedToAllIndex", 2, leader.getReplicatedToAllIndex()); + assertEquals("Leader snapshot index", 1, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader journal log size", 4, leaderContext.getReplicatedLog().size()); + assertEquals("Leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 5, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", 5, leaderContext.getLastApplied()); + assertEquals("Leader replicatedToAllIndex", 1, leader.getReplicatedToAllIndex()); // Now stop dropping AppendEntries in follower 2. follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); - // Verify follower 2 applies each log entry. - applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 5); - verifyApplyState(applyStates.get(0), null, null, currentTerm, 3, payload3); - verifyApplyState(applyStates.get(1), null, null, currentTerm, 4, payload4); - verifyApplyState(applyStates.get(2), null, null, currentTerm, 5, payload5); - verifyApplyState(applyStates.get(3), null, null, currentTerm, 6, payload6); - verifyApplyState(applyStates.get(4), null, null, currentTerm, 7, payload7); + // Verify follower 2 applies each log entry. The leader should not install a snapshot b/c + // follower 2's next index (3) is still present in the log. + applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 4); + verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2); + verifyApplyState(applyStates.get(1), null, null, currentTerm, 3, payload3); + verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4); + verifyApplyState(applyStates.get(3), null, null, currentTerm, 5, payload5); + + // Verify the leader did not try to install a snapshot to catch up follower 2. + InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor, InstallSnapshot.class); + Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot); // Ensure there's at least 1 more heartbeat. MessageCollectorActor.clearMessages(leaderCollectorActor); @@ -264,72 +237,131 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // The leader should now have performed fake snapshots to advance the snapshot index and to trim // the log. In addition replicatedToAllIndex should've advanced. - assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 6, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 7, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader replicatedToAllIndex", 6, leader.getReplicatedToAllIndex()); + verifyLeadersTrimmedLog(5); // Verify the leader's persisted snapshot. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 3, currentTerm, 7); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 2, currentTerm, 3); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); - assertEquals("Persisted Snapshot getUnAppliedEntries size", 4, unAppliedEntry.size()); - verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4); - verifyReplicatedLogEntry(unAppliedEntry.get(1), currentTerm, 5, payload5); - verifyReplicatedLogEntry(unAppliedEntry.get(2), currentTerm, 6, payload6); - verifyReplicatedLogEntry(unAppliedEntry.get(3), currentTerm, 7, payload7); - - // Even though follower 2's log was behind by 5 entries and 2 snapshots were done, the leader - // should not have tried to install a snapshot to catch it up because replicatedToAllIndex was also - // behind. Instead of installing a snapshot the leader would've sent AppendEntries with the log entries. - InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(follower2CollectorActor, InstallSnapshot.class); - Assert.assertNull("Follower 2 received unexpected InstallSnapshot", installSnapshot); + assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); + verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 3, payload3); // Verify follower 1's log and snapshot indexes. MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class); - assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm()); - assertEquals("Follower 1 snapshot index", 6, follower1Context.getReplicatedLog().getSnapshotIndex()); - assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size()); - assertEquals("Follower 1 journal last index", 7, follower1Context.getReplicatedLog().lastIndex()); - assertEquals("Follower 1 commit index", 7, follower1Context.getCommitIndex()); - assertEquals("Follower 1 last applied", 7, follower1Context.getLastApplied()); - assertEquals("Follower 1 replicatedToAllIndex", 6, follower1.getReplicatedToAllIndex()); + verifyFollowersTrimmedLog(1, follower1Actor, 5); // Verify follower 2's log and snapshot indexes. MessageCollectorActor.clearMessages(follower2CollectorActor); MessageCollectorActor.expectFirstMatching(follower2CollectorActor, AppendEntries.class); - assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm()); - assertEquals("Follower 2 snapshot index", 6, follower2Context.getReplicatedLog().getSnapshotIndex()); - assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size()); - assertEquals("Follower 2 journal last index", 7, follower2Context.getReplicatedLog().lastIndex()); - assertEquals("Follower 2 commit index", 7, follower2Context.getCommitIndex()); - assertEquals("Follower 2 last applied", 7, follower2Context.getLastApplied()); - assertEquals("Follower 2 replicatedToAllIndex", 6, follower2.getReplicatedToAllIndex()); + verifyFollowersTrimmedLog(2, follower2Actor, 5); MessageCollectorActor.clearMessages(leaderCollectorActor); MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + expSnapshotState.add(payload3); + expSnapshotState.add(payload4); + expSnapshotState.add(payload5); + + testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries complete"); + } + + /** + * Send payloads to trigger a leader snapshot due to snapshotBatchCount reached with follower 2 + * lagging where the leader trims its log from the last applied index. Follower 2's log + * will be behind by several entries and, when it is resumed, it should be caught up via a snapshot + * installed by the leader. + * + * @throws Exception + */ + @Test + public void testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot() throws Exception { + testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot starting"); + + setup(); + + sendInitialPayloadsReplicatedToAllFollowers("zero", "one"); + + // Configure follower 2 to drop messages and lag. + follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); + + // Send 5 payloads - the second should cause a leader snapshot. + MockPayload payload2 = sendPayloadData(leaderActor, "two"); + MockPayload payload3 = sendPayloadData(leaderActor, "three"); + MockPayload payload4 = sendPayloadData(leaderActor, "four"); + MockPayload payload5 = sendPayloadData(leaderActor, "five"); + MockPayload payload6 = sendPayloadData(leaderActor, "six"); + + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond. + List applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5); + verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2); + verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4); + verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6); + + MessageCollectorActor.clearMessages(leaderCollectorActor); + + testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 1 more payload to trigger second snapshot"); + + // Send another payload to trigger a second leader snapshot. + MockPayload payload7 = sendPayloadData(leaderActor, "seven"); + + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class); + verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7); + + // Verify follower 1 applies each log entry. + applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6); + verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2); + verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4); + verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7); + + // The snapshot should have caused the leader to advanced the snapshot index to the leader's last + // applied index (6) since the log size should have exceed the snapshot batch count (4). + // replicatedToAllIndex should remain at 1 since follower 2 is lagging. + verifyLeadersTrimmedLog(7, 1); + + expSnapshotState.add(payload2); + expSnapshotState.add(payload3); expSnapshotState.add(payload4); expSnapshotState.add(payload5); expSnapshotState.add(payload6); + + // Verify the leader's persisted snapshot. + List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); + assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 6, currentTerm, 7); + List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); + assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); + verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 7, payload7); + expSnapshotState.add(payload7); - testLog.info("testSubsequentReplicationsAndSnapshots complete"); + verifyInstallSnapshotToLaggingFollower(7); + + testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete"); } /** - * Send a couple more payloads with follower 2 lagging. The last payload will have a large enough size - * to trigger a leader snapshot. + * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a + * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will + * be behind by several entries and, when it is resumed, it should be caught up via a snapshot installed + * by the leader. * * @throws Exception */ - private void testLeaderSnapshotTriggeredByMemoryThresholdExceeded() throws Exception { - testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceeded starting: sending 3 payloads, replicatedToAllIndex: {}", - leader.getReplicatedToAllIndex()); + @Test + public void testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower() throws Exception { + testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceededWithLaggingFollower starting"); + + snapshotBatchCount = 5; + setup(); + + sendInitialPayloadsReplicatedToAllFollowers("zero"); leaderActor.underlyingActor().setMockTotalMemory(1000); @@ -339,11 +371,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A follower2Actor.underlyingActor().startDropMessages(AppendEntries.class); // Send a payload with a large relative size but not enough to trigger a snapshot. - MockPayload payload8 = sendPayloadData(leaderActor, "eight", 500); + MockPayload payload1 = sendPayloadData(leaderActor, "one", 500); // Verify the leader got consensus and applies the first log entry even though follower 2 didn't respond. List applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 1); - verifyApplyState(applyStates.get(0), leaderCollectorActor, payload8.toString(), currentTerm, 8, payload8); + verifyApplyState(applyStates.get(0), leaderCollectorActor, payload1.toString(), currentTerm, 1, payload1); // Wait for all the ReplicatedLogImplEntry and ApplyJournalEntries messages to be added to the journal // before the snapshot so the snapshot sequence # will be higher to ensure the snapshot gets @@ -354,20 +386,20 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A CaptureSnapshot captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class); Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot); - expSnapshotState.add(payload8); + expSnapshotState.add(payload1); // Send another payload with a large enough relative size in combination with the last payload // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot. - payload9 = sendPayloadData(leaderActor, "nine", 201); + MockPayload payload2 = sendPayloadData(leaderActor, "two", 201); // Verify the leader applies the last log entry. applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2); - verifyApplyState(applyStates.get(1), leaderCollectorActor, payload9.toString(), currentTerm, 9, payload9); + verifyApplyState(applyStates.get(1), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2); // Verify follower 1 applies each log entry. applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2); - verifyApplyState(applyStates.get(0), null, null, currentTerm, 8, payload8); - verifyApplyState(applyStates.get(1), null, null, currentTerm, 9, payload9); + verifyApplyState(applyStates.get(0), null, null, currentTerm, 1, payload1); + verifyApplyState(applyStates.get(1), null, null, currentTerm, 2, payload2); // A snapshot should've occurred - wait for it to complete. MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); @@ -375,23 +407,37 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Because the snapshot was triggered by exceeding the memory threshold the leader should've advanced // the snapshot index to the last applied index and trimmed the log even though the entries weren't // replicated to all followers. - assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 8, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 9, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader commit index", 9, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 9, leaderContext.getLastApplied()); - // Note: replicatedToAllIndex should not be advanced since log entries 8 and 9 haven't yet been - // replicated to follower 2. - assertEquals("Leader replicatedToAllIndex", 7, leader.getReplicatedToAllIndex()); + verifyLeadersTrimmedLog(2, 0); // Verify the leader's persisted snapshot. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); - verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 8, currentTerm, 9); + verifySnapshot("Persisted", persistedSnapshots.get(0), currentTerm, 1, currentTerm, 2); List unAppliedEntry = persistedSnapshots.get(0).getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); - verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 9, payload9); + verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 2, payload2); + + expSnapshotState.add(payload2); + + verifyInstallSnapshotToLaggingFollower(2L); + + // Sends a payload with index 3. + verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot(); + + // Sends 3 payloads with indexes 4, 5 and 6. + verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot(); + + // Recover the leader from persistence and verify. + long leadersLastIndexOnRecovery = 6; + + // The leader's last snapshot was triggered by index 4 so the last applied index in the snapshot was 3. + long leadersSnapshotIndexOnRecovery = 3; + + // The recovered journal should have 3 entries starting at index 4. + long leadersFirstJournalEntryIndexOnRecovery = 4; + + verifyLeaderRecoveryAfterReinstatement(leadersLastIndexOnRecovery, leadersSnapshotIndexOnRecovery, + leadersFirstJournalEntryIndexOnRecovery); testLog.info("testLeaderSnapshotTriggeredByMemoryThresholdExceeded ending"); } @@ -406,49 +452,31 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A ApplyState applyState; CaptureSnapshot captureSnapshot; - MockPayload payload10 = sendPayloadData(leaderActor, "ten"); + MockPayload payload3 = sendPayloadData(leaderActor, "three"); // Verify the leader applies the state. applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class); - verifyApplyState(applyState, leaderCollectorActor, payload10.toString(), currentTerm, 10, payload10); + verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3); captureSnapshot = MessageCollectorActor.getFirstMatching(leaderCollectorActor, CaptureSnapshot.class); Assert.assertNull("Leader received unexpected CaptureSnapshot", captureSnapshot); // Verify the follower 1 applies the state. applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class); - verifyApplyState(applyState, null, null, currentTerm, 10, payload10); + verifyApplyState(applyState, null, null, currentTerm, 3, payload3); // Verify the follower 2 applies the state. applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class); - verifyApplyState(applyState, null, null, currentTerm, 10, payload10); + verifyApplyState(applyState, null, null, currentTerm, 3, payload3); // Verify the leader's state. - assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 9, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 10, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader commit index", 10, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 10, leaderContext.getLastApplied()); - assertEquals("Leader replicatedToAllIndex", 9, leader.getReplicatedToAllIndex()); + verifyLeadersTrimmedLog(3); // Verify follower 1's state. - assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm()); - assertEquals("Follower 1 snapshot index", 9, follower1Context.getReplicatedLog().getSnapshotIndex()); - assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size()); - assertEquals("Follower 1 journal last index", 10, follower1Context.getReplicatedLog().lastIndex()); - assertEquals("Follower 1 commit index", 10, follower1Context.getCommitIndex()); - assertEquals("Follower 1 last applied", 10, follower1Context.getLastApplied()); - assertEquals("Follower 1 replicatedToAllIndex", 9, follower1.getReplicatedToAllIndex()); + verifyFollowersTrimmedLog(1, follower1Actor, 3); // Verify follower 2's state. - assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm()); - assertEquals("Follower 2 snapshot index", 9, follower2Context.getReplicatedLog().getSnapshotIndex()); - assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size()); - assertEquals("Follower 2 journal last index", 10, follower2Context.getReplicatedLog().lastIndex()); - assertEquals("Follower 2 commit index", 10, follower2Context.getCommitIndex()); - assertEquals("Follower 2 last applied", 10, follower2Context.getLastApplied()); - assertEquals("Follower 2 replicatedToAllIndex", 9, follower2.getReplicatedToAllIndex()); + verifyFollowersTrimmedLog(2, follower2Actor, 3); // Revert back to JVM total memory. leaderActor.underlyingActor().setMockTotalMemory(0); @@ -457,24 +485,22 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); - expSnapshotState.add(payload10); + expSnapshotState.add(payload3); } /** - * Following a snapshot due memory threshold exceeded, resume the lagging follower and verify it receives - * an install snapshot from the leader. + * Resume the lagging follower 2 and verify it receives an install snapshot from the leader. * * @throws Exception */ - private void testInstallSnapshotToLaggingFollower() throws Exception { + private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex) throws Exception { List persistedSnapshots; List unAppliedEntry; - ApplyState applyState; ApplySnapshot applySnapshot; InstallSnapshot installSnapshot; InstallSnapshotReply installSnapshotReply; - expSnapshotState.add(payload9); + testLog.info("testInstallSnapshotToLaggingFollower starting"); // Now stop dropping AppendEntries in follower 2. follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class); @@ -485,7 +511,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex()); assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks()); assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm()); - assertEquals("InstallSnapshot getLastIncludedIndex", 9, installSnapshot.getLastIncludedIndex()); + assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex()); //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray()); installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class); @@ -496,7 +522,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify follower 2 applies the snapshot. applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class); - verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 9, currentTerm, 9); + verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size()); // Wait for the snapshot to complete. @@ -508,12 +534,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // The leader should now have performed fake snapshots to advance the snapshot index and to trim // the log. In addition replicatedToAllIndex should've advanced. - assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 8, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size()); - assertEquals("Leader commit index", 9, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 9, leaderContext.getLastApplied()); - assertEquals("Leader replicatedToAllIndex", 8, leader.getReplicatedToAllIndex()); + verifyLeadersTrimmedLog(lastAppliedIndex); // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from // the snapshot store because the second snapshot was initiated by the follower install snapshot and @@ -525,147 +546,186 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0); Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1); - verifySnapshot("Persisted", persistedSnapshot, currentTerm, 9, currentTerm, 9); + verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex); unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size()); MessageCollectorActor.clearMessages(leaderCollectorActor); MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); + + testLog.info("testInstallSnapshotToLaggingFollower complete"); } /** * Do another round of payloads and snapshot to verify replicatedToAllIndex gets back on track and * snapshots works as expected after doing a follower snapshot. In this step we don't lag a follower. + * * @throws Exception */ - private void testFinalReplicationsAndSnapshot() throws Exception { + private void verifyReplicationsAndSnapshotWithNoLaggingAfterInstallSnapshot() throws Exception { List applyStates; ApplyState applyState; - testLog.info("testFinalReplicationsAndSnapshot starting: replicatedToAllIndex: {}", leader.getReplicatedToAllIndex()); + testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot starting: replicatedToAllIndex: {}", + leader.getReplicatedToAllIndex()); // Send another payload - a snapshot should occur. - payload11 = sendPayloadData(leaderActor, "eleven"); + MockPayload payload4 = sendPayloadData(leaderActor, "four"); // Wait for the snapshot to complete. MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class); - verifyApplyState(applyState, leaderCollectorActor, payload11.toString(), currentTerm, 11, payload11); + verifyApplyState(applyState, leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4); // Verify the leader's last persisted snapshot (previous ones may not be purged yet). List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1); - verifySnapshot("Persisted", persistedSnapshot, currentTerm, 10, currentTerm, 11); + verifySnapshot("Persisted", persistedSnapshot, currentTerm, 3, currentTerm, 4); List unAppliedEntry = persistedSnapshot.getUnAppliedEntries(); assertEquals("Persisted Snapshot getUnAppliedEntries size", 1, unAppliedEntry.size()); - verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 11, payload11); + verifyReplicatedLogEntry(unAppliedEntry.get(0), currentTerm, 4, payload4); // Send a couple more payloads. - payload12 = sendPayloadData(leaderActor, "twelve"); - payload13 = sendPayloadData(leaderActor, "thirteen"); + MockPayload payload5 = sendPayloadData(leaderActor, "five"); + MockPayload payload6 = sendPayloadData(leaderActor, "six"); // Verify the leader applies the 2 log entries. applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3); - verifyApplyState(applyStates.get(1), leaderCollectorActor, payload12.toString(), currentTerm, 12, payload12); - verifyApplyState(applyStates.get(2), leaderCollectorActor, payload13.toString(), currentTerm, 13, payload13); + verifyApplyState(applyStates.get(1), leaderCollectorActor, payload5.toString(), currentTerm, 5, payload5); + verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6); // Verify the leader applies a log entry for at least the last entry index. - verifyApplyJournalEntries(leaderCollectorActor, 13); + verifyApplyJournalEntries(leaderCollectorActor, 6); // Ensure there's at least 1 more heartbeat to trim the log. MessageCollectorActor.clearMessages(leaderCollectorActor); MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class); - // Verify the leader's final snapshot index et al. - assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 12, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 1, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 13, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader commit index", 13, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 13, leaderContext.getLastApplied()); - assertEquals("Leader replicatedToAllIndex", 12, leader.getReplicatedToAllIndex()); + // Verify the leader's final state. + verifyLeadersTrimmedLog(6); InMemoryJournal.dumpJournal(leaderId); - // Verify the leaders's persisted journal log - should only contain the last 2 ReplicatedLogEntries + // Verify the leaders's persisted journal log - it should only contain the last 2 ReplicatedLogEntries // added after the snapshot as the persisted journal should've been purged to the snapshot // sequence number. - verifyPersistedJournal(leaderId, Arrays.asList(new ReplicatedLogImplEntry(12, currentTerm, payload12), - new ReplicatedLogImplEntry(13, currentTerm, payload13))); + verifyPersistedJournal(leaderId, Arrays.asList(new ReplicatedLogImplEntry(5, currentTerm, payload5), + new ReplicatedLogImplEntry(6, currentTerm, payload6))); // Verify the leaders's persisted journal contains an ApplyJournalEntries for at least the last entry index. List persistedApplyJournalEntries = InMemoryJournal.get(leaderId, ApplyJournalEntries.class); boolean found = false; for(ApplyJournalEntries entry: persistedApplyJournalEntries) { - if(entry.getToIndex() == 13) { + if(entry.getToIndex() == 6) { found = true; break; } } - Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 13), found); + Assert.assertTrue(String.format("ApplyJournalEntries with index %d not found in leader's persisted journal", 6), found); - // Verify follower 1 applies the 2 log entries. + // Verify follower 1 applies the 3 log entries. applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3); - verifyApplyState(applyStates.get(0), null, null, currentTerm, 11, payload11); - verifyApplyState(applyStates.get(1), null, null, currentTerm, 12, payload12); - verifyApplyState(applyStates.get(2), null, null, currentTerm, 13, payload13); + verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4); + verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5); + verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6); // Verify follower 1's log state. - assertEquals("Follower 1 snapshot term", currentTerm, follower1Context.getReplicatedLog().getSnapshotTerm()); - assertEquals("Follower 1 snapshot index", 12, follower1Context.getReplicatedLog().getSnapshotIndex()); - assertEquals("Follower 1 journal log size", 1, follower1Context.getReplicatedLog().size()); - assertEquals("Follower 1 journal last index", 13, follower1Context.getReplicatedLog().lastIndex()); - assertEquals("Follower 1 commit index", 13, follower1Context.getCommitIndex()); - assertEquals("Follower 1 last applied", 13, follower1Context.getLastApplied()); - assertEquals("Follower 1 replicatedToAllIndex", 12, follower1.getReplicatedToAllIndex()); - - // Verify follower 2 applies the 2 log entries. + verifyFollowersTrimmedLog(1, follower1Actor, 6); + + // Verify follower 2 applies the 3 log entries. applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3); - verifyApplyState(applyStates.get(0), null, null, currentTerm, 11, payload11); - verifyApplyState(applyStates.get(1), null, null, currentTerm, 12, payload12); - verifyApplyState(applyStates.get(2), null, null, currentTerm, 13, payload13); + verifyApplyState(applyStates.get(0), null, null, currentTerm, 4, payload4); + verifyApplyState(applyStates.get(1), null, null, currentTerm, 5, payload5); + verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6); // Verify follower 2's log state. - assertEquals("Follower 2 snapshot term", currentTerm, follower2Context.getReplicatedLog().getSnapshotTerm()); - assertEquals("Follower 2 snapshot index", 12, follower2Context.getReplicatedLog().getSnapshotIndex()); - assertEquals("Follower 2 journal log size", 1, follower2Context.getReplicatedLog().size()); - assertEquals("Follower 2 journal last index", 13, follower2Context.getReplicatedLog().lastIndex()); - assertEquals("Follower 2 commit index", 13, follower2Context.getCommitIndex()); - assertEquals("Follower 2 last applied", 13, follower2Context.getLastApplied()); - assertEquals("Follower 2 replicatedToAllIndex", 12, follower2.getReplicatedToAllIndex()); - - testLog.info("testFinalReplicationsAndSnapshot ending"); + verifyFollowersTrimmedLog(2, follower2Actor, 6); + + expSnapshotState.add(payload4); + expSnapshotState.add(payload5); + expSnapshotState.add(payload6); + + testLog.info("testReplicationsAndSnapshotAfterInstallSnapshot ending"); } /** * Kill the leader actor, reinstate it and verify the recovered journal. */ - private void testLeaderReinstatement() { + private void verifyLeaderRecoveryAfterReinstatement(long lastIndex, long snapshotIndex, long firstJournalEntryIndex) { testLog.info("testLeaderReinstatement starting"); killActor(leaderActor); leaderActor = newTestRaftActor(leaderId, peerAddresses, leaderConfigParams); + TestRaftActor testRaftActor = leaderActor.underlyingActor(); - leaderActor.underlyingActor().startDropMessages(RequestVoteReply.class); + testRaftActor.startDropMessages(RequestVoteReply.class); - leaderContext = leaderActor.underlyingActor().getRaftActorContext(); + leaderContext = testRaftActor.getRaftActorContext(); - leaderActor.underlyingActor().waitForRecoveryComplete(); + testRaftActor.waitForRecoveryComplete(); + int logSize = (int) (expSnapshotState.size() - firstJournalEntryIndex); assertEquals("Leader snapshot term", currentTerm, leaderContext.getReplicatedLog().getSnapshotTerm()); - assertEquals("Leader snapshot index", 10, leaderContext.getReplicatedLog().getSnapshotIndex()); - assertEquals("Leader journal log size", 3, leaderContext.getReplicatedLog().size()); - assertEquals("Leader journal last index", 13, leaderContext.getReplicatedLog().lastIndex()); - assertEquals("Leader commit index", 13, leaderContext.getCommitIndex()); - assertEquals("Leader last applied", 13, leaderContext.getLastApplied()); - verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(11), currentTerm, 11, payload11); - verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(12), currentTerm, 12, payload12); - verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(13), currentTerm, 13, payload13); + assertEquals("Leader snapshot index", snapshotIndex, leaderContext.getReplicatedLog().getSnapshotIndex()); + assertEquals("Leader journal log size", logSize, leaderContext.getReplicatedLog().size()); + assertEquals("Leader journal last index", lastIndex, leaderContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex()); + assertEquals("Leader last applied", lastIndex, leaderContext.getLastApplied()); + + for(long i = firstJournalEntryIndex; i < expSnapshotState.size(); i++) { + verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(i), currentTerm, i, + expSnapshotState.get((int) i)); + } + + assertEquals("Leader applied state", expSnapshotState, testRaftActor.getState()); testLog.info("testLeaderReinstatement ending"); } + + private void sendInitialPayloadsReplicatedToAllFollowers(String... data) { + + // Send the payloads. + for(String d: data) { + expSnapshotState.add(sendPayloadData(leaderActor, d)); + } + + int nEntries = data.length; + + // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond. + List applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, nEntries); + for(int i = 0; i < expSnapshotState.size(); i++) { + MockPayload payload = expSnapshotState.get(i); + verifyApplyState(applyStates.get(i), leaderCollectorActor, payload.toString(), currentTerm, i, payload); + } + + // Verify follower 1 applies each log entry. + applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, nEntries); + for(int i = 0; i < expSnapshotState.size(); i++) { + MockPayload payload = expSnapshotState.get(i); + verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload); + } + + // Verify follower 2 applies each log entry. + applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, nEntries); + for(int i = 0; i < expSnapshotState.size(); i++) { + MockPayload payload = expSnapshotState.get(i); + verifyApplyState(applyStates.get(i), null, null, currentTerm, i, payload); + } + + // Ensure there's at least 1 more heartbeat. + MessageCollectorActor.clearMessages(leaderCollectorActor); + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, AppendEntriesReply.class); + + // The leader should have performed fake snapshots to trim the log to the last index replicated to + // all followers. + verifyLeadersTrimmedLog(nEntries - 1); + + MessageCollectorActor.clearMessages(leaderCollectorActor); + MessageCollectorActor.clearMessages(follower1CollectorActor); + MessageCollectorActor.clearMessages(follower2CollectorActor); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index 1829a2d570..023f60909e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -67,6 +67,7 @@ public class SnapshotManagerTest extends AbstractActorTest { doReturn(new HashMap<>()).when(mockRaftActorContext).getPeerAddresses(); doReturn(mockConfigParams).when(mockRaftActorContext).getConfigParams(); doReturn(10L).when(mockConfigParams).getSnapshotBatchCount(); + doReturn(70).when(mockConfigParams).getSnapshotDataThresholdPercentage(); doReturn(mockReplicatedLog).when(mockRaftActorContext).getReplicatedLog(); doReturn("123").when(mockRaftActorContext).getId(); doReturn(mockDataPersistenceProvider).when(mockRaftActorContext).getPersistenceProvider(); @@ -257,7 +258,6 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockRaftActorBehavior).setReplicatedToAllIndex(9); } - @Test public void testPersistWhenReplicatedLogDataSizeGreaterThanThreshold(){ doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize(); @@ -271,6 +271,34 @@ public class SnapshotManagerTest extends AbstractActorTest { verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior, never()).setReplicatedToAllIndex(anyLong()); + } + + @Test + public void testPersistWhenReplicatedLogSizeExceedsSnapshotBatchCount() { + doReturn(10L).when(mockReplicatedLog).size(); // matches snapshotBatchCount + doReturn(100).when(mockReplicatedLog).dataSize(); + + doReturn(5L).when(mockReplicatedLog).getSnapshotIndex(); + doReturn(5L).when(mockReplicatedLog).getSnapshotTerm(); + + long replicatedToAllIndex = 1; + ReplicatedLogEntry replicatedLogEntry = mock(ReplicatedLogEntry.class); + doReturn(replicatedLogEntry).when(mockReplicatedLog).get(replicatedToAllIndex); + doReturn(6L).when(replicatedLogEntry).getTerm(); + doReturn(replicatedToAllIndex).when(replicatedLogEntry).getIndex(); + + snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(6, 9, + new MockRaftActorContext.MockPayload()), replicatedToAllIndex); + + snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, 2000000L); + + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); + + verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); + + verify(mockRaftActorBehavior).setReplicatedToAllIndex(replicatedToAllIndex); } @Test @@ -287,6 +315,8 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.persist(bytes, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + assertEquals(true, snapshotManager.isCapturing()); + verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class)); verify(mockReplicatedLog).snapshotPreCommit(9L, 6L); @@ -340,8 +370,12 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.persist(new byte[]{}, mockRaftActorBehavior, Runtime.getRuntime().totalMemory()); + assertEquals(true, snapshotManager.isCapturing()); + snapshotManager.commit(100L, mockRaftActorBehavior); + assertEquals(false, snapshotManager.isCapturing()); + verify(mockReplicatedLog).snapshotCommit(); verify(mockDataPersistenceProvider).deleteMessages(50L); -- 2.36.6