X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=a50666233c31f30b2e94cbf4c49d53a95cca93f4;hb=ed6019236d78a69577888f60064c3714eaa80f6a;hp=cf63f4d1e28303feca19e68c9d0a8939fd4f01ea;hpb=cf5be659d906cc80d52647cb516bbab435156742;p=controller.git
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
index cf63f4d1e2..97ecef370f 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
@@ -9,23 +9,17 @@
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftReplicator;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
+import scala.concurrent.duration.FiniteDuration;
/**
* The behavior of a RaftActor when it is in the Leader state
- *
+ *
* Leaders:
*
* - Upon election: send initial empty AppendEntries RPCs
@@ -45,42 +39,83 @@ import java.util.concurrent.atomic.AtomicLong;
* of matchIndex[i] ⥠N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
*/
-public class Leader extends AbstractRaftActorBehavior {
-
-
- private final Map followerToReplicator = new HashMap<>();
+public class Leader extends AbstractLeader {
+ private Cancellable installSnapshotSchedule = null;
+ private Cancellable isolatedLeaderCheckSchedule = null;
- public Leader(RaftActorContext context, List followers){
+ public Leader(RaftActorContext context) {
super(context);
- for(String follower : followers) {
+ scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval());
- ActorRef replicator = context.actorOf(
- RaftReplicator.props(
- new FollowerLogInformationImpl(follower,
- new AtomicLong(0),
- new AtomicLong(0)),
- context.getActor()
- )
- );
+ scheduleIsolatedLeaderCheck(
+ new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
+ context.getConfigParams().getHeartBeatInterval().unit()));
+ }
- // Create a replicator for each follower
- followerToReplicator.put(follower, replicator);
+ @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ Preconditions.checkNotNull(sender, "sender should not be null");
+ if (originalMessage instanceof IsolatedLeaderCheck) {
+ if (isLeaderIsolated()) {
+ LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ minIsolatedLeaderPeerCount, leaderId);
+ return switchBehavior(new IsolatedLeader(context));
+ }
}
+ return super.handleMessage(sender, originalMessage);
}
- @Override public RaftState handleMessage(ActorRef sender, Object message) {
- Preconditions.checkNotNull(sender, "sender should not be null");
+ protected void stopInstallSnapshotSchedule() {
+ if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+ installSnapshotSchedule.cancel();
+ }
+ }
- if(message instanceof SendHeartBeat) {
- sender.tell(new AppendEntries(
- context.getTermInformation().getCurrentTerm().get() , context.getId(),
- context.getReplicatedLog().last().getIndex(),
- context.getReplicatedLog().last().getTerm(),
- Collections.EMPTY_LIST), context.getActor());
+ protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+ if(followers.size() == 0){
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
}
- return RaftState.Leader;
+
+ stopInstallSnapshotSchedule();
+
+ // Schedule a message to send append entries to followers that can
+ // accept an append entries with some data in it
+ installSnapshotSchedule =
+ context.getActorSystem().scheduler().scheduleOnce(
+ interval,
+ context.getActor(), new InitiateInstallSnapshot(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+ protected void stopIsolatedLeaderCheckSchedule() {
+ if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
+ isolatedLeaderCheckSchedule.cancel();
+ }
+ }
+
+ protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
+ isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
+ context.getActor(), new IsolatedLeaderCheck(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+ @Override public void close() throws Exception {
+ stopInstallSnapshotSchedule();
+ stopIsolatedLeaderCheckSchedule();
+ super.close();
+ }
+
+ @VisibleForTesting
+ void markFollowerActive(String followerId) {
+ followerToLog.get(followerId).markFollowerActive();
+ }
+
+ @VisibleForTesting
+ void markFollowerInActive(String followerId) {
+ followerToLog.get(followerId).markFollowerInActive();
}
}