From ed7166c5ba3a378f46640bb479464580c2b167ba Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 10 Jun 2015 02:17:21 -0400 Subject: [PATCH] Bug 3570: Use SnapShot lastAppliedIndex for install snapshot Follow-up patch for https://git.opendaylight.org/gerrit/#/c/21904/ to use the captured Snapshot's lastAppliedIndex for the lastIncludedIndex field in the InstallSnapshot message and the follower's matchIndex/nextIndex once the install completes. This is in lieu of using the leader's snapshotIndex which typically lags behind the lastAppliedIndex by 1 due to the trimming of the in-memory log. This avoids the leader sending its last log entry redundantly after the install completes as the last entry was included in the snapshot. Change-Id: Ie821078b4316641b67e1b853b9264353dde6bfae Signed-off-by: Tom Pantelis (cherry picked from commit 1dfb0b9105e9eb352ff2263434e79a5433e59e91) --- .../cluster/raft/SnapshotManager.java | 10 +- .../base/messages/SendInstallSnapshot.java | 8 +- .../raft/behaviors/AbstractLeader.java | 53 ++++++--- .../cluster/raft/RecoveryIntegrationTest.java | 12 +-- ...otsWithLaggingFollowerIntegrationTest.java | 8 +- .../cluster/raft/SnapshotManagerTest.java | 4 +- .../cluster/raft/behaviors/LeaderTest.java | 102 +++++++++++------- 7 files changed, 122 insertions(+), 75 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index f1881f5b0f..77875a567e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft; import akka.japi.Procedure; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; import java.util.List; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -281,14 +280,14 @@ public class SnapshotManager implements SnapshotState { // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. - Snapshot sn = Snapshot.create(snapshotBytes, + Snapshot snapshot = Snapshot.create(snapshotBytes, captureSnapshot.getUnAppliedEntries(), captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - context.getPersistenceProvider().saveSnapshot(sn); + context.getPersistenceProvider().saveSnapshot(snapshot); - LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage()); + LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), snapshot.getLogMessage()); long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100; @@ -334,8 +333,7 @@ public class SnapshotManager implements SnapshotState { if (context.getId().equals(currentBehavior.getLeaderId()) && captureSnapshot.isInstallSnapshotInitiated()) { // this would be call straight to the leader and won't initiate in serialization - currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot( - ByteString.copyFrom(snapshotBytes))); + currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot)); } captureSnapshot = null; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java index 83c85d9135..21c65e6037 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java @@ -8,16 +8,16 @@ package org.opendaylight.controller.cluster.raft.base.messages; -import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.raft.Snapshot; public class SendInstallSnapshot { - private ByteString snapshot; + private final Snapshot snapshot; - public SendInstallSnapshot(ByteString snapshot) { + public SendInstallSnapshot(Snapshot snapshot) { this.snapshot = snapshot; } - public ByteString getSnapshot() { + public Snapshot getSnapshot() { return snapshot; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index de1e1d11ff..94fa507b47 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -33,6 +34,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -88,7 +90,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected final int minIsolatedLeaderPeerCount; - private Optional snapshot; + private Optional snapshot; public AbstractLeader(RaftActorContext context) { super(context, RaftState.Leader); @@ -140,8 +142,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - void setSnapshot(Optional snapshot) { - this.snapshot = snapshot; + void setSnapshot(@Nullable Snapshot snapshot) { + if(snapshot != null) { + this.snapshot = Optional.of(new SnapshotHolder(snapshot)); + } else { + this.snapshot = Optional.absent(); + } } @Override @@ -339,7 +345,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else if(message instanceof SendInstallSnapshot) { // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + setSnapshot(((SendInstallSnapshot) message).getSnapshot()); sendInstallSnapshot(); } else if (message instanceof Replicate) { @@ -382,10 +388,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ); } - followerLogInformation.setMatchIndex( - context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation.setNextIndex( - context.getReplicatedLog().getSnapshotIndex() + 1); + long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + followerLogInformation.setMatchIndex(followerMatchIndex); + followerLogInformation.setNextIndex(followerMatchIndex + 1); mapFollowerToSnapshot.remove(followerId); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", @@ -395,7 +400,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory - setSnapshot(Optional.absent()); + setSnapshot(null); } wasLastChunk = true; @@ -618,7 +623,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { - ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. @@ -626,8 +631,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), + snapshot.get().getLastIncludedIndex(), + snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, followerToSnapshot.incrementChunkIndex(), followerToSnapshot.getTotalChunks(), @@ -871,4 +876,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public int followerLogSize() { return followerToLog.size(); } + + private static class SnapshotHolder { + private final long lastIncludedTerm; + private final long lastIncludedIndex; + private final ByteString snapshotBytes; + + SnapshotHolder(Snapshot snapshot) { + this.lastIncludedTerm = snapshot.getLastAppliedTerm(); + this.lastIncludedIndex = snapshot.getLastAppliedIndex(); + this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + } + + long getLastIncludedTerm() { + return lastIncludedTerm; + } + + long getLastIncludedIndex() { + return lastIncludedIndex; + } + + ByteString getSnapshotBytes() { + return snapshotBytes; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java index 9fe03f9673..b264739bd1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java @@ -20,7 +20,6 @@ import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -182,16 +181,11 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { // Wait for the follower to persist the snapshot. MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class); - // The last applied entry on the leader is included in the snapshot but is also sent in a subsequent - // AppendEntries because the InstallSnapshot message lastIncludedIndex field is set to the leader's - // snapshotIndex and not the actual last index included in the snapshot. - // FIXME? - is this OK? - MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class); - List expFollowerState = Arrays.asList(payload0, payload1, payload2, payload2); + List expFollowerState = Arrays.asList(payload0, payload1, payload2); assertEquals("Follower commit index", 2, follower2Context.getCommitIndex()); assertEquals("Follower last applied", 2, follower2Context.getLastApplied()); - assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex()); + assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex()); assertEquals("Follower state", expFollowerState, follower2Underlying.getState()); killActor(follower2Actor); @@ -205,7 +199,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest { assertEquals("Follower commit index", 2, follower2Context.getCommitIndex()); assertEquals("Follower last applied", 2, follower2Context.getLastApplied()); - assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex()); + assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex()); assertEquals("Follower state", expFollowerState, follower2Underlying.getState()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index 0a6082d60d..6c258d33c8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -485,7 +485,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex()); assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks()); assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm()); - assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex()); + assertEquals("InstallSnapshot getLastIncludedIndex", 9, installSnapshot.getLastIncludedIndex()); //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray()); installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class); @@ -496,13 +496,9 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // Verify follower 2 applies the snapshot. applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class); - verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8); + verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 9, currentTerm, 9); assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size()); - // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot. - applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class); - verifyApplyState(applyState, null, null, currentTerm, 9, payload9); - // Wait for the snapshot to complete. MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index d94eb6b041..1829a2d570 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -298,7 +298,7 @@ public class SnapshotManagerTest extends AbstractActorTest { SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue(); - assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray())); + assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState())); } @Test @@ -609,4 +609,4 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals("getTerm", -1L, reader.getTerm()); assertEquals("getIndex", -1L, reader.getIndex()); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 07548d6a92..89b83b3b36 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -9,10 +9,10 @@ import akka.actor.Props; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -439,8 +440,8 @@ public class LeaderTest extends AbstractLeaderTest { //clears leaders log actorContext.getReplicatedLog().removeFrom(0); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int newEntryIndex = 4; final int snapshotTerm = 1; final int currentTerm = 2; @@ -448,12 +449,15 @@ public class LeaderTest extends AbstractLeaderTest { // set the snapshot variables in replicatedlog actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); //set follower timeout to 2 mins, helps during debugging actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10)); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + // new entry ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, @@ -463,7 +467,8 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive(FOLLOWER_ID); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm)); FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); leader.setFollowerSnapshot(FOLLOWER_ID, fts); @@ -489,7 +494,7 @@ public class LeaderTest extends AbstractLeaderTest { InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); - assertEquals(snapshotIndex, is.getLastIncludedIndex()); + assertEquals(commitIndex, is.getLastIncludedIndex()); } @Test @@ -573,7 +578,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(Optional.absent()); + leader.setSnapshot(null); // new entry ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, @@ -616,8 +621,8 @@ public class LeaderTest extends AbstractLeaderTest { //clears leaders log actorContext.getReplicatedLog().removeFrom(0); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int lastAppliedIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -625,15 +630,22 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(lastAppliedIndex); + actorContext.setLastApplied(lastAppliedIndex); leader = new Leader(actorContext); - // Ignore initial heartbeat. + // Initial heartbeat. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new SendInstallSnapshot(toByteString(leadersSnapshot))); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + + Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), + Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); assertTrue(raftBehavior instanceof Leader); @@ -642,7 +654,7 @@ public class LeaderTest extends AbstractLeaderTest { InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertNotNull(installSnapshot.getData()); - assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex()); + assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); assertEquals(currentTerm, installSnapshot.getTerm()); @@ -654,15 +666,18 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + // Ignore initial heartbeat. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -678,7 +693,8 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm)); FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); leader.setFollowerSnapshot(FOLLOWER_ID, fts); while(!fts.isLastChunk(fts.getChunkIndex())) { @@ -698,9 +714,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(1, leader.followerLogSize()); FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID); assertNotNull(fli); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex + 1, fli.getNextIndex()); + assertEquals(commitIndex, fli.getMatchIndex()); + assertEquals(commitIndex + 1, fli.getNextIndex()); } @Test @@ -709,8 +724,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -724,10 +739,13 @@ public class LeaderTest extends AbstractLeaderTest { configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); actorContext.setConfigParams(configParams); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -739,9 +757,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm); + leader.setSnapshot(snapshot); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -780,8 +800,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -792,10 +812,13 @@ public class LeaderTest extends AbstractLeaderTest { } }); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -807,10 +830,12 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm); + leader.setSnapshot(snapshot); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -839,8 +864,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -851,10 +876,13 @@ public class LeaderTest extends AbstractLeaderTest { } }); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -866,9 +894,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm); + leader.setSnapshot(snapshot); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); -- 2.36.6