+ @VisibleForTesting
+ protected final void setLeaderId(final @Nullable String leaderId) {
+ this.leaderId = leaderId;
+ }
+
+ @Override
+ public short getLeaderPayloadVersion() {
+ return leaderPayloadVersion;
+ }
+
+ @VisibleForTesting
+ protected final void setLeaderPayloadVersion(final short leaderPayloadVersion) {
+ this.leaderPayloadVersion = leaderPayloadVersion;
+ }
+
+ private void restartLastLeaderMessageTimer() {
+ if (lastLeaderMessageTimer.isRunning()) {
+ lastLeaderMessageTimer.reset();
+ }
+
+ lastLeaderMessageTimer.start();
+ }
+
+ private boolean isLogEntryPresent(final long index) {
+ if (context.getReplicatedLog().isInSnapshot(index)) {
+ return true;
+ }
+
+ ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
+ return entry != null;
+
+ }
+
+ private void updateInitialSyncStatus(final long currentLeaderCommit, final String newLeaderId) {
+ initialSyncStatusTracker.update(newLeaderId, currentLeaderCommit, context.getCommitIndex());
+ }
+
+ @Override
+ protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
+ int numLogEntries = appendEntries.getEntries().size();
+ if (log.isTraceEnabled()) {
+ log.trace("{}: handleAppendEntries: {}", logName(), appendEntries);
+ } else if (log.isDebugEnabled() && numLogEntries > 0) {
+ log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
+ }
+
+ if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
+ log.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the "
+ + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
+ closeSnapshotTracker();
+ }
+
+ if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
+ // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
+ AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+ appendEntries.getLeaderRaftVersion());
+
+ log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
+ sender.tell(reply, actor());
+
+ return this;
+ }