X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=26beed2f7a69b5191407f230c839edd1e4178ade;hb=8ddcccfc1045eec79f6a22dc68250d95fc23a017;hp=857c87f0aca82daaf425fcdc064978c9debf6fb1;hpb=b725909c696c8d40006a6297dc54a467fddcf6b3;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 857c87f0ac..26beed2f7a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -27,6 +27,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; @@ -118,20 +119,13 @@ public class Leader extends AbstractRaftActorBehavior { } @Override protected RaftState handleAppendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState suggestedState) { + AppendEntries appendEntries) { - context.getLogger() - .error("An unexpected AppendEntries received in state " + state()); - - return suggestedState; + return state(); } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, - AppendEntriesReply appendEntriesReply, RaftState suggestedState) { - - // Do not take any other action since a behavior change is coming - if (suggestedState != state()) - return suggestedState; + AppendEntriesReply appendEntriesReply) { // Update the FollowerLogInformation String followerId = appendEntriesReply.getFollowerId(); @@ -143,6 +137,13 @@ public class Leader extends AbstractRaftActorBehavior { followerLogInformation .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); } else { + + // TODO: When we find that the follower is out of sync with the + // Leader we simply decrement that followers next index by 1. + // Would it be possible to do better than this? The RAFT spec + // does not explicitly deal with it but may be something for us to + // think about + followerLogInformation.decrNextIndex(); } @@ -177,7 +178,7 @@ public class Leader extends AbstractRaftActorBehavior { applyLogToStateMachine(context.getCommitIndex()); } - return suggestedState; + return state(); } protected ClientRequestTracker findClientRequestTracker(long logIndex) { @@ -191,17 +192,30 @@ public class Leader extends AbstractRaftActorBehavior { } @Override protected RaftState handleRequestVoteReply(ActorRef sender, - RequestVoteReply requestVoteReply, RaftState suggestedState) { - return suggestedState; + RequestVoteReply requestVoteReply) { + return state(); } @Override public RaftState state() { return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object message) { + @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); + Object message = fromSerializableMessage(originalMessage); + + if (message instanceof RaftRPC) { + RaftRPC rpc = (RaftRPC) message; + // If RPC request or response contains term T > currentTerm: + // set currentTerm = T, convert to follower (§5.1) + // This applies to all RPC messages and responses + if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); + return RaftState.Follower; + } + } + try { if (message instanceof SendHeartBeat) { return sendHeartBeat(); @@ -210,7 +224,6 @@ public class Leader extends AbstractRaftActorBehavior { } else if (message instanceof Replicate) { replicate((Replicate) message); } else if (message instanceof InstallSnapshotReply){ - // FIXME : Should I be checking the term here too? handleInstallSnapshotReply( (InstallSnapshotReply) message); } @@ -276,20 +289,28 @@ public class Leader extends AbstractRaftActorBehavior { List entries = Collections.emptyList(); if(context.getReplicatedLog().isPresent(nextIndex)){ + // TODO: Instead of sending all entries from nextIndex + // only send a fixed number of entries to each follower + // This is to avoid the situation where there are a lot of + // entries to install for a fresh follower or to a follower + // that has fallen too far behind with the log but yet is not + // eligible to receive a snapshot entries = context.getReplicatedLog().getFrom(nextIndex); } followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(nextIndex), prevLogTerm(nextIndex), - entries, context.getCommitIndex() - ), - actor() - ); + new AppendEntries(currentTerm(), context.getId(), prevLogIndex(nextIndex), + prevLogTerm(nextIndex), entries, context.getCommitIndex()).toSerializable(), + actor()); } } + /** + * An installSnapshot is scheduled at a interval that is a multiple of + * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing + * snapshots at every heartbeat. + */ private void installSnapshotIfNeeded(){ for (String followerId : followerToActor.keySet()) { ActorSelection followerActor =