From d04b6dc0d4f1eb7e53b95048d41bd11ff35a3fa9 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Thu, 24 Jul 2014 17:47:41 -0700 Subject: [PATCH] Enhance code documentation and add TODOs This commit is based on a code walk through done with Colin Dixon and Abhishek Kumar. TODOs and FIXMEs have been added to guide further development. The major things that need to be completed are, 1. Installing snapshots 2. Adding and removing a new peer with consensus 3. Optimizing AppendEntries (faster synchronization) Change-Id: Ic788e050fe8fa591176a927906004fd2277e29fa Signed-off-by: Moiz Raja --- .../controller/cluster/raft/ElectionTerm.java | 12 +++-- .../controller/cluster/raft/RaftActor.java | 15 ++++-- .../cluster/raft/ReplicatedLog.java | 15 ++++-- .../cluster/raft/behaviors/Candidate.java | 2 + .../cluster/raft/behaviors/Follower.java | 54 ++++++++++++++----- .../cluster/raft/behaviors/Leader.java | 19 ++++++- .../cluster/raft/behaviors/FollowerTest.java | 5 +- 7 files changed, 94 insertions(+), 28 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java index 9c0a4fbffd..9f0d02edb9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java @@ -30,9 +30,8 @@ public interface ElectionTerm { String getVotedFor(); /** - * Called when we need to update the current term either because we received - * a message from someone with a more uptodate term or because we just voted - * for someone + * To be called mainly when we are recovering in-memory election state from + * persistent storage * * @param currentTerm * @param votedFor @@ -40,6 +39,13 @@ public interface ElectionTerm { void update(long currentTerm, String votedFor); /** + * To be called when we need to update the current term either because we + * received a message from someone with a more up-to-date term or because we + * just voted for someone + *

+ * This information needs to be persisted so that on recovery the replica + * can start itself in the right term and know if it has already voted in + * that term or not * * @param currentTerm * @param votedFor diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 7814bad00b..0ff2341c70 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -99,7 +99,6 @@ public abstract class RaftActor extends UntypedPersistentActor { public RaftActor(String id, Map peerAddresses) { - final String id1 = getSelf().path().toString(); context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(), @@ -157,20 +156,26 @@ public abstract class RaftActor extends UntypedPersistentActor { trimPersistentData(success.metadata().sequenceNr()); } else if (message instanceof SaveSnapshotFailure) { + // TODO: Handle failure in saving the snapshot - } else if (message instanceof FindLeader){ - getSender().tell(new FindLeaderReply( - context.getPeerAddress(currentBehavior.getLeaderId())), - getSelf()); + // Maybe do retries on failure } else if (message instanceof AddRaftPeer){ + + // FIXME : Do not add raft peers like this. + // When adding a new Peer we have to ensure that the a majority of + // the peers know about the new Peer. Doing it this way may cause + // a situation where multiple Leaders may emerge AddRaftPeer arp = (AddRaftPeer)message; context.addToPeers(arp.getName(), arp.getAddress()); } else if (message instanceof RemoveRaftPeer){ + RemoveRaftPeer rrp = (RemoveRaftPeer)message; context.removePeer(rrp.getName()); + } else { + RaftState state = currentBehavior.handleMessage(getSender(), message); currentBehavior = switchBehavior(state); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 29d6a4557d..b7c8955aad 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -44,7 +44,10 @@ public interface ReplicatedLog { long lastTerm(); /** - * Remove all the entries from the logs >= index + * To be called when we need to remove entries from the in-memory log. + * This method will remove all entries >= index. This method should be used + * during recovery to appropriately trim the log based on persisted + * information * * @param index the index of the log entry */ @@ -52,10 +55,14 @@ public interface ReplicatedLog { /** - * Remove all entries starting from the specified entry and persist the - * information to disk + * To be called when we need to remove entries from the in-memory log and we + * need that information persisted to disk. This method will remove all + * entries >= index. + *

+ * The persisted information would then be used during recovery to properly + * reconstruct the state of the in-memory replicated log * - * @param index + * @param index the index of the log entry */ void removeFromAndPersist(long index); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 0c23db4c8a..ecd4901246 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -158,6 +158,8 @@ public class Candidate extends AbstractRaftActorBehavior { context.getLogger().debug("Starting new term " + (currentTerm+1)); // Request for a vote + // TODO: Retry request for vote if replies do not arrive in a reasonable + // amount of time TBD for (ActorSelection peerActor : peerToActor.values()) { peerActor.tell(new RequestVote( context.getTermInformation().getCurrentTerm(), diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 74069a18e1..532201b26e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -40,20 +40,33 @@ public class Follower extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { + // TODO : Refactor this method into a bunch of smaller methods + // to make it easier to read. Before refactoring ensure tests + // cover the code properly + + // 1. Reply false if term < currentTerm (§5.1) + // This is handled in the appendEntries method of the base class + // If we got here then we do appear to be talking to the leader leaderId = appendEntries.getLeaderId(); // 2. Reply false if log doesn’t contain an entry at prevLogIndex // whose term matches prevLogTerm (§5.3) + ReplicatedLogEntry previousEntry = context.getReplicatedLog() .get(appendEntries.getPrevLogIndex()); - boolean noMatchingTerms = true; + boolean outOfSync = true; + // First check if the logs are in sync or not if (lastIndex() == -1 && appendEntries.getPrevLogIndex() != -1) { + // The follower's log is out of sync because the leader does have + // an entry at prevLogIndex and this follower has no entries in + // it's log. + context.getLogger().debug( "The followers log is empty and the senders prevLogIndex is {}", appendEntries.getPrevLogIndex()); @@ -62,6 +75,9 @@ public class Follower extends AbstractRaftActorBehavior { && appendEntries.getPrevLogIndex() != -1 && previousEntry == null) { + // The follower's log is out of sync because the Leader's + // prevLogIndex entry was not found in it's log + context.getLogger().debug( "The log is not empty but the prevLogIndex {} was not found in it", appendEntries.getPrevLogIndex()); @@ -70,15 +86,21 @@ public class Follower extends AbstractRaftActorBehavior { && previousEntry != null && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) { + // The follower's log is out of sync because the Leader's + // prevLogIndex entry does exist in the follower's log but it has + // a different term in it + context.getLogger().debug( "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}" , previousEntry.getTerm() , appendEntries.getPrevLogTerm()); } else { - noMatchingTerms = false; + outOfSync = false; } - if (noMatchingTerms) { + if (outOfSync) { + // We found that the log was out of sync so just send a negative + // reply and return sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() @@ -98,6 +120,9 @@ public class Follower extends AbstractRaftActorBehavior { // follow it (§5.3) int addEntriesFrom = 0; if (context.getReplicatedLog().size() > 0) { + + // Find the entry up until which the one that is not in the + // follower's log for (int i = 0; i < appendEntries.getEntries() .size(); i++, addEntriesFrom++) { @@ -111,20 +136,20 @@ public class Follower extends AbstractRaftActorBehavior { break; } - if (newEntry != null && newEntry.getTerm() == matchEntry + if (newEntry.getTerm() == matchEntry .getTerm()) { continue; } - if (newEntry != null && newEntry.getTerm() != matchEntry - .getTerm()) { - context.getLogger().debug( - "Removing entries from log starting at " - + matchEntry.getIndex() - ); - context.getReplicatedLog() - .removeFromAndPersist(matchEntry.getIndex()); - break; - } + + context.getLogger().debug( + "Removing entries from log starting at " + + matchEntry.getIndex() + ); + + // Entries do not match so remove all subsequent entries + context.getReplicatedLog() + .removeFromAndPersist(matchEntry.getIndex()); + break; } } @@ -136,6 +161,7 @@ public class Follower extends AbstractRaftActorBehavior { // 4. Append any new entries not already in the log for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { + context.getLogger().debug( "Append entry to log " + appendEntries.getEntries().get(i).getData() .toString() diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index a9882a6647..fb8be8b891 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -137,6 +137,13 @@ public class Leader extends AbstractRaftActorBehavior { followerLogInformation .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); } else { + + // TODO: When we find that the follower is out of sync with the + // Leader we simply decrement that followers next index by 1. + // Would it be possible to do better than this? The RAFT spec + // does not explicitly deal with it but may be something for us to + // think about + followerLogInformation.decrNextIndex(); } @@ -215,7 +222,6 @@ public class Leader extends AbstractRaftActorBehavior { } else if (message instanceof Replicate) { replicate((Replicate) message); } else if (message instanceof InstallSnapshotReply){ - // FIXME : Should I be checking the term here too? handleInstallSnapshotReply( (InstallSnapshotReply) message); } @@ -281,6 +287,12 @@ public class Leader extends AbstractRaftActorBehavior { List entries = Collections.emptyList(); if(context.getReplicatedLog().isPresent(nextIndex)){ + // TODO: Instead of sending all entries from nextIndex + // only send a fixed number of entries to each follower + // This is to avoid the situation where there are a lot of + // entries to install for a fresh follower or to a follower + // that has fallen too far behind with the log but yet is not + // eligible to receive a snapshot entries = context.getReplicatedLog().getFrom(nextIndex); } @@ -295,6 +307,11 @@ public class Leader extends AbstractRaftActorBehavior { } } + /** + * An installSnapshot is scheduled at a interval that is a multiple of + * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing + * snapshots at every heartbeat. + */ private void installSnapshotIfNeeded(){ for (String followerId : followerToActor.keySet()) { ActorSelection followerActor = diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 1cf178bb00..b7c371dd39 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -378,9 +378,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals(4, log.last().getIndex() + 1); assertNotNull(log.get(2)); - // Check that the entry at index 2 has the new data + assertEquals("one", log.get(1).getData()); + + // Check that the entry at index 2 has the new data assertEquals("two-1", log.get(2).getData()); + assertEquals("three", log.get(3).getData()); assertNotNull(log.get(3)); -- 2.36.6