long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
- if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
+ if(appendEntriesReply.isForceInstallSnapshot()) {
+ // Reset the followers match and next index. This is to signal that this follower has nothing
+ // in common with this Leader and so would require a snapshot to be installed
+ followerLogInformation.setMatchIndex(-1);
+ followerLogInformation.setNextIndex(-1);
+
+ // Force initiate a snapshot capture
+ initiateCaptureSnapshot(followerId);
+ } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
// The follower's log is empty or the last entry is present in the leader's journal
// and the terms match so the follower is just behind the leader's journal from
// Send heartbeat to follower whenever install snapshot is initiated.
sendAppendEntries = true;
- initiateCaptureSnapshot(followerId, followerNextIndex);
+ if (canInstallSnapshot(followerNextIndex)) {
+ initiateCaptureSnapshot(followerId);
+ }
} else if(sendHeartbeat) {
// we send an AppendEntries, even if the follower is inactive
* 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
* then send the existing snapshot in chunks to the follower.
* @param followerId
- * @param followerNextIndex
*/
- private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
- if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
- context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
-
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot.
- // This could happen if another follower needs an install when one is going on.
- final ActorSelection followerActor = context.getPeerActorSelection(followerId);
- sendSnapshotChunk(followerActor, followerId);
+ private void initiateCaptureSnapshot(String followerId) {
+ if (snapshot.isPresent()) {
+ // if a snapshot is present in the memory, most likely another install is in progress
+ // no need to capture snapshot.
+ // This could happen if another follower needs an install when one is going on.
+ final ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ sendSnapshotChunk(followerActor, followerId);
- } else {
- context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
- }
+ } else {
+ context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
}
}
+ private boolean canInstallSnapshot(long nextIndex){
+ // If the follower's nextIndex is -1 then we might as well send it a snapshot
+ // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
+ // in the snapshot
+ return (nextIndex == -1 ||
+ (!context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex)));
+
+ }
+
private void sendInstallSnapshot() {
LOG.debug("{}: sendInstallSnapshot", logName());
if (followerActor != null) {
long nextIndex = e.getValue().getNextIndex();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ if (canInstallSnapshot(nextIndex)) {
sendSnapshotChunk(followerActor, e.getKey());
}
}
int addEntriesFrom = 0;
if (context.getReplicatedLog().size() > 0) {
- // Find the entry up until which the one that is not in the follower's log
+ // Find the entry up until the one that is not in the follower's log
for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
continue;
}
- LOG.debug("{}: Removing entries from log starting at {}", logName(),
+ if(!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
+
+ LOG.debug("{}: Removing entries from log starting at {}", logName(),
matchEntry.getIndex());
- // Entries do not match so remove all subsequent entries
- context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex());
- break;
+ // Entries do not match so remove all subsequent entries
+ context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex());
+ break;
+ } else {
+ sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+ lastTerm(), context.getPayloadVersion(), true), actor());
+ return this;
+ }
}
}
private final short payloadVersion;
+ private final boolean forceInstallSnapshot;
+
public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
short payloadVersion) {
+ this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false);
+ }
+
+ public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
+ short payloadVersion, boolean forceInstallSnapshot) {
super(term);
this.followerId = followerId;
this.logLastIndex = logLastIndex;
this.logLastTerm = logLastTerm;
this.payloadVersion = payloadVersion;
+ this.forceInstallSnapshot = forceInstallSnapshot;
}
+
@Override
public long getTerm() {
return term;
StringBuilder builder = new StringBuilder();
builder.append("AppendEntriesReply [success=").append(success).append(", logLastIndex=").append(logLastIndex)
.append(", logLastTerm=").append(logLastTerm).append(", followerId=").append(followerId)
- .append(", payloadVersion=").append(payloadVersion).append("]");
+ .append(", payloadVersion=").append(", forceInstallSnapshot=").append(forceInstallSnapshot)
+ .append(payloadVersion).append("]");
return builder.toString();
}
+
+ public boolean isForceInstallSnapshot() {
+ return forceInstallSnapshot;
+ }
}
expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
}
+ @Test
+ public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
+ logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(1, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(newReplicatedLogEntry(1, 0, "zero"));
+ log.append(newReplicatedLogEntry(1, 1, "one"));
+ log.append(newReplicatedLogEntry(1, 2, "two"));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(newReplicatedLogEntry(2, 2, "two-1"));
+ entries.add(newReplicatedLogEntry(2, 3, "three"));
+
+ // Send appendEntries with the same term as was set on the receiver
+ // before the new behavior was created (1 in this case)
+ // This will not work for a Candidate because as soon as a Candidate
+ // is created it increments the term
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
+
+ context.setRaftPolicy(createRaftPolicy(false, true));
+ follower = createBehavior(context);
+
+ RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+ Assert.assertSame(follower, newBehavior);
+
+ expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
+ }
+
@Test
public void testHandleAppendEntriesPreviousLogEntryMissing(){
logStart("testHandleAppendEntriesPreviousLogEntryMissing");
private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
+ expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
+ }
+
+ private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
+ String expFollowerId, long expLogLastTerm, long expLogLastIndex,
+ boolean expForceInstallSnapshot) {
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
AppendEntriesReply.class);
assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
+ assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
}
+
private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
new MockRaftActorContext.MockPayload(data));
MockRaftActorContext actorContext = createActorContextWithFollower();
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
-
//clears leaders log
actorContext.getReplicatedLog().removeFrom(0);
Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
+ @Test
+ public void testInitiateForceInstallSnapshot() throws Exception {
+ logStart("testInitiateForceInstallSnapshot");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = -1;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = -1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setLastApplied(3);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // set the snapshot as absent and check if capture-snapshot is invoked.
+ leader.setSnapshot(null);
+
+ for(int i=0;i<4;i++) {
+ actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
+ new MockRaftActorContext.MockPayload("X" + i)));
+ }
+
+ // new entry
+ ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
+
+ actorContext.getReplicatedLog().append(entry);
+
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
+
+ // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
+ // installed with a SendInstallSnapshot
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
+
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+ CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
+
+ assertTrue(cs.isInstallSnapshotInitiated());
+ assertEquals(3, cs.getLastAppliedIndex());
+ assertEquals(1, cs.getLastAppliedTerm());
+ assertEquals(4, cs.getLastIndex());
+ assertEquals(2, cs.getLastTerm());
+
+ // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+ leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+
+ Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ }
+
+
@Test
public void testInstallSnapshot() throws Exception {
logStart("testInstallSnapshot");
assertEquals(currentTerm, installSnapshot.getTerm());
}
+ @Test
+ public void testForceInstallSnapshot() throws Exception {
+ logStart("testForceInstallSnapshot");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ final int lastAppliedIndex = 3;
+ final int snapshotIndex = -1;
+ final int snapshotTerm = -1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ actorContext.setCommitIndex(lastAppliedIndex);
+ actorContext.setLastApplied(lastAppliedIndex);
+
+ leader = new Leader(actorContext);
+
+ // Initial heartbeat.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
+
+ Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+
+ assertTrue(raftBehavior instanceof Leader);
+
+ // check if installsnapshot gets called with the correct values.
+
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ assertNotNull(installSnapshot.getData());
+ assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
+ assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
+
+ assertEquals(currentTerm, installSnapshot.getTerm());
+ }
+
@Test
public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
logStart("testHandleInstallSnapshotReplyLastChunk");