Merge "added feature topology manager shell"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
index 90edf7da9a8dc5660f13f1eb783543f1bc8c17c1..2a44e8b7a5c3adeecd1c534de0664d8deb290934 100644 (file)
@@ -19,14 +19,15 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -35,6 +36,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -66,7 +68,7 @@ public class Leader extends AbstractRaftActorBehavior {
     private final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
 
-    private final Map<String, ActorSelection> followerToActor = new HashMap<>();
+    private final Set<String> followers;
 
     private Cancellable heartbeatSchedule = null;
     private Cancellable appendEntriesSchedule = null;
@@ -83,21 +85,21 @@ public class Leader extends AbstractRaftActorBehavior {
             context.setCommitIndex(lastIndex());
         }
 
-        for (String followerId : context.getPeerAddresses().keySet()) {
+        followers = context.getPeerAddresses().keySet();
+
+        for (String followerId : followers) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
                     new AtomicLong(lastIndex()),
                     new AtomicLong(-1));
 
-            followerToActor.put(followerId,
-                context.actorSelection(context.getPeerAddress(followerId)));
-
             followerToLog.put(followerId, followerLogInformation);
-
         }
 
-        if (followerToActor.size() > 0) {
-            minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
+        context.getLogger().debug("Election:Leader has following peers:"+ followers);
+
+        if (followers.size() > 0) {
+            minReplicationCount = (followers.size() + 1) / 2 + 1;
         } else {
             minReplicationCount = 0;
         }
@@ -110,38 +112,51 @@ public class Leader extends AbstractRaftActorBehavior {
         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
 
         scheduleInstallSnapshotCheck(
-            new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
-                HEART_BEAT_INTERVAL.unit())
+            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
+                context.getConfigParams().getHeartBeatInterval().unit())
         );
 
     }
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState suggestedState) {
+        AppendEntries appendEntries) {
 
-        context.getLogger()
-            .error("An unexpected AppendEntries received in state " + state());
+        context.getLogger().info("Leader: Received {}", appendEntries.toString());
 
-        return suggestedState;
+        return state();
     }
 
     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
+        AppendEntriesReply appendEntriesReply) {
 
-        // Do not take any other action since a behavior change is coming
-        if (suggestedState != state())
-            return suggestedState;
+        if(! appendEntriesReply.isSuccess()) {
+            context.getLogger()
+                .info("Leader: Received {}", appendEntriesReply.toString());
+        }
 
         // Update the FollowerLogInformation
         String followerId = appendEntriesReply.getFollowerId();
         FollowerLogInformation followerLogInformation =
             followerToLog.get(followerId);
+
+        if(followerLogInformation == null){
+            context.getLogger().error("Unknown follower {}", followerId);
+            return state();
+        }
+
         if (appendEntriesReply.isSuccess()) {
             followerLogInformation
                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
             followerLogInformation
                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
         } else {
+
+            // TODO: When we find that the follower is out of sync with the
+            // Leader we simply decrement that followers next index by 1.
+            // Would it be possible to do better than this? The RAFT spec
+            // does not explicitly deal with it but may be something for us to
+            // think about
+
             followerLogInformation.decrNextIndex();
         }
 
@@ -176,7 +191,7 @@ public class Leader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
-        return suggestedState;
+        return state();
     }
 
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
@@ -190,17 +205,30 @@ public class Leader extends AbstractRaftActorBehavior {
     }
 
     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        return suggestedState;
