Test handling of the Replicate message by the Leader 86/9086/4
authorMoiz Raja <moraja@cisco.com>
Wed, 16 Jul 2014 12:35:58 +0000 (05:35 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 22:21:10 +0000 (15:21 -0700)
Refactored some code, added comments and a test

Change-Id: I0719dbdb4f1f6da611b090faa6cabc84b85e50df
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 304b2fdbab0b8c242ae01e4768547cd8d5861a46..d12789205e4540482e98b5d2ca9144218abd7154 100644 (file)
@@ -98,6 +98,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         AppendEntries appendEntries, RaftState suggestedState);
 
 
         AppendEntries appendEntries, RaftState suggestedState);
 
 
+    /**
+     * appendEntries first processes the AppendEntries message and then
+     * delegates handling to a specific behavior
+     *
+     * @param sender
+     * @param appendEntries
+     * @param raftState
+     * @return
+     */
     protected RaftState appendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState raftState) {
 
     protected RaftState appendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState raftState) {
 
@@ -137,10 +146,18 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      *                           AppendEntriesReply message
      * @return
      */
      *                           AppendEntriesReply message
      * @return
      */
-
     protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState);
 
     protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState);
 
+    /**
+     * requestVote handles the RequestVote message. This logic is common
+     * for all behaviors
+     *
+     * @param sender
+     * @param requestVote
+     * @param suggestedState
+     * @return
+     */
     protected RaftState requestVote(ActorRef sender,
         RequestVote requestVote, RaftState suggestedState) {
 
     protected RaftState requestVote(ActorRef sender,
         RequestVote requestVote, RaftState suggestedState) {
 
@@ -198,24 +215,35 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      *                         message
      * @return
      */
      *                         message
      * @return
      */
-
     protected abstract RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState);
 
     protected abstract RaftState handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply, RaftState suggestedState);
 
