private static final int SYNC_THRESHOLD = 10;
public Follower(RaftActorContext context) {
+ this(context, null);
+ }
+
+ public Follower(RaftActorContext context, String initialLeaderId) {
super(context, RaftState.Follower);
+ leaderId = initialLeaderId;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
- if(context.getRaftPolicy().automaticElectionsEnabled()) {
- if (context.getPeerIds().isEmpty()) {
+ if(canStartElection()) {
+ if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
logName(), prevLogTerm, appendEntries.getPrevLogTerm());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
- && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
+ && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getReplicatedToAllIndex())) {
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
"{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
- && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 &&
- !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)){
+ && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
+ && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getEntries().get(0).getIndex() - 1)) {
LOG.debug(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
}
if (message instanceof ElectionTimeout) {
- LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
- return internalSwitchBehavior(RaftState.Candidate);
+ if(canStartElection()) {
+ LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+ return internalSwitchBehavior(RaftState.Candidate);
+ } else {
+ return this;
+ }
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
- LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
- logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+ LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
+
+ leaderId = installSnapshot.getLeaderId();
if(snapshotTracker == null){
snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
installSnapshot.getLastIncludedTerm(),
context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor(),
- context.getPeerServerInfo());
+ context.getPeerServerInfo(true));
ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
@Override