Implement handling of AppendEntries from a recipient perspective 73/8973/4
authorMoiz Raja <moraja@cisco.com>
Sun, 13 Jul 2014 01:46:15 +0000 (18:46 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 22:16:20 +0000 (15:16 -0700)
This commit includes everything that a recipient will do when it receives an AppendEntries
except applying of the state to the state machine

Change-Id: I9131b7b73624527efb1a7cb7f0c999fa03f90dfd
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java

index 5e628bdd1bb035342529b64702c707bf6a458696..186de02fed39dfe5f9485654e08f5acf7f3f22e0 100644 (file)
@@ -103,12 +103,20 @@ public abstract class RaftActor extends UntypedEventsourcedProcessor {
 
     private class ReplicatedLogImpl implements ReplicatedLog {
 
 
     private class ReplicatedLogImpl implements ReplicatedLog {
 
-        @Override public ReplicatedLogEntry getReplicatedLogEntry(long index) {
-            throw new UnsupportedOperationException("getReplicatedLogEntry");
+        @Override public ReplicatedLogEntry get(long index) {
+            throw new UnsupportedOperationException("get");
         }
 
         @Override public ReplicatedLogEntry last() {
             throw new UnsupportedOperationException("last");
         }
         }
 
         @Override public ReplicatedLogEntry last() {
             throw new UnsupportedOperationException("last");
         }
+
+        @Override public void removeFrom(long index) {
+            throw new UnsupportedOperationException("removeFrom");
+        }
+
+        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
+            throw new UnsupportedOperationException("append");
+        }
     }
 }
     }
 }
index e2f4bdbeb6e235580dc7d0d6be3c8990997f0989..f12bc9af7357501a9670d72f3ddf45edbfd07def 100644 (file)
@@ -18,7 +18,7 @@ public interface ReplicatedLog {
      * @param index
      * @return
      */
      * @param index
      * @return
      */
-    ReplicatedLogEntry getReplicatedLogEntry(long index);
+    ReplicatedLogEntry get(long index);
 
 
     /**
 
 
     /**
@@ -27,4 +27,17 @@ public interface ReplicatedLog {
      * @return
      */
     ReplicatedLogEntry last();
      * @return
      */
     ReplicatedLogEntry last();
+
+    /**
+     * Remove all the entries from the logs >= index
+     *
+     * @param index
+     */
+    void removeFrom(long index);
+
+    /**
+     * Append an entry to the log
+     * @param replicatedLogEntry
+     */
+    void append(ReplicatedLogEntry replicatedLogEntry);
 }
 }
index ae1baec471ef1d08abdcbb6d9c56052b95a87ffa..167082711d82b46c61e90e8311473d051e06461c 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -45,10 +46,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
      */
     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
 
+    /**
+     * The interval at which a heart beat message will be sent to the remote
+     * RaftActor
+     * <p/>
+     * Since this is set to 100 milliseconds the Election timeout should be
+     * at least 200 milliseconds
+     */
+    protected static final FiniteDuration HEART_BEAT_INTERVAL =
+        new FiniteDuration(100, TimeUnit.MILLISECONDS);
+
     /**
      * The interval in which a new election would get triggered if no leader is found
      */
     /**
      * The interval in which a new election would get triggered if no leader is found
      */
