package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
-
- private SnapshotTracker snapshotTracker = null;
+ private static final int SYNC_THRESHOLD = 10;
private final SyncStatusTracker initialSyncStatusTracker;
- private static final int SYNC_THRESHOLD = 10;
+ private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
+ @Override
+ public void apply(ReplicatedLogEntry logEntry) {
+ context.getReplicatedLog().captureSnapshotIfReady(logEntry);
+ }
+ };
+
+ private SnapshotTracker snapshotTracker = null;
public Follower(RaftActorContext context) {
+ this(context, null);
+ }
+
+ public Follower(RaftActorContext context, String initialLeaderId) {
super(context, RaftState.Follower);
+ leaderId = initialLeaderId;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
- if(context.getRaftPolicy().automaticElectionsEnabled()) {
- if (context.getPeerIds().isEmpty()) {
+ if(canStartElection()) {
+ if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
actor().tell(ELECTION_TIMEOUT, actor());
} else {
scheduleElection(electionDuration());
LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
- context.getReplicatedLog().appendAndPersist(entry);
+ context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback);
if(entry.getData() instanceof ServerConfigurationPayload) {
context.updatePeerIds((ServerConfigurationPayload)entry.getData());
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
- LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
- logName(), appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, snapshotIndex: {}",
+ logName(), appendEntries.getPrevLogIndex(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
} else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
// The follower's log is out of sync because the Leader's
logName(), prevLogTerm, appendEntries.getPrevLogTerm());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
- && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
+ && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getReplicatedToAllIndex())) {
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
"{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
- && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 &&
- !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)){
+ && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
+ && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getEntries().get(0).getIndex() - 1)) {
LOG.debug(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
}
if (message instanceof ElectionTimeout) {
- LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
- return internalSwitchBehavior(RaftState.Candidate);
+ if(canStartElection()) {
+ LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+ return internalSwitchBehavior(RaftState.Candidate);
+ } else {
+ return this;
+ }
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+ leaderId = installSnapshot.getLeaderId();
+
if(snapshotTracker == null){
snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
}
installSnapshot.getLastIncludedIndex(),
installSnapshot.getLastIncludedTerm(),
context.getTermInformation().getCurrentTerm(),
- context.getTermInformation().getVotedFor());
+ context.getTermInformation().getVotedFor(),
+ context.getPeerServerInfo(true));
ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
@Override