if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
// if snapshot install is in progress, follower should just acknowledge append entries with a reply.
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
- lastIndex(), lastTerm(), context.getPayloadVersion());
+ lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+ appendEntries.getLeaderRaftVersion());
log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
sender.tell(reply, actor());
leaderId = appendEntries.getLeaderId();
leaderPayloadVersion = appendEntries.getPayloadVersion();
+ if (appendEntries.getLeaderAddress().isPresent()) {
+ final String address = appendEntries.getLeaderAddress().get();
+ log.debug("New leader address: {}", address);
+
+ context.setPeerAddress(leaderId, address);
+ context.getConfigParams().getPeerAddressResolver().setResolved(leaderId, address);
+ }
+
// First check if the logs are in sync or not
if (isOutOfSync(appendEntries, sender)) {
updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
}
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
- lastIndex, lastTerm(), context.getPayloadVersion());
+ lastIndex, lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+ appendEntries.getLeaderRaftVersion());
if (log.isTraceEnabled()) {
log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
- lastTerm(), context.getPayloadVersion(), true), actor());
+ lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+ appendEntries.getLeaderRaftVersion()), actor());
return false;
}
break;
} else {
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
- lastTerm(), context.getPayloadVersion(), true), actor());
+ lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+ appendEntries.getLeaderRaftVersion()), actor());
return false;
}
}
log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(),
appendEntries.getPrevLogIndex());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
} else if (appendEntries.getPrevLogIndex() != -1) {
+ "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
}
appendEntries.getReplicatedToAllIndex(), lastIndex,
context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
}
return false;
}
- private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot) {
+ private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot,
+ short leaderRaftVersion) {
// We found that the log was out of sync so just send a negative reply.
final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(),
- lastTerm(), context.getPayloadVersion(), forceInstallSnapshot);
+ lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(),
+ leaderRaftVersion);
log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
sender.tell(reply, actor());
}
+ private boolean needsLeaderAddress() {
+ return context.getPeerAddress(leaderId) == null;
+ }
+
@Override
protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
final AppendEntriesReply appendEntriesReply) {