From cbf101a2dd94747d62a3078d6e50433096cccd4a Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Fri, 18 Jul 2014 05:03:48 -0700 Subject: [PATCH] Fix issue when adding new peers - There a couple of cases that were not being handled correctly in Follower#handleAppendEntries this had to be fixed - ReplicatedLogImpl#size was not properly reported Change-Id: I8af214cb46a5bae844d1e20e8788502255ffeab5 Signed-off-by: Moiz Raja --- .../cluster/example/ExampleActor.java | 4 +- .../cluster/example/messages/KeyValue.java | 7 +++ .../controller/cluster/raft/RaftActor.java | 20 +++++-- .../cluster/raft/behaviors/Follower.java | 55 +++++++++++++------ 4 files changed, 64 insertions(+), 22 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 914af5d9eb..8d4d5e48c8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -50,7 +50,9 @@ public class ExampleActor extends RaftActor { String persistId = Long.toString(persistIdentifier++); persistData(getSender(), persistId, message); } else { - getLeader().forward(message, getContext()); + if(getLeader() != null) { + getLeader().forward(message, getContext()); + } } } else if (message instanceof PrintState) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java index 05e9ba7eab..00cc09ae30 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java @@ -26,4 +26,11 @@ public class KeyValue implements Serializable{ public String getValue() { return value; } + + @Override public String toString() { + return "KeyValue{" + + "key='" + key + '\'' + + ", value='" + value + '\'' + + '}'; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index dd9572c9a7..9b7940838d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -131,8 +131,9 @@ public abstract class RaftActor extends UntypedPersistentActor { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; - LOG.debug("Applying state for log index {}", - applyState.getReplicatedLogEntry().getIndex()); + LOG.debug("Applying state for log index {} data {}", + applyState.getReplicatedLogEntry().getIndex(), + applyState.getReplicatedLogEntry().getData()); applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); @@ -183,11 +184,13 @@ public abstract class RaftActor extends UntypedPersistentActor { */ protected void persistData(ActorRef clientActor, String identifier, Object data) { - LOG.debug("Persist data " + identifier); + ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); + LOG.debug("Persist data {}", replicatedLogEntry); + replicatedLog .appendAndPersist(clientActor, identifier, replicatedLogEntry); } @@ -407,7 +410,7 @@ public abstract class RaftActor extends UntypedPersistentActor { final String identifier, final ReplicatedLogEntry replicatedLogEntry) { context.getLogger().debug( - "Append log entry and persist " + replicatedLogEntry.getIndex()); + "Append log entry and persist " + replicatedLogEntry); // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs journal.add(replicatedLogEntry); @@ -448,7 +451,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } @Override public long size() { - return journal.size() + snapshotIndex; + return journal.size() + snapshotIndex + 1; } @Override public boolean isPresent(long index) { @@ -510,6 +513,13 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public long getIndex() { return index; } + + @Override public String toString() { + return "Entry{" + + "index=" + index + + ", term=" + term + + '}'; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 1cfc2e0eb9..dd2f19b137 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -21,14 +21,13 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; /** * The behavior of a RaftActor in the Follower state - * + *

*

- * */ public class Follower extends AbstractRaftActorBehavior { public Follower(RaftActorContext context) { @@ -49,16 +48,36 @@ public class Follower extends AbstractRaftActorBehavior { .get(appendEntries.getPrevLogIndex()); - if (lastIndex() > -1 && previousEntry != null - && previousEntry.getTerm() != appendEntries - .getPrevLogTerm()) { + boolean noMatchingTerms = true; + + if (lastIndex() == -1 + && appendEntries.getPrevLogIndex() != -1) { + + context.getLogger().debug( + "The followers log is empty and the senders prevLogIndex is {}", + appendEntries.getPrevLogIndex()); + + } else if (lastIndex() > -1 + && appendEntries.getPrevLogIndex() != -1 + && previousEntry == null) { + + context.getLogger().debug( + "The log is not empty but the prevLogIndex {} was not found in it", + appendEntries.getPrevLogIndex()); + + } else if (lastIndex() > -1 + && previousEntry != null + && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) { context.getLogger().debug( - "Cannot append entries because previous entry term " - + previousEntry.getTerm() - + " is not equal to append entries prevLogTerm " - + appendEntries.getPrevLogTerm()); + "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}" + , previousEntry.getTerm() + , appendEntries.getPrevLogTerm()); + } else { + noMatchingTerms = false; + } + if (noMatchingTerms) { sender.tell( new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm()), actor() @@ -70,7 +89,8 @@ public class Follower extends AbstractRaftActorBehavior { && appendEntries.getEntries().size() > 0) { context.getLogger().debug( "Number of entries to be appended = " + appendEntries - .getEntries().size()); + .getEntries().size() + ); // 3. If an existing entry conflicts with a new one (same index // but different terms), delete the existing entry and all that @@ -98,7 +118,8 @@ public class Follower extends AbstractRaftActorBehavior { .getTerm()) { context.getLogger().debug( "Removing entries from log starting at " - + matchEntry.getIndex()); + + matchEntry.getIndex() + ); context.getReplicatedLog() .removeFrom(matchEntry.getIndex()); break; @@ -108,14 +129,16 @@ public class Follower extends AbstractRaftActorBehavior { context.getLogger().debug( "After cleanup entries to be added from = " + (addEntriesFrom - + lastIndex())); + + lastIndex()) + ); // 4. Append any new entries not already in the log for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { context.getLogger().debug( - "Append entry to log " + appendEntries.getEntries().get(i) - .toString()); + "Append entry to log " + appendEntries.getEntries().get(i).getData() + .toString() + ); context.getReplicatedLog() .appendAndPersist(appendEntries.getEntries().get(i)); } @@ -165,9 +188,9 @@ public class Follower extends AbstractRaftActorBehavior { } @Override public RaftState handleMessage(ActorRef sender, Object message) { - if(message instanceof ElectionTimeout){ + if (message instanceof ElectionTimeout) { return RaftState.Candidate; - } else if(message instanceof InstallSnapshot){ + } else if (message instanceof InstallSnapshot) { InstallSnapshot snapshot = (InstallSnapshot) message; actor().tell(new ApplySnapshot(snapshot), actor()); } -- 2.36.6