X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=8b95e8b7a6bd57e9581d02226f99b2d44bf5cb51;hb=77d55c2a5a0311aac06707d71e199ba30271b48c;hp=6c3eee5415a1efc0654ee0ef6a4aa66bca81fff6;hpb=0032979e6c27ffdc879eabc9bb9dee2ca75ee2d8;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 6c3eee5415..fcfaee3603 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -5,31 +5,17 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.actor.Cancellable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; -import org.opendaylight.controller.cluster.raft.messages.AppendEntries; -import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RequestVote; -import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; import scala.concurrent.duration.FiniteDuration; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - /** * The behavior of a RaftActor when it is in the Leader state *

@@ -52,110 +38,84 @@ import java.util.concurrent.atomic.AtomicLong; * of matchIndex[i] ≥ N, and log[N].term == currentTerm: * set commitIndex = N (§5.3, §5.4). */ -public class Leader extends AbstractRaftActorBehavior { - - /** - * The interval at which a heart beat message will be sent to the remote - * RaftActor - *

- * Since this is set to 100 milliseconds the Election timeout should be - * at least 200 milliseconds - */ - private static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(100, TimeUnit.MILLISECONDS); - - private final Map followerToReplicator = new HashMap<>(); - - private final Map followerToLog = - new HashMap(); - - private final Map followerToActor = new HashMap<>(); - - private Cancellable heartbeatCancel = null; +public class Leader extends AbstractLeader { + private Cancellable installSnapshotSchedule = null; + private Cancellable isolatedLeaderCheckSchedule = null; - public Leader(RaftActorContext context, List followers) { + public Leader(RaftActorContext context) { super(context); - for (String follower : followers) { + scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval()); - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(follower, - new AtomicLong(0), - new AtomicLong(0)); + scheduleIsolatedLeaderCheck( + new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10, + context.getConfigParams().getHeartBeatInterval().unit())); + } - followerToActor.put(follower, - context.actorSelection(followerLogInformation.getId())); - followerToLog.put(follower, followerLogInformation); + @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { + Preconditions.checkNotNull(sender, "sender should not be null"); + if (originalMessage instanceof IsolatedLeaderCheck) { + if (isLeaderIsolated()) { + LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", + context.getId(), minIsolatedLeaderPeerCount, leaderId); + return switchBehavior(new IsolatedLeader(context)); + } } - // Immediately schedule a heartbeat - // Upon election: send initial empty AppendEntries RPCs - // (heartbeat) to each server; repeat during idle periods to - // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); - - + return super.handleMessage(sender, originalMessage); } - @Override protected RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState) { - return suggestedState; + protected void stopInstallSnapshotSchedule() { + if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) { + installSnapshotSchedule.cancel(); + } } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState) { - return suggestedState; - } + protected void scheduleInstallSnapshotCheck(FiniteDuration interval) { + if (getFollowerIds().isEmpty()) { + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; + } - @Override protected RaftState handleRequestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState) { - return suggestedState; - } + stopInstallSnapshotSchedule(); - @Override protected RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState) { - return suggestedState; + // Schedule a message to send append entries to followers that can + // accept an append entries with some data in it + installSnapshotSchedule = + context.getActorSystem().scheduler().scheduleOnce( + interval, + context.getActor(), new InitiateInstallSnapshot(), + context.getActorSystem().dispatcher(), context.getActor()); } - @Override protected RaftState state() { - return RaftState.Leader; + protected void stopIsolatedLeaderCheckSchedule() { + if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) { + isolatedLeaderCheckSchedule.cancel(); + } } - @Override public RaftState handleMessage(ActorRef sender, Object message) { - Preconditions.checkNotNull(sender, "sender should not be null"); - - scheduleHeartBeat(HEART_BEAT_INTERVAL); - - if (message instanceof SendHeartBeat) { - for (ActorSelection follower : followerToActor.values()) { - follower.tell(new AppendEntries( - context.getTermInformation().getCurrentTerm().get(), - context.getId(), - context.getReplicatedLog().last().getIndex(), - context.getReplicatedLog().last().getTerm(), - Collections.EMPTY_LIST, context.getCommitIndex().get()), - context.getActor()); - } - return state(); - } - return super.handleMessage(sender, message); + protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) { + isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval, + context.getActor(), new IsolatedLeaderCheck(), + context.getActorSystem().dispatcher(), context.getActor()); } - private void scheduleHeartBeat(FiniteDuration interval) { - if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) { - heartbeatCancel.cancel(); - } + @Override + public void close() throws Exception { + stopInstallSnapshotSchedule(); + stopIsolatedLeaderCheckSchedule(); + super.close(); + } - // Schedule a heartbeat. When the scheduler triggers the replicator - // will let the RaftActor (leader) know that a new heartbeat needs to be sent - // Scheduling the heartbeat only once here because heartbeats do not - // need to be sent if there are other messages being sent to the remote - // actor. - heartbeatCancel = - context.getActorSystem().scheduler().scheduleOnce(interval, - context.getActor(), new SendHeartBeat(), - context.getActorSystem().dispatcher(), context.getActor()); + @VisibleForTesting + void markFollowerActive(String followerId) { + getFollower(followerId).markFollowerActive(); } + @VisibleForTesting + void markFollowerInActive(String followerId) { + getFollower(followerId).markFollowerInActive(); + } }