X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=e4d42661de3a9612d558e29685c55bf195d9a331;hb=refs%2Fchanges%2F35%2F41535%2F2;hp=da10f41ab3ae60d8cdff60a722e43e7b8e286b88;hpb=4816cd33c7b70012b11dd4efc6dbb01218f8cef0;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index da10f41ab3..e4d42661de 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -11,8 +11,8 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.util.ArrayList; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -65,12 +65,10 @@ public class Follower extends AbstractRaftActorBehavior { initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD); - if(canStartElection()) { - if (context.getPeerIds().isEmpty() && getLeaderId() == null) { - actor().tell(ElectionTimeout.INSTANCE, actor()); - } else { - scheduleElection(electionDuration()); - } + if (context.getPeerIds().isEmpty() && getLeaderId() == null) { + actor().tell(ElectionTimeout.INSTANCE, actor()); + } else { + scheduleElection(electionDuration()); } } @@ -80,8 +78,8 @@ public class Follower extends AbstractRaftActorBehavior { } @VisibleForTesting - protected final void setLeaderId(final String leaderId) { - this.leaderId = Preconditions.checkNotNull(leaderId); + protected final void setLeaderId(@Nullable final String leaderId) { + this.leaderId = leaderId; } @Override @@ -95,32 +93,15 @@ public class Follower extends AbstractRaftActorBehavior { } private boolean isLogEntryPresent(long index){ - if(index == context.getReplicatedLog().getSnapshotIndex()){ + if(context.getReplicatedLog().isInSnapshot(index)) { return true; } - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(index); - - return previousEntry != null; + ReplicatedLogEntry entry = context.getReplicatedLog().get(index); + return entry != null; } - private long getLogEntryTerm(long index){ - if(index == context.getReplicatedLog().getSnapshotIndex()){ - return context.getReplicatedLog().getSnapshotTerm(); - } - - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(index); - - if(previousEntry != null){ - return previousEntry.getTerm(); - } - - return -1; - } - private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){ initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex()); } @@ -186,14 +167,20 @@ public class Follower extends AbstractRaftActorBehavior { // Find the entry up until the one that is not in the follower's log for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i); - ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex()); - if (newEntry == null) { - //newEntry not found in the log + if(!isLogEntryPresent(matchEntry.getIndex())) { + // newEntry not found in the log break; } - if (newEntry.getTerm() == matchEntry.getTerm()) { + long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex()); + + LOG.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry, + existingEntryTerm); + + // existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know + // what the term was so we'll assume it matches. + if(existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) { continue; } @@ -203,7 +190,18 @@ public class Follower extends AbstractRaftActorBehavior { matchEntry.getIndex()); // Entries do not match so remove all subsequent entries - context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex()); + if(!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) { + // Could not remove the entries - this means the matchEntry index must be in the + // snapshot and not the log. In this case the prior entries are part of the state + // so we must send back a reply to force a snapshot to completely re-sync the + // follower's log and state. + + LOG.debug("{}: Could not remove entries - sending reply to force snapshot", logName()); + sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, + lastTerm(), context.getPayloadVersion(), true), actor()); + return this; + } + break; } else { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, @@ -214,8 +212,8 @@ public class Follower extends AbstractRaftActorBehavior { } lastIndex = lastIndex(); - LOG.debug("{}: After cleanup entries to be added from = {}", logName(), - (addEntriesFrom + lastIndex)); + LOG.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(), + lastIndex, addEntriesFrom); // 4. Append any new entries not already in the log for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { @@ -312,8 +310,7 @@ public class Follower extends AbstractRaftActorBehavior { logName(), prevLogTerm, appendEntries.getPrevLogTerm()); } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 && appendEntries.getReplicatedToAllIndex() != -1 - && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex()) - && !context.getReplicatedLog().isInSnapshot(appendEntries.getReplicatedToAllIndex())) { + && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) { // This append entry comes from a leader who has it's log aggressively trimmed and so does not have // the previous entry in it's in-memory journal @@ -322,8 +319,7 @@ public class Follower extends AbstractRaftActorBehavior { logName(), appendEntries.getReplicatedToAllIndex()); } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 - && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1) - && !context.getReplicatedLog().isInSnapshot(appendEntries.getEntries().get(0).getIndex() - 1)) { + && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) { LOG.debug( "{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal", logName(), appendEntries.getEntries().get(0).getIndex() - 1); @@ -352,6 +348,8 @@ public class Follower extends AbstractRaftActorBehavior { LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName()); return internalSwitchBehavior(RaftState.Candidate); } else { + setLeaderId(null); + scheduleElection(electionDuration()); return this; } } @@ -413,7 +411,7 @@ public class Follower extends AbstractRaftActorBehavior { installSnapshot.getLastIncludedTerm(), context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), - context.getPeerServerInfo(true)); + installSnapshot.getServerConfig().orNull()); ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { @Override