+ public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+
+ Object message = fromSerializableMessage(originalMessage);
+
+ if (message instanceof RaftRPC) {
+
+ RaftRPC rpc = (RaftRPC) message;
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
+ context.getTermInformation().getCurrentTerm());
+ }
+
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+ return switchBehavior(new Follower(context));
+ }
+ }
+
+ if (message instanceof ElectionTimeout) {
+ LOG.debug("{}: Received ElectionTimeout", logName());
+
+ if (votesRequired == 0) {
+ // If there are no peers then we should be a Leader
+ // We wait for the election timeout to occur before declare
+ // ourselves the leader. This gives enough time for a leader
+ // who we do not know about (as a peer)
+ // to send a message to the candidate
+
+ return switchBehavior(new Leader(context));
+ }
+ startNewTerm();
+ scheduleElection(electionDuration());
+ return this;
+ }
+
+ return super.handleMessage(sender, message);
+ }
+
+
+ private void startNewTerm() {
+
+
+ // set voteCount back to 1 (that is voting for self)
+ voteCount = 1;
+
+ // Increment the election term and vote for self
+ long currentTerm = context.getTermInformation().getCurrentTerm();
+ long newTerm = currentTerm + 1;
+ context.getTermInformation().updateAndPersist(newTerm, context.getId());
+
+ LOG.debug("{}: Starting new term {}", logName(), newTerm);
+
+ // Request for a vote
+ // TODO: Retry request for vote if replies do not arrive in a reasonable
+ // amount of time TBD
+ for (String peerId : peers) {
+ ActorSelection peerActor = context.getPeerActorSelection(peerId);
+ if(peerActor != null) {
+ RequestVote requestVote = new RequestVote(
+ context.getTermInformation().getCurrentTerm(),
+ context.getId(),
+ context.getReplicatedLog().lastIndex(),
+ context.getReplicatedLog().lastTerm());
+
+ LOG.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
+
+ peerActor.tell(requestVote, context.getActor());
+ }
+ }
+ }
+
+ @Override public void close() throws Exception {
+ stopElection();