Enhance code documentation and add TODOs 04/9304/4
authorMoiz Raja <moraja@cisco.com>
Fri, 25 Jul 2014 00:47:41 +0000 (17:47 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 23:15:42 +0000 (16:15 -0700)
This commit is based on a code walk through done with Colin Dixon and
Abhishek Kumar.

TODOs and FIXMEs have been added to guide further development.

The major things that need to be completed are,
1. Installing snapshots
2. Adding and removing a new peer with consensus
3. Optimizing AppendEntries (faster synchronization)

Change-Id: Ic788e050fe8fa591176a927906004fd2277e29fa
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index 9c0a4fbffd5a03bf8e645e4d5aab9f72e66b507c..9f0d02edb9d5aa111b841f0972c0d82ca07a7df8 100644 (file)
@@ -30,9 +30,8 @@ public interface ElectionTerm {
     String getVotedFor();
 
     /**
     String getVotedFor();
 
     /**
-     * Called when we need to update the current term either because we received
-     * a message from someone with a more uptodate term or because we just voted
-     * for someone
+     * To be called mainly when we are recovering in-memory election state from
+     * persistent storage
      *
      * @param currentTerm
      * @param votedFor
      *
      * @param currentTerm
      * @param votedFor
@@ -40,6 +39,13 @@ public interface ElectionTerm {
     void update(long currentTerm, String votedFor);
 
     /**
     void update(long currentTerm, String votedFor);
 
     /**
+     * To be called when we need to update the current term either because we
+     * received a message from someone with a more up-to-date term or because we
+     * just voted for someone
+     * <p>
+     * This information needs to be persisted so that on recovery the replica
+     * can start itself in the right term and know if it has already voted in
+     * that term or not
      *
      * @param currentTerm
      * @param votedFor
      *
      * @param currentTerm
      * @param votedFor
index 7814bad00b4d1423cea9847b06882d36138e0238..0ff2341c708431d8da540d625638c9d70bb3dd11 100644 (file)
@@ -99,7 +99,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
-        final String id1 = getSelf().path().toString();
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
             id, new ElectionTermImpl(),
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
             id, new ElectionTermImpl(),
@@ -157,20 +156,26 @@ public abstract class RaftActor extends UntypedPersistentActor {
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
+
             // TODO: Handle failure in saving the snapshot
             // TODO: Handle failure in saving the snapshot
-        } else if (message instanceof FindLeader){
-            getSender().tell(new FindLeaderReply(
-                context.getPeerAddress(currentBehavior.getLeaderId())),
-                getSelf());
+            // Maybe do retries on failure
 
         } else if (message instanceof AddRaftPeer){
 
         } else if (message instanceof AddRaftPeer){
+
+            // FIXME : Do not add raft peers like this.
+            // When adding a new Peer we have to ensure that the a majority of
+            // the peers know about the new Peer. Doing it this way may cause
+            // a situation where multiple Leaders may emerge
             AddRaftPeer arp = (AddRaftPeer)message;
            context.addToPeers(arp.getName(), arp.getAddress());
 
         } else if (message instanceof RemoveRaftPeer){
             AddRaftPeer arp = (AddRaftPeer)message;
            context.addToPeers(arp.getName(), arp.getAddress());
 
         } else if (message instanceof RemoveRaftPeer){
+
             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
             context.removePeer(rrp.getName());
             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
             context.removePeer(rrp.getName());
+
         } else {
         } else {
+
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
             currentBehavior = switchBehavior(state);
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
             currentBehavior = switchBehavior(state);
index 29d6a4557db8dde765accbfe767b9b4f8debd290..b7c8955aad982873ee02fff78f629b7f7bc1f1b5 100644 (file)
@@ -44,7 +44,10 @@ public interface ReplicatedLog {
     long lastTerm();
 
     /**
     long lastTerm();
 
     /**
-     * Remove all the entries from the logs >= index
+     * To be called when we need to remove entries from the in-memory log.
+     * This method will remove all entries >= index. This method should be used
+     * during recovery to appropriately trim the log based on persisted
+     * information
      *
      * @param index the index of the log entry
      */
      *
      * @param index the index of the log entry
      */
@@ -52,10 +55,14 @@ public interface ReplicatedLog {
 
 
     /**
 
 
     /**
-     * Remove all entries starting from the specified entry and persist the
-     * information to disk
+     * To be called when we need to remove entries from the in-memory log and we
+     * need that information persisted to disk. This method will remove all
+     * entries >= index.
+     * <p>
+     * The persisted information would then be used during recovery to properly
+     * reconstruct the state of the in-memory replicated log
      *
      *
-     * @param index
+     * @param index the index of the log entry
      */
     void removeFromAndPersist(long index);
 
      */
     void removeFromAndPersist(long index);
 
index 0c23db4c8a0184af45461ddfd3ec214ce117c460..ecd49012461a7b1ee76f4678c7451a4fa1edbfb1 100644 (file)
@@ -158,6 +158,8 @@ public class Candidate extends AbstractRaftActorBehavior {
         context.getLogger().debug("Starting new term " + (currentTerm+1));
 
         // Request for a vote
         context.getLogger().debug("Starting new term " + (currentTerm+1));
 
         // Request for a vote
+        // TODO: Retry request for vote if replies do not arrive in a reasonable
+        // amount of time TBD
         for (ActorSelection peerActor : peerToActor.values()) {
             peerActor.tell(new RequestVote(
                     context.getTermInformation().getCurrentTerm(),
         for (ActorSelection peerActor : peerToActor.values()) {
             peerActor.tell(new RequestVote(
                     context.getTermInformation().getCurrentTerm(),
index 74069a18e1f879e71040a171eef8497889fd0f8c..532201b26e8e6b64915c2a187d37b60a397fe2ca 100644 (file)
@@ -40,20 +40,33 @@ public class Follower extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
+        // TODO : Refactor this method into a bunch of smaller methods
+        // to make it easier to read. Before refactoring ensure tests
+        // cover the code properly
+
+        // 1. Reply false if term < currentTerm (§5.1)
+        // This is handled in the appendEntries method of the base class
+
         // If we got here then we do appear to be talking to the leader
         leaderId = appendEntries.getLeaderId();
 
         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
         // whose term matches prevLogTerm (§5.3)
         // If we got here then we do appear to be talking to the leader
         leaderId = appendEntries.getLeaderId();
 
         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
         // whose term matches prevLogTerm (§5.3)
+
         ReplicatedLogEntry previousEntry = context.getReplicatedLog()
             .get(appendEntries.getPrevLogIndex());
 
 
         ReplicatedLogEntry previousEntry = context.getReplicatedLog()
             .get(appendEntries.getPrevLogIndex());
 
 
-        boolean noMatchingTerms = true;
+        boolean outOfSync = true;
 
 
+        // First check if the logs are in sync or not
         if (lastIndex() == -1
             && appendEntries.getPrevLogIndex() != -1) {
 
         if (lastIndex() == -1
             && appendEntries.getPrevLogIndex() != -1) {
 
+            // The follower's log is out of sync because the leader does have
+            // an entry at prevLogIndex and this follower has no entries in
+            // it's log.
+
             context.getLogger().debug(
                 "The followers log is empty and the senders prevLogIndex is {}",
                 appendEntries.getPrevLogIndex());
             context.getLogger().debug(
                 "The followers log is empty and the senders prevLogIndex is {}",
                 appendEntries.getPrevLogIndex());
@@ -62,6 +75,9 @@ public class Follower extends AbstractRaftActorBehavior {
             && appendEntries.getPrevLogIndex() != -1
             && previousEntry == null) {
 
             && appendEntries.getPrevLogIndex() != -1
             && previousEntry == null) {
 
+            // The follower's log is out of sync because the Leader's
+            // prevLogIndex entry was not found in it's log
+
             context.getLogger().debug(
                 "The log is not empty but the prevLogIndex {} was not found in it",
                 appendEntries.getPrevLogIndex());
             context.getLogger().debug(
                 "The log is not empty but the prevLogIndex {} was not found in it",
                 appendEntries.getPrevLogIndex());
@@ -70,15 +86,21 @@ public class Follower extends AbstractRaftActorBehavior {
             && previousEntry != null
             && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
 
             && previousEntry != null
             && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
 
+            // The follower's log is out of sync because the Leader's
+            // prevLogIndex entry does exist in the follower's log but it has
+            // a different term in it
+
             context.getLogger().debug(
                 "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
                 , previousEntry.getTerm()
                 , appendEntries.getPrevLogTerm());
         } else {
             context.getLogger().debug(
                 "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
                 , previousEntry.getTerm()
                 , appendEntries.getPrevLogTerm());
         } else {
-            noMatchingTerms = false;
+            outOfSync = false;
         }
 
         }
 
-        if (noMatchingTerms) {
+        if (outOfSync) {
+            // We found that the log was out of sync so just send a negative
+            // reply and return
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -98,6 +120,9 @@ public class Follower extends AbstractRaftActorBehavior {
             // follow it (§5.3)
             int addEntriesFrom = 0;
             if (context.getReplicatedLog().size() > 0) {
             // follow it (§5.3)
             int addEntriesFrom = 0;
             if (context.getReplicatedLog().size() > 0) {
+
+                // Find the entry up until which the one that is not in the
+                // follower's log
                 for (int i = 0;
                      i < appendEntries.getEntries()
                          .size(); i++, addEntriesFrom++) {
                 for (int i = 0;
                      i < appendEntries.getEntries()
                          .size(); i++, addEntriesFrom++) {
@@ -111,20 +136,20 @@ public class Follower extends AbstractRaftActorBehavior {
                         break;
                     }
 
                         break;
                     }
 
-                    if (newEntry != null && newEntry.getTerm() == matchEntry
+                    if (newEntry.getTerm() == matchEntry
                         .getTerm()) {
                         continue;
                     }
                         .getTerm()) {
                         continue;
                     }
-                    if (newEntry != null && newEntry.getTerm() != matchEntry
-                        .getTerm()) {
-                        context.getLogger().debug(
-                            "Removing entries from log starting at "
-                                + matchEntry.getIndex()
-                        );
-                        context.getReplicatedLog()
-                            .removeFromAndPersist(matchEntry.getIndex());
-                        break;
-                    }
+
+                    context.getLogger().debug(
+                        "Removing entries from log starting at "
+                            + matchEntry.getIndex()
+                    );
+
+                    // Entries do not match so remove all subsequent entries
+                    context.getReplicatedLog()
+                        .removeFromAndPersist(matchEntry.getIndex());
+                    break;
                 }
             }
 
                 }
             }
 
@@ -136,6 +161,7 @@ public class Follower extends AbstractRaftActorBehavior {
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
+
                 context.getLogger().debug(
                     "Append entry to log " + appendEntries.getEntries().get(i).getData()
                         .toString()
                 context.getLogger().debug(
                     "Append entry to log " + appendEntries.getEntries().get(i).getData()
                         .toString()
index a9882a664767ad726c651611c6cff8ac078f6d1a..fb8be8b891a315b420c6778f1691ba4712a069d7 100644 (file)
@@ -137,6 +137,13 @@ public class Leader extends AbstractRaftActorBehavior {
             followerLogInformation
                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
         } else {
             followerLogInformation
                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
         } else {
+
+            // TODO: When we find that the follower is out of sync with the
+            // Leader we simply decrement that followers next index by 1.
+            // Would it be possible to do better than this? The RAFT spec
+            // does not explicitly deal with it but may be something for us to
+            // think about
+
             followerLogInformation.decrNextIndex();
         }
 
             followerLogInformation.decrNextIndex();
         }
 
@@ -215,7 +222,6 @@ public class Leader extends AbstractRaftActorBehavior {
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
             } else if (message instanceof InstallSnapshotReply){
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
             } else if (message instanceof InstallSnapshotReply){
-                // FIXME : Should I be checking the term here too?
                 handleInstallSnapshotReply(
                     (InstallSnapshotReply) message);
             }
                 handleInstallSnapshotReply(
                     (InstallSnapshotReply) message);
             }
@@ -281,6 +287,12 @@ public class Leader extends AbstractRaftActorBehavior {
             List<ReplicatedLogEntry> entries = Collections.emptyList();
 
             if(context.getReplicatedLog().isPresent(nextIndex)){
             List<ReplicatedLogEntry> entries = Collections.emptyList();
 
             if(context.getReplicatedLog().isPresent(nextIndex)){
+                // TODO: Instead of sending all entries from nextIndex
+                // only send a fixed number of entries to each follower
+                // This is to avoid the situation where there are a lot of
+                // entries to install for a fresh follower or to a follower
+                // that has fallen too far behind with the log but yet is not
+                // eligible to receive a snapshot
                 entries =
                     context.getReplicatedLog().getFrom(nextIndex);
             }
                 entries =
                     context.getReplicatedLog().getFrom(nextIndex);
             }
@@ -295,6 +307,11 @@ public class Leader extends AbstractRaftActorBehavior {
         }
     }
 
         }
     }
 
+    /**
+     * An installSnapshot is scheduled at a interval that is a multiple of
+     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+     * snapshots at every heartbeat.
+     */
     private void installSnapshotIfNeeded(){
         for (String followerId : followerToActor.keySet()) {
             ActorSelection followerActor =
     private void installSnapshotIfNeeded(){
         for (String followerId : followerToActor.keySet()) {
             ActorSelection followerActor =
index 1cf178bb006ad37d4c4bd84fa950dd81d7dece36..b7c371dd39722986b7601bec8cce1e0127ec71d4 100644 (file)
@@ -378,9 +378,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             assertEquals(4, log.last().getIndex() + 1);
             assertNotNull(log.get(2));
 
             assertEquals(4, log.last().getIndex() + 1);
             assertNotNull(log.get(2));
 
-            // Check that the entry at index 2 has the new data
+
             assertEquals("one", log.get(1).getData());
             assertEquals("one", log.get(1).getData());
+
+            // Check that the entry at index 2 has the new data
             assertEquals("two-1", log.get(2).getData());
             assertEquals("two-1", log.get(2).getData());
+
             assertEquals("three", log.get(3).getData());
             assertNotNull(log.get(3));
 
             assertEquals("three", log.get(3).getData());
             assertNotNull(log.get(3));