-    private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
+    private static final long ELECTION_TIME_INTERVAL =
+        HEART_BEAT_INTERVAL.toMillis() * 2;
 
     /**
      *
 
     /**
      *
@@ -79,6 +91,71 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected abstract RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState suggestedState);
 
     protected abstract RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries, RaftState suggestedState);
 
+
+    protected RaftState appendEntries(ActorRef sender,
+        AppendEntries appendEntries, RaftState raftState){
+
+        // 1. Reply false if term < currentTerm (§5.1)
+        if(appendEntries.getTerm() < currentTerm()){
+            sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
+            return state();
+        }
+
+        // 2. Reply false if log doesn’t contain an entry at prevLogIndex
+        // whose term matches prevLogTerm (§5.3)
+        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+            .get(appendEntries.getPrevLogIndex());
+
+        if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){
+            sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
+            return state();
+        }
+
+        if(appendEntries.getEntries() != null) {
+            // 3. If an existing entry conflicts with a new one (same index
+            // but different terms), delete the existing entry and all that
+            // follow it (§5.3)
+            int addEntriesFrom = 0;
+            for (int i = 0;
+                 i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+                ReplicatedLogEntry newEntry = context.getReplicatedLog()
+                    .get(i + 1);
+
+                if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){
+                    break;
+                }
+                if (newEntry != null && newEntry.getTerm() != appendEntries
+                    .getEntries().get(i).getTerm()) {
+                    context.getReplicatedLog().removeFrom(i + 1);
+                    break;
+                }
+            }
+
+            // 4. Append any new entries not already in the log
+            for (int i = addEntriesFrom;
+                 i < appendEntries.getEntries().size(); i++) {
+                context.getReplicatedLog()
+                    .append(appendEntries.getEntries().get(i));
+            }
+        }
+
+
+        // 5. If leaderCommit > commitIndex, set commitIndex =
+        // min(leaderCommit, index of last new entry)
+        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
+            context.getReplicatedLog().last().getIndex()));
+
+        // If commitIndex > lastApplied: increment lastApplied, apply
+        // log[lastApplied] to state machine (§5.3)
+        if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
+            applyLogToStateMachine(appendEntries.getLeaderCommit());
+        }
+
+        sender.tell(new AppendEntriesReply(currentTerm(), true), actor());
+
+        return handleAppendEntries(sender, appendEntries, raftState);
+    }
+
     /**
      * Derived classes should not directly handle AppendEntriesReply messages it
      * should let the base class handle it first. Once the base class handles
     /**
      * Derived classes should not directly handle AppendEntriesReply messages it
      * should let the base class handle it first. Once the base class handles
@@ -98,18 +175,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState);
 
     protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply, RaftState suggestedState);
 
-    protected RaftState handleRequestVote(ActorRef sender,
-        RequestVote requestVote, RaftState suggestedState){
+    protected RaftState requestVote(ActorRef sender,
+        RequestVote requestVote, RaftState suggestedState) {
 
         boolean grantVote = false;
 
         //  Reply false if term < currentTerm (§5.1)
 
         boolean grantVote = false;
 
         //  Reply false if term < currentTerm (§5.1)
-        if(requestVote.getTerm() < currentTerm()){
+        if (requestVote.getTerm() < currentTerm()) {
             grantVote = false;
 
             grantVote = false;
 
-        // If votedFor is null or candidateId, and candidate’s log is at
-        // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
-        } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) {
+            // If votedFor is null or candidateId, and candidate’s log is at
+            // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
+        } else if (votedFor() == null || votedFor()
+            .equals(requestVote.getCandidateId())) {
 
             boolean candidateLatest = false;
 
 
             boolean candidateLatest = false;
 
@@ -120,13 +198,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             // the log with the later term is more up-to-date. If the logs
             // end with the same term, then whichever log is longer is
             // more up-to-date.
             // the log with the later term is more up-to-date. If the logs
             // end with the same term, then whichever log is longer is
             // more up-to-date.
-            if(requestVote.getLastLogTerm() > lastTerm()){
+            if (requestVote.getLastLogTerm() > lastTerm()) {
                 candidateLatest = true;
                 candidateLatest = true;
-            } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){
+            } else if ((requestVote.getLastLogTerm() == lastTerm())
+                && requestVote.getLastLogIndex() >= lastTerm()) {
                 candidateLatest = true;
             }
 
                 candidateLatest = true;
             }
 
-            if(candidateLatest) {
+            if (candidateLatest) {
                 grantVote = true;
                 context.getTermInformation().update(requestVote.getTerm(),
                     requestVote.getCandidateId());
                 grantVote = true;
                 context.getTermInformation().update(requestVote.getTerm(),
                     requestVote.getCandidateId());
@@ -163,9 +242,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected abstract RaftState state();
 
      */
     protected abstract RaftState state();
 
-    protected FiniteDuration electionDuration(){
+    protected FiniteDuration electionDuration() {
         long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
         long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
-        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
+        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
+            TimeUnit.MILLISECONDS);
     }
 
     protected void scheduleElection(FiniteDuration interval) {
     }
 
     protected void scheduleElection(FiniteDuration interval) {
@@ -182,15 +262,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
-    protected long currentTerm(){
+    protected long currentTerm() {
         return context.getTermInformation().getCurrentTerm();
     }
 
         return context.getTermInformation().getCurrentTerm();
     }
 
-    protected String votedFor(){
+    protected String votedFor() {
         return context.getTermInformation().getVotedFor();
     }
 
         return context.getTermInformation().getVotedFor();
     }
 
-    protected ActorRef actor(){
+    protected ActorRef actor() {
         return context.getActor();
     }
 
         return context.getActor();
     }
 
@@ -210,18 +290,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             raftState = applyTerm((RaftRPC) message);
         }
         if (message instanceof AppendEntries) {
             raftState = applyTerm((RaftRPC) message);
         }
         if (message instanceof AppendEntries) {
-            AppendEntries appendEntries = (AppendEntries) message;
-            if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
-                applyLogToStateMachine(appendEntries.getLeaderCommit());
-            }
-            raftState = handleAppendEntries(sender, appendEntries, raftState);
+            raftState = appendEntries(sender, (AppendEntries) message,
+                raftState);
         } else if (message instanceof AppendEntriesReply) {
             raftState =
                 handleAppendEntriesReply(sender, (AppendEntriesReply) message,
                     raftState);
         } else if (message instanceof RequestVote) {
             raftState =
         } else if (message instanceof AppendEntriesReply) {
             raftState =
                 handleAppendEntriesReply(sender, (AppendEntriesReply) message,
                     raftState);
         } else if (message instanceof RequestVote) {
             raftState =
-                handleRequestVote(sender, (RequestVote) message, raftState);
+                requestVote(sender, (RequestVote) message, raftState);
         } else if (message instanceof RequestVoteReply) {
             raftState =
                 handleRequestVoteReply(sender, (RequestVoteReply) message,
         } else if (message instanceof RequestVoteReply) {
             raftState =
                 handleRequestVoteReply(sender, (RequestVoteReply) message,
index 0498d7fc8b17c52f60cd2b5b0cc249853a5daca7..cfefd21c568029d1e4ce9eed3e5c139724fb5498 100644 (file)
@@ -53,15 +53,6 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class Leader extends AbstractRaftActorBehavior {
 
  */
 public class Leader extends AbstractRaftActorBehavior {
 
-    /**
-     * The interval at which a heart beat message will be sent to the remote
-     * RaftActor
-     * <p/>
-     * Since this is set to 100 milliseconds the Election timeout should be
-     * at least 200 milliseconds
-     */
-    public static final FiniteDuration HEART_BEAT_INTERVAL =
-        new FiniteDuration(100, TimeUnit.MILLISECONDS);
 
     private final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
 
     private final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
@@ -70,10 +61,10 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private Cancellable heartbeatCancel = null;
 
 
     private Cancellable heartbeatCancel = null;
 
-    public Leader(RaftActorContext context, List<String> followePaths) {
+    public Leader(RaftActorContext context, List<String> followerPaths) {
         super(context);
 
         super(context);
 
-        for (String followerPath : followePaths) {
+        for (String followerPath : followerPaths) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerPath,
                     new AtomicLong(0),
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerPath,
                     new AtomicLong(0),
index fd4e0a274c6f9122e40809e7b120f6a7a8b936bd..c0e89a8ef41a6c5ecaf2282eb8d4a131d43876c6 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+
 import java.util.List;
 
 /**
 import java.util.List;
 
 /**
@@ -26,13 +28,13 @@ public class AppendEntries extends AbstractRaftRPC {
 
     // log entries to store (empty for heartbeat;
     // may send more than one for efficiency)
 
     // log entries to store (empty for heartbeat;
     // may send more than one for efficiency)
-    private final List<Object> entries;
+    private final List<ReplicatedLogEntry> entries;
 
     // leader's commitIndex
     private final long leaderCommit;
 
     public AppendEntries(long term, String leaderId, long prevLogIndex,
 
     // leader's commitIndex
     private final long leaderCommit;
 
     public AppendEntries(long term, String leaderId, long prevLogIndex,
-        long prevLogTerm, List<Object> entries, long leaderCommit) {
+        long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
         super(term);
         this.leaderId = leaderId;
         this.prevLogIndex = prevLogIndex;
         super(term);
         this.leaderId = leaderId;
         this.prevLogIndex = prevLogIndex;
@@ -53,7 +55,7 @@ public class AppendEntries extends AbstractRaftRPC {
         return prevLogTerm;
     }
 
         return prevLogTerm;
     }
 
-    public List<Object> getEntries() {
+    public List<ReplicatedLogEntry> getEntries() {
         return entries;
     }
 
         return entries;
     }
 
index 2a1cf9ab877f3a3ae6cdf0148997732b38516f00..3b332e4ec7b1031f110059a8789b165ed11909e4 100644 (file)
@@ -13,6 +13,9 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class MockRaftActorContext implements RaftActorContext {
 
     private String id;
 public class MockRaftActorContext implements RaftActorContext {
 
     private String id;
@@ -104,7 +107,7 @@ public class MockRaftActorContext implements RaftActorContext {
         private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, "");
         private ReplicatedLogEntry last = new MockReplicatedLogEntry(0,0, "");
 
         private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, "");
         private ReplicatedLogEntry last = new MockReplicatedLogEntry(0,0, "");
 
-        @Override public ReplicatedLogEntry getReplicatedLogEntry(long index) {
+        @Override public ReplicatedLogEntry get(long index) {
             return replicatedLogEntry;
         }
 
             return replicatedLogEntry;
         }
 
@@ -112,6 +115,12 @@ public class MockRaftActorContext implements RaftActorContext {
             return last;
         }
 
             return last;
         }
 
+        @Override public void removeFrom(long index) {
+        }
+
+        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
+        }
+
         public void setReplicatedLogEntry(
             ReplicatedLogEntry replicatedLogEntry) {
             this.replicatedLogEntry = replicatedLogEntry;
         public void setReplicatedLogEntry(
             ReplicatedLogEntry replicatedLogEntry) {
             this.replicatedLogEntry = replicatedLogEntry;
@@ -122,6 +131,28 @@ public class MockRaftActorContext implements RaftActorContext {
         }
     }
 
         }
     }
 
+    public static class SimpleReplicatedLog implements ReplicatedLog {
+        private final List<ReplicatedLogEntry> log = new ArrayList<>(10000);
+
+        @Override public ReplicatedLogEntry get(long index) {
+            return log.get((int) index);
+        }
+
+        @Override public ReplicatedLogEntry last() {
+            return log.get(log.size()-1);
+        }
+
+        @Override public void removeFrom(long index) {
+            for(int i=(int) index ; i < log.size() ; i++) {
+                log.remove(i);
+            }
+        }
+
+        @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
+            log.add(replicatedLogEntry);
+        }
+    }
+
     public static class MockReplicatedLogEntry implements ReplicatedLogEntry {
 
         private final long term;
     public static class MockReplicatedLogEntry implements ReplicatedLogEntry {
 
         private final long term;
index 1e7aa6a1d5fe40e2fd57e3088e80dc0dcbe1d653..3cd373adf4319e36f5101c418e330adc90c7931d 100644 (file)
@@ -8,6 +8,7 @@ import org.opendaylight.controller.cluster.raft.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
@@ -15,17 +16,25 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 
-public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
+public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
     private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
         DoNothingActor.class));
 
 
     private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
         DoNothingActor.class));
 
-   @Test
-    public void testHandlingOfRaftRPCWithNewerTerm() throws Exception {
+    /**
+     * This test checks that when a new Raft RPC message is received with a newer
+     * term the RaftActor gets into the Follower state.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleRaftRPCWithNewerTerm() throws Exception {
         new JavaTestKit(getSystem()) {{
 
             assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
         new JavaTestKit(getSystem()) {{
 
             assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
@@ -44,28 +53,339 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
         }};
     }
 
         }};
     }
 
+    /**
+     * This test verifies that when an AppendEntries RPC is received by a RaftActor
+     * with a commitIndex that is greater than what has been applied to the
+     * state machine of the RaftActor, the RaftActor applies the state and
+     * sets it current applied state to the commitIndex of the sender.
+     *
+     * @throws Exception
+     */
     @Test
     @Test
-    public void testHandlingOfAppendEntriesWithNewerCommitIndex() throws Exception{
+    public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
         new JavaTestKit(getSystem()) {{
 
             RaftActorContext context =
                 createActorContext();
 
         new JavaTestKit(getSystem()) {{
 
             RaftActorContext context =
                 createActorContext();
 
-            ((MockRaftActorContext) context).setLastApplied(100);
+            context.setLastApplied(100);
+            setLastLogEntry((MockRaftActorContext) context, 0, 0, "");
 
 
+            // The new commitIndex is 101
             AppendEntries appendEntries =
                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
 
             RaftState raftState =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
 
             AppendEntries appendEntries =
                 new AppendEntries(100, "leader-1", 0, 0, null, 101);
 
             RaftState raftState =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
 
-            assertEquals(new AtomicLong(101).get(), context.getLastApplied());
+            assertEquals(101L, context.getLastApplied());
+
+        }};
+    }
+
+    /**
+     * This test verifies that when an AppendEntries is received with a term that
+     * is less that the currentTerm of the RaftActor then the RaftActor does not
+     * change it's state and it responds back with a failure
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleAppendEntriesSenderTermLessThanReceiverTerm()
+        throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                createActorContext();
+
+            // First set the receivers term to a high number (1000)
+            context.getTermInformation().update(1000, "test");
+
+            AppendEntries appendEntries =
+                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftState raftState =
+                behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftState);
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(false, out);
+
+
+        }};
+    }
+
+    /**
+     * This test verifies that when an AppendEntries is received a specific prevLogTerm
+     * which does not match the term that is in RaftActors log entry at prevLogIndex
+     * then the RaftActor does not change it's state and it returns a failure.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
+        throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                createActorContext();
+
+            // First set the receivers term to lower number
+            context.getTermInformation().update(95, "test");
+
+            // Set the last log entry term for the receiver to be greater than
+            // what we will be sending as the prevLogTerm in AppendEntries
+            MockRaftActorContext.MockReplicatedLog mockReplicatedLog =
+                setLastLogEntry(context, 20, 0, "");
+
+            // Also set the entry at index 0 with term 20 which will be greater
+            // than the prevLogTerm sent by the sender
+            mockReplicatedLog.setReplicatedLogEntry(
+                new MockRaftActorContext.MockReplicatedLogEntry(20, 0, ""));
+
+            // AppendEntries is now sent with a bigger term
+            // this will set the receivers term to be the same as the sender's term
+            AppendEntries appendEntries =
+                new AppendEntries(100, "leader-1", 0, 0, null, 101);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftState raftState =
+                behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftState);
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(false, out);
+
+
+        }};
+    }
+
+    /**
+     * This test verifies that when a new AppendEntries message is received with
+     * new entries and the logs of the sender and receiver match that the new
+     * entries get added to the log and the log is incremented by the number of
+     * entries received in appendEntries
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleAppendEntriesAddNewEntries() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                createActorContext();
+
+            // First set the receivers term to lower number
+            context.getTermInformation().update(1, "test");
+
+            // Prepare the receivers log
+            MockRaftActorContext.SimpleReplicatedLog log =
+                new MockRaftActorContext.SimpleReplicatedLog();
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+
+            context.setReplicatedLog(log);
+
+            // Prepare the entries to be sent with AppendEntries
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            entries.add(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three"));
+            entries.add(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four"));
+
+            // Send appendEntries with the same term as was set on the receiver
+            // before the new behavior was created (1 in this case)
+            // This will not work for a Candidate because as soon as a Candidate
+            // is created it increments the term
+            AppendEntries appendEntries =
+                new AppendEntries(1, "leader-1", 2, 1, entries, 101);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
+                // Resetting the Candidates term to make sure it will match
+                // the term sent by AppendEntries. If this was not done then
+                // the test will fail because the Candidate will assume that
+                // the message was sent to it from a lower term peer and will
+                // thus respond with a failure
+                context.getTermInformation().update(1, "test");
+            }
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftState raftState =
+                behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftState);
+            assertEquals(5, log.last().getIndex() + 1);
+            assertNotNull(log.get(3));
+            assertNotNull(log.get(4));
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(true, out);
+
+
+        }};
+    }
+
+    /**
+     * This test verifies that when a new AppendEntries message is received with
+     * new entries and the logs of the sender and receiver are out-of-sync that
+     * the log is first corrected by removing the out of sync entries from the
+     * log and then adding in the new entries sent with the AppendEntries message
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandleAppendEntriesCorrectReceiverLogEntries()
+        throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                createActorContext();
+
+            // First set the receivers term to lower number
+            context.getTermInformation().update(2, "test");
+
+            // Prepare the receivers log
+            MockRaftActorContext.SimpleReplicatedLog log =
+                new MockRaftActorContext.SimpleReplicatedLog();
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero"));
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one"));
+            log.append(
+                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two"));
+
+            context.setReplicatedLog(log);
+
+            // Prepare the entries to be sent with AppendEntries
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            entries.add(
+                new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1"));
+            entries.add(
+                new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three"));
+
+            // Send appendEntries with the same term as was set on the receiver
+            // before the new behavior was created (1 in this case)
+            // This will not work for a Candidate because as soon as a Candidate
+            // is created it increments the term
+            AppendEntries appendEntries =
+                new AppendEntries(2, "leader-1", 1, 1, entries, 101);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
+                // Resetting the Candidates term to make sure it will match
+                // the term sent by AppendEntries. If this was not done then
+                // the test will fail because the Candidate will assume that
+                // the message was sent to it from a lower term peer and will
+                // thus respond with a failure
+                context.getTermInformation().update(2, "test");
+            }
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftState raftState =
+                behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftState);
+
+            // The entry at index 2 will be found out-of-sync with the leader
+            // and will be removed
+            // Then the two new entries will be added to the log
+            // Thus making the log to have 4 entries
+            assertEquals(4, log.last().getIndex() + 1);
+            assertNotNull(log.get(2));
+
+            // Check that the entry at index 2 has the new data
+            assertEquals("two-1", log.get(2).getData());
+            assertNotNull(log.get(3));
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(true, out);
+
 
         }};
     }
 
 
         }};
     }
 
+    /**
+     * This test verifies that when a RequestVote is received by the RaftActor
+     * with a term which is greater than the RaftActors' currentTerm and the
+     * senders' log is more upto date than the receiver that the receiver grants
+     * the vote to the sender
+     */
     @Test
     @Test
-    public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermAndSenderLogMoreUpToDate(){
+    public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermAndSenderLogMoreUpToDate() {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
@@ -74,19 +394,23 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
                     RaftActorBehavior follower = createBehavior(
                         createActorContext(behaviorActor));
 
                     RaftActorBehavior follower = createBehavior(
                         createActorContext(behaviorActor));
 
-                    follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
-
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
-                        // do not put code outside this method, will run afterwards
-                        protected Boolean match(Object in) {
-                            if (in instanceof RequestVoteReply) {
-                                RequestVoteReply reply = (RequestVoteReply) in;
-                                return reply.isVoteGranted();
-                            } else {
-                                throw noMatch();
+                    follower.handleMessage(getTestActor(),
+                        new RequestVote(1000, "test", 10000, 999));
+
+                    final Boolean out =
+                        new ExpectMsg<Boolean>(duration("1 seconds"),
+                            "RequestVoteReply") {
+                            // do not put code outside this method, will run afterwards
+                            protected Boolean match(Object in) {
+                                if (in instanceof RequestVoteReply) {
+                                    RequestVoteReply reply =
+                                        (RequestVoteReply) in;
+                                    return reply.isVoteGranted();
+                                } else {
+                                    throw noMatch();
+                                }
                             }
                             }
-                        }
-                    }.get();
+                        }.get();
 
                     assertEquals(true, out);
                 }
 
                     assertEquals(true, out);
                 }
@@ -94,8 +418,13 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
         }};
     }
 
         }};
     }
 
+    /**
+     * This test verifies that when a RaftActor receives a RequestVote message
+     * with a term that is greater than it's currentTerm but a less up-to-date
+     * log then the receiving RaftActor will not grant the vote to the sender
+     */
     @Test
     @Test
-    public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermButSenderLogLessUptoDate(){
+    public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermButSenderLogLessUptoDate() {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
@@ -104,29 +433,37 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
                     RaftActorContext actorContext =
                         createActorContext(behaviorActor);
 
                     RaftActorContext actorContext =
                         createActorContext(behaviorActor);
 
-                    MockRaftActorContext.MockReplicatedLog log = new MockRaftActorContext.MockReplicatedLog();
-                    log.setReplicatedLogEntry(new MockRaftActorContext.MockReplicatedLogEntry(20000, 1000000, ""));
-                    log.setLast(
+                    MockRaftActorContext.MockReplicatedLog
+                        log = new MockRaftActorContext.MockReplicatedLog();
+                    log.setReplicatedLogEntry(
                         new MockRaftActorContext.MockReplicatedLogEntry(20000,
                             1000000, ""));
                         new MockRaftActorContext.MockReplicatedLogEntry(20000,
                             1000000, ""));
+                    log.setLast(
+                        new MockRaftActorContext.MockReplicatedLogEntry(20000,
+                            1000000, "")
+                    );
 
                     ((MockRaftActorContext) actorContext).setReplicatedLog(log);
 
                     RaftActorBehavior follower = createBehavior(actorContext);
 
 
                     ((MockRaftActorContext) actorContext).setReplicatedLog(log);
 
                     RaftActorBehavior follower = createBehavior(actorContext);
 
-                    follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
-
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
-                        // do not put code outside this method, will run afterwards
-                        protected Boolean match(Object in) {
-                            if (in instanceof RequestVoteReply) {
-                                RequestVoteReply reply = (RequestVoteReply) in;
-                                return reply.isVoteGranted();
-                            } else {
-                                throw noMatch();
+                    follower.handleMessage(getTestActor(),
+                        new RequestVote(1000, "test", 10000, 999));
+
+                    final Boolean out =
+                        new ExpectMsg<Boolean>(duration("1 seconds"),
+                            "RequestVoteReply") {
+                            // do not put code outside this method, will run afterwards
+                            protected Boolean match(Object in) {
+                                if (in instanceof RequestVoteReply) {
+                                    RequestVoteReply reply =
+                                        (RequestVoteReply) in;
+                                    return reply.isVoteGranted();
+                                } else {
+                                    throw noMatch();
+                                }
                             }
                             }
-                        }
-                    }.get();
+                        }.get();
 
                     assertEquals(false, out);
                 }
 
                     assertEquals(false, out);
                 }
@@ -136,33 +473,42 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
 
 
 
 
 
 
-
+    /**
+     * This test verifies that the receiving RaftActor will not grant a vote
+     * to a sender if the sender's term is lesser than the currentTerm of the
+     * recipient RaftActor
+     */
     @Test
     @Test
-    public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm(){
+    public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
                 protected void run() {
 
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
                 protected void run() {
 
-                    RaftActorContext context = createActorContext(behaviorActor);
+                    RaftActorContext context =
+                        createActorContext(behaviorActor);
 
                     context.getTermInformation().update(1000, null);
 
                     RaftActorBehavior follower = createBehavior(context);
 
 
                     context.getTermInformation().update(1000, null);
 
                     RaftActorBehavior follower = createBehavior(context);
 
-                    follower.handleMessage(getTestActor(), new RequestVote(999, "test", 10000, 999));
-
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
-                        // do not put code outside this method, will run afterwards
-                        protected Boolean match(Object in) {
-                            if (in instanceof RequestVoteReply) {
-                                RequestVoteReply reply = (RequestVoteReply) in;
-                                return reply.isVoteGranted();
-                            } else {
-                                throw noMatch();
+                    follower.handleMessage(getTestActor(),
+                        new RequestVote(999, "test", 10000, 999));
+
+                    final Boolean out =
+                        new ExpectMsg<Boolean>(duration("1 seconds"),
+                            "RequestVoteReply") {
+                            // do not put code outside this method, will run afterwards
+                            protected Boolean match(Object in) {
+                                if (in instanceof RequestVoteReply) {
+                                    RequestVoteReply reply =
+                                        (RequestVoteReply) in;
+                                    return reply.isVoteGranted();
+                                } else {
+                                    throw noMatch();
+                                }
                             }
                             }
-                        }
-                    }.get();
+                        }.get();
 
                     assertEquals(false, out);
                 }
 
                     assertEquals(false, out);
                 }
@@ -171,20 +517,43 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
     }
 
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
     }
 
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
-        ActorRef actorRef, RaftRPC rpc){
-        RaftState raftState = createBehavior()
+        ActorRef actorRef, RaftRPC rpc) {
+
+        RaftActorContext actorContext = createActorContext();
+        setLastLogEntry(
+            (MockRaftActorContext) actorContext, 0, 0, "");
+
+        RaftState raftState = createBehavior(actorContext)
             .handleMessage(actorRef, rpc);
 
         assertEquals(RaftState.Follower, raftState);
     }
 
             .handleMessage(actorRef, rpc);
 
         assertEquals(RaftState.Follower, raftState);
     }
 
