X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollower.java;h=efece88e28e406a2b587e51a76c424e2a1219f32;hb=refs%2Fchanges%2F64%2F42564%2F7;hp=da10f41ab3ae60d8cdff60a722e43e7b8e286b88;hpb=4816cd33c7b70012b11dd4efc6dbb01218f8cef0;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index da10f41ab3..efece88e28 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -11,8 +11,10 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -20,6 +22,7 @@ import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -50,6 +53,7 @@ public class Follower extends AbstractRaftActorBehavior { } }; + private final Stopwatch lastLeaderMessageTimer = Stopwatch.createUnstarted(); private SnapshotTracker snapshotTracker = null; private String leaderId; private short leaderPayloadVersion; @@ -65,12 +69,10 @@ public class Follower extends AbstractRaftActorBehavior { initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD); - if(canStartElection()) { - if (context.getPeerIds().isEmpty() && getLeaderId() == null) { - actor().tell(ElectionTimeout.INSTANCE, actor()); - } else { - scheduleElection(electionDuration()); - } + if (context.getPeerIds().isEmpty() && getLeaderId() == null) { + actor().tell(TimeoutNow.INSTANCE, actor()); + } else { + scheduleElection(electionDuration()); } } @@ -80,8 +82,8 @@ public class Follower extends AbstractRaftActorBehavior { } @VisibleForTesting - protected final void setLeaderId(final String leaderId) { - this.leaderId = Preconditions.checkNotNull(leaderId); + protected final void setLeaderId(@Nullable final String leaderId) { + this.leaderId = leaderId; } @Override @@ -94,31 +96,22 @@ public class Follower extends AbstractRaftActorBehavior { this.leaderPayloadVersion = leaderPayloadVersion; } - private boolean isLogEntryPresent(long index){ - if(index == context.getReplicatedLog().getSnapshotIndex()){ - return true; + private void restartLastLeaderMessageTimer() { + if (lastLeaderMessageTimer.isRunning()) { + lastLeaderMessageTimer.reset(); } - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(index); - - return previousEntry != null; - + lastLeaderMessageTimer.start(); } - private long getLogEntryTerm(long index){ - if(index == context.getReplicatedLog().getSnapshotIndex()){ - return context.getReplicatedLog().getSnapshotTerm(); + private boolean isLogEntryPresent(long index){ + if(context.getReplicatedLog().isInSnapshot(index)) { + return true; } - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(index); - - if(previousEntry != null){ - return previousEntry.getTerm(); - } + ReplicatedLogEntry entry = context.getReplicatedLog().get(index); + return entry != null; - return -1; } private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){ @@ -186,14 +179,20 @@ public class Follower extends AbstractRaftActorBehavior { // Find the entry up until the one that is not in the follower's log for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i); - ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex()); - if (newEntry == null) { - //newEntry not found in the log + if(!isLogEntryPresent(matchEntry.getIndex())) { + // newEntry not found in the log break; } - if (newEntry.getTerm() == matchEntry.getTerm()) { + long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex()); + + LOG.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry, + existingEntryTerm); + + // existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know + // what the term was so we'll assume it matches. + if(existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) { continue; } @@ -203,7 +202,18 @@ public class Follower extends AbstractRaftActorBehavior { matchEntry.getIndex()); // Entries do not match so remove all subsequent entries - context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex()); + if(!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) { + // Could not remove the entries - this means the matchEntry index must be in the + // snapshot and not the log. In this case the prior entries are part of the state + // so we must send back a reply to force a snapshot to completely re-sync the + // follower's log and state. + + LOG.debug("{}: Could not remove entries - sending reply to force snapshot", logName()); + sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, + lastTerm(), context.getPayloadVersion(), true), actor()); + return this; + } + break; } else { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, @@ -214,8 +224,8 @@ public class Follower extends AbstractRaftActorBehavior { } lastIndex = lastIndex(); - LOG.debug("{}: After cleanup entries to be added from = {}", logName(), - (addEntriesFrom + lastIndex)); + LOG.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(), + lastIndex, addEntriesFrom); // 4. Append any new entries not already in the log for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) { @@ -312,8 +322,7 @@ public class Follower extends AbstractRaftActorBehavior { logName(), prevLogTerm, appendEntries.getPrevLogTerm()); } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 && appendEntries.getReplicatedToAllIndex() != -1 - && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex()) - && !context.getReplicatedLog().isInSnapshot(appendEntries.getReplicatedToAllIndex())) { + && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) { // This append entry comes from a leader who has it's log aggressively trimmed and so does not have // the previous entry in it's in-memory journal @@ -322,8 +331,7 @@ public class Follower extends AbstractRaftActorBehavior { logName(), appendEntries.getReplicatedToAllIndex()); } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1 && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 - && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1) - && !context.getReplicatedLog().isInSnapshot(appendEntries.getEntries().get(0).getIndex() - 1)) { + && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) { LOG.debug( "{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal", logName(), appendEntries.getEntries().get(0).getIndex() - 1); @@ -347,13 +355,8 @@ public class Follower extends AbstractRaftActorBehavior { @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { - if (originalMessage instanceof ElectionTimeout) { - if (canStartElection()) { - LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName()); - return internalSwitchBehavior(RaftState.Candidate); - } else { - return this; - } + if (originalMessage instanceof ElectionTimeout || originalMessage instanceof TimeoutNow) { + return handleElectionTimeout(originalMessage); } final Object message = fromSerializableMessage(originalMessage); @@ -374,19 +377,50 @@ public class Follower extends AbstractRaftActorBehavior { } if (rpc instanceof InstallSnapshot) { - InstallSnapshot installSnapshot = (InstallSnapshot) rpc; - handleInstallSnapshot(sender, installSnapshot); + handleInstallSnapshot(sender, (InstallSnapshot) rpc); + restartLastLeaderMessageTimer(); scheduleElection(electionDuration()); return this; } if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) { + restartLastLeaderMessageTimer(); scheduleElection(electionDuration()); } return super.handleMessage(sender, rpc); } + private RaftActorBehavior handleElectionTimeout(Object message) { + // If the message is ElectionTimeout, verify we haven't actually seen a message from the leader + // during the election timeout interval. It may that the election timer expired b/c this actor + // was busy and messages got delayed, in which case leader messages would be backed up in the + // queue but would be processed before the ElectionTimeout message and thus would restart the + // lastLeaderMessageTimer. + long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS); + boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || lastLeaderMessageInterval >= + context.getConfigParams().getElectionTimeOutInterval().toMillis(); + + if(canStartElection()) { + if(message instanceof TimeoutNow || noLeaderMessageReceived) { + LOG.debug("{}: Received {} - switching to Candidate", logName(), message.getClass().getSimpleName()); + return internalSwitchBehavior(RaftState.Candidate); + } else { + LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout", + logName(), lastLeaderMessageInterval); + scheduleElection(electionDuration()); + } + } else if(message instanceof ElectionTimeout) { + if(noLeaderMessageReceived) { + setLeaderId(null); + } + + scheduleElection(electionDuration()); + } + + return this; + } + private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) { LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot); @@ -413,7 +447,7 @@ public class Follower extends AbstractRaftActorBehavior { installSnapshot.getLastIncludedTerm(), context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), - context.getPeerServerInfo(true)); + installSnapshot.getServerConfig().orNull()); ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { @Override