Fix bug in ReplicatedLogImpl#removeFrom and use akka-persistence for removing entries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index dd2f19b13760810b9b80253f495781eeca6ddbd9..74069a18e1f879e71040a171eef8497889fd0f8c 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeou
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
 /**
@@ -37,7 +38,7 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState suggestedState) {
+        AppendEntries appendEntries) {
 
         // If we got here then we do appear to be talking to the leader
         leaderId = appendEntries.getLeaderId();
@@ -121,7 +122,7 @@ public class Follower extends AbstractRaftActorBehavior {
                                 + matchEntry.getIndex()
                         );
                         context.getReplicatedLog()
-                            .removeFrom(matchEntry.getIndex());
+                            .removeFromAndPersist(matchEntry.getIndex());
                         break;
                     }
                 }
@@ -170,17 +171,17 @@ public class Follower extends AbstractRaftActorBehavior {
         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
             lastIndex(), lastTerm()), actor());
 
-        return suggestedState;
+        return state();
     }
 
     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-        return suggestedState;
+        AppendEntriesReply appendEntriesReply) {
+        return state();
     }
 
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        return suggestedState;
+        RequestVoteReply requestVoteReply) {
+        return state();
     }
 
     @Override public RaftState state() {
@@ -188,6 +189,16 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
+        if (message instanceof RaftRPC) {
+            RaftRPC rpc = (RaftRPC) message;
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (ยง5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+            }
+        }
+
         if (message instanceof ElectionTimeout) {
             return RaftState.Candidate;
         } else if (message instanceof InstallSnapshot) {