Enhance code documentation and add TODOs 04/9304/4
authorMoiz Raja <moraja@cisco.com>
Fri, 25 Jul 2014 00:47:41 +0000 (17:47 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 23:15:42 +0000 (16:15 -0700)
This commit is based on a code walk through done with Colin Dixon and
Abhishek Kumar.

TODOs and FIXMEs have been added to guide further development.

The major things that need to be completed are,
1. Installing snapshots
2. Adding and removing a new peer with consensus
3. Optimizing AppendEntries (faster synchronization)

Change-Id: Ic788e050fe8fa591176a927906004fd2277e29fa
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index 9c0a4fb..9f0d02e 100644 (file)
@@ -30,9 +30,8 @@ public interface ElectionTerm {
     String getVotedFor();
 
     /**
-     * Called when we need to update the current term either because we received
-     * a message from someone with a more uptodate term or because we just voted
-     * for someone
+     * To be called mainly when we are recovering in-memory election state from
+     * persistent storage
      *
      * @param currentTerm
      * @param votedFor
@@ -40,6 +39,13 @@ public interface ElectionTerm {
     void update(long currentTerm, String votedFor);
 
     /**
+     * To be called when we need to update the current term either because we
+     * received a message from someone with a more up-to-date term or because we
+     * just voted for someone
+     * <p>
+     * This information needs to be persisted so that on recovery the replica
+     * can start itself in the right term and know if it has already voted in
+     * that term or not
      *
      * @param currentTerm
      * @param votedFor
index 7814bad..0ff2341 100644 (file)
@@ -99,7 +99,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
-        final String id1 = getSelf().path().toString();
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
             id, new ElectionTermImpl(),
@@ -157,20 +156,26 @@ public abstract class RaftActor extends UntypedPersistentActor {
             trimPersistentData(success.metadata().sequenceNr());
 
         } else if (message instanceof SaveSnapshotFailure) {
+
             // TODO: Handle failure in saving the snapshot
-        } else if (message instanceof FindLeader){
-            getSender().tell(new FindLeaderReply(
-                context.getPeerAddress(currentBehavior.getLeaderId())),
-                getSelf());
+            // Maybe do retries on failure
 
         } else if (message instanceof AddRaftPeer){
+
+            // FIXME : Do not add raft peers like this.
+            // When adding a new Peer we have to ensure that the a majority of
+            // the peers know about the new Peer. Doing it this way may cause
+            // a situation where multiple Leaders may emerge
             AddRaftPeer arp = (AddRaftPeer)message;
            context.addToPeers(arp.getName(), arp.getAddress());
 
         } else if (message instanceof RemoveRaftPeer){
+
             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
             context.removePeer(rrp.getName());
+
         } else {
+
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
             currentBehavior = switchBehavior(state);
index 29d6a45..b7c8955 100644 (file)
@@ -44,7 +44,10 @@ public interface ReplicatedLog {
     long lastTerm();
 
     /**
-     * Remove all the entries from the logs >= index
+     * To be called when we need to remove entries from the in-memory log.
+     * This method will remove all entries >= index. This method should be used
+     * during recovery to appropriately trim the log based on persisted
+     * information
      *
      * @param index the index of the log entry
      */
@@ -52,10 +55,14 @@ public interface ReplicatedLog {
 
 
     /**
-     * Remove all entries starting from the specified entry and persist the
-     * information to disk
+     * To be called when we need to remove entries from the in-memory log and we
+     * need that information persisted to disk. This method will remove all
+     * entries >= index.
+     * <p>
+     * The persisted information would then be used during recovery to properly
+     * reconstruct the state of the in-memory replicated log
      *
-     * @param index
+     * @param index the index of the log entry
      */
     void removeFromAndPersist(long index);
 
index 0c23db4..ecd4901 100644 (file)
@@ -158,6 +158,8 @@ public class Candidate extends AbstractRaftActorBehavior {
         context.getLogger().debug("Starting new term " + (currentTerm+1));
 
         // Request for a vote
+        // TODO: Retry request for vote if replies do not arrive in a reasonable
+        // amount of time TBD
         for (ActorSelection peerActor : peerToActor.values()) {
             peerActor.tell(new RequestVote(
                     context.getTermInformation().getCurrentTerm(),
index 74069a1..532201b 100644 (file)
@@ -40,20 +40,33 @@ public class Follower extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
+        // TODO : Refactor this method into a bunch of smaller methods
+        // to make it easier to read. Before refactoring ensure tests
+        // cover the code properly
+
+        // 1. Reply false if term < currentTerm (§5.1)
+        // This is handled in the appendEntries method of the base class
+
         // If we got here then we do appear to be talking to the leader
         leaderId = appendEntries.getLeaderId();
 
         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
         // whose term matches prevLogTerm (§5.3)
+
         ReplicatedLogEntry previousEntry = context.getReplicatedLog()
             .get(appendEntries.getPrevLogIndex());
 
 
-        boolean noMatchingTerms = true;
+        boolean outOfSync = true;
 
+        // First check if the logs are in sync or not
         if (lastIndex() == -1
             && appendEntries.getPrevLogIndex() != -1) {
 
+            // The follower's log is out of sync because the leader does have
+            // an entry at prevLogIndex and this follower has no entries in
+            // it's log.
+
             context.getLogger().debug(
                 "The followers log is empty and the senders prevLogIndex is {}",
                 appendEntries.getPrevLogIndex());
@@ -62,6 +75,9 @@ public class Follower extends AbstractRaftActorBehavior {
             && appendEntries.getPrevLogIndex() != -1
             && previousEntry == null) {
 
+            // The follower's log is out of sync because the Leader's
+            // prevLogIndex entry was not found in it's log
+
             context.getLogger().debug(
                 "The log is not empty but the prevLogIndex {} was not found in it",
                 appendEntries.getPrevLogIndex());
@@ -70,15 +86,21 @@ public class Follower extends AbstractRaftActorBehavior {
             && previousEntry != null
             && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
 
+            // The follower's log is out of sync because the Leader's
+            // prevLogIndex entry does exist in the follower's log but it has
+            // a different term in it
+
             context.getLogger().debug(
                 "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
                 , previousEntry.getTerm()
                 , appendEntries.getPrevLogTerm());
         } else {
-            noMatchingTerms = false;
+            outOfSync = false;
         }
 
-        if (noMatchingTerms) {
+        if (outOfSync) {
+            // We found that the log was out of sync so just send a negative
+            // reply and return
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -98,6 +120,9 @@ public class Follower extends AbstractRaftActorBehavior {
             // follow it (§5.3)
             int addEntriesFrom = 0;
             if (context.getReplicatedLog().size() > 0) {
+
+                // Find the entry up until which the one that is not in the
+                // follower's log
                 for (int i = 0;
                      i < appendEntries.getEntries()
                          .size(); i++, addEntriesFrom++) {
@@ -111,20 +136,20 @@ public class Follower extends AbstractRaftActorBehavior {
                         break;
                     }
 
-                    if (newEntry != null && newEntry.getTerm() == matchEntry
+                    if (newEntry.getTerm() == matchEntry
                         .getTerm()) {
                         continue;
                     }
-                    if (newEntry != null && newEntry.getTerm() != matchEntry
-                        .getTerm()) {
-                        context.getLogger().debug(
-                            "Removing entries from log starting at "
-                                + matchEntry.getIndex()
-                        );
-                        context.getReplicatedLog()
-                            .removeFromAndPersist(matchEntry.getIndex());
-                        break;
-                    }
+
+                    context.getLogger().debug(
+                        "Removing entries from log starting at "
+                            + matchEntry.getIndex()
+                    );
+
+                    // Entries do not match so remove all subsequent entries
+                    context.getReplicatedLog()
+                        .removeFromAndPersist(matchEntry.getIndex());
+                    break;
                 }
             }
 
@@ -136,6 +161,7 @@ public class Follower extends AbstractRaftActorBehavior {
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
+
                 context.getLogger().debug(
                     "Append entry to log " + appendEntries.getEntries().get(i).getData()
                         .toString()
index a9882a6..fb8be8b 100644 (file)
@@ -137,6 +137,13 @@ public class Leader extends AbstractRaftActorBehavior {
             followerLogInformation
                 .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
         } else {
+
+            // TODO: When we find that the follower is out of sync with the
+            // Leader we simply decrement that followers next index by 1.
+            // Would it be possible to do better than this? The RAFT spec
+            // does not explicitly deal with it but may be something for us to
+            // think about
+
             followerLogInformation.decrNextIndex();
         }
 
@@ -215,7 +222,6 @@ public class Leader extends AbstractRaftActorBehavior {
             } else if (message instanceof Replicate) {
                 replicate((Replicate) message);
             } else if (message instanceof InstallSnapshotReply){
-                // FIXME : Should I be checking the term here too?
                 handleInstallSnapshotReply(
                     (InstallSnapshotReply) message);
             }
@@ -281,6 +287,12 @@ public class Leader extends AbstractRaftActorBehavior {
             List<ReplicatedLogEntry> entries = Collections.emptyList();
 
             if(context.getReplicatedLog().isPresent(nextIndex)){
+                // TODO: Instead of sending all entries from nextIndex
+                // only send a fixed number of entries to each follower
+                // This is to avoid the situation where there are a lot of
+                // entries to install for a fresh follower or to a follower
+                // that has fallen too far behind with the log but yet is not
+                // eligible to receive a snapshot
                 entries =
                     context.getReplicatedLog().getFrom(nextIndex);
             }
@@ -295,6 +307,11 @@ public class Leader extends AbstractRaftActorBehavior {
         }
     }
 
+    /**
+     * An installSnapshot is scheduled at a interval that is a multiple of
+     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+     * snapshots at every heartbeat.
+     */
     private void installSnapshotIfNeeded(){
         for (String followerId : followerToActor.keySet()) {
             ActorSelection followerActor =
index 1cf178b..b7c371d 100644 (file)
@@ -378,9 +378,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             assertEquals(4, log.last().getIndex() + 1);
             assertNotNull(log.get(2));
 
-            // Check that the entry at index 2 has the new data
+
             assertEquals("one", log.get(1).getData());
+
+            // Check that the entry at index 2 has the new data
             assertEquals("two-1", log.get(2).getData());
+
             assertEquals("three", log.get(3).getData());
             assertNotNull(log.get(3));
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.