Merge "Fixed unneeded wrapping of guava for karaf"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
index 90edf7da9a8dc5660f13f1eb783543f1bc8c17c1..fb8be8b891a315b420c6778f1691ba4712a069d7 100644 (file)
@@ -27,6 +27,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -93,9 +94,10 @@ public class Leader extends AbstractRaftActorBehavior {
                 context.actorSelection(context.getPeerAddress(followerId)));
 
             followerToLog.put(followerId, followerLogInformation);
-
         }
 
+        context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
+
         if (followerToActor.size() > 0) {
             minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
         } else {
@@ -117,20 +119,13 @@ public class Leader extends AbstractRaftActorBehavior {
     }
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState suggestedState) {
-
-        context.getLogger()
-            .error("An unexpected AppendEntries received in state " + state());
+        AppendEntries appendEntries) {
 
-        return suggestedState;
+        return state();
     }
 
     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-
-        // Do not take any other action since a behavior change is coming
-        if (suggestedState != state())
-            return suggestedState;
+        AppendEntriesReply appendEntriesReply) {
 
         // Update the FollowerLogInformation
         String followerId = appendEntriesReply.getFollowerId();
@@ -142,6 +137,13 @@ public class Leader extends AbstractRaftActorBehavior {
             followerLogInformation
                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
         } else {
+
+            // TODO: When we find that the follower is out of sync with the
+            // Leader we simply decrement that followers next index by 1.
+            // Would it be possible to do better than this? The RAFT spec
+            // does not explicitly deal with it but may be something for us to
+            // think about
+
             followerLogInformation.decrNextIndex();
         }
 
@@ -176,7 +178,7 @@ public class Leader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
-        return suggestedState;
+        return state();
     }
 
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
@@ -190,8 +192,8 @@ public class Leader extends AbstractRaftActorBehavior {
     }
 
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        return suggestedState;
+        RequestVoteReply requestVoteReply) {
+        return state();
     }
 
     @Override public RaftState state() {
@@ -201,6 +203,17 @@ public class Leader extends AbstractRaftActorBehavior {
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
+        if (message instanceof RaftRPC) {
+            RaftRPC rpc = (RaftRPC) message;
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (ยง5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+                return RaftState.Follower;
+            }
+        }
+
         try {
             if (message instanceof SendHeartBeat) {
                 return sendHeartBeat();
@@ -209,7 +222,6 @@ public class Leader extends AbstractRaftActorBehavior {
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
             } else if (message instanceof InstallSnapshotReply){
-                // FIXME : Should I be checking the term here too?
                 handleInstallSnapshotReply(
                     (InstallSnapshotReply) message);
             }
@@ -275,6 +287,12 @@ public class Leader extends AbstractRaftActorBehavior {
             List<ReplicatedLogEntry> entries = Collections.emptyList();
 
             if(context.getReplicatedLog().isPresent(nextIndex)){
+                // TODO: Instead of sending all entries from nextIndex
+                // only send a fixed number of entries to each follower
+                // This is to avoid the situation where there are a lot of
+                // entries to install for a fresh follower or to a follower
+                // that has fallen too far behind with the log but yet is not
+                // eligible to receive a snapshot
                 entries =
                     context.getReplicatedLog().getFrom(nextIndex);
             }
@@ -289,6 +307,11 @@ public class Leader extends AbstractRaftActorBehavior {
         }
     }
 
+    /**
+     * An installSnapshot is scheduled at a interval that is a multiple of
+     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+     * snapshots at every heartbeat.
+     */
     private void installSnapshotIfNeeded(){
         for (String followerId : followerToActor.keySet()) {
             ActorSelection followerActor =