Merge changes Ic434bf4a,I05a3fb18,I47a3783d,I8234bbfd
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
index 5c37455be9420db37a4e7ce1dbdc82a479600fe7..0dd39001136a25416c241eb8fb69085266d3752c 100644 (file)
@@ -9,27 +9,14 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVote;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * The behavior of a RaftActor when it is in the Leader state
  * <p/>
@@ -52,107 +39,78 @@ import java.util.concurrent.atomic.AtomicLong;
  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
  * set commitIndex = N (§5.3, §5.4).
  */
-public class Leader extends AbstractRaftActorBehavior {
-
-    /**
-     * The interval at which a heart beat message will be sent to the remote
-     * RaftActor
-     * <p/>
-     * Since this is set to 100 milliseconds the Election timeout should be
-     * at least 200 milliseconds
-     */
-    public static final FiniteDuration HEART_BEAT_INTERVAL =
-        new FiniteDuration(100, TimeUnit.MILLISECONDS);
-
-    private final Map<String, FollowerLogInformation> followerToLog =
-        new HashMap();
-
-    private final Map<String, ActorSelection> followerToActor = new HashMap<>();
+public class Leader extends AbstractLeader {
+    private Cancellable installSnapshotSchedule = null;
+    private Cancellable isolatedLeaderCheckSchedule = null;
 
-    private Cancellable heartbeatCancel = null;
-
-    public Leader(RaftActorContext context, List<String> followePaths) {
+    public Leader(RaftActorContext context) {
         super(context);
 
-        for (String followerPath : followePaths) {
-            FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerPath,
-                    new AtomicLong(0),
-                    new AtomicLong(0));
-
-            followerToActor.put(followerPath,
-                context.actorSelection(followerLogInformation.getId()));
-            followerToLog.put(followerPath, followerLogInformation);
+        scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval());
 
-        }
-
-        // Immediately schedule a heartbeat
-        // Upon election: send initial empty AppendEntries RPCs
-        // (heartbeat) to each server; repeat during idle periods to
-        // prevent election timeouts (§5.2)
-        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+        scheduleIsolatedLeaderCheck(
+            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
+                context.getConfigParams().getHeartBeatInterval().unit()));
+    }
 
+    @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+        Preconditions.checkNotNull(sender, "sender should not be null");
 
-    }
+        if (originalMessage instanceof IsolatedLeaderCheck) {
+            if (isLeaderIsolated()) {
+                LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                    minIsolatedLeaderPeerCount, leaderId);
+                return switchBehavior(new IsolatedLeader(context));
+            }
+        }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState suggestedState) {
-        return suggestedState;
+        return super.handleMessage(sender, originalMessage);
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-        return suggestedState;
+    protected void stopInstallSnapshotSchedule() {
+        if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+            installSnapshotSchedule.cancel();
+        }
     }
 
-    @Override protected RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState) {
-        return suggestedState;
-    }
+    protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+        if(followers.size() == 0){
+            // Optimization - do not bother scheduling a heartbeat as there are
+            // no followers
+            return;
+        }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        return suggestedState;
-    }
+        stopInstallSnapshotSchedule();
 
-    @Override protected RaftState state() {
-        return RaftState.Leader;
+        // Schedule a message to send append entries to followers that can
+        // accept an append entries with some data in it
+        installSnapshotSchedule =
+            context.getActorSystem().scheduler().scheduleOnce(
+                interval,
+                context.getActor(), new InitiateInstallSnapshot(),
+                context.getActorSystem().dispatcher(), context.getActor());
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object message) {
-        Preconditions.checkNotNull(sender, "sender should not be null");
-
-        scheduleHeartBeat(HEART_BEAT_INTERVAL);
-
-        if (message instanceof SendHeartBeat) {
-            for (ActorSelection follower : followerToActor.values()) {
-                follower.tell(new AppendEntries(
-                    context.getTermInformation().getCurrentTerm().get(),
-                    context.getId(),
-                    context.getReplicatedLog().last().getIndex(),
-                    context.getReplicatedLog().last().getTerm(),
-                    Collections.EMPTY_LIST, context.getCommitIndex().get()),
-                    context.getActor());
-            }
-            return state();
+    protected void stopIsolatedLeaderCheckSchedule() {
+        if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
+            isolatedLeaderCheckSchedule.cancel();
         }
-        return super.handleMessage(sender, message);
     }
 
-    private void scheduleHeartBeat(FiniteDuration interval) {
-        if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
-            heartbeatCancel.cancel();
-        }
+    protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
+        isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
+            context.getActor(), new IsolatedLeaderCheck(),
+            context.getActorSystem().dispatcher(), context.getActor());
+    }
 
-        // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
-        // message is sent to itself.
-        // Scheduling the heartbeat only once here because heartbeats do not
-        // need to be sent if there are other messages being sent to the remote
-        // actor.
-        heartbeatCancel =
-            context.getActorSystem().scheduler().scheduleOnce(interval,
-                context.getActor(), new SendHeartBeat(),
-                context.getActorSystem().dispatcher(), context.getActor());
+    @Override public void close() throws Exception {
+        stopInstallSnapshotSchedule();
+        stopIsolatedLeaderCheckSchedule();
+        super.close();
     }
 
+    @VisibleForTesting
+    void markFollowerActive(String followerId) {
+        followerToLog.get(followerId).markFollowerActive();
+    }
 }