import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
-import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply) {
-
+ protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
if(LOG.isTraceEnabled()) {
LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
}
// Update the FollowerLogInformation
String followerId = appendEntriesReply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
if(followerLogInformation == null){
LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+ long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
+ long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
boolean updated = false;
if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
// The follower's log is actually ahead of the leader's log. Normally this doesn't happen
followerLogInformation.setNextIndex(-1);
initiateCaptureSnapshot(followerId);
+
updated = true;
} else if (appendEntriesReply.isSuccess()) {
- updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+ if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 &&
+ followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
+ // The follower's last entry is present in the leader's journal but the terms don't match so the
+ // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means
+ // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot
+ // and no longer in the journal. Either way, we set the follower's next index to 1 less than the last
+ // index reported by the follower. For the former case, the leader will send all entries starting with
+ // the previous follower's index and the follower will remove and replace the conflicting entries as
+ // needed. For the latter, the leader will initiate an install snapshot.
+
+ followerLogInformation.setNextIndex(followerLastLogIndex - 1);
+ updated = true;
+
+ LOG.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " +
+ "leader's {} - set the follower's next index to {}",
+ logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
+ followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+ } else {
+ updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+ }
} else {
LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
- long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
- long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex);
if(appendEntriesReply.isForceInstallSnapshot()) {
// Reset the followers match and next index. This is to signal that this follower has nothing
// in common with this Leader and so would require a snapshot to be installed
// Force initiate a snapshot capture
initiateCaptureSnapshot(followerId);
- } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 &&
- followersLastLogTerm == appendEntriesReply.getLogLastTerm())) {
+ } else if(followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 &&
+ followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
// The follower's log is empty or the last entry is present in the leader's journal
// and the terms match so the follower is just behind the leader's journal from
// the last snapshot, if any. We'll catch up the follower quickly by starting at the
// The follower's log conflicts with leader's log so decrement follower's next index by 1
// in an attempt to find where the logs match.
- LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index",
- logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm);
-
followerLogInformation.decrNextIndex();
+ updated = true;
+
+ LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
+ logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
+ followerLogInformation.getNextIndex());
}
}
} else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if(followerActor != null) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, followerLogInformation);
}
}
if (installSnapshotState != null) {
// if install snapshot is in process , then sent next chunk if possible
if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, followerLogInformation);
} else if(sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntries = true;
}
if(sendAppendEntries) {
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- entries, followerId);
+ sendAppendEntriesToFollower(followerActor, entries, followerLogInformation);
}
}
}
- private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
- List<ReplicatedLogEntry> entries, String followerId) {
+ private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
+ FollowerLogInformation followerLogInformation) {
+ // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
+ // possibly committing and applying conflicting entries (those with same index, different term) from a prior
+ // term that weren't replicated to a majority, which would be a violation of raft.
+ // - if the follower isn't active. In this case we don't know the state of the follower and we send an
+ // empty AppendEntries as a heart beat to prevent election.
+ // - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
+ // need to send AppendEntries to prevent election.
+ boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
+ long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
+ context.getCommitIndex();
+
+ long followerNextIndex = followerLogInformation.getNextIndex();
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
getLogEntryIndex(followerNextIndex - 1),
getLogEntryTerm(followerNextIndex - 1), entries,
- context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
+ leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
if(!entries.isEmpty() || LOG.isTraceEnabled()) {
- LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
+ LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
appendEntries);
}
}
/**
+ * Initiates a snapshot capture to install on a follower.
+ *
* Install Snapshot works as follows
- * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
- * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 4. On complete, Follower sends back a InstallSnapshotReply.
- * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
- * and replenishes the memory by deleting the snapshot in Replicated log.
- * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
- * then send the existing snapshot in chunks to the follower.
- * @param followerId
+ * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
+ * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
+ * the Leader's handleMessage with a SendInstallSnapshot message.
+ * 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
+ * the Follower via InstallSnapshot messages.
+ * 4. For each chunk, the Follower sends back an InstallSnapshotReply.
+ * 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
+ * follower.
+ * 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
+ * then send the existing snapshot in chunks to the follower.
+ *
+ * @param followerId the id of the follower.
+ * @return true if capture was initiated, false otherwise.
*/
public boolean initiateCaptureSnapshot(String followerId) {
+ FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot.
- // This could happen if another follower needs an install when one is going on.
+ // If a snapshot is present in the memory, most likely another install is in progress no need to capture
+ // snapshot. This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
- sendSnapshotChunk(followerActor, followerId);
+
+ // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
+ sendSnapshotChunk(followerActor, followerLogInfo);
return true;
} else {
- return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
this.getReplicatedToAllIndex(), followerId);
+ if(captureInitiated) {
+ followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+ context.getConfigParams().getSnapshotChunkSize(), logName()));
+ }
+
+ return captureInitiated;
}
}
if (followerActor != null) {
long nextIndex = followerLogInfo.getNextIndex();
- if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
+ if (followerLogInfo.getInstallSnapshotState() != null ||
+ context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
canInstallSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, followerLogInfo);
}
}
}
* Sends a snapshot chunk to a given follower
* InstallSnapshot should qualify as a heartbeat too.
*/
- private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
- try {
- if (snapshot.isPresent()) {
- byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
-
- // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
- // followerId to the followerToSnapshot map.
- LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
-
- int nextChunkIndex = installSnapshotState.incrementChunkIndex();
- Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
- if(installSnapshotState.isLastChunk(nextChunkIndex)) {
- serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
- }
-
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- snapshot.get().getLastIncludedIndex(),
- snapshot.get().getLastIncludedTerm(),
- nextSnapshotChunk,
- nextChunkIndex,
- installSnapshotState.getTotalChunks(),
- Optional.of(installSnapshotState.getLastChunkHashCode()),
- serverConfig
- ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
- actor()
- );
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
- logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
- installSnapshotState.getTotalChunks());
- }
+ private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
+ if (snapshot.isPresent()) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+ if (installSnapshotState == null) {
+ installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
+ logName());
+ followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
}
- } catch (IOException e) {
- LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);
- }
- }
- /**
- * Acccepts snaphot as ByteString, enters into map for future chunks
- * creates and return a ByteString chunk
- */
- private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
- if (installSnapshotState == null) {
- installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
- logName());
- followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState);
- }
- byte[] nextChunk = installSnapshotState.getNextChunk();
+ // Ensure the snapshot bytes are set - this is a no-op.
+ installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+
+ byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+
+ LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+ nextSnapshotChunk.length);
+
+ int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+ Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+ if(installSnapshotState.isLastChunk(nextChunkIndex)) {
+ serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+ }
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ snapshot.get().getLastIncludedIndex(),
+ snapshot.get().getLastIncludedTerm(),
+ nextSnapshotChunk,
+ nextChunkIndex,
+ installSnapshotState.getTotalChunks(),
+ Optional.of(installSnapshotState.getLastChunkHashCode()),
+ serverConfig
+ ).toSerializable(followerLogInfo.getRaftVersion()),
+ actor()
+ );
- return nextChunk;
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
+ installSnapshotState.getTotalChunks());
+ }
+ }
}
private void sendHeartBeat() {