X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=26beed2f7a69b5191407f230c839edd1e4178ade;hb=8ddcccfc1045eec79f6a22dc68250d95fc23a017;hp=c06ee9bd2b836c784c4adb75e7f318875ebf22c3;hpb=fdab53ef9033fc83c812f7d3d6d3327d3d176f0f;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index c06ee9bd2b..26beed2f7a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -22,12 +22,17 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; +import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,7 +69,9 @@ public class Leader extends AbstractRaftActorBehavior { private final Map followerToActor = new HashMap<>(); - private Cancellable heartbeatCancel = null; + private Cancellable heartbeatSchedule = null; + private Cancellable appendEntriesSchedule = null; + private Cancellable installSnapshotSchedule = null; private List trackerList = new ArrayList<>(); @@ -73,7 +80,7 @@ public class Leader extends AbstractRaftActorBehavior { public Leader(RaftActorContext context) { super(context); - if(lastIndex() >= 0) { + if (lastIndex() >= 0) { context.setCommitIndex(lastIndex()); } @@ -87,9 +94,10 @@ public class Leader extends AbstractRaftActorBehavior { context.actorSelection(context.getPeerAddress(followerId))); followerToLog.put(followerId, followerLogInformation); - } + context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet()); + if (followerToActor.size() > 0) { minReplicationCount = (followerToActor.size() + 1) / 2 + 1; } else { @@ -103,24 +111,21 @@ public class Leader extends AbstractRaftActorBehavior { // prevent election timeouts (§5.2) scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + scheduleInstallSnapshotCheck( + new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, + HEART_BEAT_INTERVAL.unit()) + ); } @Override protected RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState) { - - context.getLogger() - .error("An unexpected AppendEntries received in state " + state()); + AppendEntries appendEntries) { - return suggestedState; + return state(); } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState) { - - // Do not take any other action since a behavior change is coming - if (suggestedState != state()) - return suggestedState; + AppendEntriesReply appendEntriesReply) { // Update the FollowerLogInformation String followerId = appendEntriesReply.getFollowerId(); @@ -132,6 +137,13 @@ public class Leader extends AbstractRaftActorBehavior { followerLogInformation .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); } else { + + // TODO: When we find that the follower is out of sync with the + // Leader we simply decrement that followers next index by 1. + // Would it be possible to do better than this? The RAFT spec + // does not explicitly deal with it but may be something for us to + // think about + followerLogInformation.decrNextIndex(); } @@ -148,7 +160,7 @@ public class Leader extends AbstractRaftActorBehavior { } } - if (replicatedCount >= minReplicationCount){ + if (replicatedCount >= minReplicationCount) { ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); if (replicatedLogEntry != null @@ -161,11 +173,12 @@ public class Leader extends AbstractRaftActorBehavior { } } - if(context.getCommitIndex() > context.getLastApplied()){ + // Apply the change to the state machine + if (context.getCommitIndex() > context.getLastApplied()) { applyLogToStateMachine(context.getCommitIndex()); } - return suggestedState; + return state(); } protected ClientRequestTracker findClientRequestTracker(long logIndex) { @@ -179,71 +192,40 @@ public class Leader extends AbstractRaftActorBehavior { } @Override protected RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState) { - return suggestedState; + RequestVoteReply requestVoteReply) { + return state(); } @Override public RaftState state() { return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object message) { + @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); + Object message = fromSerializableMessage(originalMessage); + + if (message instanceof RaftRPC) { + RaftRPC rpc = (RaftRPC) message; + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); + return RaftState.Follower; + } + } + try { if (message instanceof SendHeartBeat) { return sendHeartBeat(); + } else if(message instanceof SendInstallSnapshot) { + installSnapshotIfNeeded(); } else if (message instanceof Replicate) { - - Replicate replicate = (Replicate) message; - long logIndex = replicate.getReplicatedLogEntry().getIndex(); - - context.getLogger().debug("Replicate message " + logIndex); - - if (followerToActor.size() == 0) { - context.setCommitIndex( - replicate.getReplicatedLogEntry().getIndex()); - - context.getActor() - .tell(new ApplyState(replicate.getClientActor(), - replicate.getIdentifier(), - replicate.getReplicatedLogEntry()), - context.getActor() - ); - } else { - - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); - - ReplicatedLogEntry prevEntry = - context.getReplicatedLog().get(lastIndex() - 1); - long prevLogIndex = -1; - long prevLogTerm = -1; - if (prevEntry != null) { - prevLogIndex = prevEntry.getIndex(); - prevLogTerm = prevEntry.getTerm(); - } - // Send an AppendEntries to all followers - for (String followerId : followerToActor.keySet()) { - ActorSelection followerActor = - followerToActor.get(followerId); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex, prevLogTerm, - context.getReplicatedLog().getFrom( - followerLogInformation.getNextIndex() - .get() - ), context.getCommitIndex() - ), - actor() - ); - } - } + replicate((Replicate) message); + } else if (message instanceof InstallSnapshotReply){ + handleInstallSnapshotReply( + (InstallSnapshotReply) message); } } finally { scheduleHeartBeat(HEART_BEAT_INTERVAL); @@ -252,39 +234,132 @@ public class Leader extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } - private RaftState sendHeartBeat() { - if (followerToActor.size() > 0) { - for (String follower : followerToActor.keySet()) { + private void handleInstallSnapshotReply(InstallSnapshotReply message) { + InstallSnapshotReply reply = message; + String followerId = reply.getFollowerId(); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + followerLogInformation + .setMatchIndex(context.getReplicatedLog().getSnapshotIndex()); + followerLogInformation + .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1); + } - FollowerLogInformation followerLogInformation = - followerToLog.get(follower); + private void replicate(Replicate replicate) { + long logIndex = replicate.getReplicatedLogEntry().getIndex(); - AtomicLong nextIndex = - followerLogInformation.getNextIndex(); + context.getLogger().debug("Replicate message " + logIndex); - List entries = - context.getReplicatedLog().getFrom(nextIndex.get()); + if (followerToActor.size() == 0) { + context.setCommitIndex( + replicate.getReplicatedLogEntry().getIndex()); - followerToActor.get(follower).tell(new AppendEntries( - context.getTermInformation().getCurrentTerm(), - context.getId(), - context.getReplicatedLog().lastIndex(), - context.getReplicatedLog().lastTerm(), - entries, context.getCommitIndex()), + context.getActor() + .tell(new ApplyState(replicate.getClientActor(), + replicate.getIdentifier(), + replicate.getReplicatedLogEntry()), context.getActor() ); + } else { + + // Create a tracker entry we will use this later to notify the + // client actor + trackerList.add( + new ClientRequestTrackerImpl(replicate.getClientActor(), + replicate.getIdentifier(), + logIndex) + ); + + sendAppendEntries(); + } + } + + private void sendAppendEntries() { + // Send an AppendEntries to all followers + for (String followerId : followerToActor.keySet()) { + ActorSelection followerActor = + followerToActor.get(followerId); + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + List entries = Collections.emptyList(); + + if(context.getReplicatedLog().isPresent(nextIndex)){ + // TODO: Instead of sending all entries from nextIndex + // only send a fixed number of entries to each follower + // This is to avoid the situation where there are a lot of + // entries to install for a fresh follower or to a follower + // that has fallen too far behind with the log but yet is not + // eligible to receive a snapshot + entries = + context.getReplicatedLog().getFrom(nextIndex); + } + + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), prevLogIndex(nextIndex), + prevLogTerm(nextIndex), entries, context.getCommitIndex()).toSerializable(), + actor()); + } + } + + /** + * An installSnapshot is scheduled at a interval that is a multiple of + * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing + * snapshots at every heartbeat. + */ + private void installSnapshotIfNeeded(){ + for (String followerId : followerToActor.keySet()) { + ActorSelection followerActor = + followerToActor.get(followerId); + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){ + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().getSnapshot() + ), + actor() + ); } } + } + + private RaftState sendHeartBeat() { + if (followerToActor.size() > 0) { + sendAppendEntries(); + } return state(); } private void stopHeartBeat() { - if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) { - heartbeatCancel.cancel(); + if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) { + heartbeatSchedule.cancel(); + } + } + + private void stopInstallSnapshotSchedule() { + if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) { + installSnapshotSchedule.cancel(); } } private void scheduleHeartBeat(FiniteDuration interval) { + if(followerToActor.keySet().size() == 0){ + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; + } + stopHeartBeat(); // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat @@ -292,13 +367,34 @@ public class Leader extends AbstractRaftActorBehavior { // Scheduling the heartbeat only once here because heartbeats do not // need to be sent if there are other messages being sent to the remote // actor. - heartbeatCancel = + heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( interval, context.getActor(), new SendHeartBeat(), context.getActorSystem().dispatcher(), context.getActor()); } + + private void scheduleInstallSnapshotCheck(FiniteDuration interval) { + if(followerToActor.keySet().size() == 0){ + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; + } + + stopInstallSnapshotSchedule(); + + // Schedule a message to send append entries to followers that can + // accept an append entries with some data in it + installSnapshotSchedule = + context.getActorSystem().scheduler().scheduleOnce( + interval, + context.getActor(), new SendInstallSnapshot(), + context.getActorSystem().dispatcher(), context.getActor()); + } + + + @Override public void close() throws Exception { stopHeartBeat(); }