Adding the wrongly removed module back in pom.xml
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
index a9882a664767ad726c651611c6cff8ac078f6d1a..26beed2f7a69b5191407f230c839edd1e4178ade 100644 (file)
@@ -137,6 +137,13 @@ public class Leader extends AbstractRaftActorBehavior {
             followerLogInformation
                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
         } else {
+
+            // TODO: When we find that the follower is out of sync with the
+            // Leader we simply decrement that followers next index by 1.
+            // Would it be possible to do better than this? The RAFT spec
+            // does not explicitly deal with it but may be something for us to
+            // think about
+
             followerLogInformation.decrNextIndex();
         }
 
@@ -193,9 +200,11 @@ public class Leader extends AbstractRaftActorBehavior {
         return RaftState.Leader;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object message) {
+    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
+        Object message = fromSerializableMessage(originalMessage);
+
         if (message instanceof RaftRPC) {
             RaftRPC rpc = (RaftRPC) message;
             // If RPC request or response contains term T > currentTerm:
@@ -215,7 +224,6 @@ public class Leader extends AbstractRaftActorBehavior {
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
             } else if (message instanceof InstallSnapshotReply){
-                // FIXME : Should I be checking the term here too?
                 handleInstallSnapshotReply(
                     (InstallSnapshotReply) message);
             }
@@ -281,20 +289,28 @@ public class Leader extends AbstractRaftActorBehavior {
             List<ReplicatedLogEntry> entries = Collections.emptyList();
 
             if(context.getReplicatedLog().isPresent(nextIndex)){
+                // TODO: Instead of sending all entries from nextIndex
+                // only send a fixed number of entries to each follower
+                // This is to avoid the situation where there are a lot of
+                // entries to install for a fresh follower or to a follower
+                // that has fallen too far behind with the log but yet is not
+                // eligible to receive a snapshot
                 entries =
                     context.getReplicatedLog().getFrom(nextIndex);
             }
 
             followerActor.tell(
-                new AppendEntries(currentTerm(), context.getId(),
-                    prevLogIndex(nextIndex), prevLogTerm(nextIndex),
-                    entries, context.getCommitIndex()
-                ),
-                actor()
-            );
+                new AppendEntries(currentTerm(), context.getId(), prevLogIndex(nextIndex),
+                    prevLogTerm(nextIndex), entries, context.getCommitIndex()).toSerializable(),
+                actor());
         }
     }
 
+    /**
+     * An installSnapshot is scheduled at a interval that is a multiple of
+     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+     * snapshots at every heartbeat.
+     */
     private void installSnapshotIfNeeded(){
         for (String followerId : followerToActor.keySet()) {
             ActorSelection followerActor =