@Override
public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
-
- Object message = fromSerializableMessage(originalMessage);
-
- if (message instanceof RaftRPC) {
- RaftRPC rpc = (RaftRPC) message;
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
- logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
-
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- }
- }
-
- if (message instanceof ElectionTimeout) {
- if(canStartElection()) {
+ if (originalMessage instanceof ElectionTimeout) {
+ if (canStartElection()) {
LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
return internalSwitchBehavior(RaftState.Candidate);
} else {
return this;
}
+ }
- } else if (message instanceof InstallSnapshot) {
- InstallSnapshot installSnapshot = (InstallSnapshot) message;
+ final Object message = fromSerializableMessage(originalMessage);
+ if (!(message instanceof RaftRPC)) {
+ // The rest of the processing requires the message to be a RaftRPC
+ return null;
+ }
+
+ final RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+ }
+
+ if (rpc instanceof InstallSnapshot) {
+ InstallSnapshot installSnapshot = (InstallSnapshot) rpc;
handleInstallSnapshot(sender, installSnapshot);
+ scheduleElection(electionDuration());
+ return this;
}
- if(message instanceof RaftRPC && (!(message instanceof RequestVote) || (canGrantVote((RequestVote) message)))){
+ if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) {
scheduleElection(electionDuration());
}
- return super.handleMessage(sender, message);
+ return super.handleMessage(sender, rpc);
}
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {