X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=e4d42661de3a9612d558e29685c55bf195d9a331;hb=refs%2Fchanges%2F35%2F41535%2F2;hp=a9d1b8233e5aa2301998375652fea36fe48442ee;hpb=23b10ec4ddfdd9348c2abe7dbcfbed3b49db3dc6;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index a9d1b8233e..e4d42661de 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -11,8 +11,8 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.util.ArrayList; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -65,12 +65,10 @@ public class Follower extends AbstractRaftActorBehavior { initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD); - if(canStartElection()) { - if (context.getPeerIds().isEmpty() && getLeaderId() == null) { - actor().tell(ElectionTimeout.INSTANCE, actor()); - } else { - scheduleElection(electionDuration()); - } + if (context.getPeerIds().isEmpty() && getLeaderId() == null) { + actor().tell(ElectionTimeout.INSTANCE, actor()); + } else { + scheduleElection(electionDuration()); } } @@ -80,8 +78,8 @@ public class Follower extends AbstractRaftActorBehavior { } @VisibleForTesting - protected final void setLeaderId(final String leaderId) { - this.leaderId = Preconditions.checkNotNull(leaderId); + protected final void setLeaderId(@Nullable final String leaderId) { + this.leaderId = leaderId; } @Override @@ -99,28 +97,11 @@ public class Follower extends AbstractRaftActorBehavior { return true; } - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(index); - - return previousEntry != null; + ReplicatedLogEntry entry = context.getReplicatedLog().get(index); + return entry != null; } - private long getLogEntryTerm(long index){ - if(index == context.getReplicatedLog().getSnapshotIndex()){ - return context.getReplicatedLog().getSnapshotTerm(); - } - - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(index); - - if(previousEntry != null){ - return previousEntry.getTerm(); - } - - return -1; - } - private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){ initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex()); } @@ -186,14 +167,20 @@ public class Follower extends AbstractRaftActorBehavior { // Find the entry up until the one that is not in the follower's log for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i); - ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex()); - if (newEntry == null) { - //newEntry not found in the log + if(!isLogEntryPresent(matchEntry.getIndex())) { + // newEntry not found in the log break; } - if (newEntry.getTerm() == matchEntry.getTerm()) { + long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex()); + + LOG.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry, + existingEntryTerm); + + // existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know + // what the term was so we'll assume it matches. + if(existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) { continue; } @@ -203,7 +190,18 @@ public class Follower extends AbstractRaftActorBehavior { matchEntry.getIndex()); // Entries do not match so remove all subsequent entries - context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex()); + if(!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) { + // Could not remove the entries - this means the matchEntry index must be in the + // snapshot and not the log. In this case the prior entries are part of the state + // so we must send back a reply to force a snapshot to completely re-sync the + // follower's log and state. + + LOG.debug("{}: Could not remove entries - sending reply to force snapshot", logName()); + sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, + lastTerm(), context.getPayloadVersion(), true), actor()); + return this; + } + break; } else { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, @@ -214,8 +212,8 @@ public class Follower extends AbstractRaftActorBehavior { } lastIndex = lastIndex(); - LOG.debug("{}: After cleanup entries to be added from = {}", logName(), - (addEntriesFrom + lastIndex)); + LOG.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(), + lastIndex, addEntriesFrom); // 4. Append any new entries not already in the log for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { @@ -350,6 +348,8 @@ public class Follower extends AbstractRaftActorBehavior { LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName()); return internalSwitchBehavior(RaftState.Candidate); } else { + setLeaderId(null); + scheduleElection(electionDuration()); return this; } } @@ -411,7 +411,7 @@ public class Follower extends AbstractRaftActorBehavior { installSnapshot.getLastIncludedTerm(), context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), - context.getPeerServerInfo(true)); + installSnapshot.getServerConfig().orNull()); ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { @Override