import akka.actor.ActorRef;
import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
- if(canStartElection()) {
- if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
- actor().tell(ElectionTimeout.INSTANCE, actor());
- } else {
- scheduleElection(electionDuration());
- }
+ if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
+ actor().tell(ElectionTimeout.INSTANCE, actor());
+ } else {
+ scheduleElection(electionDuration());
}
}
}
@VisibleForTesting
- protected final void setLeaderId(final String leaderId) {
- this.leaderId = Preconditions.checkNotNull(leaderId);
+ protected final void setLeaderId(@Nullable final String leaderId) {
+ this.leaderId = leaderId;
}
@Override
}
private boolean isLogEntryPresent(long index){
- if(index == context.getReplicatedLog().getSnapshotIndex()){
+ if(context.getReplicatedLog().isInSnapshot(index)) {
return true;
}
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
- LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
- logName(), appendEntries.getPrevLogIndex());
+ LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it - lastIndex: {}, snapshotIndex: {}",
+ logName(), appendEntries.getPrevLogIndex(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
} else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
// The follower's log is out of sync because the Leader's
logName(), prevLogTerm, appendEntries.getPrevLogTerm());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
- && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())
- && !context.getReplicatedLog().isInSnapshot(appendEntries.getReplicatedToAllIndex())) {
+ && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
- && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)
- && !context.getReplicatedLog().isInSnapshot(appendEntries.getEntries().get(0).getIndex() - 1)) {
+ && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)) {
LOG.debug(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
return internalSwitchBehavior(RaftState.Candidate);
} else {
+ setLeaderId(null);
+ scheduleElection(electionDuration());
return this;
}
}
installSnapshot.getLastIncludedTerm(),
context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor(),
- context.getPeerServerInfo(true));
+ installSnapshot.getServerConfig().orNull());
ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
@Override