Merge "Fix RaftActorTest.testRaftRoleChangeNotifier failure"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
index 0498d7fc8b17c52f60cd2b5b0cc249853a5daca7..ebcdcd40fb078ebcc16439ec2feaa87b6f62eca4 100644 (file)
@@ -5,29 +5,15 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
-import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 
 /**
  * The behavior of a RaftActor when it is in the Leader state
@@ -51,102 +37,51 @@ import java.util.concurrent.atomic.AtomicLong;
  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
  * set commitIndex = N (§5.3, §5.4).
  */
-public class Leader extends AbstractRaftActorBehavior {
-
-    /**
-     * The interval at which a heart beat message will be sent to the remote
-     * RaftActor
-     * <p/>
-     * Since this is set to 100 milliseconds the Election timeout should be
-     * at least 200 milliseconds
-     */
-    public static final FiniteDuration HEART_BEAT_INTERVAL =
-        new FiniteDuration(100, TimeUnit.MILLISECONDS);
-
-    private final Map<String, FollowerLogInformation> followerToLog =
-        new HashMap();
-
-    private final Map<String, ActorSelection> followerToActor = new HashMap<>();
+public class Leader extends AbstractLeader {
+    private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
+    private final Stopwatch isolatedLeaderCheck;
 
-    private Cancellable heartbeatCancel = null;
-
-    public Leader(RaftActorContext context, List<String> followePaths) {
+    public Leader(RaftActorContext context) {
         super(context);
+        isolatedLeaderCheck = Stopwatch.createStarted();
+    }
 
-        for (String followerPath : followePaths) {
-            FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerPath,
-                    new AtomicLong(0),
-                    new AtomicLong(0));
+    @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+        Preconditions.checkNotNull(sender, "sender should not be null");
 
-            followerToActor.put(followerPath,
-                context.actorSelection(followerLogInformation.getId()));
-            followerToLog.put(followerPath, followerLogInformation);
+        if (originalMessage instanceof IsolatedLeaderCheck) {
+            if (isLeaderIsolated()) {
+                LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                        context.getId(), minIsolatedLeaderPeerCount, leaderId);
 
+                return switchBehavior(new IsolatedLeader(context));
+            }
         }
 
-        // Immediately schedule a heartbeat
-        // Upon election: send initial empty AppendEntries RPCs
-        // (heartbeat) to each server; repeat during idle periods to
-        // prevent election timeouts (§5.2)
-        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
-
-
+        return super.handleMessage(sender, originalMessage);
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries, RaftState suggestedState) {
-        return suggestedState;
-    }
-
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
-        return suggestedState;
-    }
+    @Override
+    protected void beforeSendHeartbeat(){
+        if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+            context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
+            isolatedLeaderCheck.reset().start();
+        }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply, RaftState suggestedState) {
-        return suggestedState;
     }
 
-    @Override protected RaftState state() {
-        return RaftState.Leader;
+    @Override
+    public void close() throws Exception {
+        super.close();
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object message) {
-        Preconditions.checkNotNull(sender, "sender should not be null");
-
-        scheduleHeartBeat(HEART_BEAT_INTERVAL);
-
-        if (message instanceof SendHeartBeat) {
-            for (ActorSelection follower : followerToActor.values()) {
-                follower.tell(new AppendEntries(
-                    context.getTermInformation().getCurrentTerm(),
-                    context.getId(),
-                    context.getReplicatedLog().last().getIndex(),
-                    context.getReplicatedLog().last().getTerm(),
-                    Collections.EMPTY_LIST, context.getCommitIndex()),
-                    context.getActor());
-            }
-            return state();
-        }
-        return super.handleMessage(sender, message);
+    @VisibleForTesting
+    void markFollowerActive(String followerId) {
+        getFollower(followerId).markFollowerActive();
     }
 
-    private void scheduleHeartBeat(FiniteDuration interval) {
-        if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
-            heartbeatCancel.cancel();
-        }
-
-        // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
-        // message is sent to itself.
-        // Scheduling the heartbeat only once here because heartbeats do not
-        // need to be sent if there are other messages being sent to the remote
-        // actor.
-        heartbeatCancel =
-            context.getActorSystem().scheduler().scheduleOnce(interval,
-                context.getActor(), new SendHeartBeat(),
-                context.getActorSystem().dispatcher(), context.getActor());
+    @VisibleForTesting
+    void markFollowerInActive(String followerId) {
+        getFollower(followerId).markFollowerInActive();
     }
-
 }