- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- if (followerToSnapshot != null) {
- // if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
- } else {
- // we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
+ boolean sendHeartbeat, boolean isHeartbeat) {
+
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if (followerActor != null) {
+ long followerNextIndex = followerLogInformation.getNextIndex();
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ boolean sendAppendEntries = false;
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+ if (mapFollowerToSnapshot.get(followerId) != null) {
+ // if install snapshot is in process , then sent next chunk if possible
+ if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerId);
+ } else if(sendHeartbeat) {
+ // we send a heartbeat even if we have not received a reply for the last chunk
+ sendAppendEntries = true;
+ }
+ } else {
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+
+ if((!isHeartbeat && LOG.isDebugEnabled()) || LOG.isTraceEnabled()) {
+ LOG.debug("{}: Checking sendAppendEntries for follower {}: active: {}, followerNextIndex: {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+ logName(), followerId, isFollowerActive, followerNextIndex, leaderLastIndex, leaderSnapShotIndex);
+ }
+
+ if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+
+ LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
+ followerNextIndex, followerId);
+
+ if(followerLogInformation.okToReplicate()) {
+ // Try to send all the entries in the journal but not exceeding the max data size
+ // for a single AppendEntries message.
+ int maxEntries = (int) context.getReplicatedLog().size();
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries,
+ context.getConfigParams().getSnapshotChunkSize());
+ sendAppendEntries = true;
+ }
+ } else if (isFollowerActive && followerNextIndex >= 0 &&
+ leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
+ // if the followers next index is not present in the leaders log, and
+ // if the follower is just not starting and if leader's index is more than followers index
+ // then snapshot should be sent
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " +
+ "follower-nextIndex: %d, leader-snapshot-index: %d, " +
+ "leader-last-index: %d", logName(), followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex));