// 1. Reply false if term < currentTerm (ยง5.1)
if (appendEntries.getTerm() < currentTerm()) {
- log.debug("{}: Cannot append entries because sender term {} is less than {}", logName(),
+ log.info("{}: Cannot append entries because sender's term {} is less than {}", logName(),
appendEntries.getTerm(), currentTerm());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
*/
protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) {
- log.debug("{}: In requestVote: {}", logName(), requestVote);
+ log.debug("{}: In requestVote: {} - currentTerm: {}, votedFor: {}, lastIndex: {}, lastTerm: {}", logName(),
+ requestVote, currentTerm(), votedFor(), lastIndex(), lastTerm());
boolean grantVote = canGrantVote(requestVote);
* @param index the log index
*/
protected void applyLogToStateMachine(final long index) {
- long newLastApplied = context.getLastApplied();
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- final ApplyState msg;
+ final ApplyState applyState;
final ClientRequestTracker tracker = removeClientRequestTracker(i);
if (tracker != null) {
- msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
+ applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
} else {
- msg = new ApplyState(null, null, replicatedLogEntry);
+ applyState = new ApplyState(null, null, replicatedLogEntry);
}
- actor().tell(msg, actor());
- newLastApplied = i;
+ log.debug("{}: Setting last applied to {}", logName(), i);
+
+ context.setLastApplied(i);
+ context.getApplyStateConsumer().accept(applyState);
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
}
}
- log.debug("{}: Setting last applied to {}", logName(), newLastApplied);
-
- context.setLastApplied(newLastApplied);
-
// send a message to persist a ApplyLogEntries marker message into akka's persistent journal
// will be used during recovery
//in case if the above code throws an error and this message is not sent, it would be fine
return this;
}
- log.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
+ log.info("{} :- Switching from behavior {} to {}, election term: {}", logName(), this.state(),
+ newBehavior.state(), context.getTermInformation().getCurrentTerm());
try {
close();
} catch (RuntimeException e) {