Merge "Added requuired-capabilities to the impl/.../config/default-config.xml and...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
index cf63f4d1e28303feca19e68c9d0a8939fd4f01ea..fcfaee36033f3eba278f1f351d3c8cb3e974feb1 100644 (file)
@@ -5,27 +5,20 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftReplicator;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The behavior of a RaftActor when it is in the Leader state
- * <p>
+ * <p/>
  * Leaders:
  * <ul>
  * <li> Upon election: send initial empty AppendEntries RPCs
@@ -45,42 +38,84 @@ import java.util.concurrent.atomic.AtomicLong;
  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
  * set commitIndex = N (§5.3, §5.4).
  */
-public class Leader extends AbstractRaftActorBehavior {
-
+public class Leader extends AbstractLeader {
+    private Cancellable installSnapshotSchedule = null;
+    private Cancellable isolatedLeaderCheckSchedule = null;
 
-    private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
-
-    public Leader(RaftActorContext context, List<String> followers){
+    public Leader(RaftActorContext context) {
         super(context);
 
-        for(String follower : followers) {
+        scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval());
 
-            ActorRef replicator = context.actorOf(
-                RaftReplicator.props(
-                    new FollowerLogInformationImpl(follower,
-                        new AtomicLong(0),
-                        new AtomicLong(0)),
-                    context.getActor()
-                )
-            );
+        scheduleIsolatedLeaderCheck(
+            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
+                context.getConfigParams().getHeartBeatInterval().unit()));
+    }
 
-            // Create a replicator for each follower
-            followerToReplicator.put(follower, replicator);
+    @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+        Preconditions.checkNotNull(sender, "sender should not be null");
 
+        if (originalMessage instanceof IsolatedLeaderCheck) {
+            if (isLeaderIsolated()) {
+                LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                        context.getId(), minIsolatedLeaderPeerCount, leaderId);
+                return switchBehavior(new IsolatedLeader(context));
+            }
         }
 
+        return super.handleMessage(sender, originalMessage);
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object message) {
-        Preconditions.checkNotNull(sender, "sender should not be null");
+    protected void stopInstallSnapshotSchedule() {
+        if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+            installSnapshotSchedule.cancel();
+        }
+    }
+
+    protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+        if (getFollowerIds().isEmpty()) {
+            // Optimization - do not bother scheduling a heartbeat as there are
+            // no followers
+            return;
+        }
 
-        if(message instanceof SendHeartBeat) {
-            sender.tell(new AppendEntries(
-                context.getTermInformation().getCurrentTerm().get() , context.getId(),
-                context.getReplicatedLog().last().getIndex(),
-                context.getReplicatedLog().last().getTerm(),
-                Collections.EMPTY_LIST), context.getActor());
+        stopInstallSnapshotSchedule();
+
+        // Schedule a message to send append entries to followers that can
+        // accept an append entries with some data in it
+        installSnapshotSchedule =
+            context.getActorSystem().scheduler().scheduleOnce(
+                interval,
+                context.getActor(), new InitiateInstallSnapshot(),
+                context.getActorSystem().dispatcher(), context.getActor());
+    }
+
+    protected void stopIsolatedLeaderCheckSchedule() {
+        if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
+            isolatedLeaderCheckSchedule.cancel();
         }
-        return RaftState.Leader;
+    }
+
+    protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
+        isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
+            context.getActor(), new IsolatedLeaderCheck(),
+            context.getActorSystem().dispatcher(), context.getActor());
+    }
+
+    @Override
+    public void close() throws Exception {
+        stopInstallSnapshotSchedule();
+        stopIsolatedLeaderCheckSchedule();
+        super.close();
+    }
+
+    @VisibleForTesting
+    void markFollowerActive(String followerId) {
+        getFollower(followerId).markFollowerActive();
+    }
+
+    @VisibleForTesting
+    void markFollowerInActive(String followerId) {
+        getFollower(followerId).markFollowerInActive();
     }
 }