startNewTerm();
if(votingPeers.isEmpty()){
- actor().tell(ELECTION_TIMEOUT, actor());
+ actor().tell(ElectionTimeout.INSTANCE, actor());
} else {
scheduleElection(electionDuration());
}
+ }
+ @Override
+ public final String getLeaderId() {
+ return null;
+ }
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return -1;
}
- @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+ @Override
+ protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
// Some other candidate for the same term became a leader and sent us an append entry
if(currentTerm() == appendEntries.getTerm()){
- LOG.debug("{}: New Leader sent an append entry to Candidate for term {} will switch to Follower",
- logName(), currentTerm());
+ LOG.info("{}: New Leader {} sent an AppendEntries to Candidate for term {} - will switch to Follower",
+ logName(), appendEntries.getLeaderId(), currentTerm());
return switchBehavior(new Follower(context));
}
return this;
}
- @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply) {
-
+ @Override
+ protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
return this;
}
- @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply) {
-
- LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply,
- voteCount);
+ @Override
+ protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
+ LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
if (requestVoteReply.isVoteGranted()) {
voteCount++;
}
if (voteCount >= votesRequired) {
- return internalSwitchBehavior(RaftState.Leader);
+ if(context.getLastApplied() < context.getReplicatedLog().lastIndex()) {
+ LOG.info("{}: LastApplied index {} is behind last index {} - switching to PreLeader",
+ logName(), context.getLastApplied(), context.getReplicatedLog().lastIndex());
+ return internalSwitchBehavior(RaftState.PreLeader);
+ } else {
+ return internalSwitchBehavior(RaftState.Leader);
+ }
}
return this;
}
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ if (message instanceof ElectionTimeout) {
+ LOG.debug("{}: Received ElectionTimeout", logName());
+
+ if (votesRequired == 0) {
+ // If there are no peers then we should be a Leader
+ // We wait for the election timeout to occur before declare
+ // ourselves the leader. This gives enough time for a leader
+ // who we do not know about (as a peer)
+ // to send a message to the candidate
- Object message = fromSerializableMessage(originalMessage);
+ return internalSwitchBehavior(RaftState.Leader);
+ }
+
+ startNewTerm();
+ scheduleElection(electionDuration());
+ return this;
+ }
if (message instanceof RaftRPC) {
// set currentTerm = T, convert to follower (ยง5.1)
// This applies to all RPC messages and responses
if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ LOG.info("{}: Term {} in \"{}\" message is greater than Candidate's term {} - switching to Follower",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
- return internalSwitchBehavior(RaftState.Follower);
- }
- }
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- if (message instanceof ElectionTimeout) {
- LOG.debug("{}: Received ElectionTimeout", logName());
+ // The raft paper does not say whether or not a Candidate can/should process a RequestVote in
+ // this case but doing so gains quicker convergence when the sender's log is more up-to-date.
+ if (message instanceof RequestVote) {
+ super.handleMessage(sender, message);
+ }
- if (votesRequired == 0) {
- // If there are no peers then we should be a Leader
- // We wait for the election timeout to occur before declare
- // ourselves the leader. This gives enough time for a leader
- // who we do not know about (as a peer)
- // to send a message to the candidate
-
- return internalSwitchBehavior(RaftState.Leader);
+ return internalSwitchBehavior(RaftState.Follower);
}
- startNewTerm();
- scheduleElection(electionDuration());
- return this;
}
return super.handleMessage(sender, message);
long newTerm = currentTerm + 1;
context.getTermInformation().updateAndPersist(newTerm, context.getId());
- LOG.debug("{}: Starting new term {}", logName(), newTerm);
+ LOG.info("{}: Starting new election term {}", logName(), newTerm);
// Request for a vote
// TODO: Retry request for vote if replies do not arrive in a reasonable
}
}
- @Override public void close() throws Exception {
+ @Override
+ public void close() {
stopElection();
}
}