public Follower(RaftActorContext context) {
super(context, RaftState.Follower);
- scheduleElection(electionDuration());
-
initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+
+ if(context.getPeerAddresses().isEmpty()){
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
+
}
private boolean isLogEntryPresent(long index){
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
+ if (snapshotTracker != null) {
+ // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
+ AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex(), lastTerm());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
+ }
+ sender.tell(reply, actor());
+
+ return this;
+ }
+
// 1. Reply false if term < currentTerm (ยง5.1)
// This is handled in the appendEntries method of the base class
sender.tell(reply, actor());
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
return snapshotTracker;
}
- private static class InitialSyncStatusTracker {
+ private class InitialSyncStatusTracker {
private static final long INVALID_LOG_INDEX = -2L;
private long initialLeaderCommit = INVALID_LOG_INDEX;
if(!initialSyncUpDone){
if(initialLeaderCommit == INVALID_LOG_INDEX){
- actor.tell(new FollowerInitialSyncUpStatus(false), ActorRef.noSender());
+ actor.tell(new FollowerInitialSyncUpStatus(false, getId()), ActorRef.noSender());
initialLeaderCommit = leaderCommit;
} else if(commitIndex >= initialLeaderCommit){
- actor.tell(new FollowerInitialSyncUpStatus(true), ActorRef.noSender());
+ actor.tell(new FollowerInitialSyncUpStatus(true, getId()), ActorRef.noSender());
initialSyncUpDone = true;
}
}