}
final Object message = fromSerializableMessage(originalMessage);
- if (message instanceof RaftRPC) {
- RaftRPC rpc = (RaftRPC) message;
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
- logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
-
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- }
+ if (!(message instanceof RaftRPC)) {
+ // The rest of the processing requires the message to be a RaftRPC
+ return null;
+ }
+
+ final RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
}
- if (message instanceof InstallSnapshot) {
- InstallSnapshot installSnapshot = (InstallSnapshot) message;
+ if (rpc instanceof InstallSnapshot) {
+ InstallSnapshot installSnapshot = (InstallSnapshot) rpc;
handleInstallSnapshot(sender, installSnapshot);
+ scheduleElection(electionDuration());
+ return this;
}
- if (message instanceof RaftRPC && (!(message instanceof RequestVote) || (canGrantVote((RequestVote) message)))){
+ if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) {
scheduleElection(electionDuration());
}
- return super.handleMessage(sender, message);
+ return super.handleMessage(sender, rpc);
}
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {