From: Moiz Raja Date: Wed, 16 Jul 2014 12:35:58 +0000 (-0700) Subject: Test handling of the Replicate message by the Leader X-Git-Tag: release/helium~436 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=53e9fc9521e5fc0d49e9b1cb8763f8508101984b Test handling of the Replicate message by the Leader Refactored some code, added comments and a test Change-Id: I0719dbdb4f1f6da611b090faa6cabc84b85e50df Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 304b2fdbab..d12789205e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -98,6 +98,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { AppendEntries appendEntries, RaftState suggestedState); + /** + * appendEntries first processes the AppendEntries message and then + * delegates handling to a specific behavior + * + * @param sender + * @param appendEntries + * @param raftState + * @return + */ protected RaftState appendEntries(ActorRef sender, AppendEntries appendEntries, RaftState raftState) { @@ -137,10 +146,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * AppendEntriesReply message * @return */ - protected abstract RaftState handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply, RaftState suggestedState); + /** + * requestVote handles the RequestVote message. This logic is common + * for all behaviors + * + * @param sender + * @param requestVote + * @param suggestedState + * @return + */ protected RaftState requestVote(ActorRef sender, RequestVote requestVote, RaftState suggestedState) { @@ -198,24 +215,35 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * message * @return */ - protected abstract RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState); + /** + * Creates a random election duration + * + * @return + */ protected FiniteDuration electionDuration() { long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS); } + /** + * stop the scheduled election + */ protected void stopElection() { if (electionCancel != null && !electionCancel.isCancelled()) { electionCancel.cancel(); } } + /** + * schedule a new election + * + * @param interval + */ protected void scheduleElection(FiniteDuration interval) { - stopElection(); // Schedule an election. When the scheduler triggers an ElectionTimeout @@ -226,30 +254,90 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { context.getActorSystem().dispatcher(), context.getActor()); } + /** + * Get the current term + * @return + */ protected long currentTerm() { return context.getTermInformation().getCurrentTerm(); } + /** + * Get the candidate for whom we voted in the current term + * @return + */ protected String votedFor() { return context.getTermInformation().getVotedFor(); } + /** + * Get the actor associated with this behavior + * @return + */ protected ActorRef actor() { return context.getActor(); } + /** + * Get the term from the last entry in the log + * + * @return + */ protected long lastTerm() { return context.getReplicatedLog().lastTerm(); } + /** + * Get the index from the last entry in the log + * + * @return + */ protected long lastIndex() { return context.getReplicatedLog().lastIndex(); } + /** + * Find the client request tracker for a specific logIndex + * + * @param logIndex + * @return + */ protected ClientRequestTracker findClientRequestTracker(long logIndex) { return null; } + /** + * Find the log index from the previous to last entry in the log + * + * @return + */ + protected long prevLogIndex(long index){ + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(index - 1); + if (prevEntry != null) { + return prevEntry.getIndex(); + } + return -1; + } + + /** + * Find the log term from the previous to last entry in the log + * @return + */ + protected long prevLogTerm(long index){ + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(index - 1); + if (prevEntry != null) { + return prevEntry.getTerm(); + } + return -1; + } + + /** + * Apply the provided index to the state machine + * + * @param index a log index that is known to be committed + */ protected void applyLogToStateMachine(long index) { // Now maybe we apply to the state machine for (long i = context.getLastApplied() + 1; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index c06ee9bd2b..ec4551ac4f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -73,7 +73,7 @@ public class Leader extends AbstractRaftActorBehavior { public Leader(RaftActorContext context) { super(context); - if(lastIndex() >= 0) { + if (lastIndex() >= 0) { context.setCommitIndex(lastIndex()); } @@ -148,7 +148,7 @@ public class Leader extends AbstractRaftActorBehavior { } } - if (replicatedCount >= minReplicationCount){ + if (replicatedCount >= minReplicationCount) { ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N); if (replicatedLogEntry != null @@ -161,7 +161,8 @@ public class Leader extends AbstractRaftActorBehavior { } } - if(context.getCommitIndex() > context.getLastApplied()){ + // Apply the change to the state machine + if (context.getCommitIndex() > context.getLastApplied()) { applyLogToStateMachine(context.getCommitIndex()); } @@ -194,56 +195,7 @@ public class Leader extends AbstractRaftActorBehavior { if (message instanceof SendHeartBeat) { return sendHeartBeat(); } else if (message instanceof Replicate) { - - Replicate replicate = (Replicate) message; - long logIndex = replicate.getReplicatedLogEntry().getIndex(); - - context.getLogger().debug("Replicate message " + logIndex); - - if (followerToActor.size() == 0) { - context.setCommitIndex( - replicate.getReplicatedLogEntry().getIndex()); - - context.getActor() - .tell(new ApplyState(replicate.getClientActor(), - replicate.getIdentifier(), - replicate.getReplicatedLogEntry()), - context.getActor() - ); - } else { - - trackerList.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); - - ReplicatedLogEntry prevEntry = - context.getReplicatedLog().get(lastIndex() - 1); - long prevLogIndex = -1; - long prevLogTerm = -1; - if (prevEntry != null) { - prevLogIndex = prevEntry.getIndex(); - prevLogTerm = prevEntry.getTerm(); - } - // Send an AppendEntries to all followers - for (String followerId : followerToActor.keySet()) { - ActorSelection followerActor = - followerToActor.get(followerId); - FollowerLogInformation followerLogInformation = - followerToLog.get(followerId); - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex, prevLogTerm, - context.getReplicatedLog().getFrom( - followerLogInformation.getNextIndex() - .get() - ), context.getCommitIndex() - ), - actor() - ); - } - } + replicate((Replicate) message); } } finally { scheduleHeartBeat(HEART_BEAT_INTERVAL); @@ -252,28 +204,62 @@ public class Leader extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } - private RaftState sendHeartBeat() { - if (followerToActor.size() > 0) { - for (String follower : followerToActor.keySet()) { - - FollowerLogInformation followerLogInformation = - followerToLog.get(follower); + private void replicate(Replicate replicate) { + long logIndex = replicate.getReplicatedLogEntry().getIndex(); - AtomicLong nextIndex = - followerLogInformation.getNextIndex(); + context.getLogger().debug("Replicate message " + logIndex); - List entries = - context.getReplicatedLog().getFrom(nextIndex.get()); + if (followerToActor.size() == 0) { + context.setCommitIndex( + replicate.getReplicatedLogEntry().getIndex()); - followerToActor.get(follower).tell(new AppendEntries( - context.getTermInformation().getCurrentTerm(), - context.getId(), - context.getReplicatedLog().lastIndex(), - context.getReplicatedLog().lastTerm(), - entries, context.getCommitIndex()), + context.getActor() + .tell(new ApplyState(replicate.getClientActor(), + replicate.getIdentifier(), + replicate.getReplicatedLogEntry()), context.getActor() ); - } + } else { + + // Create a tracker entry we will use this later to notify the + // client actor + trackerList.add( + new ClientRequestTrackerImpl(replicate.getClientActor(), + replicate.getIdentifier(), + logIndex) + ); + + sendAppendEntries(); + } + } + + private void sendAppendEntries() { + // Send an AppendEntries to all followers + for (String followerId : followerToActor.keySet()) { + ActorSelection followerActor = + followerToActor.get(followerId); + + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + + long nextIndex = followerLogInformation.getNextIndex().get(); + + List entries = + context.getReplicatedLog().getFrom(nextIndex); + + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(nextIndex), prevLogTerm(nextIndex), + entries, context.getCommitIndex() + ), + actor() + ); + } + } + + private RaftState sendHeartBeat() { + if (followerToActor.size() > 0) { + sendAppendEntries(); } return state(); } @@ -285,6 +271,12 @@ public class Leader extends AbstractRaftActorBehavior { } private void scheduleHeartBeat(FiniteDuration interval) { + if(followerToActor.keySet().size() == 0){ + // Optimization - do not bother scheduling a heartbeat as there are + // no followers + return; + } + stopHeartBeat(); // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index e5e54d5944..35bf6f15b4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -8,6 +8,8 @@ import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; @@ -19,8 +21,10 @@ import static org.junit.Assert.assertEquals; public class LeaderTest extends AbstractRaftActorBehaviorTest { - private ActorRef leaderActor = getSystem().actorOf(Props.create(DoNothingActor.class)); - private ActorRef senderActor = getSystem().actorOf(Props.create(DoNothingActor.class)); + private ActorRef leaderActor = + getSystem().actorOf(Props.create(DoNothingActor.class)); + private ActorRef senderActor = + getSystem().actorOf(Props.create(DoNothingActor.class)); @Test public void testHandleMessageForUnknownMessage() throws Exception { @@ -37,7 +41,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { @Test - public void testThatLeaderSendsAHeartbeatMessageToAllFollowers(){ + public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { @@ -45,31 +49,35 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { ActorRef followerActor = getTestActor(); - MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); Map peerAddresses = new HashMap(); - peerAddresses.put(followerActor.path().toString(), followerActor.path().toString()); + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); actorContext.setPeerAddresses(peerAddresses); Leader leader = new Leader(actorContext); leader.handleMessage(senderActor, new SendHeartBeat()); - final String out = new ExpectMsg(duration("1 seconds"), "match hint") { - // do not put code outside this method, will run afterwards - protected String match(Object in) { - if (in instanceof AppendEntries) { - if (((AppendEntries) in).getTerm() - == 0) { - return "match"; + final String out = + new ExpectMsg(duration("1 seconds"), + "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof AppendEntries) { + if (((AppendEntries) in).getTerm() + == 0) { + return "match"; + } + return null; + } else { + throw noMatch(); } - return null; - } else { - throw noMatch(); } - } - }.get(); // this extracts the received message + }.get(); // this extracts the received message assertEquals("match", out); @@ -80,7 +88,114 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { }}; } - @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { + @Test + public void testHandleReplicateMessageSendAppendEntriesToFollower() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + ActorRef followerActor = getTestActor(); + + MockRaftActorContext actorContext = + (MockRaftActorContext) createActorContext(); + + Map peerAddresses = new HashMap(); + + peerAddresses.put(followerActor.path().toString(), + followerActor.path().toString()); + + actorContext.setPeerAddresses(peerAddresses); + + Leader leader = new Leader(actorContext); + RaftState raftState = leader + .handleMessage(senderActor, new Replicate(null, null, + new MockRaftActorContext.MockReplicatedLogEntry(1, + 100, + "foo") + )); + + // State should not change + assertEquals(RaftState.Leader, raftState); + + final String out = + new ExpectMsg(duration("1 seconds"), + "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof AppendEntries) { + if (((AppendEntries) in).getTerm() + == 0) { + return "match"; + } + return null; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + + } + + + }; + }}; + } + + @Test + public void testHandleReplicateMessageWhenThereAreNoFollowers() { + new JavaTestKit(getSystem()) {{ + + new Within(duration("1 seconds")) { + protected void run() { + + ActorRef raftActor = getTestActor(); + + MockRaftActorContext actorContext = + new MockRaftActorContext("test", getSystem(), raftActor); + + Leader leader = new Leader(actorContext); + RaftState raftState = leader + .handleMessage(senderActor, new Replicate(null, "state-id", + new MockRaftActorContext.MockReplicatedLogEntry(1, + 100, + "foo") + )); + + // State should not change + assertEquals(RaftState.Leader, raftState); + + assertEquals(100, actorContext.getCommitIndex()); + + final String out = + new ExpectMsg(duration("1 seconds"), + "match hint") { + // do not put code outside this method, will run afterwards + protected String match(Object in) { + if (in instanceof ApplyState) { + if (((ApplyState) in).getIdentifier().equals("state-id")) { + return "match"; + } + return null; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertEquals("match", out); + + } + + + }; + }}; + } + + @Override protected RaftActorBehavior createBehavior( + RaftActorContext actorContext) { return new Leader(actorContext); }