- if (followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex();
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
-
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- if (followerToSnapshot != null) {
- // if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
- } else {
- // we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ /**
+ * This method checks if any update needs to be sent to the given follower. This includes append log entries,
+ * sending next snapshot chunk, and initiating a snapshot.
+ *
+ * @return true if any update is sent, false otherwise
+ */
+ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
+ boolean sendHeartbeat, boolean isHeartbeat) {
+
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if (followerActor != null) {
+ long followerNextIndex = followerLogInformation.getNextIndex();
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ boolean sendAppendEntries = false;
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+ LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
+ if (installSnapshotState != null) {
+ // if install snapshot is in process , then sent next chunk if possible
+ if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerLogInformation);
+ } else if (sendHeartbeat) {
+ // we send a heartbeat even if we have not received a reply for the last chunk
+ sendAppendEntries = true;
+ }
+ } else {
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+
+ if (!isHeartbeat && log.isDebugEnabled() || log.isTraceEnabled()) {
+ log.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, "
+ + "leaderLastIndex: {}, leaderSnapShotIndex: {}", logName(), followerId, isFollowerActive,
+ followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
+ }
+
+ if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+
+ log.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
+ followerNextIndex, followerId);
+
+ if (followerLogInformation.okToReplicate()) {
+ // Try to send all the entries in the journal but not exceeding the max data size
+ // for a single AppendEntries message.
+ int maxEntries = (int) context.getReplicatedLog().size();
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
+ context.getConfigParams().getSnapshotChunkSize());
+ sendAppendEntries = true;
+ }
+ } else if (isFollowerActive && followerNextIndex >= 0
+ && leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
+ // if the followers next index is not present in the leaders log, and
+ // if the follower is just not starting and if leader's index is more than followers index
+ // then snapshot should be sent
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
+ + "follower-nextIndex: %d, leader-snapshot-index: %d, "
+ + "leader-last-index: %d", logName(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex));