X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=eb49abc17aea4a0ebefacde0a5925312082e1405;hb=1b1360ac337d23b9a586f62616eb278c3065eef0;hp=53e47c2f842f17ac0b2766811745a7f1ec6eabde;hpb=df13d272f2f74892f2decd54ece5d8286fb1c995;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 53e47c2f84..eb49abc17a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -5,44 +5,28 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.actor.Cancellable; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.raft.ClientRequestTracker; -import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort; import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; -import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; -import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; -import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; -import org.opendaylight.controller.cluster.raft.messages.RaftRPC; -import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import scala.concurrent.duration.FiniteDuration; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** - * The behavior of a RaftActor when it is in the Leader state - *

+ * The behavior of a RaftActor when it is in the Leader state. + * + *

* Leaders: *

*/ -public class Leader extends AbstractRaftActorBehavior { - - - private final Map followerToLog = - new HashMap(); - - private final Set followers; - - private Cancellable heartbeatSchedule = null; - private Cancellable appendEntriesSchedule = null; - private Cancellable installSnapshotSchedule = null; - - private List trackerList = new ArrayList<>(); - - private final int minReplicationCount; - - public Leader(RaftActorContext context) { - super(context); - - if (lastIndex() >= 0) { - context.setCommitIndex(lastIndex()); - } - - followers = context.getPeerAddresses().keySet(); - - for (String followerId : followers) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - new AtomicLong(lastIndex()), - new AtomicLong(-1)); - - followerToLog.put(followerId, followerLogInformation); - } - - context.getLogger().debug("Election:Leader has following peers:"+ followers); - - if (followers.size() > 0) { - minReplicationCount = (followers.size() + 1) / 2 + 1; - } else { - minReplicationCount = 0; - } - - - // Immediately schedule a heartbeat - // Upon election: send initial empty AppendEntries RPCs - // (heartbeat) to each server; repeat during idle periods to - // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); +public class Leader extends AbstractLeader { + /** + * Internal message sent to periodically check if this leader has become isolated and should transition + * to {@link IsolatedLeader}. + */ + @VisibleForTesting + static final Object ISOLATED_LEADER_CHECK = new Object(); - scheduleInstallSnapshotCheck( - new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, - HEART_BEAT_INTERVAL.unit()) - ); + private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted(); + @Nullable private LeadershipTransferContext leadershipTransferContext; + Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) { + super(context, RaftState.Leader, initializeFromLeader); } - @Override protected RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries) { - - context.getLogger().info("Leader: Received {}", appendEntries.toString()); - - return state(); + public Leader(RaftActorContext context) { + this(context, null); } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply) { - - if(! appendEntriesReply.isSuccess()) { - context.getLogger() - .info("Leader: Received {}", appendEntriesReply.toString()); - } - - // Update the FollowerLogInformation - String followerId = appendEntriesReply.getFollowerId(); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - if(followerLogInformation == null){ - context.getLogger().error("Unknown follower {}", followerId); - return state(); - } - - if (appendEntriesReply.isSuccess()) { - followerLogInformation - .setMatchIndex(appendEntriesReply.getLogLastIndex()); - followerLogInformation - .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); - } else { - - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about - - followerLogInformation.decrNextIndex(); - } - - // Now figure out if this reply warrants a change in the commitIndex - // If there exists an N such that N > commitIndex, a majority - // of matchIndex[i] ≥ N, and log[N].term == currentTerm: - // set commitIndex = N (§5.3, §5.4). - for (long N = context.getCommitIndex() + 1; ; N++) { - int replicatedCount = 1; - - for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex().get() >= N) { - replicatedCount++; - } - } + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { + Preconditions.checkNotNull(sender, "sender should not be null"); - if (replicatedCount >= minReplicationCount) { - ReplicatedLogEntry replicatedLogEntry = - context.getReplicatedLog().get(N); - if (replicatedLogEntry != null - && replicatedLogEntry.getTerm() - == currentTerm()) { - context.setCommitIndex(N); - } + if (ISOLATED_LEADER_CHECK.equals(originalMessage)) { + if (isLeaderIsolated()) { + log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", + context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId()); + return internalSwitchBehavior(new IsolatedLeader(context, this)); } else { - break; + return this; } + } else { + return super.handleMessage(sender, originalMessage); } - - // Apply the change to the state machine - if (context.getCommitIndex() > context.getLastApplied()) { - applyLogToStateMachine(context.getCommitIndex()); - } - - return state(); } - protected ClientRequestTracker findClientRequestTracker(long logIndex) { - for (ClientRequestTracker tracker : trackerList) { - if (tracker.getIndex() == logIndex) { - return tracker; - } + @Override + protected void beforeSendHeartbeat() { + if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) + > context.getConfigParams().getIsolatedCheckIntervalInMillis()) { + context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor()); + isolatedLeaderCheck.reset().start(); } - return null; + if (leadershipTransferContext != null && leadershipTransferContext.isExpired( + context.getConfigParams().getElectionTimeOutInterval().toMillis())) { + log.debug("{}: Leadership transfer expired", logName()); + leadershipTransferContext = null; + } } - @Override protected RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply) { - return state(); + @Override + protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { + RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply); + tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId()); + return returnBehavior; } - @Override public RaftState state() { - return RaftState.Leader; - } + /** + * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows: + *
    + *
  • Start a timer (Stopwatch).
  • + *
  • Send an initial AppendEntries heartbeat to all followers.
  • + *
  • On AppendEntriesReply, check if the follower's new match Index matches the leader's last index
  • + *
  • If it matches, + *
      + *
    • Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.
    • + *
    • Send an ElectionTimeout to the follower to immediately start an election.
    • + *
    • Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.
    • + *
  • + *
  • Otherwise if the election time out period elapses, notify + * {@link RaftActorLeadershipTransferCohort#abortTransfer}.
  • + *
+ * + * @param leadershipTransferCohort the cohort participating in the leadership transfer + */ + public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) { + log.debug("{}: Attempting to transfer leadership", logName()); - @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { - Preconditions.checkNotNull(sender, "sender should not be null"); + leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort); - Object message = fromSerializableMessage(originalMessage); + // Send an immediate heart beat to the followers. + sendAppendEntries(0, false); + } - if (message instanceof RaftRPC) { - RaftRPC rpc = (RaftRPC) message; - // If RPC request or response contains term T > currentTerm: - // set currentTerm = T, convert to follower (§5.1) - // This applies to all RPC messages and responses - if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return RaftState.Follower; - } + private void tryToCompleteLeadershipTransfer(String followerId) { + if (leadershipTransferContext == null) { + return; } - try { - if (message instanceof SendHeartBeat) { - return sendHeartBeat(); - } else if(message instanceof SendInstallSnapshot) { - installSnapshotIfNeeded(); - } else if (message instanceof Replicate) { - replicate((Replicate) message); - } else if (message instanceof InstallSnapshotReply){ - handleInstallSnapshotReply( - (InstallSnapshotReply) message); - } - } finally { - scheduleHeartBeat(HEART_BEAT_INTERVAL); + final Optional requestedFollowerIdOptional + = leadershipTransferContext.transferCohort.getRequestedFollowerId(); + if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.get().equals(followerId)) { + // we want to transfer leadership to specific follower + return; } - return super.handleMessage(sender, message); - } - - private void handleInstallSnapshotReply(InstallSnapshotReply message) { - InstallSnapshotReply reply = message; - String followerId = reply.getFollowerId(); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - followerLogInformation - .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation - .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); - } - - private void replicate(Replicate replicate) { - long logIndex = replicate.getReplicatedLogEntry().getIndex(); - - context.getLogger().debug("Replicate message " + logIndex); - - if (followers.size() == 0) { - context.setCommitIndex( - replicate.getReplicatedLogEntry().getIndex()); - - context.getActor() - .tell(new ApplyState(replicate.getClientActor(), - replicate.getIdentifier(), - replicate.getReplicatedLogEntry()), - context.getActor() - ); - } else { - - // Create a tracker entry we will use this later to notify the - // client actor - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); - - sendAppendEntries(); + FollowerLogInformation followerInfo = getFollower(followerId); + if (followerInfo == null) { + return; } - } - - private void sendAppendEntries() { - // Send an AppendEntries to all followers - for (String followerId : followers) { - ActorSelection followerActor = - context.getPeerActorSelection(followerId); - if (followerActor != null) { - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + long lastIndex = context.getReplicatedLog().lastIndex(); + boolean isVoting = context.getPeerInfo(followerId).isVoting(); - long nextIndex = followerLogInformation.getNextIndex().get(); + log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}", + logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting); - List entries = Collections.emptyList(); + if (isVoting && followerInfo.getMatchIndex() == lastIndex) { + log.debug("{}: Follower's log matches - sending ElectionTimeout", logName()); - if (context.getReplicatedLog().isPresent(nextIndex)) { - // TODO: Instead of sending all entries from nextIndex - // only send a fixed number of entries to each follower - // This is to avoid the situation where there are a lot of - // entries to install for a fresh follower or to a follower - // that has fallen too far behind with the log but yet is not - // eligible to receive a snapshot - entries = - context.getReplicatedLog().getFrom(nextIndex); - } + // We can't be sure if the follower has applied all its log entries to its state so send an + // additional AppendEntries with the latest commit index. + sendAppendEntries(0, false); - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(nextIndex), - prevLogTerm(nextIndex), entries, - context.getCommitIndex()).toSerializable(), - actor() - ); - } - } - } - - /** - * An installSnapshot is scheduled at a interval that is a multiple of - * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing - * snapshots at every heartbeat. - */ - private void installSnapshotIfNeeded(){ - for (String followerId : followers) { - ActorSelection followerActor = - context.getPeerActorSelection(followerId); - - if(followerActor != null) { - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + // Now send a TimeoutNow message to the matching follower to immediately start an election. + ActorSelection followerActor = context.getPeerActorSelection(followerId); + followerActor.tell(TimeoutNow.INSTANCE, context.getActor()); - long nextIndex = followerLogInformation.getNextIndex().get(); + log.debug("{}: Leader transfer complete", logName()); - if (!context.getReplicatedLog().isPresent(nextIndex) && context - .getReplicatedLog().isInSnapshot(nextIndex)) { - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().getSnapshot() - ), - actor() - ); - } - } + leadershipTransferContext.transferCohort.transferComplete(); + leadershipTransferContext = null; } } - private RaftState sendHeartBeat() { - if (followers.size() > 0) { - sendAppendEntries(); + @Override + public void close() { + if (leadershipTransferContext != null) { + LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext; + leadershipTransferContext = null; + localLeadershipTransferContext.transferCohort.abortTransfer(); } - return state(); - } - private void stopHeartBeat() { - if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) { - heartbeatSchedule.cancel(); - } + super.close(); } - private void stopInstallSnapshotSchedule() { - if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) { - installSnapshotSchedule.cancel(); - } + @VisibleForTesting + void markFollowerActive(String followerId) { + getFollower(followerId).markFollowerActive(); } - private void scheduleHeartBeat(FiniteDuration interval) { - if(followers.size() == 0){ - // Optimization - do not bother scheduling a heartbeat as there are - // no followers - return; - } - - stopHeartBeat(); - - // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat - // message is sent to itself. - // Scheduling the heartbeat only once here because heartbeats do not - // need to be sent if there are other messages being sent to the remote - // actor. - heartbeatSchedule = - context.getActorSystem().scheduler().scheduleOnce( - interval, - context.getActor(), new SendHeartBeat(), - context.getActorSystem().dispatcher(), context.getActor()); + @VisibleForTesting + void markFollowerInActive(String followerId) { + getFollower(followerId).markFollowerInActive(); } + private static class LeadershipTransferContext { + RaftActorLeadershipTransferCohort transferCohort; + Stopwatch timer = Stopwatch.createStarted(); - private void scheduleInstallSnapshotCheck(FiniteDuration interval) { - if(followers.size() == 0){ - // Optimization - do not bother scheduling a heartbeat as there are - // no followers - return; + LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) { + this.transferCohort = transferCohort; } - stopInstallSnapshotSchedule(); - - // Schedule a message to send append entries to followers that can - // accept an append entries with some data in it - installSnapshotSchedule = - context.getActorSystem().scheduler().scheduleOnce( - interval, - context.getActor(), new SendInstallSnapshot(), - context.getActorSystem().dispatcher(), context.getActor()); - } - - - - @Override public void close() throws Exception { - stopHeartBeat(); - } + boolean isExpired(long timeout) { + if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) { + transferCohort.abortTransfer(); + return true; + } - @Override public String getLeaderId() { - return context.getId(); + return false; + } } - }