Fix bug in ReplicatedLogImpl#removeFrom and use akka-persistence for removing entries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 88558cac16f76f6f1cc4ddae32db7781a5b186c0..74069a18e1f879e71040a171eef8497889fd0f8c 100644 (file)
@@ -12,21 +12,23 @@ import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
 /**
  * The behavior of a RaftActor in the Follower state
- *
+ * <p/>
  * <ul>
  * <li> Respond to RPCs from candidates and leaders
  * <li> If election timeout elapses without receiving AppendEntries
  * RPC from current leader or granting vote to candidate:
  * convert to candidate
  * </ul>
- *
  */
 public class Follower extends AbstractRaftActorBehavior {
     public Follower(RaftActorContext context) {
@@ -36,7 +38,7 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState suggestedState) {
+        AppendEntries appendEntries) {
 
         // If we got here then we do appear to be talking to the leader
         leaderId = appendEntries.getLeaderId();
@@ -47,16 +49,36 @@ public class Follower extends AbstractRaftActorBehavior {
             .get(appendEntries.getPrevLogIndex());
 
 
-        if (lastIndex() > -1 && previousEntry != null
-            && previousEntry.getTerm() != appendEntries
-            .getPrevLogTerm()) {
+        boolean noMatchingTerms = true;
+
+        if (lastIndex() == -1
+            && appendEntries.getPrevLogIndex() != -1) {
+
+            context.getLogger().debug(
+                "The followers log is empty and the senders prevLogIndex is {}",
+                appendEntries.getPrevLogIndex());
+
+        } else if (lastIndex() > -1
+            && appendEntries.getPrevLogIndex() != -1
+            && previousEntry == null) {
 
             context.getLogger().debug(
-                "Cannot append entries because previous entry term "
-                    + previousEntry.getTerm()
-                    + " is not equal to append entries prevLogTerm "
-                    + appendEntries.getPrevLogTerm());
+                "The log is not empty but the prevLogIndex {} was not found in it",
+                appendEntries.getPrevLogIndex());
 
+        } else if (lastIndex() > -1
+            && previousEntry != null
+            && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+
+            context.getLogger().debug(
+                "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
+                , previousEntry.getTerm()
+                , appendEntries.getPrevLogTerm());
+        } else {
+            noMatchingTerms = false;
+        }
+
+        if (noMatchingTerms) {
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -68,7 +90,8 @@ public class Follower extends AbstractRaftActorBehavior {
             && appendEntries.getEntries().size() > 0) {
             context.getLogger().debug(
                 "Number of entries to be appended = " + appendEntries
-                    .getEntries().size());
+                    .getEntries().size()
+            );
 
             // 3. If an existing entry conflicts with a new one (same index
             // but different terms), delete the existing entry and all that
@@ -96,9 +119,10 @@ public class Follower extends AbstractRaftActorBehavior {
                         .getTerm()) {
                         context.getLogger().debug(
                             "Removing entries from log starting at "
-                                + matchEntry.getIndex());
+                                + matchEntry.getIndex()
+                        );
                         context.getReplicatedLog()
-                            .removeFrom(matchEntry.getIndex());
+                            .removeFromAndPersist(matchEntry.getIndex());
                         break;
                     }
                 }
@@ -106,14 +130,16 @@ public class Follower extends AbstractRaftActorBehavior {
 
             context.getLogger().debug(
                 "After cleanup entries to be added from = " + (addEntriesFrom
-                    + lastIndex()));
+                    + lastIndex())
+            );
 
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
                 context.getLogger().debug(
-                    "Append entry to log " + appendEntries.getEntries().get(i)
-                        .toString());
+                    "Append entry to log " + appendEntries.getEntries().get(i).getData()
+                        .toString()
+                );
                 context.getReplicatedLog()
                     .appendAndPersist(appendEntries.getEntries().get(i));
             }
@@ -145,17 +171,17 @@ public class Follower extends AbstractRaftActorBehavior {
         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
             lastIndex(), lastTerm()), actor());
 
-        return suggestedState;
+        return state();
     }
 
     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-        return suggestedState;
+        AppendEntriesReply appendEntriesReply) {
+        return state();
     }
 
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        return suggestedState;
+        RequestVoteReply requestVoteReply) {
+        return state();
     }
 
     @Override public RaftState state() {
@@ -163,8 +189,21 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
-        if(message instanceof ElectionTimeout){
+        if (message instanceof RaftRPC) {
+            RaftRPC rpc = (RaftRPC) message;
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (ยง5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+            }
+        }
+
+        if (message instanceof ElectionTimeout) {
             return RaftState.Candidate;
+        } else if (message instanceof InstallSnapshot) {
+            InstallSnapshot snapshot = (InstallSnapshot) message;
+            actor().tell(new ApplySnapshot(snapshot), actor());
         }
 
         scheduleElection(electionDuration());