X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=2a44e8b7a5c3adeecd1c534de0664d8deb290934;hp=5c37455be9420db37a4e7ce1dbdc82a479600fe7;hb=475d28f717bae92b2cc10b0589131771fcc62242;hpb=97222f19035815199200e727f43960513073eb9e diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 5c37455be9..2a44e8b7a5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -12,21 +12,31 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; +import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.Replicate; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; -import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -54,105 +64,364 @@ import java.util.concurrent.atomic.AtomicLong; */ public class Leader extends AbstractRaftActorBehavior { - /** - * The interval at which a heart beat message will be sent to the remote - * RaftActor - *

- * Since this is set to 100 milliseconds the Election timeout should be - * at least 200 milliseconds - */ - public static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(100, TimeUnit.MILLISECONDS); private final Map followerToLog = new HashMap(); - private final Map followerToActor = new HashMap<>(); + private final Set followers; + + private Cancellable heartbeatSchedule = null; + private Cancellable appendEntriesSchedule = null; + private Cancellable installSnapshotSchedule = null; - private Cancellable heartbeatCancel = null; + private List trackerList = new ArrayList<>(); - public Leader(RaftActorContext context, List followePaths) { + private final int minReplicationCount; + + public Leader(RaftActorContext context) { super(context); - for (String followerPath : followePaths) { + if (lastIndex() >= 0) { + context.setCommitIndex(lastIndex()); + } + + followers = context.getPeerAddresses().keySet(); + + for (String followerId : followers) { FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerPath, - new AtomicLong(0), - new AtomicLong(0)); + new FollowerLogInformationImpl(followerId, + new AtomicLong(lastIndex()), + new AtomicLong(-1)); + + followerToLog.put(followerId, followerLogInformation); + } - followerToActor.put(followerPath, - context.actorSelection(followerLogInformation.getId())); - followerToLog.put(followerPath, followerLogInformation); + context.getLogger().debug("Election:Leader has following peers:"+ followers); + if (followers.size() > 0) { + minReplicationCount = (followers.size() + 1) / 2 + 1; + } else { + minReplicationCount = 0; } + // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to // prevent election timeouts (§5.2) scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + scheduleInstallSnapshotCheck( + new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000, + context.getConfigParams().getHeartBeatInterval().unit()) + ); } @Override protected RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState) { - return suggestedState; + AppendEntries appendEntries) { + + context.getLogger().info("Leader: Received {}", appendEntries.toString()); + + return state(); } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState) { - return suggestedState; + AppendEntriesReply appendEntriesReply) { + + if(! appendEntriesReply.isSuccess()) { + context.getLogger() + .info("Leader: Received {}", appendEntriesReply.toString()); + } + + // Update the FollowerLogInformation + String followerId = appendEntriesReply.getFollowerId(); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + if(followerLogInformation == null){ + context.getLogger().error("Unknown follower {}", followerId); + return state(); + } + + if (appendEntriesReply.isSuccess()) { + followerLogInformation + .setMatchIndex(appendEntriesReply.getLogLastIndex()); + followerLogInformation + .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); + } else { + + // TODO: When we find that the follower is out of sync with the + // Leader we simply decrement that followers next index by 1. + // Would it be possible to do better than this? The RAFT spec + // does not explicitly deal with it but may be something for us to + // think about + + followerLogInformation.decrNextIndex(); + } + + // Now figure out if this reply warrants a change in the commitIndex + // If there exists an N such that N > commitIndex, a majority + // of matchIndex[i] ≥ N, and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). + for (long N = context.getCommitIndex() + 1; ; N++) { + int replicatedCount = 1; + + for (FollowerLogInformation info : followerToLog.values()) { + if (info.getMatchIndex().get() >= N) { + replicatedCount++; + } + } + + if (replicatedCount >= minReplicationCount) { + ReplicatedLogEntry replicatedLogEntry = + context.getReplicatedLog().get(N); + if (replicatedLogEntry != null + && replicatedLogEntry.getTerm() + == currentTerm()) { + context.setCommitIndex(N); + } + } else { + break; + } + } + + // Apply the change to the state machine + if (context.getCommitIndex() > context.getLastApplied()) { + applyLogToStateMachine(context.getCommitIndex()); + } + + return state(); } - @Override protected RaftState handleRequestVote(ActorRef sender, - RequestVote requestVote, RaftState suggestedState) { - return suggestedState; + protected ClientRequestTracker findClientRequestTracker(long logIndex) { + for (ClientRequestTracker tracker : trackerList) { + if (tracker.getIndex() == logIndex) { + return tracker; + } + } + + return null; } @Override protected RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState) { - return suggestedState; + RequestVoteReply requestVoteReply) { + return state(); } - @Override protected RaftState state() { + @Override public RaftState state() { return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object message) { + @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); - scheduleHeartBeat(HEART_BEAT_INTERVAL); - - if (message instanceof SendHeartBeat) { - for (ActorSelection follower : followerToActor.values()) { - follower.tell(new AppendEntries( - context.getTermInformation().getCurrentTerm().get(), - context.getId(), - context.getReplicatedLog().last().getIndex(), - context.getReplicatedLog().last().getTerm(), - Collections.EMPTY_LIST, context.getCommitIndex().get()), - context.getActor()); + Object message = fromSerializableMessage(originalMessage); + + if (message instanceof RaftRPC) { + RaftRPC rpc = (RaftRPC) message; + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); + return RaftState.Follower; } - return state(); } + + try { + if (message instanceof SendHeartBeat) { + return sendHeartBeat(); + } else if(message instanceof SendInstallSnapshot) { + installSnapshotIfNeeded(); + } else if (message instanceof Replicate) { + replicate((Replicate) message); + } else if (message instanceof InstallSnapshotReply){ + handleInstallSnapshotReply( + (InstallSnapshotReply) message); + } + } finally { + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + } + return super.handleMessage(sender, message); } + private void handleInstallSnapshotReply(InstallSnapshotReply message) { + InstallSnapshotReply reply = message; + String followerId = reply.getFollowerId(); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + followerLogInformation + .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); + followerLogInformation + .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); + } + + private void replicate(Replicate replicate) { + long logIndex = replicate.getReplicatedLogEntry().getIndex(); + + context.getLogger().debug("Replicate message " + logIndex); + + if (followers.size() == 0) { + context.setCommitIndex( + replicate.getReplicatedLogEntry().getIndex()); + + context.getActor() + .tell(new ApplyState(replicate.getClientActor(), + replicate.getIdentifier(), + replicate.getReplicatedLogEntry()), + context.getActor() + ); + } else { + + // Create a tracker entry we will use this later to notify the + // client actor + trackerList.add( + new ClientRequestTrackerImpl(replicate.getClientActor(), + replicate.getIdentifier(), + logIndex) + ); + + sendAppendEntries(); + } + } + + private void sendAppendEntries() { + // Send an AppendEntries to all followers + for (String followerId : followers) { + ActorSelection followerActor = + context.getPeerActorSelection(followerId); + + if (followerActor != null) { + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + List entries = Collections.emptyList(); + + if (context.getReplicatedLog().isPresent(nextIndex)) { + // TODO: Instead of sending all entries from nextIndex + // only send a fixed number of entries to each follower + // This is to avoid the situation where there are a lot of + // entries to install for a fresh follower or to a follower + // that has fallen too far behind with the log but yet is not + // eligible to receive a snapshot + entries = + context.getReplicatedLog().getFrom(nextIndex); + } + + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(nextIndex), + prevLogTerm(nextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + } + } + } + + /** + * An installSnapshot is scheduled at a interval that is a multiple of + * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing + * snapshots at every heartbeat. + */ + private void installSnapshotIfNeeded(){ + for (String followerId : followers) { + ActorSelection followerActor = + context.getPeerActorSelection(followerId); + + if(followerActor != null) { + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + if (!context.getReplicatedLog().isPresent(nextIndex) && context + .getReplicatedLog().isInSnapshot(nextIndex)) { + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().getSnapshot() + ), + actor() + ); + } + } + } + } + + private RaftState sendHeartBeat() { + if (followers.size() > 0) { + sendAppendEntries(); + } + return state(); + } + + private void stopHeartBeat() { + if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) { + heartbeatSchedule.cancel(); + } + } + + private void stopInstallSnapshotSchedule() { + if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) { + installSnapshotSchedule.cancel(); + } + } + private void scheduleHeartBeat(FiniteDuration interval) { - if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) { - heartbeatCancel.cancel(); + if(followers.size() == 0){ + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; } + stopHeartBeat(); + // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat // message is sent to itself. // Scheduling the heartbeat only once here because heartbeats do not // need to be sent if there are other messages being sent to the remote // actor. - heartbeatCancel = - context.getActorSystem().scheduler().scheduleOnce(interval, + heartbeatSchedule = + context.getActorSystem().scheduler().scheduleOnce( + interval, context.getActor(), new SendHeartBeat(), context.getActorSystem().dispatcher(), context.getActor()); } + + private void scheduleInstallSnapshotCheck(FiniteDuration interval) { + if(followers.size() == 0){ + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; + } + + stopInstallSnapshotSchedule(); + + // Schedule a message to send append entries to followers that can + // accept an append entries with some data in it + installSnapshotSchedule = + context.getActorSystem().scheduler().scheduleOnce( + interval, + context.getActor(), new SendInstallSnapshot(), + context.getActorSystem().dispatcher(), context.getActor()); + } + + + + @Override public void close() throws Exception { + stopHeartBeat(); + } + + @Override public String getLeaderId() { + return context.getId(); + } + }