Fix issue when adding new peers 45/9145/4
authorMoiz Raja <moraja@cisco.com>
Fri, 18 Jul 2014 12:03:48 +0000 (05:03 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 22:44:46 +0000 (15:44 -0700)
- There a couple of cases that were not being handled correctly
in Follower#handleAppendEntries this had to be fixed
- ReplicatedLogImpl#size was not properly reported

Change-Id: I8af214cb46a5bae844d1e20e8788502255ffeab5
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java

index 914af5d..8d4d5e4 100644 (file)
@@ -50,7 +50,9 @@ public class ExampleActor extends RaftActor {
                 String persistId = Long.toString(persistIdentifier++);
                 persistData(getSender(), persistId, message);
             } else {
-                getLeader().forward(message, getContext());
+                if(getLeader() != null) {
+                    getLeader().forward(message, getContext());
+                }
             }
 
         } else if (message instanceof PrintState) {
index 05e9ba7..00cc09a 100644 (file)
@@ -26,4 +26,11 @@ public class KeyValue implements Serializable{
     public String getValue() {
         return value;
     }
+
+    @Override public String toString() {
+        return "KeyValue{" +
+            "key='" + key + '\'' +
+            ", value='" + value + '\'' +
+            '}';
+    }
 }
index dd9572c..9b79408 100644 (file)
@@ -131,8 +131,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
-            LOG.debug("Applying state for log index {}",
-                applyState.getReplicatedLogEntry().getIndex());
+            LOG.debug("Applying state for log index {} data {}",
+                applyState.getReplicatedLogEntry().getIndex(),
+                applyState.getReplicatedLogEntry().getData());
 
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
@@ -183,11 +184,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     protected void persistData(ActorRef clientActor, String identifier,
         Object data) {
-        LOG.debug("Persist data " + identifier);
+
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
 
+        LOG.debug("Persist data {}", replicatedLogEntry);
+
         replicatedLog
             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
     }
@@ -407,7 +410,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
             context.getLogger().debug(
-                "Append log entry and persist " + replicatedLogEntry.getIndex());
+                "Append log entry and persist " + replicatedLogEntry);
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
             journal.add(replicatedLogEntry);
 
@@ -448,7 +451,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
 
         @Override public long size() {
-            return journal.size() + snapshotIndex;
+            return journal.size() + snapshotIndex + 1;
         }
 
         @Override public boolean isPresent(long index) {
@@ -510,6 +513,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
         @Override public long getIndex() {
             return index;
         }
+
+        @Override public String toString() {
+            return "Entry{" +
+                "index=" + index +
+                ", term=" + term +
+                '}';
+        }
     }
 
 
index 1cfc2e0..dd2f19b 100644 (file)
@@ -21,14 +21,13 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
 /**
  * The behavior of a RaftActor in the Follower state
- *
+ * <p/>
  * <ul>
  * <li> Respond to RPCs from candidates and leaders
  * <li> If election timeout elapses without receiving AppendEntries
  * RPC from current leader or granting vote to candidate:
  * convert to candidate
  * </ul>
- *
  */
 public class Follower extends AbstractRaftActorBehavior {
     public Follower(RaftActorContext context) {
@@ -49,16 +48,36 @@ public class Follower extends AbstractRaftActorBehavior {
             .get(appendEntries.getPrevLogIndex());
 
 
-        if (lastIndex() > -1 && previousEntry != null
-            && previousEntry.getTerm() != appendEntries
-            .getPrevLogTerm()) {
+        boolean noMatchingTerms = true;
+
+        if (lastIndex() == -1
+            && appendEntries.getPrevLogIndex() != -1) {
+
+            context.getLogger().debug(
+                "The followers log is empty and the senders prevLogIndex is {}",
+                appendEntries.getPrevLogIndex());
+
+        } else if (lastIndex() > -1
+            && appendEntries.getPrevLogIndex() != -1
+            && previousEntry == null) {
+
+            context.getLogger().debug(
+                "The log is not empty but the prevLogIndex {} was not found in it",
+                appendEntries.getPrevLogIndex());
+
+        } else if (lastIndex() > -1
+            && previousEntry != null
+            && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
 
             context.getLogger().debug(
-                "Cannot append entries because previous entry term "
-                    + previousEntry.getTerm()
-                    + " is not equal to append entries prevLogTerm "
-                    + appendEntries.getPrevLogTerm());
+                "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
+                , previousEntry.getTerm()
+                , appendEntries.getPrevLogTerm());
+        } else {
+            noMatchingTerms = false;
+        }
 
+        if (noMatchingTerms) {
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
@@ -70,7 +89,8 @@ public class Follower extends AbstractRaftActorBehavior {
             && appendEntries.getEntries().size() > 0) {
             context.getLogger().debug(
                 "Number of entries to be appended = " + appendEntries
-                    .getEntries().size());
+                    .getEntries().size()
+            );
 
             // 3. If an existing entry conflicts with a new one (same index
             // but different terms), delete the existing entry and all that
@@ -98,7 +118,8 @@ public class Follower extends AbstractRaftActorBehavior {
                         .getTerm()) {
                         context.getLogger().debug(
                             "Removing entries from log starting at "
-                                + matchEntry.getIndex());
+                                + matchEntry.getIndex()
+                        );
                         context.getReplicatedLog()
                             .removeFrom(matchEntry.getIndex());
                         break;
@@ -108,14 +129,16 @@ public class Follower extends AbstractRaftActorBehavior {
 
             context.getLogger().debug(
                 "After cleanup entries to be added from = " + (addEntriesFrom
-                    + lastIndex()));
+                    + lastIndex())
+            );
 
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
                 context.getLogger().debug(
-                    "Append entry to log " + appendEntries.getEntries().get(i)
-                        .toString());
+                    "Append entry to log " + appendEntries.getEntries().get(i).getData()
+                        .toString()
+                );
                 context.getReplicatedLog()
                     .appendAndPersist(appendEntries.getEntries().get(i));
             }
@@ -165,9 +188,9 @@ public class Follower extends AbstractRaftActorBehavior {
     }
 
     @Override public RaftState handleMessage(ActorRef sender, Object message) {
-        if(message instanceof ElectionTimeout){
+        if (message instanceof ElectionTimeout) {
             return RaftState.Candidate;
-        } else if(message instanceof InstallSnapshot){
+        } else if (message instanceof InstallSnapshot) {
             InstallSnapshot snapshot = (InstallSnapshot) message;
             actor().tell(new ApplySnapshot(snapshot), actor());
         }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.