+    /**
+     * Creates a random election duration
+     *
+     * @return
+     */
     protected FiniteDuration electionDuration() {
         long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
         return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
             TimeUnit.MILLISECONDS);
     }
 
     protected FiniteDuration electionDuration() {
         long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
         return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
             TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * stop the scheduled election
+     */
     protected void stopElection() {
         if (electionCancel != null && !electionCancel.isCancelled()) {
             electionCancel.cancel();
         }
     }
 
     protected void stopElection() {
         if (electionCancel != null && !electionCancel.isCancelled()) {
             electionCancel.cancel();
         }
     }
 
+    /**
+     * schedule a new election
+     *
+     * @param interval
+     */
     protected void scheduleElection(FiniteDuration interval) {
     protected void scheduleElection(FiniteDuration interval) {
-
         stopElection();
 
         // Schedule an election. When the scheduler triggers an ElectionTimeout
         stopElection();
 
         // Schedule an election. When the scheduler triggers an ElectionTimeout
@@ -226,30 +254,90 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
+    /**
+     * Get the current term
+     * @return
+     */
     protected long currentTerm() {
         return context.getTermInformation().getCurrentTerm();
     }
 
     protected long currentTerm() {
         return context.getTermInformation().getCurrentTerm();
     }
 
+    /**
+     * Get the candidate for whom we voted in the current term
+     * @return
+     */
     protected String votedFor() {
         return context.getTermInformation().getVotedFor();
     }
 
     protected String votedFor() {
         return context.getTermInformation().getVotedFor();
     }
 
+    /**
+     * Get the actor associated with this behavior
+     * @return
+     */
     protected ActorRef actor() {
         return context.getActor();
     }
 
     protected ActorRef actor() {
         return context.getActor();
     }
 
+    /**
+     * Get the term from the last entry in the log
+     *
+     * @return
+     */
     protected long lastTerm() {
         return context.getReplicatedLog().lastTerm();
     }
 
     protected long lastTerm() {
         return context.getReplicatedLog().lastTerm();
     }
 
+    /**
+     * Get the index from the last entry in the log
+     *
+     * @return
+     */
     protected long lastIndex() {
         return context.getReplicatedLog().lastIndex();
     }
 
     protected long lastIndex() {
         return context.getReplicatedLog().lastIndex();
     }
 
+    /**
+     * Find the client request tracker for a specific logIndex
+     *
+     * @param logIndex
+     * @return
+     */
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
         return null;
     }
 
     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
         return null;
     }
 
+    /**
+     * Find the log index from the previous to last entry in the log
+     *
+     * @return
+     */
+    protected long prevLogIndex(long index){
+        ReplicatedLogEntry prevEntry =
+            context.getReplicatedLog().get(index - 1);
+        if (prevEntry != null) {
+            return prevEntry.getIndex();
+        }
+        return -1;
+    }
+
+    /**
+     * Find the log term from the previous to last entry in the log
+     * @return
+     */
+    protected long prevLogTerm(long index){
+        ReplicatedLogEntry prevEntry =
+            context.getReplicatedLog().get(index - 1);
+        if (prevEntry != null) {
+            return prevEntry.getTerm();
+        }
+        return -1;
+    }
+
+    /**
+     * Apply the provided index to the state machine
+     *
+     * @param index a log index that is known to be committed
+     */
     protected void applyLogToStateMachine(long index) {
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1;
     protected void applyLogToStateMachine(long index) {
         // Now maybe we apply to the state machine
         for (long i = context.getLastApplied() + 1;
index c06ee9bd2b836c784c4adb75e7f318875ebf22c3..ec4551ac4ffb7cae84c9ce9fd82a652499017d98 100644 (file)
@@ -73,7 +73,7 @@ public class Leader extends AbstractRaftActorBehavior {
     public Leader(RaftActorContext context) {
         super(context);
 
     public Leader(RaftActorContext context) {
         super(context);
 
-        if(lastIndex() >= 0) {
+        if (lastIndex() >= 0) {
             context.setCommitIndex(lastIndex());
         }
 
             context.setCommitIndex(lastIndex());
         }
 
@@ -148,7 +148,7 @@ public class Leader extends AbstractRaftActorBehavior {
                 }
             }
 
                 }
             }
 
-            if (replicatedCount >= minReplicationCount){
+            if (replicatedCount >= minReplicationCount) {
                 ReplicatedLogEntry replicatedLogEntry =
                     context.getReplicatedLog().get(N);
                 if (replicatedLogEntry != null
                 ReplicatedLogEntry replicatedLogEntry =
                     context.getReplicatedLog().get(N);
                 if (replicatedLogEntry != null
@@ -161,7 +161,8 @@ public class Leader extends AbstractRaftActorBehavior {
             }
         }
 
             }
         }
 
-        if(context.getCommitIndex() > context.getLastApplied()){
+        // Apply the change to the state machine
+        if (context.getCommitIndex() > context.getLastApplied()) {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
             applyLogToStateMachine(context.getCommitIndex());
         }
 
@@ -194,56 +195,7 @@ public class Leader extends AbstractRaftActorBehavior {
             if (message instanceof SendHeartBeat) {
                 return sendHeartBeat();
             } else if (message instanceof Replicate) {
             if (message instanceof SendHeartBeat) {
                 return sendHeartBeat();
             } else if (message instanceof Replicate) {
-
-                Replicate replicate = (Replicate) message;
-                long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
-                context.getLogger().debug("Replicate message " + logIndex);
-
-                if (followerToActor.size() == 0) {
-                    context.setCommitIndex(
-                        replicate.getReplicatedLogEntry().getIndex());
-
-                    context.getActor()
-                        .tell(new ApplyState(replicate.getClientActor(),
-                                replicate.getIdentifier(),
-                                replicate.getReplicatedLogEntry()),
-                            context.getActor()
-                        );
-                } else {
-
-                    trackerList.add(
-                        new ClientRequestTrackerImpl(replicate.getClientActor(),
-                            replicate.getIdentifier(),
-                            logIndex)
-                    );
-
-                    ReplicatedLogEntry prevEntry =
-                        context.getReplicatedLog().get(lastIndex() - 1);
-                    long prevLogIndex = -1;
-                    long prevLogTerm = -1;
-                    if (prevEntry != null) {
-                        prevLogIndex = prevEntry.getIndex();
-                        prevLogTerm = prevEntry.getTerm();
-                    }
-                    // Send an AppendEntries to all followers
-                    for (String followerId : followerToActor.keySet()) {
-                        ActorSelection followerActor =
-                            followerToActor.get(followerId);
-                        FollowerLogInformation followerLogInformation =
-                            followerToLog.get(followerId);
-                        followerActor.tell(
-                            new AppendEntries(currentTerm(), context.getId(),
-                                prevLogIndex, prevLogTerm,
-                                context.getReplicatedLog().getFrom(
-                                    followerLogInformation.getNextIndex()
-                                        .get()
-                                ), context.getCommitIndex()
-                            ),
-                            actor()
-                        );
-                    }
-                }
+                replicate((Replicate) message);
             }
         } finally {
             scheduleHeartBeat(HEART_BEAT_INTERVAL);
             }
         } finally {
             scheduleHeartBeat(HEART_BEAT_INTERVAL);
@@ -252,28 +204,62 @@ public class Leader extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
         return super.handleMessage(sender, message);
     }
 
-    private RaftState sendHeartBeat() {
-        if (followerToActor.size() > 0) {
-            for (String follower : followerToActor.keySet()) {
-
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(follower);
+    private void replicate(Replicate replicate) {
+        long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
 
-                AtomicLong nextIndex =
-                    followerLogInformation.getNextIndex();
+        context.getLogger().debug("Replicate message " + logIndex);
 
 
-                List<ReplicatedLogEntry> entries =
-                    context.getReplicatedLog().getFrom(nextIndex.get());
+        if (followerToActor.size() == 0) {
+            context.setCommitIndex(
+                replicate.getReplicatedLogEntry().getIndex());
 
 
-                followerToActor.get(follower).tell(new AppendEntries(
-                        context.getTermInformation().getCurrentTerm(),
-                        context.getId(),
-                        context.getReplicatedLog().lastIndex(),
-                        context.getReplicatedLog().lastTerm(),
-                        entries, context.getCommitIndex()),
+            context.getActor()
+                .tell(new ApplyState(replicate.getClientActor(),
+                        replicate.getIdentifier(),
+                        replicate.getReplicatedLogEntry()),
                     context.getActor()
                 );
                     context.getActor()
                 );
-            }
+        } else {
+
+            // Create a tracker entry we will use this later to notify the
+            // client actor
+            trackerList.add(
+                new ClientRequestTrackerImpl(replicate.getClientActor(),
+                    replicate.getIdentifier(),
+                    logIndex)
+            );
+
+            sendAppendEntries();
+        }
+    }
+
+    private void sendAppendEntries() {
+        // Send an AppendEntries to all followers
+        for (String followerId : followerToActor.keySet()) {
+            ActorSelection followerActor =
+                followerToActor.get(followerId);
+
+            FollowerLogInformation followerLogInformation =
+                followerToLog.get(followerId);
+
+            long nextIndex = followerLogInformation.getNextIndex().get();
+
+            List<ReplicatedLogEntry> entries =
+                context.getReplicatedLog().getFrom(nextIndex);
+
+            followerActor.tell(
+                new AppendEntries(currentTerm(), context.getId(),
+                    prevLogIndex(nextIndex), prevLogTerm(nextIndex),
+                    entries, context.getCommitIndex()
+                ),
+                actor()
+            );
+        }
+    }
+
+    private RaftState sendHeartBeat() {
+        if (followerToActor.size() > 0) {
+            sendAppendEntries();
         }
         return state();
     }
         }
         return state();
     }
@@ -285,6 +271,12 @@ public class Leader extends AbstractRaftActorBehavior {
     }
 
     private void scheduleHeartBeat(FiniteDuration interval) {
     }
 
     private void scheduleHeartBeat(FiniteDuration interval) {
+        if(followerToActor.keySet().size() == 0){
+            // Optimization - do not bother scheduling a heartbeat as there are
+            // no followers
+            return;
+        }
+
         stopHeartBeat();
 
         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
         stopHeartBeat();
 
         // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
index e5e54d5944bf424b6acc46ea637eefe07e17093a..35bf6f15b42b2e5c2b81b6ef9b10e21baeb7ec9f 100644 (file)
@@ -8,6 +8,8 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
@@ -19,8 +21,10 @@ import static org.junit.Assert.assertEquals;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
-    private ActorRef leaderActor = getSystem().actorOf(Props.create(DoNothingActor.class));
-    private ActorRef senderActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+    private ActorRef leaderActor =
+        getSystem().actorOf(Props.create(DoNothingActor.class));
+    private ActorRef senderActor =
+        getSystem().actorOf(Props.create(DoNothingActor.class));
 
     @Test
     public void testHandleMessageForUnknownMessage() throws Exception {
 
     @Test
     public void testHandleMessageForUnknownMessage() throws Exception {
@@ -37,7 +41,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
 
     @Test
 
 
     @Test
-    public void testThatLeaderSendsAHeartbeatMessageToAllFollowers(){
+    public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
@@ -45,31 +49,35 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                     ActorRef followerActor = getTestActor();
 
 
                     ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
 
                     Map<String, String> peerAddresses = new HashMap();
 
 
                     Map<String, String> peerAddresses = new HashMap();
 
-                    peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
                     leader.handleMessage(senderActor, new SendHeartBeat());
 
 
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
                     leader.handleMessage(senderActor, new SendHeartBeat());
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                        // do not put code outside this method, will run afterwards
-                        protected String match(Object in) {
-                            if (in instanceof AppendEntries) {
-                                if (((AppendEntries) in).getTerm()
-                                    == 0) {
-                                    return "match";
+                    final String out =
+                        new ExpectMsg<String>(duration("1 seconds"),
+                            "match hint") {
+                            // do not put code outside this method, will run afterwards
+                            protected String match(Object in) {
+                                if (in instanceof AppendEntries) {
+                                    if (((AppendEntries) in).getTerm()
+                                        == 0) {
+                                        return "match";
+                                    }
+                                    return null;
+                                } else {
+                                    throw noMatch();
                                 }
                                 }
-                                return null;
-                            } else {
-                                throw noMatch();
                             }
                             }
-                        }
-                    }.get(); // this extracts the received message
+                        }.get(); // this extracts the received message
 
                     assertEquals("match", out);
 
 
                     assertEquals("match", out);
 
@@ -80,7 +88,114 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
         }};
     }
 
-    @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+    @Test
+    public void testHandleReplicateMessageSendAppendEntriesToFollower() {
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef followerActor = getTestActor();
+
+                    MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+
+                    Map<String, String> peerAddresses = new HashMap();
+
+                    peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                    actorContext.setPeerAddresses(peerAddresses);
+
+                    Leader leader = new Leader(actorContext);
+                    RaftState raftState = leader
+                        .handleMessage(senderActor, new Replicate(null, null,
+                            new MockRaftActorContext.MockReplicatedLogEntry(1,
+                                100,
+                                "foo")
+                        ));
+
+                    // State should not change
+                    assertEquals(RaftState.Leader, raftState);
+
+                    final String out =
+                        new ExpectMsg<String>(duration("1 seconds"),
+                            "match hint") {
+                            // do not put code outside this method, will run afterwards
+                            protected String match(Object in) {
+                                if (in instanceof AppendEntries) {
+                                    if (((AppendEntries) in).getTerm()
+                                        == 0) {
+                                        return "match";
+                                    }
+                                    return null;
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+
+                }
+
+
+            };
+        }};
+    }
+
+    @Test
+    public void testHandleReplicateMessageWhenThereAreNoFollowers() {
+        new JavaTestKit(getSystem()) {{
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    ActorRef raftActor = getTestActor();
+
+                    MockRaftActorContext actorContext =
+                        new MockRaftActorContext("test", getSystem(), raftActor);
+
+                    Leader leader = new Leader(actorContext);
+                    RaftState raftState = leader
+                        .handleMessage(senderActor, new Replicate(null, "state-id",
+                            new MockRaftActorContext.MockReplicatedLogEntry(1,
+                                100,
+                                "foo")
+                        ));
+
+                    // State should not change
+                    assertEquals(RaftState.Leader, raftState);
+
+                    assertEquals(100, actorContext.getCommitIndex());
+
+                    final String out =
+                        new ExpectMsg<String>(duration("1 seconds"),
+                            "match hint") {
+                            // do not put code outside this method, will run afterwards
+                            protected String match(Object in) {
+                                if (in instanceof ApplyState) {
+                                    if (((ApplyState) in).getIdentifier().equals("state-id")) {
+                                        return "match";
+                                    }
+                                    return null;
+                                } else {
+                                    throw noMatch();
+                                }
+                            }
+                        }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+
+                }
+
+
+            };
+        }};
+    }
+
+    @Override protected RaftActorBehavior createBehavior(
+        RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
         return new Leader(actorContext);
     }