+ public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
+ if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
+ return handleElectionTimeout(message);
+ }
+
+ if (!(message instanceof RaftRPC)) {
+ // The rest of the processing requires the message to be a RaftRPC
+ return null;
+ }
+
+ final RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ log.info("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ }
+
+ if (rpc instanceof InstallSnapshot) {
+ handleInstallSnapshot(sender, (InstallSnapshot) rpc);
+ restartLastLeaderMessageTimer();
+ scheduleElection(electionDuration());
+ return this;
+ }
+
+ if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) {
+ restartLastLeaderMessageTimer();
+ scheduleElection(electionDuration());
+ }
+
+ return super.handleMessage(sender, rpc);
+ }
+
+ private RaftActorBehavior handleElectionTimeout(final Object message) {
+ // If the message is ElectionTimeout, verify we haven't actually seen a message from the leader
+ // during the election timeout interval. It may that the election timer expired b/c this actor
+ // was busy and messages got delayed, in which case leader messages would be backed up in the
+ // queue but would be processed before the ElectionTimeout message and thus would restart the
+ // lastLeaderMessageTimer.
+ long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS);
+ long electionTimeoutInMillis = context.getConfigParams().getElectionTimeOutInterval().toMillis();
+ boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning()
+ || lastLeaderMessageInterval >= electionTimeoutInMillis;
+
+ if (canStartElection()) {
+ if (message instanceof TimeoutNow) {
+ log.debug("{}: Received TimeoutNow - switching to Candidate", logName());