+        RequestVoteReply requestVoteReply) {
+        return state();
     }
 
     @Override public RaftState state() {
         return RaftState.Leader;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object message) {
+    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
+        Object message = fromSerializableMessage(originalMessage);
+
+        if (message instanceof RaftRPC) {
+            RaftRPC rpc = (RaftRPC) message;
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (ยง5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+                return RaftState.Follower;
+            }
+        }
+
         try {
             if (message instanceof SendHeartBeat) {
                 return sendHeartBeat();
@@ -209,12 +237,11 @@ public class Leader extends AbstractRaftActorBehavior {
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
             } else if (message instanceof InstallSnapshotReply){
-                // FIXME : Should I be checking the term here too?
                 handleInstallSnapshotReply(
                     (InstallSnapshotReply) message);
             }
         } finally {
-            scheduleHeartBeat(HEART_BEAT_INTERVAL);
+            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
         }
 
         return super.handleMessage(sender, message);
@@ -237,7 +264,7 @@ public class Leader extends AbstractRaftActorBehavior {
 
         context.getLogger().debug("Replicate message " + logIndex);
 
-        if (followerToActor.size() == 0) {
+        if (followers.size() == 0) {
             context.setCommitIndex(
                 replicate.getReplicatedLogEntry().getIndex());
 
@@ -263,57 +290,73 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
-        for (String followerId : followerToActor.keySet()) {
+        for (String followerId : followers) {
             ActorSelection followerActor =
-                followerToActor.get(followerId);
+                context.getPeerActorSelection(followerId);
 
-            FollowerLogInformation followerLogInformation =
-                followerToLog.get(followerId);
+            if (followerActor != null) {
+                FollowerLogInformation followerLogInformation =
+                    followerToLog.get(followerId);
 
-            long nextIndex = followerLogInformation.getNextIndex().get();
+                long nextIndex = followerLogInformation.getNextIndex().get();
 
-            List<ReplicatedLogEntry> entries = Collections.emptyList();
+                List<ReplicatedLogEntry> entries = Collections.emptyList();
 
-            if(context.getReplicatedLog().isPresent(nextIndex)){
-                entries =
-                    context.getReplicatedLog().getFrom(nextIndex);
-            }
+                if (context.getReplicatedLog().isPresent(nextIndex)) {
+                    // TODO: Instead of sending all entries from nextIndex
+                    // only send a fixed number of entries to each follower
+                    // This is to avoid the situation where there are a lot of
+                    // entries to install for a fresh follower or to a follower
+                    // that has fallen too far behind with the log but yet is not
+                    // eligible to receive a snapshot
+                    entries =
+                        context.getReplicatedLog().getFrom(nextIndex);
+                }
 
-            followerActor.tell(
-                new AppendEntries(currentTerm(), context.getId(),
-                    prevLogIndex(nextIndex), prevLogTerm(nextIndex),
-                    entries, context.getCommitIndex()
-                ),
-                actor()
-            );
+                followerActor.tell(
+                    new AppendEntries(currentTerm(), context.getId(),
+                        prevLogIndex(nextIndex),
+                        prevLogTerm(nextIndex), entries,
+                        context.getCommitIndex()).toSerializable(),
+                    actor()
+                );
+            }
         }
     }
 
+    /**
+     * An installSnapshot is scheduled at a interval that is a multiple of
+     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+     * snapshots at every heartbeat.
+     */
     private void installSnapshotIfNeeded(){
-        for (String followerId : followerToActor.keySet()) {
+        for (String followerId : followers) {
             ActorSelection followerActor =
-                followerToActor.get(followerId);
-
-            FollowerLogInformation followerLogInformation =
-                followerToLog.get(followerId);
-
-            long nextIndex = followerLogInformation.getNextIndex().get();
-
-            if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
-                followerActor.tell(
-                    new InstallSnapshot(currentTerm(), context.getId(),
-                        context.getReplicatedLog().getSnapshotIndex(),
-                        context.getReplicatedLog().getSnapshotTerm(),
-                        context.getReplicatedLog().getSnapshot()
-                    ),
-                    actor()
-                );
+                context.getPeerActorSelection(followerId);
+
+            if(followerActor != null) {
+                FollowerLogInformation followerLogInformation =
+                    followerToLog.get(followerId);
+
+                long nextIndex = followerLogInformation.getNextIndex().get();
+
+                if (!context.getReplicatedLog().isPresent(nextIndex) && context
+                    .getReplicatedLog().isInSnapshot(nextIndex)) {
+                    followerActor.tell(
+                        new InstallSnapshot(currentTerm(), context.getId(),
+                            context.getReplicatedLog().getSnapshotIndex(),
+                            context.getReplicatedLog().getSnapshotTerm(),
+                            context.getReplicatedLog().getSnapshot()
+                        ),
+                        actor()
+                    );
+                }
             }
         }
     }
 
     private RaftState sendHeartBeat() {
-        if (followerToActor.size() > 0) {
+        if (followers.size() > 0) {
             sendAppendEntries();
         }
         return state();
@@ -332,7 +375,7 @@ public class Leader extends AbstractRaftActorBehavior {
     }
 
     private void scheduleHeartBeat(FiniteDuration interval) {
-        if(followerToActor.keySet().size() == 0){
+        if(followers.size() == 0){
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;
@@ -354,7 +397,7 @@ public class Leader extends AbstractRaftActorBehavior {
 
 
     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
-        if(followerToActor.keySet().size() == 0){
+        if(followers.size() == 0){
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;