-    protected abstract RaftActorBehavior createBehavior(RaftActorContext actorContext);
+    protected MockRaftActorContext.MockReplicatedLog setLastLogEntry(
+        MockRaftActorContext actorContext, long term, long index, Object data) {
+        return setLastLogEntry(actorContext,
+            new MockRaftActorContext.MockReplicatedLogEntry(term, index, data));
+    }
+
+    protected MockRaftActorContext.MockReplicatedLog setLastLogEntry(
+        MockRaftActorContext actorContext, ReplicatedLogEntry logEntry) {
+        MockRaftActorContext.MockReplicatedLog
+            log = new MockRaftActorContext.MockReplicatedLog();
+        // By default MockReplicateLog has last entry set to (1,1,"")
+        log.setLast(logEntry);
+        actorContext.setReplicatedLog(log);
+
+        return log;
+    }
+
+    protected abstract RaftActorBehavior createBehavior(
+        RaftActorContext actorContext);
 
 
-    protected RaftActorBehavior createBehavior(){
+    protected RaftActorBehavior createBehavior() {
         return createBehavior(createActorContext());
     }
 
         return createBehavior(createActorContext());
     }
 
-    protected RaftActorContext createActorContext(){
+    protected RaftActorContext createActorContext() {
         return new MockRaftActorContext();
     }
 
         return new MockRaftActorContext();
     }
 
@@ -192,19 +561,19 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest{
         return new MockRaftActorContext("test", getSystem(), actor);
     }
 
         return new MockRaftActorContext("test", getSystem(), actor);
     }
 
-    protected AppendEntries createAppendEntriesWithNewerTerm(){
+    protected AppendEntries createAppendEntriesWithNewerTerm() {
         return new AppendEntries(100, "leader-1", 0, 0, null, 1);
     }
 
         return new AppendEntries(100, "leader-1", 0, 0, null, 1);
     }
 
-    protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm(){
+    protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() {
         return new AppendEntriesReply(100, false);
     }
 
         return new AppendEntriesReply(100, false);
     }
 
-    protected RequestVote createRequestVoteWithNewerTerm(){
+    protected RequestVote createRequestVoteWithNewerTerm() {
         return new RequestVote(100, "candidate-1", 10, 100);
     }
 
         return new RequestVote(100, "candidate-1", 10, 100);
     }
 
-    protected RequestVoteReply createRequestVoteReplyWithNewerTerm(){
+    protected RequestVoteReply createRequestVoteReplyWithNewerTerm() {
         return new RequestVoteReply(100, false);
     }
 
         return new RequestVoteReply(100, false);
     }