X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeader.java;h=53e47c2f842f17ac0b2766811745a7f1ec6eabde;hp=a9882a664767ad726c651611c6cff8ac078f6d1a;hb=0eb621d29daaf08979c356e2148e99c48458e169;hpb=3019650e87a3fc05f80e8f6359e01ca5f1c5f197 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index a9882a6647..53e47c2f84 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -19,10 +19,10 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; -import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; -import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.Replicate; +import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -67,7 +68,7 @@ public class Leader extends AbstractRaftActorBehavior { private final Map followerToLog = new HashMap(); - private final Map followerToActor = new HashMap<>(); + private final Set followers; private Cancellable heartbeatSchedule = null; private Cancellable appendEntriesSchedule = null; @@ -84,22 +85,21 @@ public class Leader extends AbstractRaftActorBehavior { context.setCommitIndex(lastIndex()); } - for (String followerId : context.getPeerAddresses().keySet()) { + followers = context.getPeerAddresses().keySet(); + + for (String followerId : followers) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, new AtomicLong(lastIndex()), new AtomicLong(-1)); - followerToActor.put(followerId, - context.actorSelection(context.getPeerAddress(followerId))); - followerToLog.put(followerId, followerLogInformation); } - context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet()); + context.getLogger().debug("Election:Leader has following peers:"+ followers); - if (followerToActor.size() > 0) { - minReplicationCount = (followerToActor.size() + 1) / 2 + 1; + if (followers.size() > 0) { + minReplicationCount = (followers.size() + 1) / 2 + 1; } else { minReplicationCount = 0; } @@ -121,22 +121,42 @@ public class Leader extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { + context.getLogger().info("Leader: Received {}", appendEntries.toString()); + return state(); } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { + if(! appendEntriesReply.isSuccess()) { + context.getLogger() + .info("Leader: Received {}", appendEntriesReply.toString()); + } + // Update the FollowerLogInformation String followerId = appendEntriesReply.getFollowerId(); FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + + if(followerLogInformation == null){ + context.getLogger().error("Unknown follower {}", followerId); + return state(); + } + if (appendEntriesReply.isSuccess()) { followerLogInformation .setMatchIndex(appendEntriesReply.getLogLastIndex()); followerLogInformation .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); } else { + + // TODO: When we find that the follower is out of sync with the + // Leader we simply decrement that followers next index by 1. + // Would it be possible to do better than this? The RAFT spec + // does not explicitly deal with it but may be something for us to + // think about + followerLogInformation.decrNextIndex(); } @@ -193,9 +213,11 @@ public class Leader extends AbstractRaftActorBehavior { return RaftState.Leader; } - @Override public RaftState handleMessage(ActorRef sender, Object message) { + @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) { Preconditions.checkNotNull(sender, "sender should not be null"); + Object message = fromSerializableMessage(originalMessage); + if (message instanceof RaftRPC) { RaftRPC rpc = (RaftRPC) message; // If RPC request or response contains term T > currentTerm: @@ -215,7 +237,6 @@ public class Leader extends AbstractRaftActorBehavior { } else if (message instanceof Replicate) { replicate((Replicate) message); } else if (message instanceof InstallSnapshotReply){ - // FIXME : Should I be checking the term here too? handleInstallSnapshotReply( (InstallSnapshotReply) message); } @@ -243,7 +264,7 @@ public class Leader extends AbstractRaftActorBehavior { context.getLogger().debug("Replicate message " + logIndex); - if (followerToActor.size() == 0) { + if (followers.size() == 0) { context.setCommitIndex( replicate.getReplicatedLogEntry().getIndex()); @@ -269,57 +290,73 @@ public class Leader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers - for (String followerId : followerToActor.keySet()) { + for (String followerId : followers) { ActorSelection followerActor = - followerToActor.get(followerId); + context.getPeerActorSelection(followerId); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); + if (followerActor != null) { + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); - long nextIndex = followerLogInformation.getNextIndex().get(); + long nextIndex = followerLogInformation.getNextIndex().get(); - List entries = Collections.emptyList(); + List entries = Collections.emptyList(); - if(context.getReplicatedLog().isPresent(nextIndex)){ - entries = - context.getReplicatedLog().getFrom(nextIndex); - } + if (context.getReplicatedLog().isPresent(nextIndex)) { + // TODO: Instead of sending all entries from nextIndex + // only send a fixed number of entries to each follower + // This is to avoid the situation where there are a lot of + // entries to install for a fresh follower or to a follower + // that has fallen too far behind with the log but yet is not + // eligible to receive a snapshot + entries = + context.getReplicatedLog().getFrom(nextIndex); + } - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(nextIndex), prevLogTerm(nextIndex), - entries, context.getCommitIndex() - ), - actor() - ); + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(nextIndex), + prevLogTerm(nextIndex), entries, + context.getCommitIndex()).toSerializable(), + actor() + ); + } } } + /** + * An installSnapshot is scheduled at a interval that is a multiple of + * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing + * snapshots at every heartbeat. + */ private void installSnapshotIfNeeded(){ - for (String followerId : followerToActor.keySet()) { + for (String followerId : followers) { ActorSelection followerActor = - followerToActor.get(followerId); - - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - - long nextIndex = followerLogInformation.getNextIndex().get(); - - if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){ - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), - context.getReplicatedLog().getSnapshot() - ), - actor() - ); + context.getPeerActorSelection(followerId); + + if(followerActor != null) { + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + if (!context.getReplicatedLog().isPresent(nextIndex) && context + .getReplicatedLog().isInSnapshot(nextIndex)) { + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + context.getReplicatedLog().getSnapshotIndex(), + context.getReplicatedLog().getSnapshotTerm(), + context.getReplicatedLog().getSnapshot() + ), + actor() + ); + } } } } private RaftState sendHeartBeat() { - if (followerToActor.size() > 0) { + if (followers.size() > 0) { sendAppendEntries(); } return state(); @@ -338,7 +375,7 @@ public class Leader extends AbstractRaftActorBehavior { } private void scheduleHeartBeat(FiniteDuration interval) { - if(followerToActor.keySet().size() == 0){ + if(followers.size() == 0){ // Optimization - do not bother scheduling a heartbeat as there are // no followers return; @@ -360,7 +397,7 @@ public class Leader extends AbstractRaftActorBehavior { private void scheduleInstallSnapshotCheck(FiniteDuration interval) { - if(followerToActor.keySet().size() == 0){ + if(followers.size() == 0){ // Optimization - do not bother scheduling a heartbeat as there are // no followers return;