X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=8b95e8b7a6bd57e9581d02226f99b2d44bf5cb51;hb=refs%2Fchanges%2F45%2F14445%2F3;hp=c06ee9bd2b836c784c4adb75e7f318875ebf22c3;hpb=fdab53ef9033fc83c812f7d3d6d3327d3d176f0f;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index c06ee9bd2b..ee3cc65ddd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -5,35 +5,17 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.actor.Cancellable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.raft.ClientRequestTracker; -import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; -import org.opendaylight.controller.cluster.raft.FollowerLogInformation; -import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; -import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; -import org.opendaylight.controller.cluster.raft.messages.AppendEntries; -import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; import scala.concurrent.duration.FiniteDuration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - /** * The behavior of a RaftActor when it is in the Leader state *

@@ -56,255 +38,84 @@ import java.util.concurrent.atomic.AtomicLong; * of matchIndex[i] ≥ N, and log[N].term == currentTerm: * set commitIndex = N (§5.3, §5.4). */ -public class Leader extends AbstractRaftActorBehavior { - - - private final Map followerToLog = - new HashMap(); - - private final Map followerToActor = new HashMap<>(); - - private Cancellable heartbeatCancel = null; - - private List trackerList = new ArrayList<>(); - - private final int minReplicationCount; +public class Leader extends AbstractLeader { + private Cancellable installSnapshotSchedule = null; + private Cancellable isolatedLeaderCheckSchedule = null; public Leader(RaftActorContext context) { super(context); - if(lastIndex() >= 0) { - context.setCommitIndex(lastIndex()); - } - - for (String followerId : context.getPeerAddresses().keySet()) { - FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerId, - new AtomicLong(lastIndex()), - new AtomicLong(-1)); - - followerToActor.put(followerId, - context.actorSelection(context.getPeerAddress(followerId))); - - followerToLog.put(followerId, followerLogInformation); - - } - - if (followerToActor.size() > 0) { - minReplicationCount = (followerToActor.size() + 1) / 2 + 1; - } else { - minReplicationCount = 0; - } + scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval()); - - // Immediately schedule a heartbeat - // Upon election: send initial empty AppendEntries RPCs - // (heartbeat) to each server; repeat during idle periods to - // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); - - - } - - @Override protected RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState) { - - context.getLogger() - .error("An unexpected AppendEntries received in state " + state()); - - return suggestedState; + scheduleIsolatedLeaderCheck( + new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10, + context.getConfigParams().getHeartBeatInterval().unit())); } - @Override protected RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState) { - - // Do not take any other action since a behavior change is coming - if (suggestedState != state()) - return suggestedState; - - // Update the FollowerLogInformation - String followerId = appendEntriesReply.getFollowerId(); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - if (appendEntriesReply.isSuccess()) { - followerLogInformation - .setMatchIndex(appendEntriesReply.getLogLastIndex()); - followerLogInformation - .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); - } else { - followerLogInformation.decrNextIndex(); - } - - // Now figure out if this reply warrants a change in the commitIndex - // If there exists an N such that N > commitIndex, a majority - // of matchIndex[i] ≥ N, and log[N].term == currentTerm: - // set commitIndex = N (§5.3, §5.4). - for (long N = context.getCommitIndex() + 1; ; N++) { - int replicatedCount = 1; - - for (FollowerLogInformation info : followerToLog.values()) { - if (info.getMatchIndex().get() >= N) { - replicatedCount++; - } - } + @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { + Preconditions.checkNotNull(sender, "sender should not be null"); - if (replicatedCount >= minReplicationCount){ - ReplicatedLogEntry replicatedLogEntry = - context.getReplicatedLog().get(N); - if (replicatedLogEntry != null - && replicatedLogEntry.getTerm() - == currentTerm()) { - context.setCommitIndex(N); - } - } else { - break; + if (originalMessage instanceof IsolatedLeaderCheck) { + if (isLeaderIsolated()) { + LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", + minIsolatedLeaderPeerCount, leaderId); + return switchBehavior(new IsolatedLeader(context)); } } - if(context.getCommitIndex() > context.getLastApplied()){ - applyLogToStateMachine(context.getCommitIndex()); - } - - return suggestedState; + return super.handleMessage(sender, originalMessage); } - protected ClientRequestTracker findClientRequestTracker(long logIndex) { - for (ClientRequestTracker tracker : trackerList) { - if (tracker.getIndex() == logIndex) { - return tracker; - } + protected void stopInstallSnapshotSchedule() { + if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) { + installSnapshotSchedule.cancel(); } - - return null; - } - - @Override protected RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState) { - return suggestedState; - } - - @Override public RaftState state() { - return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object message) { - Preconditions.checkNotNull(sender, "sender should not be null"); - - try { - if (message instanceof SendHeartBeat) { - return sendHeartBeat(); - } else if (message instanceof Replicate) { - - Replicate replicate = (Replicate) message; - long logIndex = replicate.getReplicatedLogEntry().getIndex(); - - context.getLogger().debug("Replicate message " + logIndex); - - if (followerToActor.size() == 0) { - context.setCommitIndex( - replicate.getReplicatedLogEntry().getIndex()); - - context.getActor() - .tell(new ApplyState(replicate.getClientActor(), - replicate.getIdentifier(), - replicate.getReplicatedLogEntry()), - context.getActor() - ); - } else { - - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); - - ReplicatedLogEntry prevEntry = - context.getReplicatedLog().get(lastIndex() - 1); - long prevLogIndex = -1; - long prevLogTerm = -1; - if (prevEntry != null) { - prevLogIndex = prevEntry.getIndex(); - prevLogTerm = prevEntry.getTerm(); - } - // Send an AppendEntries to all followers - for (String followerId : followerToActor.keySet()) { - ActorSelection followerActor = - followerToActor.get(followerId); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex, prevLogTerm, - context.getReplicatedLog().getFrom( - followerLogInformation.getNextIndex() - .get() - ), context.getCommitIndex() - ), - actor() - ); - } - } - } - } finally { - scheduleHeartBeat(HEART_BEAT_INTERVAL); + protected void scheduleInstallSnapshotCheck(FiniteDuration interval) { + if (getFollowerIds().isEmpty()) { + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; } - return super.handleMessage(sender, message); - } - - private RaftState sendHeartBeat() { - if (followerToActor.size() > 0) { - for (String follower : followerToActor.keySet()) { - - FollowerLogInformation followerLogInformation = - followerToLog.get(follower); - - AtomicLong nextIndex = - followerLogInformation.getNextIndex(); + stopInstallSnapshotSchedule(); - List entries = - context.getReplicatedLog().getFrom(nextIndex.get()); - - followerToActor.get(follower).tell(new AppendEntries( - context.getTermInformation().getCurrentTerm(), - context.getId(), - context.getReplicatedLog().lastIndex(), - context.getReplicatedLog().lastTerm(), - entries, context.getCommitIndex()), - context.getActor() - ); - } - } - return state(); + // Schedule a message to send append entries to followers that can + // accept an append entries with some data in it + installSnapshotSchedule = + context.getActorSystem().scheduler().scheduleOnce( + interval, + context.getActor(), new InitiateInstallSnapshot(), + context.getActorSystem().dispatcher(), context.getActor()); } - private void stopHeartBeat() { - if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) { - heartbeatCancel.cancel(); + protected void stopIsolatedLeaderCheckSchedule() { + if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) { + isolatedLeaderCheckSchedule.cancel(); } } - private void scheduleHeartBeat(FiniteDuration interval) { - stopHeartBeat(); - - // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat - // message is sent to itself. - // Scheduling the heartbeat only once here because heartbeats do not - // need to be sent if there are other messages being sent to the remote - // actor. - heartbeatCancel = - context.getActorSystem().scheduler().scheduleOnce( - interval, - context.getActor(), new SendHeartBeat(), - context.getActorSystem().dispatcher(), context.getActor()); + protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) { + isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval, + context.getActor(), new IsolatedLeaderCheck(), + context.getActorSystem().dispatcher(), context.getActor()); } - @Override public void close() throws Exception { - stopHeartBeat(); + @Override + public void close() throws Exception { + stopInstallSnapshotSchedule(); + stopIsolatedLeaderCheckSchedule(); + super.close(); } - @Override public String getLeaderId() { - return context.getId(); + @VisibleForTesting + void markFollowerActive(String followerId) { + getFollower(followerId).markFollowerActive(); } + @VisibleForTesting + void markFollowerInActive(String followerId) { + getFollower(followerId).markFollowerInActive(); + } }