entries,
+ final FollowerLogInformation followerLogInformation) {
+ // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
+ // possibly committing and applying conflicting entries (those with same index, different term) from a prior
+ // term that weren't replicated to a majority, which would be a violation of raft.
+ // - if the follower isn't active. In this case we don't know the state of the follower and we send an
+ // empty AppendEntries as a heart beat to prevent election.
+ // - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
+ // need to send AppendEntries to prevent election.
+ // - if we're in the process of slicing an AppendEntries with a large log entry payload. In this case we
+ // need to send an empty AppendEntries to prevent election.
+ boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
+ long leaderCommitIndex = isInstallingSnaphot || followerLogInformation.isLogEntrySlicingInProgress()
+ || !followerLogInformation.isFollowerActive() ? -1 : context.getCommitIndex();
+
+ long followerNextIndex = followerLogInformation.getNextIndex();
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
+ getLogEntryIndex(followerNextIndex - 1),
+ getLogEntryTerm(followerNextIndex - 1), entries,
+ leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion(),
+ followerLogInformation.getRaftVersion(), followerLogInformation.needsLeaderAddress(getId()));
- if(!entries.isEmpty() || LOG.isTraceEnabled()) {
- LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
+ if (!entries.isEmpty() || log.isTraceEnabled()) {
+ log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
appendEntries);
}
- followerActor.tell(appendEntries.toSerializable(), actor());
+ followerLogInformation.setSentCommitIndex(leaderCommitIndex);
+ followerActor.tell(appendEntries, actor());
}
/**
+ * Initiates a snapshot capture to install on a follower.
+ *
+ *
* Install Snapshot works as follows
- * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
- * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 4. On complete, Follower sends back a InstallSnapshotReply.
- * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
- * and replenishes the memory by deleting the snapshot in Replicated log.
- * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
- * then send the existing snapshot in chunks to the follower.
- * @param followerId
+ * 1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
+ * 2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
+ * the Leader's handleMessage with a SendInstallSnapshot message.
+ * 3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
+ * the Follower via InstallSnapshot messages.
+ * 4. For each chunk, the Follower sends back an InstallSnapshotReply.
+ * 5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
+ * follower.
+ * 6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
+ * then send the existing snapshot in chunks to the follower.
+ *
+ * @param followerId the id of the follower.
+ * @return true if capture was initiated, false otherwise.
*/
- public void initiateCaptureSnapshot(String followerId) {
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot.
- // This could happen if another follower needs an install when one is going on.
+ public boolean initiateCaptureSnapshot(final String followerId) {
+ FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
+ if (snapshotHolder.isPresent()) {
+ // If a snapshot is present in the memory, most likely another install is in progress no need to capture
+ // snapshot. This could happen if another follower needs an install when one is going on.
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
- sendSnapshotChunk(followerActor, followerId);
+ // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
+ sendSnapshotChunk(followerActor, followerLogInfo);
+ return true;
+ }
- } else {
- context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
- this.getReplicatedToAllIndex(), followerId);
+ boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
+ if (captureInitiated) {
+ followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+ context.getConfigParams().getSnapshotChunkSize(), logName()));
}
+
+ return captureInitiated;
}
- private boolean canInstallSnapshot(long nextIndex){
+ private boolean canInstallSnapshot(final long nextIndex) {
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
- return (nextIndex == -1 ||
- (!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex)));
+ return nextIndex == -1 || !context.getReplicatedLog().isPresent(nextIndex)
+ && context.getReplicatedLog().isInSnapshot(nextIndex);
}
private void sendInstallSnapshot() {
- LOG.debug("{}: sendInstallSnapshot", logName());
+ log.debug("{}: sendInstallSnapshot", logName());
for (Entry e : followerToLog.entrySet()) {
- ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
+ String followerId = e.getKey();
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
FollowerLogInformation followerLogInfo = e.getValue();
if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex();
- if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED ||
- canInstallSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, e.getKey());
+ long nextIndex = followerLogInfo.getNextIndex();
+ if (followerLogInfo.getInstallSnapshotState() != null
+ || context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED
+ || canInstallSnapshot(nextIndex)) {
+ sendSnapshotChunk(followerActor, followerLogInfo);
}
}
}
@@ -667,59 +945,94 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
* Sends a snapshot chunk to a given follower
* InstallSnapshot should qualify as a heartbeat too.
*/
- private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
- try {
- if (snapshot.isPresent()) {
- ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
-
- // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
- // followerId to the followerToSnapshot map.
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- snapshot.get().getLastIncludedIndex(),
- snapshot.get().getLastIncludedTerm(),
- nextSnapshotChunk,
- followerToSnapshot.incrementChunkIndex(),
- followerToSnapshot.getTotalChunks(),
- Optional.of(followerToSnapshot.getLastChunkHashCode())
- ).toSerializable(),
- actor()
- );
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
- logName(), followerActor.path(), followerToSnapshot.getChunkIndex(),
- followerToSnapshot.getTotalChunks());
+ private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo) {
+ if (snapshotHolder.isPresent()) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+ if (installSnapshotState == null) {
+ installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
+ logName());
+ followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
+ }
+
+ try {
+ // Ensure the snapshot bytes are set - this is a no-op.
+ installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
+
+ if (!installSnapshotState.canSendNextChunk()) {
+ return;
+ }
+
+ byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+
+ log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+ nextSnapshotChunk.length);
+
+ int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+ Optional serverConfig = Optional.empty();
+ if (installSnapshotState.isLastChunk(nextChunkIndex)) {
+ serverConfig = Optional.ofNullable(context.getPeerServerInfo(true));
}
+
+ sendSnapshotChunk(followerActor, followerLogInfo, nextSnapshotChunk, nextChunkIndex, serverConfig);
+
+ log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+ installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+
+ } catch (IOException e) {
+ log.warn("{}: Unable to send chunk: {}/{}. Reseting snapshot progress. Snapshot state: {}", logName(),
+ installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks(),
+ installSnapshotState);
+ installSnapshotState.reset();
}
- } catch (IOException e) {
- LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);
}
}
- /**
- * Acccepts snaphot as ByteString, enters into map for future chunks
- * creates and return a ByteString chunk
- */
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- if (followerToSnapshot == null) {
- followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
- mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+ private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo,
+ final byte[] snapshotChunk, final int chunkIndex,
+ final Optional serverConfig) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+
+ installSnapshotState.startChunkTimer();
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ snapshotHolder.get().getLastIncludedIndex(),
+ snapshotHolder.get().getLastIncludedTerm(),
+ snapshotChunk,
+ chunkIndex,
+ installSnapshotState.getTotalChunks(),
+ OptionalInt.of(installSnapshotState.getLastChunkHashCode()),
+ serverConfig
+ ).toSerializable(followerLogInfo.getRaftVersion()),
+ actor()
+ );
+ }
+
+ private boolean resendSnapshotChunk(final ActorSelection followerActor,
+ final FollowerLogInformation followerLogInfo) {
+ if (!snapshotHolder.isPresent()) {
+ // Seems like we should never hit this case, but just in case we do, reset the snapshot progress so that it
+ // can restart from the next AppendEntries.
+ log.warn("{}: Attempting to resend snapshot with no snapshot holder present.", logName());
+ followerLogInfo.clearLeaderInstallSnapshotState();
+ return false;
}
- ByteString nextChunk = followerToSnapshot.getNextChunk();
- LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+ // we are resending, timer needs to be reset
+ installSnapshotState.resetChunkTimer();
+ installSnapshotState.markSendStatus(false);
+
+ sendSnapshotChunk(followerActor, followerLogInfo);
- return nextChunk;
+ return true;
}
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
- LOG.trace("{}: Sending heartbeat", logName());
- sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
+ log.trace("{}: Sending heartbeat", logName());
+ sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toNanos(), true);
+
+ appendEntriesMessageSlicer.checkExpiredSlicedMessageState();
}
}
@@ -729,7 +1042,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
}
}
- private void scheduleHeartBeat(FiniteDuration interval) {
+ private void scheduleHeartBeat(final FiniteDuration interval) {
if (followerToLog.isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
@@ -744,147 +1057,38 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
// need to be sent if there are other messages being sent to the remote
// actor.
heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
- interval, context.getActor(), new SendHeartBeat(),
+ interval, context.getActor(), SendHeartBeat.INSTANCE,
context.getActorSystem().dispatcher(), context.getActor());
}
@Override
- public void close() throws Exception {
+ public void close() {
stopHeartBeat();
+ appendEntriesMessageSlicer.close();
}
@Override
- public String getLeaderId() {
+ public final String getLeaderId() {
return context.getId();
}
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return context.getPayloadVersion();
+ }
+
protected boolean isLeaderIsolated() {
- int minPresent = minIsolatedLeaderPeerCount;
+ int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
- if (followerLogInformation.isFollowerActive()) {
+ final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+ if (peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
--minPresent;
if (minPresent == 0) {
- break;
- }
- }
- }
- return (minPresent != 0);
- }
-
- /**
- * Encapsulates the snapshot bytestring and handles the logic of sending
- * snapshot chunks
- */
- protected class FollowerToSnapshot {
- private final ByteString snapshotBytes;
- private int offset = 0;
- // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
- private int replyReceivedForOffset;
- // if replyStatus is false, the previous chunk is attempted
- private boolean replyStatus = false;
- private int chunkIndex;
- private final int totalChunks;
- private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-
- public FollowerToSnapshot(ByteString snapshotBytes) {
- this.snapshotBytes = snapshotBytes;
- int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
- logName(), size, totalChunks);
- }
- replyReceivedForOffset = -1;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- }
-
- public ByteString getSnapshotBytes() {
- return snapshotBytes;
- }
-
- public int incrementOffset() {
- if(replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- offset = offset + context.getConfigParams().getSnapshotChunkSize();
- }
- return offset;
- }
-
- public int incrementChunkIndex() {
- if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- chunkIndex = chunkIndex + 1;
- }
- return chunkIndex;
- }
-
- public int getChunkIndex() {
- return chunkIndex;
- }
-
- public int getTotalChunks() {
- return totalChunks;
- }
-
- public boolean canSendNextChunk() {
- // we only send a false if a chunk is sent but we have not received a reply yet
- return replyReceivedForOffset == offset;
- }
-
- public boolean isLastChunk(int chunkIndex) {
- return totalChunks == chunkIndex;
- }
-
- public void markSendStatus(boolean success) {
- if (success) {
- // if the chunk sent was successful
- replyReceivedForOffset = offset;
- replyStatus = true;
- lastChunkHashCode = nextChunkHashCode;
- } else {
- // if the chunk sent was failure
- replyReceivedForOffset = offset;
- replyStatus = false;
- }
- }
-
- public ByteString getNextChunk() {
- int snapshotLength = getSnapshotBytes().size();
- int start = incrementOffset();
- int size = context.getConfigParams().getSnapshotChunkSize();
- if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
- size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
+ return false;
}
}
-
-
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
- snapshotLength, start, size);
-
- ByteString substring = getSnapshotBytes().substring(start, start + size);
- nextChunkHashCode = substring.hashCode();
- return substring;
- }
-
- /**
- * reset should be called when the Follower needs to be sent the snapshot from the beginning
- */
- public void reset(){
- offset = 0;
- replyStatus = false;
- replyReceivedForOffset = offset;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- }
-
- public int getLastChunkHashCode() {
- return lastChunkHashCode;
}
+ return minPresent != 0;
}
// called from example-actor for printing the follower-states
@@ -905,34 +1109,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
}
@VisibleForTesting
- public FollowerLogInformation getFollower(String followerId) {
+ public FollowerLogInformation getFollower(final String followerId) {
return followerToLog.get(followerId);
}
- @VisibleForTesting
- protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
- mapFollowerToSnapshot.put(followerId, snapshot);
- }
-
- @VisibleForTesting
- public int followerSnapshotSize() {
- return mapFollowerToSnapshot.size();
- }
-
@VisibleForTesting
public int followerLogSize() {
return followerToLog.size();
}
- private static class SnapshotHolder {
+ static class SnapshotHolder {
private final long lastIncludedTerm;
private final long lastIncludedIndex;
- private final ByteString snapshotBytes;
+ private final ByteSource snapshotBytes;
- SnapshotHolder(Snapshot snapshot) {
+ SnapshotHolder(final Snapshot snapshot, final ByteSource snapshotBytes) {
this.lastIncludedTerm = snapshot.getLastAppliedTerm();
this.lastIncludedIndex = snapshot.getLastAppliedIndex();
- this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+ this.snapshotBytes = snapshotBytes;
}
long getLastIncludedTerm() {
@@ -943,7 +1137,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
return lastIncludedIndex;
}
- ByteString getSnapshotBytes() {
+ ByteSource getSnapshotBytes() {
return snapshotBytes;
}
}