X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=efece88e28e406a2b587e51a76c424e2a1219f32;hb=refs%2Fchanges%2F64%2F42564%2F7;hp=02b5d7e72cb3da238c5aae0b91382a3dd1d8fb35;hpb=ad860a3b51c31b740aabb297727e15aa45756777;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 02b5d7e72c..efece88e28 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -11,8 +11,10 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -20,6 +22,7 @@ import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -50,6 +53,7 @@ public class Follower extends AbstractRaftActorBehavior { } }; + private final Stopwatch lastLeaderMessageTimer = Stopwatch.createUnstarted(); private SnapshotTracker snapshotTracker = null; private String leaderId; private short leaderPayloadVersion; @@ -65,12 +69,10 @@ public class Follower extends AbstractRaftActorBehavior { initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD); - if(canStartElection()) { - if (context.getPeerIds().isEmpty() && getLeaderId() == null) { - actor().tell(ElectionTimeout.INSTANCE, actor()); - } else { - scheduleElection(electionDuration()); - } + if (context.getPeerIds().isEmpty() && getLeaderId() == null) { + actor().tell(TimeoutNow.INSTANCE, actor()); + } else { + scheduleElection(electionDuration()); } } @@ -80,8 +82,8 @@ public class Follower extends AbstractRaftActorBehavior { } @VisibleForTesting - protected final void setLeaderId(final String leaderId) { - this.leaderId = Preconditions.checkNotNull(leaderId); + protected final void setLeaderId(@Nullable final String leaderId) { + this.leaderId = leaderId; } @Override @@ -94,31 +96,22 @@ public class Follower extends AbstractRaftActorBehavior { this.leaderPayloadVersion = leaderPayloadVersion; } - private boolean isLogEntryPresent(long index){ - if(context.getReplicatedLog().isInSnapshot(index)) { - return true; + private void restartLastLeaderMessageTimer() { + if (lastLeaderMessageTimer.isRunning()) { + lastLeaderMessageTimer.reset(); } - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(index); - - return previousEntry != null; - + lastLeaderMessageTimer.start(); } - private long getLogEntryTerm(long index){ - if(index == context.getReplicatedLog().getSnapshotIndex()){ - return context.getReplicatedLog().getSnapshotTerm(); + private boolean isLogEntryPresent(long index){ + if(context.getReplicatedLog().isInSnapshot(index)) { + return true; } - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(index); + ReplicatedLogEntry entry = context.getReplicatedLog().get(index); + return entry != null; - if(previousEntry != null){ - return previousEntry.getTerm(); - } - - return -1; } private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){ @@ -186,14 +179,20 @@ public class Follower extends AbstractRaftActorBehavior { // Find the entry up until the one that is not in the follower's log for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i); - ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex()); - if (newEntry == null) { - //newEntry not found in the log + if(!isLogEntryPresent(matchEntry.getIndex())) { + // newEntry not found in the log break; } - if (newEntry.getTerm() == matchEntry.getTerm()) { + long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex()); + + LOG.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry, + existingEntryTerm); + + // existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know + // what the term was so we'll assume it matches. + if(existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) { continue; } @@ -203,7 +202,18 @@ public class Follower extends AbstractRaftActorBehavior { matchEntry.getIndex()); // Entries do not match so remove all subsequent entries - context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex()); + if(!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) { + // Could not remove the entries - this means the matchEntry index must be in the + // snapshot and not the log. In this case the prior entries are part of the state + // so we must send back a reply to force a snapshot to completely re-sync the + // follower's log and state. + + LOG.debug("{}: Could not remove entries - sending reply to force snapshot", logName()); + sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, + lastTerm(), context.getPayloadVersion(), true), actor()); + return this; + } + break; } else { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, @@ -214,8 +224,8 @@ public class Follower extends AbstractRaftActorBehavior { } lastIndex = lastIndex(); - LOG.debug("{}: After cleanup entries to be added from = {}", logName(), - (addEntriesFrom + lastIndex)); + LOG.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(), + lastIndex, addEntriesFrom); // 4. Append any new entries not already in the log for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { @@ -345,13 +355,8 @@ public class Follower extends AbstractRaftActorBehavior { @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { - if (originalMessage instanceof ElectionTimeout) { - if (canStartElection()) { - LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName()); - return internalSwitchBehavior(RaftState.Candidate); - } else { - return this; - } + if (originalMessage instanceof ElectionTimeout || originalMessage instanceof TimeoutNow) { + return handleElectionTimeout(originalMessage); } final Object message = fromSerializableMessage(originalMessage); @@ -372,19 +377,50 @@ public class Follower extends AbstractRaftActorBehavior { } if (rpc instanceof InstallSnapshot) { - InstallSnapshot installSnapshot = (InstallSnapshot) rpc; - handleInstallSnapshot(sender, installSnapshot); + handleInstallSnapshot(sender, (InstallSnapshot) rpc); + restartLastLeaderMessageTimer(); scheduleElection(electionDuration()); return this; } if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) { + restartLastLeaderMessageTimer(); scheduleElection(electionDuration()); } return super.handleMessage(sender, rpc); } + private RaftActorBehavior handleElectionTimeout(Object message) { + // If the message is ElectionTimeout, verify we haven't actually seen a message from the leader + // during the election timeout interval. It may that the election timer expired b/c this actor + // was busy and messages got delayed, in which case leader messages would be backed up in the + // queue but would be processed before the ElectionTimeout message and thus would restart the + // lastLeaderMessageTimer. + long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS); + boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || lastLeaderMessageInterval >= + context.getConfigParams().getElectionTimeOutInterval().toMillis(); + + if(canStartElection()) { + if(message instanceof TimeoutNow || noLeaderMessageReceived) { + LOG.debug("{}: Received {} - switching to Candidate", logName(), message.getClass().getSimpleName()); + return internalSwitchBehavior(RaftState.Candidate); + } else { + LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout", + logName(), lastLeaderMessageInterval); + scheduleElection(electionDuration()); + } + } else if(message instanceof ElectionTimeout) { + if(noLeaderMessageReceived) { + setLeaderId(null); + } + + scheduleElection(electionDuration()); + } + + return this; + } + private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) { LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);